Merge lp:~pedronis/ubuntu-push/unicast-in-session into lp:ubuntu-push/automatic
- unicast-in-session
- Merge into automatic
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 148 |
Merged at revision: | 149 |
Proposed branch: | lp:~pedronis/ubuntu-push/unicast-in-session |
Merge into: | lp:ubuntu-push/automatic |
Diff against target: |
558 lines (+165/-63) 4 files modified
client/client.go (+10/-8) client/client_test.go (+33/-33) client/session/session.go (+36/-10) client/session/session_test.go (+86/-12) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/unicast-in-session |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email: mp+219529@code.launchpad.net |
Commit message
first pass of unicast support in session
Description of the change
first pass of unicast support in session
(persistence for recent message ids will come in the next branch)
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'client/client.go' |
2 | --- client/client.go 2014-05-14 12:28:56 +0000 |
3 | +++ client/client.go 2014-05-14 13:37:29 +0000 |
4 | @@ -235,7 +235,7 @@ |
5 | } |
6 | } |
7 | |
8 | -// filterNotification finds out if the notification is about an actual |
9 | +// filterBroadcastNotification finds out if the notification is about an actual |
10 | // upgrade for the device. It expects msg.Decoded entries to look |
11 | // like: |
12 | // |
13 | @@ -243,7 +243,7 @@ |
14 | // "IMAGE-CHANNEL/DEVICE-MODEL": [BUILD-NUMBER, CHANNEL-ALIAS] |
15 | // ... |
16 | // } |
17 | -func (client *PushClient) filterNotification(msg *session.Notification) bool { |
18 | +func (client *PushClient) filterBroadcastNotification(msg *session.BroadcastNotification) bool { |
19 | n := len(msg.Decoded) |
20 | if n == 0 { |
21 | return false |
22 | @@ -279,8 +279,8 @@ |
23 | } |
24 | |
25 | // handleNotification deals with receiving a notification |
26 | -func (client *PushClient) handleNotification(msg *session.Notification) error { |
27 | - if !client.filterNotification(msg) { |
28 | +func (client *PushClient) handleBroadcastNotification(msg *session.BroadcastNotification) error { |
29 | + if !client.filterBroadcastNotification(msg) { |
30 | return nil |
31 | } |
32 | action_id := ACTION_ID_SNOWFLAKE |
33 | @@ -317,15 +317,17 @@ |
34 | } |
35 | |
36 | // doLoop connects events with their handlers |
37 | -func (client *PushClient) doLoop(connhandler func(bool), clickhandler func(string) error, notifhandler func(*session.Notification) error, errhandler func(error)) { |
38 | +func (client *PushClient) doLoop(connhandler func(bool), clickhandler func(string) error, bcasthandler func(*session.BroadcastNotification) error, errhandler func(error)) { |
39 | for { |
40 | select { |
41 | case state := <-client.connCh: |
42 | connhandler(state) |
43 | case action := <-client.actionsCh: |
44 | clickhandler(action.ActionId) |
45 | - case msg := <-client.session.MsgCh: |
46 | - notifhandler(msg) |
47 | + case bcast := <-client.session.BroadcastCh: |
48 | + bcasthandler(bcast) |
49 | + case _ = <-client.session.NotificationsCh: |
50 | + // xxx implement me |
51 | case err := <-client.session.ErrCh: |
52 | errhandler(err) |
53 | case count := <-client.sessionConnectedCh: |
54 | @@ -348,7 +350,7 @@ |
55 | // Loop calls doLoop with the "real" handlers |
56 | func (client *PushClient) Loop() { |
57 | client.doLoop(client.handleConnState, client.handleClick, |
58 | - client.handleNotification, client.handleErr) |
59 | + client.handleBroadcastNotification, client.handleErr) |
60 | } |
61 | |
62 | func (client *PushClient) startService() error { |
63 | |
64 | === modified file 'client/client_test.go' |
65 | --- client/client_test.go 2014-05-14 12:28:56 +0000 |
66 | +++ client/client_test.go 2014-05-14 13:37:29 +0000 |
67 | @@ -491,7 +491,7 @@ |
68 | } |
69 | |
70 | /***************************************************************** |
71 | - filterNotification tests |
72 | + filterBroadcastNotification tests |
73 | ******************************************************************/ |
74 | |
75 | var siInfoRes = &systemimage.InfoResult{ |
76 | @@ -501,23 +501,23 @@ |
77 | LastUpdate: "Unknown", |
78 | } |
79 | |
80 | -func (cs *clientSuite) TestFilterNotification(c *C) { |
81 | +func (cs *clientSuite) TestFilterBroadcastNotification(c *C) { |
82 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
83 | cli.systemImageInfo = siInfoRes |
84 | // empty |
85 | - msg := &session.Notification{} |
86 | - c.Check(cli.filterNotification(msg), Equals, false) |
87 | + msg := &session.BroadcastNotification{} |
88 | + c.Check(cli.filterBroadcastNotification(msg), Equals, false) |
89 | // same build number |
90 | - msg = &session.Notification{ |
91 | + msg = &session.BroadcastNotification{ |
92 | Decoded: []map[string]interface{}{ |
93 | map[string]interface{}{ |
94 | "daily/mako": []interface{}{float64(102), "tubular"}, |
95 | }, |
96 | }, |
97 | } |
98 | - c.Check(cli.filterNotification(msg), Equals, false) |
99 | + c.Check(cli.filterBroadcastNotification(msg), Equals, false) |
100 | // higher build number and pick last |
101 | - msg = &session.Notification{ |
102 | + msg = &session.BroadcastNotification{ |
103 | Decoded: []map[string]interface{}{ |
104 | map[string]interface{}{ |
105 | "daily/mako": []interface{}{float64(102), "tubular"}, |
106 | @@ -527,9 +527,9 @@ |
107 | }, |
108 | }, |
109 | } |
110 | - c.Check(cli.filterNotification(msg), Equals, true) |
111 | + c.Check(cli.filterBroadcastNotification(msg), Equals, true) |
112 | // going backward by a margin, assume switch of alias |
113 | - msg = &session.Notification{ |
114 | + msg = &session.BroadcastNotification{ |
115 | Decoded: []map[string]interface{}{ |
116 | map[string]interface{}{ |
117 | "daily/mako": []interface{}{float64(102), "tubular"}, |
118 | @@ -539,47 +539,47 @@ |
119 | }, |
120 | }, |
121 | } |
122 | - c.Check(cli.filterNotification(msg), Equals, true) |
123 | + c.Check(cli.filterBroadcastNotification(msg), Equals, true) |
124 | } |
125 | |
126 | -func (cs *clientSuite) TestFilterNotificationRobust(c *C) { |
127 | +func (cs *clientSuite) TestFilterBroadcastNotificationRobust(c *C) { |
128 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
129 | cli.systemImageInfo = siInfoRes |
130 | - msg := &session.Notification{ |
131 | + msg := &session.BroadcastNotification{ |
132 | Decoded: []map[string]interface{}{ |
133 | map[string]interface{}{}, |
134 | }, |
135 | } |
136 | - c.Check(cli.filterNotification(msg), Equals, false) |
137 | + c.Check(cli.filterBroadcastNotification(msg), Equals, false) |
138 | for _, broken := range []interface{}{ |
139 | 5, |
140 | []interface{}{}, |
141 | []interface{}{55}, |
142 | } { |
143 | - msg := &session.Notification{ |
144 | + msg := &session.BroadcastNotification{ |
145 | Decoded: []map[string]interface{}{ |
146 | map[string]interface{}{ |
147 | "daily/mako": broken, |
148 | }, |
149 | }, |
150 | } |
151 | - c.Check(cli.filterNotification(msg), Equals, false) |
152 | + c.Check(cli.filterBroadcastNotification(msg), Equals, false) |
153 | } |
154 | } |
155 | |
156 | /***************************************************************** |
157 | - handleNotification tests |
158 | + handleBroadcastNotification tests |
159 | ******************************************************************/ |
160 | |
161 | var ( |
162 | - positiveNotification = &session.Notification{ |
163 | + positiveBroadcastNotification = &session.BroadcastNotification{ |
164 | Decoded: []map[string]interface{}{ |
165 | map[string]interface{}{ |
166 | "daily/mako": []interface{}{float64(103), "tubular"}, |
167 | }, |
168 | }, |
169 | } |
170 | - negativeNotification = &session.Notification{ |
171 | + negativeBroadcastNotification = &session.BroadcastNotification{ |
172 | Decoded: []map[string]interface{}{ |
173 | map[string]interface{}{ |
174 | "daily/mako": []interface{}{float64(102), "tubular"}, |
175 | @@ -588,13 +588,13 @@ |
176 | } |
177 | ) |
178 | |
179 | -func (cs *clientSuite) TestHandleNotification(c *C) { |
180 | +func (cs *clientSuite) TestHandleBroadcastNotification(c *C) { |
181 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
182 | cli.systemImageInfo = siInfoRes |
183 | endp := testibus.NewTestingEndpoint(nil, condition.Work(true), uint32(1)) |
184 | cli.notificationsEndp = endp |
185 | cli.log = cs.log |
186 | - c.Check(cli.handleNotification(positiveNotification), IsNil) |
187 | + c.Check(cli.handleBroadcastNotification(positiveBroadcastNotification), IsNil) |
188 | // check we sent the notification |
189 | args := testibus.GetCallArgs(endp) |
190 | c.Assert(args, HasLen, 1) |
191 | @@ -602,26 +602,26 @@ |
192 | c.Check(cs.log.Captured(), Matches, `.* got notification id \d+\s*`) |
193 | } |
194 | |
195 | -func (cs *clientSuite) TestHandleNotificationNothingToDo(c *C) { |
196 | +func (cs *clientSuite) TestHandleBroadcastNotificationNothingToDo(c *C) { |
197 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
198 | cli.systemImageInfo = siInfoRes |
199 | endp := testibus.NewTestingEndpoint(nil, condition.Work(true), uint32(1)) |
200 | cli.notificationsEndp = endp |
201 | cli.log = cs.log |
202 | - c.Check(cli.handleNotification(negativeNotification), IsNil) |
203 | + c.Check(cli.handleBroadcastNotification(negativeBroadcastNotification), IsNil) |
204 | // check we sent the notification |
205 | args := testibus.GetCallArgs(endp) |
206 | c.Assert(args, HasLen, 0) |
207 | c.Check(cs.log.Captured(), Matches, "") |
208 | } |
209 | |
210 | -func (cs *clientSuite) TestHandleNotificationFail(c *C) { |
211 | +func (cs *clientSuite) TestHandleBroadcastNotificationFail(c *C) { |
212 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
213 | cli.systemImageInfo = siInfoRes |
214 | cli.log = cs.log |
215 | endp := testibus.NewTestingEndpoint(nil, condition.Work(false)) |
216 | cli.notificationsEndp = endp |
217 | - c.Check(cli.handleNotification(positiveNotification), NotNil) |
218 | + c.Check(cli.handleBroadcastNotification(positiveBroadcastNotification), NotNil) |
219 | } |
220 | |
221 | /***************************************************************** |
222 | @@ -660,7 +660,7 @@ |
223 | c.Assert(cli.initSession(), IsNil) |
224 | |
225 | ch := make(chan bool, 1) |
226 | - go cli.doLoop(func(bool) { ch <- true }, func(_ string) error { return nil }, func(_ *session.Notification) error { return nil }, func(error) {}) |
227 | + go cli.doLoop(func(bool) { ch <- true }, func(_ string) error { return nil }, func(_ *session.BroadcastNotification) error { return nil }, func(error) {}) |
228 | c.Check(takeNextBool(ch), Equals, true) |
229 | } |
230 | |
231 | @@ -674,7 +674,7 @@ |
232 | cli.actionsCh = aCh |
233 | |
234 | ch := make(chan bool, 1) |
235 | - go cli.doLoop(func(bool) {}, func(_ string) error { ch <- true; return nil }, func(_ *session.Notification) error { return nil }, func(error) {}) |
236 | + go cli.doLoop(func(bool) {}, func(_ string) error { ch <- true; return nil }, func(_ *session.BroadcastNotification) error { return nil }, func(error) {}) |
237 | c.Check(takeNextBool(ch), Equals, true) |
238 | } |
239 | |
240 | @@ -683,11 +683,11 @@ |
241 | cli.log = cs.log |
242 | cli.systemImageInfo = siInfoRes |
243 | c.Assert(cli.initSession(), IsNil) |
244 | - cli.session.MsgCh = make(chan *session.Notification, 1) |
245 | - cli.session.MsgCh <- &session.Notification{} |
246 | + cli.session.BroadcastCh = make(chan *session.BroadcastNotification, 1) |
247 | + cli.session.BroadcastCh <- &session.BroadcastNotification{} |
248 | |
249 | ch := make(chan bool, 1) |
250 | - go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.Notification) error { ch <- true; return nil }, func(error) {}) |
251 | + go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.BroadcastNotification) error { ch <- true; return nil }, func(error) {}) |
252 | c.Check(takeNextBool(ch), Equals, true) |
253 | } |
254 | |
255 | @@ -700,7 +700,7 @@ |
256 | cli.session.ErrCh <- nil |
257 | |
258 | ch := make(chan bool, 1) |
259 | - go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.Notification) error { return nil }, func(error) { ch <- true }) |
260 | + go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.BroadcastNotification) error { return nil }, func(error) { ch <- true }) |
261 | c.Check(takeNextBool(ch), Equals, true) |
262 | } |
263 | |
264 | @@ -750,7 +750,7 @@ |
265 | cli.systemImageInfo = siInfoRes |
266 | c.Assert(cli.initSession(), IsNil) |
267 | |
268 | - cli.session.MsgCh = make(chan *session.Notification) |
269 | + cli.session.BroadcastCh = make(chan *session.BroadcastNotification) |
270 | cli.session.ErrCh = make(chan error) |
271 | |
272 | // we use tick() to make sure things have been through the |
273 | @@ -783,8 +783,8 @@ |
274 | tick() |
275 | c.Check(cli.hasConnectivity, Equals, false) |
276 | |
277 | - // * session.MsgCh to the notifications handler |
278 | - cli.session.MsgCh <- positiveNotification |
279 | + // * session.BroadcastCh to the notifications handler |
280 | + cli.session.BroadcastCh <- positiveBroadcastNotification |
281 | tick() |
282 | nargs := testibus.GetCallArgs(cli.notificationsEndp) |
283 | c.Check(nargs, HasLen, 1) |
284 | |
285 | === modified file 'client/session/session.go' |
286 | --- client/session/session.go 2014-05-02 10:42:16 +0000 |
287 | +++ client/session/session.go 2014-05-14 13:37:29 +0000 |
288 | @@ -43,7 +43,7 @@ |
289 | wireVersionBytes = []byte{protocol.ProtocolWireVersion} |
290 | ) |
291 | |
292 | -type Notification struct { |
293 | +type BroadcastNotification struct { |
294 | TopLevel int64 |
295 | Decoded []map[string]interface{} |
296 | } |
297 | @@ -116,9 +116,10 @@ |
298 | pingInterval time.Duration |
299 | retrier util.AutoRedialer |
300 | // status |
301 | - stateP *uint32 |
302 | - ErrCh chan error |
303 | - MsgCh chan *Notification |
304 | + stateP *uint32 |
305 | + ErrCh chan error |
306 | + BroadcastCh chan *BroadcastNotification |
307 | + NotificationsCh chan *protocol.Notification |
308 | // authorization |
309 | auth string |
310 | // autoredial knobs |
311 | @@ -370,7 +371,7 @@ |
312 | return err |
313 | } |
314 | |
315 | -func (sess *ClientSession) decodeBroadcast(bcast *serverMsg) *Notification { |
316 | +func (sess *ClientSession) decodeBroadcast(bcast *serverMsg) *BroadcastNotification { |
317 | decoded := make([]map[string]interface{}, 0) |
318 | for _, p := range bcast.Payloads { |
319 | var v map[string]interface{} |
320 | @@ -381,7 +382,7 @@ |
321 | } |
322 | decoded = append(decoded, v) |
323 | } |
324 | - return &Notification{ |
325 | + return &BroadcastNotification{ |
326 | TopLevel: bcast.TopLevel, |
327 | Decoded: decoded, |
328 | } |
329 | @@ -409,15 +410,37 @@ |
330 | bcast.ChanId, bcast.AppId, bcast.TopLevel, bcast.Payloads) |
331 | if bcast.ChanId == protocol.SystemChannelId { |
332 | // the system channel id, the only one we care about for now |
333 | - sess.Log.Debugf("sending it over") |
334 | - sess.MsgCh <- sess.decodeBroadcast(bcast) |
335 | - sess.Log.Debugf("sent it over") |
336 | + sess.Log.Debugf("sending bcast over") |
337 | + sess.BroadcastCh <- sess.decodeBroadcast(bcast) |
338 | + sess.Log.Debugf("sent bcast over") |
339 | } else { |
340 | sess.Log.Debugf("what is this weird channel, %#v?", bcast.ChanId) |
341 | } |
342 | return nil |
343 | } |
344 | |
345 | +// handle "notifications" messages |
346 | +func (sess *ClientSession) handleNotifications(ucast *serverMsg) error { |
347 | + // the server assumes if we ack the broadcast, we've updated |
348 | + // our state. Hence the order. |
349 | + err := sess.proto.WriteMessage(protocol.AckMsg{"ack"}) |
350 | + if err != nil { |
351 | + sess.setState(Error) |
352 | + sess.Log.Errorf("unable to ack notifications: %s", err) |
353 | + return err |
354 | + } |
355 | + sess.clearShouldDelay() |
356 | + for i := range ucast.Notifications { |
357 | + notif := &ucast.Notifications[i] |
358 | + sess.Log.Debugf("unicast app:%v msg:%s payload:%s", |
359 | + notif.AppId, notif.MsgId, notif.Payload) |
360 | + sess.Log.Debugf("sending ucast over") |
361 | + sess.NotificationsCh <- notif |
362 | + sess.Log.Debugf("sent ucast over") |
363 | + } |
364 | + return nil |
365 | +} |
366 | + |
367 | // handle "connbroken" messages |
368 | func (sess *ClientSession) handleConnBroken(connBroken *serverMsg) error { |
369 | sess.setState(Error) |
370 | @@ -449,6 +472,8 @@ |
371 | err = sess.handlePing() |
372 | case "broadcast": |
373 | err = sess.handleBroadcast(&recv) |
374 | + case "notifications": |
375 | + err = sess.handleNotifications(&recv) |
376 | case "connbroken": |
377 | err = sess.handleConnBroken(&recv) |
378 | } |
379 | @@ -534,7 +559,8 @@ |
380 | err = starter() |
381 | if err == nil { |
382 | sess.ErrCh = make(chan error, 1) |
383 | - sess.MsgCh = make(chan *Notification) |
384 | + sess.BroadcastCh = make(chan *BroadcastNotification) |
385 | + sess.NotificationsCh = make(chan *protocol.Notification) |
386 | go func() { sess.ErrCh <- looper() }() |
387 | } |
388 | } |
389 | |
390 | === modified file 'client/session/session_test.go' |
391 | --- client/session/session_test.go 2014-05-02 10:42:16 +0000 |
392 | +++ client/session/session_test.go 2014-05-14 13:37:29 +0000 |
393 | @@ -645,7 +645,8 @@ |
394 | s.downCh = make(chan interface{}, 5) |
395 | s.sess.proto = &testProtocol{up: s.upCh, down: s.downCh} |
396 | // make the message channel buffered |
397 | - s.sess.MsgCh = make(chan *Notification, 5) |
398 | + s.sess.BroadcastCh = make(chan *BroadcastNotification, 5) |
399 | + s.sess.NotificationsCh = make(chan *protocol.Notification, 5) |
400 | } |
401 | |
402 | func (s *msgSuite) TestHandlePingWorks(c *C) { |
403 | @@ -704,8 +705,8 @@ |
404 | c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
405 | s.upCh <- nil // ack ok |
406 | c.Check(<-s.errCh, Equals, nil) |
407 | - c.Assert(len(s.sess.MsgCh), Equals, 1) |
408 | - c.Check(<-s.sess.MsgCh, DeepEquals, &Notification{ |
409 | + c.Assert(len(s.sess.BroadcastCh), Equals, 1) |
410 | + c.Check(<-s.sess.BroadcastCh, DeepEquals, &BroadcastNotification{ |
411 | TopLevel: 2, |
412 | Decoded: []map[string]interface{}{ |
413 | map[string]interface{}{ |
414 | @@ -752,7 +753,7 @@ |
415 | c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
416 | s.upCh <- nil // ack ok |
417 | c.Check(<-s.errCh, IsNil) |
418 | - c.Check(len(s.sess.MsgCh), Equals, 0) |
419 | + c.Check(len(s.sess.BroadcastCh), Equals, 0) |
420 | } |
421 | |
422 | func (s *msgSuite) TestHandleBroadcastWrongBrokenLevelmap(c *C) { |
423 | @@ -770,7 +771,7 @@ |
424 | // start returns with error |
425 | c.Check(<-s.errCh, Not(Equals), nil) |
426 | // no message sent out |
427 | - c.Check(len(s.sess.MsgCh), Equals, 0) |
428 | + c.Check(len(s.sess.BroadcastCh), Equals, 0) |
429 | // and nak'ed it |
430 | c.Check(len(s.downCh), Equals, 1) |
431 | c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"nak"}) |
432 | @@ -803,6 +804,59 @@ |
433 | } |
434 | |
435 | /**************************************************************** |
436 | + handleNotifications() tests |
437 | +****************************************************************/ |
438 | + |
439 | +func (s *msgSuite) TestHandleNotificationsWorks(c *C) { |
440 | + s.sess.setShouldDelay() |
441 | + n1 := protocol.Notification{ |
442 | + AppId: "app1", |
443 | + MsgId: "a", |
444 | + Payload: json.RawMessage(`{"m": 1}`), |
445 | + } |
446 | + n2 := protocol.Notification{ |
447 | + AppId: "app2", |
448 | + MsgId: "b", |
449 | + Payload: json.RawMessage(`{"m": 2}`), |
450 | + } |
451 | + msg := serverMsg{"notifications", |
452 | + protocol.BroadcastMsg{}, |
453 | + protocol.NotificationsMsg{ |
454 | + Notifications: []protocol.Notification{n1, n2}, |
455 | + }, protocol.ConnBrokenMsg{}} |
456 | + go func() { s.errCh <- s.sess.handleNotifications(&msg) }() |
457 | + c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
458 | + s.upCh <- nil // ack ok |
459 | + c.Check(<-s.errCh, Equals, nil) |
460 | + c.Check(s.sess.ShouldDelay(), Equals, false) |
461 | + c.Assert(len(s.sess.NotificationsCh), Equals, 2) |
462 | + c.Check(<-s.sess.NotificationsCh, DeepEquals, &n1) |
463 | + c.Check(<-s.sess.NotificationsCh, DeepEquals, &n2) |
464 | +} |
465 | + |
466 | +func (s *msgSuite) TestHandleNotificationsBadAckWrite(c *C) { |
467 | + s.sess.setShouldDelay() |
468 | + n1 := protocol.Notification{ |
469 | + AppId: "app1", |
470 | + MsgId: "a", |
471 | + Payload: json.RawMessage(`{"m": 1}`), |
472 | + } |
473 | + msg := serverMsg{"notifications", |
474 | + protocol.BroadcastMsg{}, |
475 | + protocol.NotificationsMsg{ |
476 | + Notifications: []protocol.Notification{n1}, |
477 | + }, protocol.ConnBrokenMsg{}} |
478 | + go func() { s.errCh <- s.sess.handleNotifications(&msg) }() |
479 | + c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
480 | + failure := errors.New("ACK ACK ACK") |
481 | + s.upCh <- failure |
482 | + c.Assert(<-s.errCh, Equals, failure) |
483 | + c.Check(s.sess.State(), Equals, Error) |
484 | + // didn't get to clear |
485 | + c.Check(s.sess.ShouldDelay(), Equals, true) |
486 | +} |
487 | + |
488 | +/**************************************************************** |
489 | handleConnBroken() tests |
490 | ****************************************************************/ |
491 | |
492 | @@ -897,6 +951,26 @@ |
493 | c.Check(<-s.errCh, Equals, failure) |
494 | } |
495 | |
496 | +func (s *loopSuite) TestLoopNotifications(c *C) { |
497 | + c.Check(s.sess.State(), Equals, Running) |
498 | + |
499 | + n1 := protocol.Notification{ |
500 | + AppId: "app1", |
501 | + MsgId: "a", |
502 | + Payload: json.RawMessage(`{"m": 1}`), |
503 | + } |
504 | + msg := &protocol.NotificationsMsg{ |
505 | + Type: "notifications", |
506 | + Notifications: []protocol.Notification{n1}, |
507 | + } |
508 | + c.Check(takeNext(s.downCh), Equals, "deadline 1ms") |
509 | + s.upCh <- msg |
510 | + c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
511 | + failure := errors.New("ack") |
512 | + s.upCh <- failure |
513 | + c.Check(<-s.errCh, Equals, failure) |
514 | +} |
515 | + |
516 | func (s *loopSuite) TestLoopConnBroken(c *C) { |
517 | c.Check(s.sess.State(), Equals, Running) |
518 | broken := protocol.ConnBrokenMsg{ |
519 | @@ -1155,24 +1229,24 @@ |
520 | func (cs *clientSessionSuite) TestRunRunsEvenIfLoopFails(c *C) { |
521 | sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
522 | c.Assert(err, IsNil) |
523 | - // just to make a point: until here we haven't set ErrCh & MsgCh (no |
524 | + // just to make a point: until here we haven't set ErrCh & BroadcastCh (no |
525 | // biggie if this stops being true) |
526 | c.Check(sess.ErrCh, IsNil) |
527 | - c.Check(sess.MsgCh, IsNil) |
528 | + c.Check(sess.BroadcastCh, IsNil) |
529 | failureCh := make(chan error) // must be unbuffered |
530 | - notf := &Notification{} |
531 | + notf := &BroadcastNotification{} |
532 | err = sess.run( |
533 | func() {}, |
534 | func() error { return nil }, |
535 | func() error { return nil }, |
536 | func() error { return nil }, |
537 | func() error { return nil }, |
538 | - func() error { sess.MsgCh <- notf; return <-failureCh }) |
539 | + func() error { sess.BroadcastCh <- notf; return <-failureCh }) |
540 | c.Check(err, Equals, nil) |
541 | // if run doesn't error it sets up the channels |
542 | c.Assert(sess.ErrCh, NotNil) |
543 | - c.Assert(sess.MsgCh, NotNil) |
544 | - c.Check(<-sess.MsgCh, Equals, notf) |
545 | + c.Assert(sess.BroadcastCh, NotNil) |
546 | + c.Check(<-sess.BroadcastCh, Equals, notf) |
547 | failure := errors.New("TestRunRunsEvenIfLoopFails") |
548 | failureCh <- failure |
549 | c.Check(<-sess.ErrCh, Equals, failure) |
550 | @@ -1375,7 +1449,7 @@ |
551 | c.Check(takeNext(downCh), Equals, protocol.AckMsg{"ack"}) |
552 | upCh <- nil |
553 | // ...get bubbled up, |
554 | - c.Check(<-sess.MsgCh, NotNil) |
555 | + c.Check(<-sess.BroadcastCh, NotNil) |
556 | // and their TopLevel remembered |
557 | levels, err := sess.Levels.GetAll() |
558 | c.Check(err, IsNil) |
Lovely.