Merge lp:~pedronis/ubuntu-push/unicast-in-session into lp:ubuntu-push/automatic

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 148
Merged at revision: 149
Proposed branch: lp:~pedronis/ubuntu-push/unicast-in-session
Merge into: lp:ubuntu-push/automatic
Diff against target: 558 lines (+165/-63)
4 files modified
client/client.go (+10/-8)
client/client_test.go (+33/-33)
client/session/session.go (+36/-10)
client/session/session_test.go (+86/-12)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/unicast-in-session
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+219529@code.launchpad.net

Commit message

first pass of unicast support in session

Description of the change

first pass of unicast support in session

(persistence for recent message ids will come in the next branch)

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

Lovely.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'client/client.go'
2--- client/client.go 2014-05-14 12:28:56 +0000
3+++ client/client.go 2014-05-14 13:37:29 +0000
4@@ -235,7 +235,7 @@
5 }
6 }
7
8-// filterNotification finds out if the notification is about an actual
9+// filterBroadcastNotification finds out if the notification is about an actual
10 // upgrade for the device. It expects msg.Decoded entries to look
11 // like:
12 //
13@@ -243,7 +243,7 @@
14 // "IMAGE-CHANNEL/DEVICE-MODEL": [BUILD-NUMBER, CHANNEL-ALIAS]
15 // ...
16 // }
17-func (client *PushClient) filterNotification(msg *session.Notification) bool {
18+func (client *PushClient) filterBroadcastNotification(msg *session.BroadcastNotification) bool {
19 n := len(msg.Decoded)
20 if n == 0 {
21 return false
22@@ -279,8 +279,8 @@
23 }
24
25 // handleNotification deals with receiving a notification
26-func (client *PushClient) handleNotification(msg *session.Notification) error {
27- if !client.filterNotification(msg) {
28+func (client *PushClient) handleBroadcastNotification(msg *session.BroadcastNotification) error {
29+ if !client.filterBroadcastNotification(msg) {
30 return nil
31 }
32 action_id := ACTION_ID_SNOWFLAKE
33@@ -317,15 +317,17 @@
34 }
35
36 // doLoop connects events with their handlers
37-func (client *PushClient) doLoop(connhandler func(bool), clickhandler func(string) error, notifhandler func(*session.Notification) error, errhandler func(error)) {
38+func (client *PushClient) doLoop(connhandler func(bool), clickhandler func(string) error, bcasthandler func(*session.BroadcastNotification) error, errhandler func(error)) {
39 for {
40 select {
41 case state := <-client.connCh:
42 connhandler(state)
43 case action := <-client.actionsCh:
44 clickhandler(action.ActionId)
45- case msg := <-client.session.MsgCh:
46- notifhandler(msg)
47+ case bcast := <-client.session.BroadcastCh:
48+ bcasthandler(bcast)
49+ case _ = <-client.session.NotificationsCh:
50+ // xxx implement me
51 case err := <-client.session.ErrCh:
52 errhandler(err)
53 case count := <-client.sessionConnectedCh:
54@@ -348,7 +350,7 @@
55 // Loop calls doLoop with the "real" handlers
56 func (client *PushClient) Loop() {
57 client.doLoop(client.handleConnState, client.handleClick,
58- client.handleNotification, client.handleErr)
59+ client.handleBroadcastNotification, client.handleErr)
60 }
61
62 func (client *PushClient) startService() error {
63
64=== modified file 'client/client_test.go'
65--- client/client_test.go 2014-05-14 12:28:56 +0000
66+++ client/client_test.go 2014-05-14 13:37:29 +0000
67@@ -491,7 +491,7 @@
68 }
69
70 /*****************************************************************
71- filterNotification tests
72+ filterBroadcastNotification tests
73 ******************************************************************/
74
75 var siInfoRes = &systemimage.InfoResult{
76@@ -501,23 +501,23 @@
77 LastUpdate: "Unknown",
78 }
79
80-func (cs *clientSuite) TestFilterNotification(c *C) {
81+func (cs *clientSuite) TestFilterBroadcastNotification(c *C) {
82 cli := NewPushClient(cs.configPath, cs.leveldbPath)
83 cli.systemImageInfo = siInfoRes
84 // empty
85- msg := &session.Notification{}
86- c.Check(cli.filterNotification(msg), Equals, false)
87+ msg := &session.BroadcastNotification{}
88+ c.Check(cli.filterBroadcastNotification(msg), Equals, false)
89 // same build number
90- msg = &session.Notification{
91+ msg = &session.BroadcastNotification{
92 Decoded: []map[string]interface{}{
93 map[string]interface{}{
94 "daily/mako": []interface{}{float64(102), "tubular"},
95 },
96 },
97 }
98- c.Check(cli.filterNotification(msg), Equals, false)
99+ c.Check(cli.filterBroadcastNotification(msg), Equals, false)
100 // higher build number and pick last
101- msg = &session.Notification{
102+ msg = &session.BroadcastNotification{
103 Decoded: []map[string]interface{}{
104 map[string]interface{}{
105 "daily/mako": []interface{}{float64(102), "tubular"},
106@@ -527,9 +527,9 @@
107 },
108 },
109 }
110- c.Check(cli.filterNotification(msg), Equals, true)
111+ c.Check(cli.filterBroadcastNotification(msg), Equals, true)
112 // going backward by a margin, assume switch of alias
113- msg = &session.Notification{
114+ msg = &session.BroadcastNotification{
115 Decoded: []map[string]interface{}{
116 map[string]interface{}{
117 "daily/mako": []interface{}{float64(102), "tubular"},
118@@ -539,47 +539,47 @@
119 },
120 },
121 }
122- c.Check(cli.filterNotification(msg), Equals, true)
123+ c.Check(cli.filterBroadcastNotification(msg), Equals, true)
124 }
125
126-func (cs *clientSuite) TestFilterNotificationRobust(c *C) {
127+func (cs *clientSuite) TestFilterBroadcastNotificationRobust(c *C) {
128 cli := NewPushClient(cs.configPath, cs.leveldbPath)
129 cli.systemImageInfo = siInfoRes
130- msg := &session.Notification{
131+ msg := &session.BroadcastNotification{
132 Decoded: []map[string]interface{}{
133 map[string]interface{}{},
134 },
135 }
136- c.Check(cli.filterNotification(msg), Equals, false)
137+ c.Check(cli.filterBroadcastNotification(msg), Equals, false)
138 for _, broken := range []interface{}{
139 5,
140 []interface{}{},
141 []interface{}{55},
142 } {
143- msg := &session.Notification{
144+ msg := &session.BroadcastNotification{
145 Decoded: []map[string]interface{}{
146 map[string]interface{}{
147 "daily/mako": broken,
148 },
149 },
150 }
151- c.Check(cli.filterNotification(msg), Equals, false)
152+ c.Check(cli.filterBroadcastNotification(msg), Equals, false)
153 }
154 }
155
156 /*****************************************************************
157- handleNotification tests
158+ handleBroadcastNotification tests
159 ******************************************************************/
160
161 var (
162- positiveNotification = &session.Notification{
163+ positiveBroadcastNotification = &session.BroadcastNotification{
164 Decoded: []map[string]interface{}{
165 map[string]interface{}{
166 "daily/mako": []interface{}{float64(103), "tubular"},
167 },
168 },
169 }
170- negativeNotification = &session.Notification{
171+ negativeBroadcastNotification = &session.BroadcastNotification{
172 Decoded: []map[string]interface{}{
173 map[string]interface{}{
174 "daily/mako": []interface{}{float64(102), "tubular"},
175@@ -588,13 +588,13 @@
176 }
177 )
178
179-func (cs *clientSuite) TestHandleNotification(c *C) {
180+func (cs *clientSuite) TestHandleBroadcastNotification(c *C) {
181 cli := NewPushClient(cs.configPath, cs.leveldbPath)
182 cli.systemImageInfo = siInfoRes
183 endp := testibus.NewTestingEndpoint(nil, condition.Work(true), uint32(1))
184 cli.notificationsEndp = endp
185 cli.log = cs.log
186- c.Check(cli.handleNotification(positiveNotification), IsNil)
187+ c.Check(cli.handleBroadcastNotification(positiveBroadcastNotification), IsNil)
188 // check we sent the notification
189 args := testibus.GetCallArgs(endp)
190 c.Assert(args, HasLen, 1)
191@@ -602,26 +602,26 @@
192 c.Check(cs.log.Captured(), Matches, `.* got notification id \d+\s*`)
193 }
194
195-func (cs *clientSuite) TestHandleNotificationNothingToDo(c *C) {
196+func (cs *clientSuite) TestHandleBroadcastNotificationNothingToDo(c *C) {
197 cli := NewPushClient(cs.configPath, cs.leveldbPath)
198 cli.systemImageInfo = siInfoRes
199 endp := testibus.NewTestingEndpoint(nil, condition.Work(true), uint32(1))
200 cli.notificationsEndp = endp
201 cli.log = cs.log
202- c.Check(cli.handleNotification(negativeNotification), IsNil)
203+ c.Check(cli.handleBroadcastNotification(negativeBroadcastNotification), IsNil)
204 // check we sent the notification
205 args := testibus.GetCallArgs(endp)
206 c.Assert(args, HasLen, 0)
207 c.Check(cs.log.Captured(), Matches, "")
208 }
209
210-func (cs *clientSuite) TestHandleNotificationFail(c *C) {
211+func (cs *clientSuite) TestHandleBroadcastNotificationFail(c *C) {
212 cli := NewPushClient(cs.configPath, cs.leveldbPath)
213 cli.systemImageInfo = siInfoRes
214 cli.log = cs.log
215 endp := testibus.NewTestingEndpoint(nil, condition.Work(false))
216 cli.notificationsEndp = endp
217- c.Check(cli.handleNotification(positiveNotification), NotNil)
218+ c.Check(cli.handleBroadcastNotification(positiveBroadcastNotification), NotNil)
219 }
220
221 /*****************************************************************
222@@ -660,7 +660,7 @@
223 c.Assert(cli.initSession(), IsNil)
224
225 ch := make(chan bool, 1)
226- go cli.doLoop(func(bool) { ch <- true }, func(_ string) error { return nil }, func(_ *session.Notification) error { return nil }, func(error) {})
227+ go cli.doLoop(func(bool) { ch <- true }, func(_ string) error { return nil }, func(_ *session.BroadcastNotification) error { return nil }, func(error) {})
228 c.Check(takeNextBool(ch), Equals, true)
229 }
230
231@@ -674,7 +674,7 @@
232 cli.actionsCh = aCh
233
234 ch := make(chan bool, 1)
235- go cli.doLoop(func(bool) {}, func(_ string) error { ch <- true; return nil }, func(_ *session.Notification) error { return nil }, func(error) {})
236+ go cli.doLoop(func(bool) {}, func(_ string) error { ch <- true; return nil }, func(_ *session.BroadcastNotification) error { return nil }, func(error) {})
237 c.Check(takeNextBool(ch), Equals, true)
238 }
239
240@@ -683,11 +683,11 @@
241 cli.log = cs.log
242 cli.systemImageInfo = siInfoRes
243 c.Assert(cli.initSession(), IsNil)
244- cli.session.MsgCh = make(chan *session.Notification, 1)
245- cli.session.MsgCh <- &session.Notification{}
246+ cli.session.BroadcastCh = make(chan *session.BroadcastNotification, 1)
247+ cli.session.BroadcastCh <- &session.BroadcastNotification{}
248
249 ch := make(chan bool, 1)
250- go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.Notification) error { ch <- true; return nil }, func(error) {})
251+ go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.BroadcastNotification) error { ch <- true; return nil }, func(error) {})
252 c.Check(takeNextBool(ch), Equals, true)
253 }
254
255@@ -700,7 +700,7 @@
256 cli.session.ErrCh <- nil
257
258 ch := make(chan bool, 1)
259- go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.Notification) error { return nil }, func(error) { ch <- true })
260+ go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.BroadcastNotification) error { return nil }, func(error) { ch <- true })
261 c.Check(takeNextBool(ch), Equals, true)
262 }
263
264@@ -750,7 +750,7 @@
265 cli.systemImageInfo = siInfoRes
266 c.Assert(cli.initSession(), IsNil)
267
268- cli.session.MsgCh = make(chan *session.Notification)
269+ cli.session.BroadcastCh = make(chan *session.BroadcastNotification)
270 cli.session.ErrCh = make(chan error)
271
272 // we use tick() to make sure things have been through the
273@@ -783,8 +783,8 @@
274 tick()
275 c.Check(cli.hasConnectivity, Equals, false)
276
277- // * session.MsgCh to the notifications handler
278- cli.session.MsgCh <- positiveNotification
279+ // * session.BroadcastCh to the notifications handler
280+ cli.session.BroadcastCh <- positiveBroadcastNotification
281 tick()
282 nargs := testibus.GetCallArgs(cli.notificationsEndp)
283 c.Check(nargs, HasLen, 1)
284
285=== modified file 'client/session/session.go'
286--- client/session/session.go 2014-05-02 10:42:16 +0000
287+++ client/session/session.go 2014-05-14 13:37:29 +0000
288@@ -43,7 +43,7 @@
289 wireVersionBytes = []byte{protocol.ProtocolWireVersion}
290 )
291
292-type Notification struct {
293+type BroadcastNotification struct {
294 TopLevel int64
295 Decoded []map[string]interface{}
296 }
297@@ -116,9 +116,10 @@
298 pingInterval time.Duration
299 retrier util.AutoRedialer
300 // status
301- stateP *uint32
302- ErrCh chan error
303- MsgCh chan *Notification
304+ stateP *uint32
305+ ErrCh chan error
306+ BroadcastCh chan *BroadcastNotification
307+ NotificationsCh chan *protocol.Notification
308 // authorization
309 auth string
310 // autoredial knobs
311@@ -370,7 +371,7 @@
312 return err
313 }
314
315-func (sess *ClientSession) decodeBroadcast(bcast *serverMsg) *Notification {
316+func (sess *ClientSession) decodeBroadcast(bcast *serverMsg) *BroadcastNotification {
317 decoded := make([]map[string]interface{}, 0)
318 for _, p := range bcast.Payloads {
319 var v map[string]interface{}
320@@ -381,7 +382,7 @@
321 }
322 decoded = append(decoded, v)
323 }
324- return &Notification{
325+ return &BroadcastNotification{
326 TopLevel: bcast.TopLevel,
327 Decoded: decoded,
328 }
329@@ -409,15 +410,37 @@
330 bcast.ChanId, bcast.AppId, bcast.TopLevel, bcast.Payloads)
331 if bcast.ChanId == protocol.SystemChannelId {
332 // the system channel id, the only one we care about for now
333- sess.Log.Debugf("sending it over")
334- sess.MsgCh <- sess.decodeBroadcast(bcast)
335- sess.Log.Debugf("sent it over")
336+ sess.Log.Debugf("sending bcast over")
337+ sess.BroadcastCh <- sess.decodeBroadcast(bcast)
338+ sess.Log.Debugf("sent bcast over")
339 } else {
340 sess.Log.Debugf("what is this weird channel, %#v?", bcast.ChanId)
341 }
342 return nil
343 }
344
345+// handle "notifications" messages
346+func (sess *ClientSession) handleNotifications(ucast *serverMsg) error {
347+ // the server assumes if we ack the broadcast, we've updated
348+ // our state. Hence the order.
349+ err := sess.proto.WriteMessage(protocol.AckMsg{"ack"})
350+ if err != nil {
351+ sess.setState(Error)
352+ sess.Log.Errorf("unable to ack notifications: %s", err)
353+ return err
354+ }
355+ sess.clearShouldDelay()
356+ for i := range ucast.Notifications {
357+ notif := &ucast.Notifications[i]
358+ sess.Log.Debugf("unicast app:%v msg:%s payload:%s",
359+ notif.AppId, notif.MsgId, notif.Payload)
360+ sess.Log.Debugf("sending ucast over")
361+ sess.NotificationsCh <- notif
362+ sess.Log.Debugf("sent ucast over")
363+ }
364+ return nil
365+}
366+
367 // handle "connbroken" messages
368 func (sess *ClientSession) handleConnBroken(connBroken *serverMsg) error {
369 sess.setState(Error)
370@@ -449,6 +472,8 @@
371 err = sess.handlePing()
372 case "broadcast":
373 err = sess.handleBroadcast(&recv)
374+ case "notifications":
375+ err = sess.handleNotifications(&recv)
376 case "connbroken":
377 err = sess.handleConnBroken(&recv)
378 }
379@@ -534,7 +559,8 @@
380 err = starter()
381 if err == nil {
382 sess.ErrCh = make(chan error, 1)
383- sess.MsgCh = make(chan *Notification)
384+ sess.BroadcastCh = make(chan *BroadcastNotification)
385+ sess.NotificationsCh = make(chan *protocol.Notification)
386 go func() { sess.ErrCh <- looper() }()
387 }
388 }
389
390=== modified file 'client/session/session_test.go'
391--- client/session/session_test.go 2014-05-02 10:42:16 +0000
392+++ client/session/session_test.go 2014-05-14 13:37:29 +0000
393@@ -645,7 +645,8 @@
394 s.downCh = make(chan interface{}, 5)
395 s.sess.proto = &testProtocol{up: s.upCh, down: s.downCh}
396 // make the message channel buffered
397- s.sess.MsgCh = make(chan *Notification, 5)
398+ s.sess.BroadcastCh = make(chan *BroadcastNotification, 5)
399+ s.sess.NotificationsCh = make(chan *protocol.Notification, 5)
400 }
401
402 func (s *msgSuite) TestHandlePingWorks(c *C) {
403@@ -704,8 +705,8 @@
404 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
405 s.upCh <- nil // ack ok
406 c.Check(<-s.errCh, Equals, nil)
407- c.Assert(len(s.sess.MsgCh), Equals, 1)
408- c.Check(<-s.sess.MsgCh, DeepEquals, &Notification{
409+ c.Assert(len(s.sess.BroadcastCh), Equals, 1)
410+ c.Check(<-s.sess.BroadcastCh, DeepEquals, &BroadcastNotification{
411 TopLevel: 2,
412 Decoded: []map[string]interface{}{
413 map[string]interface{}{
414@@ -752,7 +753,7 @@
415 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
416 s.upCh <- nil // ack ok
417 c.Check(<-s.errCh, IsNil)
418- c.Check(len(s.sess.MsgCh), Equals, 0)
419+ c.Check(len(s.sess.BroadcastCh), Equals, 0)
420 }
421
422 func (s *msgSuite) TestHandleBroadcastWrongBrokenLevelmap(c *C) {
423@@ -770,7 +771,7 @@
424 // start returns with error
425 c.Check(<-s.errCh, Not(Equals), nil)
426 // no message sent out
427- c.Check(len(s.sess.MsgCh), Equals, 0)
428+ c.Check(len(s.sess.BroadcastCh), Equals, 0)
429 // and nak'ed it
430 c.Check(len(s.downCh), Equals, 1)
431 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"nak"})
432@@ -803,6 +804,59 @@
433 }
434
435 /****************************************************************
436+ handleNotifications() tests
437+****************************************************************/
438+
439+func (s *msgSuite) TestHandleNotificationsWorks(c *C) {
440+ s.sess.setShouldDelay()
441+ n1 := protocol.Notification{
442+ AppId: "app1",
443+ MsgId: "a",
444+ Payload: json.RawMessage(`{"m": 1}`),
445+ }
446+ n2 := protocol.Notification{
447+ AppId: "app2",
448+ MsgId: "b",
449+ Payload: json.RawMessage(`{"m": 2}`),
450+ }
451+ msg := serverMsg{"notifications",
452+ protocol.BroadcastMsg{},
453+ protocol.NotificationsMsg{
454+ Notifications: []protocol.Notification{n1, n2},
455+ }, protocol.ConnBrokenMsg{}}
456+ go func() { s.errCh <- s.sess.handleNotifications(&msg) }()
457+ c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
458+ s.upCh <- nil // ack ok
459+ c.Check(<-s.errCh, Equals, nil)
460+ c.Check(s.sess.ShouldDelay(), Equals, false)
461+ c.Assert(len(s.sess.NotificationsCh), Equals, 2)
462+ c.Check(<-s.sess.NotificationsCh, DeepEquals, &n1)
463+ c.Check(<-s.sess.NotificationsCh, DeepEquals, &n2)
464+}
465+
466+func (s *msgSuite) TestHandleNotificationsBadAckWrite(c *C) {
467+ s.sess.setShouldDelay()
468+ n1 := protocol.Notification{
469+ AppId: "app1",
470+ MsgId: "a",
471+ Payload: json.RawMessage(`{"m": 1}`),
472+ }
473+ msg := serverMsg{"notifications",
474+ protocol.BroadcastMsg{},
475+ protocol.NotificationsMsg{
476+ Notifications: []protocol.Notification{n1},
477+ }, protocol.ConnBrokenMsg{}}
478+ go func() { s.errCh <- s.sess.handleNotifications(&msg) }()
479+ c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
480+ failure := errors.New("ACK ACK ACK")
481+ s.upCh <- failure
482+ c.Assert(<-s.errCh, Equals, failure)
483+ c.Check(s.sess.State(), Equals, Error)
484+ // didn't get to clear
485+ c.Check(s.sess.ShouldDelay(), Equals, true)
486+}
487+
488+/****************************************************************
489 handleConnBroken() tests
490 ****************************************************************/
491
492@@ -897,6 +951,26 @@
493 c.Check(<-s.errCh, Equals, failure)
494 }
495
496+func (s *loopSuite) TestLoopNotifications(c *C) {
497+ c.Check(s.sess.State(), Equals, Running)
498+
499+ n1 := protocol.Notification{
500+ AppId: "app1",
501+ MsgId: "a",
502+ Payload: json.RawMessage(`{"m": 1}`),
503+ }
504+ msg := &protocol.NotificationsMsg{
505+ Type: "notifications",
506+ Notifications: []protocol.Notification{n1},
507+ }
508+ c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
509+ s.upCh <- msg
510+ c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
511+ failure := errors.New("ack")
512+ s.upCh <- failure
513+ c.Check(<-s.errCh, Equals, failure)
514+}
515+
516 func (s *loopSuite) TestLoopConnBroken(c *C) {
517 c.Check(s.sess.State(), Equals, Running)
518 broken := protocol.ConnBrokenMsg{
519@@ -1155,24 +1229,24 @@
520 func (cs *clientSessionSuite) TestRunRunsEvenIfLoopFails(c *C) {
521 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
522 c.Assert(err, IsNil)
523- // just to make a point: until here we haven't set ErrCh & MsgCh (no
524+ // just to make a point: until here we haven't set ErrCh & BroadcastCh (no
525 // biggie if this stops being true)
526 c.Check(sess.ErrCh, IsNil)
527- c.Check(sess.MsgCh, IsNil)
528+ c.Check(sess.BroadcastCh, IsNil)
529 failureCh := make(chan error) // must be unbuffered
530- notf := &Notification{}
531+ notf := &BroadcastNotification{}
532 err = sess.run(
533 func() {},
534 func() error { return nil },
535 func() error { return nil },
536 func() error { return nil },
537 func() error { return nil },
538- func() error { sess.MsgCh <- notf; return <-failureCh })
539+ func() error { sess.BroadcastCh <- notf; return <-failureCh })
540 c.Check(err, Equals, nil)
541 // if run doesn't error it sets up the channels
542 c.Assert(sess.ErrCh, NotNil)
543- c.Assert(sess.MsgCh, NotNil)
544- c.Check(<-sess.MsgCh, Equals, notf)
545+ c.Assert(sess.BroadcastCh, NotNil)
546+ c.Check(<-sess.BroadcastCh, Equals, notf)
547 failure := errors.New("TestRunRunsEvenIfLoopFails")
548 failureCh <- failure
549 c.Check(<-sess.ErrCh, Equals, failure)
550@@ -1375,7 +1449,7 @@
551 c.Check(takeNext(downCh), Equals, protocol.AckMsg{"ack"})
552 upCh <- nil
553 // ...get bubbled up,
554- c.Check(<-sess.MsgCh, NotNil)
555+ c.Check(<-sess.BroadcastCh, NotNil)
556 // and their TopLevel remembered
557 levels, err := sess.Levels.GetAll()
558 c.Check(err, IsNil)

Subscribers

People subscribed via source and target branches