Merge lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg into lp:ubuntu-push/automatic
- unicast-feed-pending-and-reorg
- Merge into automatic
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 156 |
Merged at revision: | 156 |
Proposed branch: | lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg |
Merge into: | lp:ubuntu-push/automatic |
Diff against target: |
495 lines (+208/-73) 8 files modified
server/acceptance/suites/unicast.go (+21/-0) server/broker/broker.go (+4/-0) server/broker/exchanges.go (+38/-2) server/broker/exchanges_test.go (+93/-3) server/broker/simple/simple.go (+10/-26) server/broker/simple/simple_test.go (+3/-39) server/broker/testing/impls.go (+8/-0) server/broker/testsuite/suite.go (+31/-3) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email: mp+219710@code.launchpad.net |
Commit message
support feeding pending unicasts at session start, together with/through some related reorg
Description of the change
support feeding pending unicasts at session start, together with/through some related reorg
To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'server/acceptance/suites/unicast.go' |
2 | --- server/acceptance/suites/unicast.go 2014-05-02 11:20:55 +0000 |
3 | +++ server/acceptance/suites/unicast.go 2014-05-15 15:14:22 +0000 |
4 | @@ -94,3 +94,24 @@ |
5 | c.Check(len(errCh1), Equals, 0) |
6 | c.Check(len(errCh2), Equals, 0) |
7 | } |
8 | + |
9 | +func (s *UnicastAcceptanceSuite) TestUnicastPending(c *C) { |
10 | + // send unicast that will be pending |
11 | + userId, auth := s.associatedAuth("DEV1") |
12 | + got, err := s.PostRequest("/notify", &api.Unicast{ |
13 | + UserId: userId, |
14 | + DeviceId: "DEV1", |
15 | + AppId: "app1", |
16 | + ExpireOn: future, |
17 | + Data: json.RawMessage(`{"a": 42}`), |
18 | + }) |
19 | + c.Assert(err, IsNil) |
20 | + c.Assert(got, Matches, ".*ok.*") |
21 | + |
22 | + // get pending on connect |
23 | + events, errCh, stop := s.StartClientAuth(c, "DEV1", nil, auth) |
24 | + c.Check(NextEvent(events, errCh), Equals, `unicast app:app1 payload:{"a":42};`) |
25 | + stop() |
26 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
27 | + c.Check(len(errCh), Equals, 0) |
28 | +} |
29 | |
30 | === modified file 'server/broker/broker.go' |
31 | --- server/broker/broker.go 2014-05-01 18:56:07 +0000 |
32 | +++ server/broker/broker.go 2014-05-15 15:14:22 +0000 |
33 | @@ -87,6 +87,10 @@ |
34 | Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) |
35 | // DropByMsgId drops notifications from the channel chanId by message id. |
36 | DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error |
37 | + // Feed feeds exchange into the session. |
38 | + Feed(Exchange) |
39 | + // InternalChannelId() returns the channel id corresponding to the session. |
40 | + InternalChannelId() store.InternalChannelId |
41 | } |
42 | |
43 | // Session aborted error. |
44 | |
45 | === modified file 'server/broker/exchanges.go' |
46 | --- server/broker/exchanges.go 2014-05-01 18:56:07 +0000 |
47 | +++ server/broker/exchanges.go 2014-05-15 15:14:22 +0000 |
48 | @@ -19,6 +19,7 @@ |
49 | import ( |
50 | "encoding/json" |
51 | "fmt" |
52 | + "time" |
53 | |
54 | "launchpad.net/ubuntu-push/protocol" |
55 | "launchpad.net/ubuntu-push/server/store" |
56 | @@ -33,6 +34,10 @@ |
57 | ackMsg protocol.AckMsg |
58 | } |
59 | |
60 | +type BaseExchange struct { |
61 | + Timestamp time.Time |
62 | +} |
63 | + |
64 | // BroadcastExchange leads a session through delivering a BROADCAST. |
65 | // For simplicity it is fully public. |
66 | type BroadcastExchange struct { |
67 | @@ -40,6 +45,7 @@ |
68 | TopLevel int64 |
69 | Notifications []protocol.Notification |
70 | Decoded []map[string]interface{} |
71 | + BaseExchange |
72 | } |
73 | |
74 | // check interface already here |
75 | @@ -140,7 +146,9 @@ |
76 | // UnicastExchange leads a session through delivering a NOTIFICATIONS message. |
77 | // For simplicity it is fully public. |
78 | type UnicastExchange struct { |
79 | - ChanId store.InternalChannelId |
80 | + ChanId store.InternalChannelId |
81 | + CachedOk bool |
82 | + BaseExchange |
83 | } |
84 | |
85 | // check interface already here |
86 | @@ -148,10 +156,13 @@ |
87 | |
88 | // Prepare session for a NOTIFICATIONS. |
89 | func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
90 | - _, notifs, err := sess.Get(sue.ChanId, false) |
91 | + _, notifs, err := sess.Get(sue.ChanId, sue.CachedOk) |
92 | if err != nil { |
93 | return nil, nil, err |
94 | } |
95 | + if len(notifs) == 0 { |
96 | + return nil, nil, ErrNop |
97 | + } |
98 | scratchArea := sess.ExchangeScratchArea() |
99 | scratchArea.notificationsMsg.Reset() |
100 | scratchArea.notificationsMsg.Type = "notifications" |
101 | @@ -171,3 +182,28 @@ |
102 | } |
103 | return nil |
104 | } |
105 | + |
106 | +// FeedPending feeds exchanges covering pending notifications into the session. |
107 | +func FeedPending(sess BrokerSession) error { |
108 | + // find relevant channels, for now only system |
109 | + channels := []store.InternalChannelId{store.SystemInternalChannelId} |
110 | + for _, chanId := range channels { |
111 | + topLevel, notifications, err := sess.Get(chanId, true) |
112 | + if err != nil { |
113 | + // next broadcast will try again |
114 | + continue |
115 | + } |
116 | + clientLevel := sess.Levels()[chanId] |
117 | + if clientLevel != topLevel { |
118 | + broadcastExchg := &BroadcastExchange{ |
119 | + ChanId: chanId, |
120 | + TopLevel: topLevel, |
121 | + Notifications: notifications, |
122 | + } |
123 | + broadcastExchg.Init() |
124 | + sess.Feed(broadcastExchg) |
125 | + } |
126 | + } |
127 | + sess.Feed(&UnicastExchange{ChanId: sess.InternalChannelId(), CachedOk: true}) |
128 | + return nil |
129 | +} |
130 | |
131 | === modified file 'server/broker/exchanges_test.go' |
132 | --- server/broker/exchanges_test.go 2014-05-01 18:56:07 +0000 |
133 | +++ server/broker/exchanges_test.go 2014-05-15 15:14:22 +0000 |
134 | @@ -295,7 +295,7 @@ |
135 | return nil |
136 | }, |
137 | } |
138 | - exchg := &broker.UnicastExchange{chanId1} |
139 | + exchg := &broker.UnicastExchange{ChanId: chanId1, CachedOk: false} |
140 | outMsg, inMsg, err := exchg.Prepare(sess) |
141 | c.Assert(err, IsNil) |
142 | // check |
143 | @@ -311,7 +311,7 @@ |
144 | } |
145 | |
146 | func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) { |
147 | - notifs := []protocol.Notification{} |
148 | + notifs := []protocol.Notification{protocol.Notification{}} |
149 | dropped := make(chan []protocol.Notification, 2) |
150 | sess := &testing.TestBrokerSession{ |
151 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
152 | @@ -344,8 +344,22 @@ |
153 | c.Assert(err, Equals, fail) |
154 | } |
155 | |
156 | +func (s *exchangesSuite) TestUnicastExchangeCachedOkNop(c *C) { |
157 | + chanId1 := store.UnicastInternalChannelId("u1", "d1") |
158 | + sess := &testing.TestBrokerSession{ |
159 | + DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
160 | + c.Check(chanId, Equals, chanId1) |
161 | + c.Check(cachedOk, Equals, true) |
162 | + return 0, nil, nil |
163 | + }, |
164 | + } |
165 | + exchg := &broker.UnicastExchange{ChanId: chanId1, CachedOk: true} |
166 | + _, _, err := exchg.Prepare(sess) |
167 | + c.Assert(err, Equals, broker.ErrNop) |
168 | +} |
169 | + |
170 | func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) { |
171 | - notifs := []protocol.Notification{} |
172 | + notifs := []protocol.Notification{protocol.Notification{}} |
173 | fail := errors.New("fail") |
174 | sess := &testing.TestBrokerSession{ |
175 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
176 | @@ -363,3 +377,79 @@ |
177 | err = exchg.Acked(sess, true) |
178 | c.Assert(err, Equals, fail) |
179 | } |
180 | + |
181 | +func (s *exchangesSuite) TestFeedPending(c *C) { |
182 | + bcast1 := json.RawMessage(`{"m": "M"}`) |
183 | + decoded1 := map[string]interface{}{"m": "M"} |
184 | + sess := &testing.TestBrokerSession{ |
185 | + Exchanges: make(chan broker.Exchange, 5), |
186 | + DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
187 | + switch chanId { |
188 | + case store.SystemInternalChannelId: |
189 | + return 1, help.Ns(bcast1), nil |
190 | + default: |
191 | + return 0, nil, nil |
192 | + } |
193 | + }, |
194 | + } |
195 | + err := broker.FeedPending(sess) |
196 | + c.Assert(err, IsNil) |
197 | + c.Assert(len(sess.Exchanges), Equals, 2) |
198 | + exchg1 := <-sess.Exchanges |
199 | + c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{ |
200 | + ChanId: store.SystemInternalChannelId, |
201 | + TopLevel: 1, |
202 | + Notifications: help.Ns(bcast1), |
203 | + Decoded: []map[string]interface{}{decoded1}, |
204 | + }) |
205 | + exchg2 := <-sess.Exchanges |
206 | + c.Check(exchg2, DeepEquals, &broker.UnicastExchange{ |
207 | + ChanId: sess.InternalChannelId(), |
208 | + CachedOk: true, |
209 | + }) |
210 | +} |
211 | + |
212 | +func (s *exchangesSuite) TestFeedPendingSystemChanNop(c *C) { |
213 | + bcast1 := json.RawMessage(`{"m": "M"}`) |
214 | + sess := &testing.TestBrokerSession{ |
215 | + LevelsMap: map[store.InternalChannelId]int64{ |
216 | + store.SystemInternalChannelId: 1, |
217 | + }, |
218 | + Exchanges: make(chan broker.Exchange, 5), |
219 | + DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
220 | + switch chanId { |
221 | + case store.SystemInternalChannelId: |
222 | + return 1, help.Ns(bcast1), nil |
223 | + default: |
224 | + return 0, nil, nil |
225 | + } |
226 | + }, |
227 | + } |
228 | + err := broker.FeedPending(sess) |
229 | + c.Assert(err, IsNil) |
230 | + c.Check(len(sess.Exchanges), Equals, 1) |
231 | + exchg1 := <-sess.Exchanges |
232 | + c.Check(exchg1, FitsTypeOf, &broker.UnicastExchange{}) |
233 | +} |
234 | + |
235 | +func (s *exchangesSuite) TestFeedPendingSystemChanFail(c *C) { |
236 | + sess := &testing.TestBrokerSession{ |
237 | + LevelsMap: map[store.InternalChannelId]int64{ |
238 | + store.SystemInternalChannelId: 1, |
239 | + }, |
240 | + Exchanges: make(chan broker.Exchange, 5), |
241 | + DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
242 | + switch chanId { |
243 | + case store.SystemInternalChannelId: |
244 | + return 0, nil, errors.New("fail") |
245 | + default: |
246 | + return 0, nil, nil |
247 | + } |
248 | + }, |
249 | + } |
250 | + err := broker.FeedPending(sess) |
251 | + c.Assert(err, IsNil) |
252 | + c.Check(len(sess.Exchanges), Equals, 1) |
253 | + exchg1 := <-sess.Exchanges |
254 | + c.Check(exchg1, FitsTypeOf, &broker.UnicastExchange{}) |
255 | +} |
256 | |
257 | === modified file 'server/broker/simple/simple.go' |
258 | --- server/broker/simple/simple.go 2014-05-01 18:56:07 +0000 |
259 | +++ server/broker/simple/simple.go 2014-05-15 15:14:22 +0000 |
260 | @@ -103,6 +103,14 @@ |
261 | return sess.broker.drop(chanId, targets) |
262 | } |
263 | |
264 | +func (sess *simpleBrokerSession) Feed(exchg broker.Exchange) { |
265 | + sess.exchanges <- exchg |
266 | +} |
267 | + |
268 | +func (sess *simpleBrokerSession) InternalChannelId() store.InternalChannelId { |
269 | + return store.UnicastInternalChannelId(sess.deviceId, sess.deviceId) |
270 | +} |
271 | + |
272 | // NewSimpleBroker makes a new SimpleBroker. |
273 | func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker { |
274 | sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize()) |
275 | @@ -150,30 +158,6 @@ |
276 | return b.running |
277 | } |
278 | |
279 | -func (b *SimpleBroker) feedPending(sess *simpleBrokerSession) error { |
280 | - // find relevant channels, for now only system |
281 | - channels := []store.InternalChannelId{store.SystemInternalChannelId} |
282 | - for _, chanId := range channels { |
283 | - topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId) |
284 | - if err != nil { |
285 | - // next broadcast will try again |
286 | - b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err) |
287 | - continue |
288 | - } |
289 | - clientLevel := sess.levels[chanId] |
290 | - if clientLevel != topLevel { |
291 | - broadcastExchg := &broker.BroadcastExchange{ |
292 | - ChanId: chanId, |
293 | - TopLevel: topLevel, |
294 | - Notifications: notifications, |
295 | - } |
296 | - broadcastExchg.Init() |
297 | - sess.exchanges <- broadcastExchg |
298 | - } |
299 | - } |
300 | - return nil |
301 | -} |
302 | - |
303 | // Register registers a session with the broker. It feeds the session |
304 | // pending notifications as well. |
305 | func (b *SimpleBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) { |
306 | @@ -205,7 +189,7 @@ |
307 | } |
308 | b.sessionCh <- sess |
309 | <-sess.done |
310 | - err = b.feedPending(sess) |
311 | + err = broker.FeedPending(sess) |
312 | if err != nil { |
313 | return nil, err |
314 | } |
315 | @@ -281,7 +265,7 @@ |
316 | _, devId := chanId.UnicastUserAndDevice() |
317 | sess := b.registry[devId] |
318 | if sess != nil { |
319 | - sess.exchanges <- &broker.UnicastExchange{chanId} |
320 | + sess.exchanges <- &broker.UnicastExchange{ChanId: chanId, CachedOk: false} |
321 | } |
322 | } |
323 | } |
324 | |
325 | === modified file 'server/broker/simple/simple_test.go' |
326 | --- server/broker/simple/simple_test.go 2014-04-29 15:29:04 +0000 |
327 | +++ server/broker/simple/simple_test.go 2014-05-15 15:14:22 +0000 |
328 | @@ -17,16 +17,12 @@ |
329 | package simple |
330 | |
331 | import ( |
332 | - "encoding/json" |
333 | stdtesting "testing" |
334 | - "time" |
335 | |
336 | . "launchpad.net/gocheck" |
337 | |
338 | - "launchpad.net/ubuntu-push/server/broker" |
339 | "launchpad.net/ubuntu-push/server/broker/testing" |
340 | "launchpad.net/ubuntu-push/server/store" |
341 | - help "launchpad.net/ubuntu-push/testing" |
342 | ) |
343 | |
344 | func TestSimple(t *stdtesting.T) { TestingT(t) } |
345 | @@ -45,39 +41,7 @@ |
346 | c.Check(b.sto, Equals, sto) |
347 | } |
348 | |
349 | -func (s *simpleSuite) TestFeedPending(c *C) { |
350 | - sto := store.NewInMemoryPendingStore() |
351 | - muchLater := time.Now().Add(10 * time.Minute) |
352 | - notification1 := json.RawMessage(`{"m": "M"}`) |
353 | - decoded1 := map[string]interface{}{"m": "M"} |
354 | - sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) |
355 | - b := NewSimpleBroker(sto, testBrokerConfig, nil) |
356 | - sess := &simpleBrokerSession{ |
357 | - exchanges: make(chan broker.Exchange, 1), |
358 | - } |
359 | - b.feedPending(sess) |
360 | - c.Assert(len(sess.exchanges), Equals, 1) |
361 | - exchg1 := <-sess.exchanges |
362 | - c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{ |
363 | - ChanId: store.SystemInternalChannelId, |
364 | - TopLevel: 1, |
365 | - Notifications: help.Ns(notification1), |
366 | - Decoded: []map[string]interface{}{decoded1}, |
367 | - }) |
368 | -} |
369 | - |
370 | -func (s *simpleSuite) TestFeedPendingNop(c *C) { |
371 | - sto := store.NewInMemoryPendingStore() |
372 | - muchLater := time.Now().Add(10 * time.Minute) |
373 | - notification1 := json.RawMessage(`{"m": "M"}`) |
374 | - sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) |
375 | - b := NewSimpleBroker(sto, testBrokerConfig, nil) |
376 | - sess := &simpleBrokerSession{ |
377 | - exchanges: make(chan broker.Exchange, 1), |
378 | - levels: map[store.InternalChannelId]int64{ |
379 | - store.SystemInternalChannelId: 1, |
380 | - }, |
381 | - } |
382 | - b.feedPending(sess) |
383 | - c.Assert(len(sess.exchanges), Equals, 0) |
384 | +func (s *simpleSuite) TestSessionInternalChannelId(c *C) { |
385 | + sess := &simpleBrokerSession{deviceId: "dev21"} |
386 | + c.Check(sess.InternalChannelId(), Equals, store.UnicastInternalChannelId("dev21", "dev21")) |
387 | } |
388 | |
389 | === modified file 'server/broker/testing/impls.go' |
390 | --- server/broker/testing/impls.go 2014-05-01 18:56:07 +0000 |
391 | +++ server/broker/testing/impls.go 2014-05-15 15:14:22 +0000 |
392 | @@ -68,6 +68,14 @@ |
393 | return tbs.DoDropByMsgId(chanId, targets) |
394 | } |
395 | |
396 | +func (tbs *TestBrokerSession) Feed(exchg broker.Exchange) { |
397 | + tbs.Exchanges <- exchg |
398 | +} |
399 | + |
400 | +func (tbs *TestBrokerSession) InternalChannelId() store.InternalChannelId { |
401 | + return store.UnicastInternalChannelId(tbs.DeviceId, tbs.DeviceId) |
402 | +} |
403 | + |
404 | // Test implementation of BrokerConfig. |
405 | type TestBrokerConfig struct { |
406 | ConfigSessionQueueSize uint |
407 | |
408 | === modified file 'server/broker/testsuite/suite.go' |
409 | --- server/broker/testsuite/suite.go 2014-05-01 18:56:07 +0000 |
410 | +++ server/broker/testsuite/suite.go 2014-05-15 15:14:22 +0000 |
411 | @@ -143,7 +143,7 @@ |
412 | defer b.Stop() |
413 | sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
414 | c.Assert(err, IsNil) |
415 | - c.Check(len(sess.SessionChannel()), Equals, 1) |
416 | + c.Check(len(sess.SessionChannel()), Equals, 2) |
417 | } |
418 | |
419 | func (s *CommonBrokerSuite) TestRegistrationFeedPendingError(c *C) { |
420 | @@ -154,7 +154,12 @@ |
421 | _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
422 | c.Assert(err, IsNil) |
423 | // but |
424 | - c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful feed pending, get channel snapshot for 0: get channel snapshot fail\n") |
425 | + c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, get channel snapshot for 0 \\(cachedOk=true\\): get channel snapshot fail\n") |
426 | +} |
427 | + |
428 | +func clearOfPending(c *C, sess broker.BrokerSession) { |
429 | + c.Assert(len(sess.SessionChannel()) >= 1, Equals, true) |
430 | + <-sess.SessionChannel() |
431 | } |
432 | |
433 | func (s *CommonBrokerSuite) TestRegistrationLastWins(c *C) { |
434 | @@ -164,6 +169,7 @@ |
435 | defer b.Stop() |
436 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
437 | c.Assert(err, IsNil) |
438 | + clearOfPending(c, sess1) |
439 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s2") |
440 | c.Assert(err, IsNil) |
441 | // previous session got signaled by sending nil on its channel |
442 | @@ -194,8 +200,10 @@ |
443 | defer b.Stop() |
444 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
445 | c.Assert(err, IsNil) |
446 | + clearOfPending(c, sess1) |
447 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}, "s2") |
448 | c.Assert(err, IsNil) |
449 | + clearOfPending(c, sess2) |
450 | // add notification to channel *after* the registrations |
451 | muchLater := time.Now().Add(10 * time.Minute) |
452 | sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) |
453 | @@ -250,8 +258,9 @@ |
454 | b := s.MakeBroker(sto, testBrokerConfig, s.testlog) |
455 | b.Start() |
456 | defer b.Stop() |
457 | - _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
458 | + sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
459 | c.Assert(err, IsNil) |
460 | + clearOfPending(c, sess) |
461 | b.Broadcast(store.SystemInternalChannelId) |
462 | select { |
463 | case <-time.After(5 * time.Second): |
464 | @@ -272,8 +281,10 @@ |
465 | defer b.Stop() |
466 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1") |
467 | c.Assert(err, IsNil) |
468 | + clearOfPending(c, sess1) |
469 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2") |
470 | c.Assert(err, IsNil) |
471 | + clearOfPending(c, sess2) |
472 | // add notification to channel *after* the registrations |
473 | muchLater := time.Now().Add(10 * time.Minute) |
474 | sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater) |
475 | @@ -338,3 +349,20 @@ |
476 | c.Assert(err, ErrorMatches, "drop fail") |
477 | c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n") |
478 | } |
479 | + |
480 | +func (s *CommonBrokerSuite) TestSessionFeed(c *C) { |
481 | + sto := store.NewInMemoryPendingStore() |
482 | + b := s.MakeBroker(sto, testBrokerConfig, nil) |
483 | + b.Start() |
484 | + defer b.Stop() |
485 | + sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1") |
486 | + c.Assert(err, IsNil) |
487 | + clearOfPending(c, sess1) |
488 | + bcast := &broker.BroadcastExchange{ChanId: store.SystemInternalChannelId, TopLevel: 99} |
489 | + sess1.Feed(bcast) |
490 | + c.Check(s.RevealBroadcastExchange(<-sess1.SessionChannel()), DeepEquals, bcast) |
491 | + |
492 | + ucast := &broker.UnicastExchange{ChanId: store.UnicastInternalChannelId("dev21", "dev21"), CachedOk: true} |
493 | + sess1.Feed(ucast) |
494 | + c.Check(s.RevealUnicastExchange(<-sess1.SessionChannel()), DeepEquals, ucast) |
495 | +} |