Merge lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg into lp:ubuntu-push/automatic

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 156
Merged at revision: 156
Proposed branch: lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg
Merge into: lp:ubuntu-push/automatic
Diff against target: 495 lines (+208/-73)
8 files modified
server/acceptance/suites/unicast.go (+21/-0)
server/broker/broker.go (+4/-0)
server/broker/exchanges.go (+38/-2)
server/broker/exchanges_test.go (+93/-3)
server/broker/simple/simple.go (+10/-26)
server/broker/simple/simple_test.go (+3/-39)
server/broker/testing/impls.go (+8/-0)
server/broker/testsuite/suite.go (+31/-3)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+219710@code.launchpad.net

Commit message

support feeding pending unicasts at session start, together with/through some related reorg

Description of the change

support feeding pending unicasts at session start, together with/through some related reorg

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'server/acceptance/suites/unicast.go'
--- server/acceptance/suites/unicast.go 2014-05-02 11:20:55 +0000
+++ server/acceptance/suites/unicast.go 2014-05-15 15:14:22 +0000
@@ -94,3 +94,24 @@
94 c.Check(len(errCh1), Equals, 0)94 c.Check(len(errCh1), Equals, 0)
95 c.Check(len(errCh2), Equals, 0)95 c.Check(len(errCh2), Equals, 0)
96}96}
97
98func (s *UnicastAcceptanceSuite) TestUnicastPending(c *C) {
99 // send unicast that will be pending
100 userId, auth := s.associatedAuth("DEV1")
101 got, err := s.PostRequest("/notify", &api.Unicast{
102 UserId: userId,
103 DeviceId: "DEV1",
104 AppId: "app1",
105 ExpireOn: future,
106 Data: json.RawMessage(`{"a": 42}`),
107 })
108 c.Assert(err, IsNil)
109 c.Assert(got, Matches, ".*ok.*")
110
111 // get pending on connect
112 events, errCh, stop := s.StartClientAuth(c, "DEV1", nil, auth)
113 c.Check(NextEvent(events, errCh), Equals, `unicast app:app1 payload:{"a":42};`)
114 stop()
115 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
116 c.Check(len(errCh), Equals, 0)
117}
97118
=== modified file 'server/broker/broker.go'
--- server/broker/broker.go 2014-05-01 18:56:07 +0000
+++ server/broker/broker.go 2014-05-15 15:14:22 +0000
@@ -87,6 +87,10 @@
87 Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error)87 Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error)
88 // DropByMsgId drops notifications from the channel chanId by message id.88 // DropByMsgId drops notifications from the channel chanId by message id.
89 DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error89 DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error
90 // Feed feeds exchange into the session.
91 Feed(Exchange)
92 // InternalChannelId() returns the channel id corresponding to the session.
93 InternalChannelId() store.InternalChannelId
90}94}
9195
92// Session aborted error.96// Session aborted error.
9397
=== modified file 'server/broker/exchanges.go'
--- server/broker/exchanges.go 2014-05-01 18:56:07 +0000
+++ server/broker/exchanges.go 2014-05-15 15:14:22 +0000
@@ -19,6 +19,7 @@
19import (19import (
20 "encoding/json"20 "encoding/json"
21 "fmt"21 "fmt"
22 "time"
2223
23 "launchpad.net/ubuntu-push/protocol"24 "launchpad.net/ubuntu-push/protocol"
24 "launchpad.net/ubuntu-push/server/store"25 "launchpad.net/ubuntu-push/server/store"
@@ -33,6 +34,10 @@
33 ackMsg protocol.AckMsg34 ackMsg protocol.AckMsg
34}35}
3536
37type BaseExchange struct {
38 Timestamp time.Time
39}
40
36// BroadcastExchange leads a session through delivering a BROADCAST.41// BroadcastExchange leads a session through delivering a BROADCAST.
37// For simplicity it is fully public.42// For simplicity it is fully public.
38type BroadcastExchange struct {43type BroadcastExchange struct {
@@ -40,6 +45,7 @@
40 TopLevel int6445 TopLevel int64
41 Notifications []protocol.Notification46 Notifications []protocol.Notification
42 Decoded []map[string]interface{}47 Decoded []map[string]interface{}
48 BaseExchange
43}49}
4450
45// check interface already here51// check interface already here
@@ -140,7 +146,9 @@
140// UnicastExchange leads a session through delivering a NOTIFICATIONS message.146// UnicastExchange leads a session through delivering a NOTIFICATIONS message.
141// For simplicity it is fully public.147// For simplicity it is fully public.
142type UnicastExchange struct {148type UnicastExchange struct {
143 ChanId store.InternalChannelId149 ChanId store.InternalChannelId
150 CachedOk bool
151 BaseExchange
144}152}
145153
146// check interface already here154// check interface already here
@@ -148,10 +156,13 @@
148156
149// Prepare session for a NOTIFICATIONS.157// Prepare session for a NOTIFICATIONS.
150func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {158func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
151 _, notifs, err := sess.Get(sue.ChanId, false)159 _, notifs, err := sess.Get(sue.ChanId, sue.CachedOk)
152 if err != nil {160 if err != nil {
153 return nil, nil, err161 return nil, nil, err
154 }162 }
163 if len(notifs) == 0 {
164 return nil, nil, ErrNop
165 }
155 scratchArea := sess.ExchangeScratchArea()166 scratchArea := sess.ExchangeScratchArea()
156 scratchArea.notificationsMsg.Reset()167 scratchArea.notificationsMsg.Reset()
157 scratchArea.notificationsMsg.Type = "notifications"168 scratchArea.notificationsMsg.Type = "notifications"
@@ -171,3 +182,28 @@
171 }182 }
172 return nil183 return nil
173}184}
185
186// FeedPending feeds exchanges covering pending notifications into the session.
187func FeedPending(sess BrokerSession) error {
188 // find relevant channels, for now only system
189 channels := []store.InternalChannelId{store.SystemInternalChannelId}
190 for _, chanId := range channels {
191 topLevel, notifications, err := sess.Get(chanId, true)
192 if err != nil {
193 // next broadcast will try again
194 continue
195 }
196 clientLevel := sess.Levels()[chanId]
197 if clientLevel != topLevel {
198 broadcastExchg := &BroadcastExchange{
199 ChanId: chanId,
200 TopLevel: topLevel,
201 Notifications: notifications,
202 }
203 broadcastExchg.Init()
204 sess.Feed(broadcastExchg)
205 }
206 }
207 sess.Feed(&UnicastExchange{ChanId: sess.InternalChannelId(), CachedOk: true})
208 return nil
209}
174210
=== modified file 'server/broker/exchanges_test.go'
--- server/broker/exchanges_test.go 2014-05-01 18:56:07 +0000
+++ server/broker/exchanges_test.go 2014-05-15 15:14:22 +0000
@@ -295,7 +295,7 @@
295 return nil295 return nil
296 },296 },
297 }297 }
298 exchg := &broker.UnicastExchange{chanId1}298 exchg := &broker.UnicastExchange{ChanId: chanId1, CachedOk: false}
299 outMsg, inMsg, err := exchg.Prepare(sess)299 outMsg, inMsg, err := exchg.Prepare(sess)
300 c.Assert(err, IsNil)300 c.Assert(err, IsNil)
301 // check301 // check
@@ -311,7 +311,7 @@
311}311}
312312
313func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) {313func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) {
314 notifs := []protocol.Notification{}314 notifs := []protocol.Notification{protocol.Notification{}}
315 dropped := make(chan []protocol.Notification, 2)315 dropped := make(chan []protocol.Notification, 2)
316 sess := &testing.TestBrokerSession{316 sess := &testing.TestBrokerSession{
317 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {317 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
@@ -344,8 +344,22 @@
344 c.Assert(err, Equals, fail)344 c.Assert(err, Equals, fail)
345}345}
346346
347func (s *exchangesSuite) TestUnicastExchangeCachedOkNop(c *C) {
348 chanId1 := store.UnicastInternalChannelId("u1", "d1")
349 sess := &testing.TestBrokerSession{
350 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
351 c.Check(chanId, Equals, chanId1)
352 c.Check(cachedOk, Equals, true)
353 return 0, nil, nil
354 },
355 }
356 exchg := &broker.UnicastExchange{ChanId: chanId1, CachedOk: true}
357 _, _, err := exchg.Prepare(sess)
358 c.Assert(err, Equals, broker.ErrNop)
359}
360
347func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) {361func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) {
348 notifs := []protocol.Notification{}362 notifs := []protocol.Notification{protocol.Notification{}}
349 fail := errors.New("fail")363 fail := errors.New("fail")
350 sess := &testing.TestBrokerSession{364 sess := &testing.TestBrokerSession{
351 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {365 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
@@ -363,3 +377,79 @@
363 err = exchg.Acked(sess, true)377 err = exchg.Acked(sess, true)
364 c.Assert(err, Equals, fail)378 c.Assert(err, Equals, fail)
365}379}
380
381func (s *exchangesSuite) TestFeedPending(c *C) {
382 bcast1 := json.RawMessage(`{"m": "M"}`)
383 decoded1 := map[string]interface{}{"m": "M"}
384 sess := &testing.TestBrokerSession{
385 Exchanges: make(chan broker.Exchange, 5),
386 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
387 switch chanId {
388 case store.SystemInternalChannelId:
389 return 1, help.Ns(bcast1), nil
390 default:
391 return 0, nil, nil
392 }
393 },
394 }
395 err := broker.FeedPending(sess)
396 c.Assert(err, IsNil)
397 c.Assert(len(sess.Exchanges), Equals, 2)
398 exchg1 := <-sess.Exchanges
399 c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{
400 ChanId: store.SystemInternalChannelId,
401 TopLevel: 1,
402 Notifications: help.Ns(bcast1),
403 Decoded: []map[string]interface{}{decoded1},
404 })
405 exchg2 := <-sess.Exchanges
406 c.Check(exchg2, DeepEquals, &broker.UnicastExchange{
407 ChanId: sess.InternalChannelId(),
408 CachedOk: true,
409 })
410}
411
412func (s *exchangesSuite) TestFeedPendingSystemChanNop(c *C) {
413 bcast1 := json.RawMessage(`{"m": "M"}`)
414 sess := &testing.TestBrokerSession{
415 LevelsMap: map[store.InternalChannelId]int64{
416 store.SystemInternalChannelId: 1,
417 },
418 Exchanges: make(chan broker.Exchange, 5),
419 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
420 switch chanId {
421 case store.SystemInternalChannelId:
422 return 1, help.Ns(bcast1), nil
423 default:
424 return 0, nil, nil
425 }
426 },
427 }
428 err := broker.FeedPending(sess)
429 c.Assert(err, IsNil)
430 c.Check(len(sess.Exchanges), Equals, 1)
431 exchg1 := <-sess.Exchanges
432 c.Check(exchg1, FitsTypeOf, &broker.UnicastExchange{})
433}
434
435func (s *exchangesSuite) TestFeedPendingSystemChanFail(c *C) {
436 sess := &testing.TestBrokerSession{
437 LevelsMap: map[store.InternalChannelId]int64{
438 store.SystemInternalChannelId: 1,
439 },
440 Exchanges: make(chan broker.Exchange, 5),
441 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
442 switch chanId {
443 case store.SystemInternalChannelId:
444 return 0, nil, errors.New("fail")
445 default:
446 return 0, nil, nil
447 }
448 },
449 }
450 err := broker.FeedPending(sess)
451 c.Assert(err, IsNil)
452 c.Check(len(sess.Exchanges), Equals, 1)
453 exchg1 := <-sess.Exchanges
454 c.Check(exchg1, FitsTypeOf, &broker.UnicastExchange{})
455}
366456
=== modified file 'server/broker/simple/simple.go'
--- server/broker/simple/simple.go 2014-05-01 18:56:07 +0000
+++ server/broker/simple/simple.go 2014-05-15 15:14:22 +0000
@@ -103,6 +103,14 @@
103 return sess.broker.drop(chanId, targets)103 return sess.broker.drop(chanId, targets)
104}104}
105105
106func (sess *simpleBrokerSession) Feed(exchg broker.Exchange) {
107 sess.exchanges <- exchg
108}
109
110func (sess *simpleBrokerSession) InternalChannelId() store.InternalChannelId {
111 return store.UnicastInternalChannelId(sess.deviceId, sess.deviceId)
112}
113
106// NewSimpleBroker makes a new SimpleBroker.114// NewSimpleBroker makes a new SimpleBroker.
107func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker {115func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker {
108 sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize())116 sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize())
@@ -150,30 +158,6 @@
150 return b.running158 return b.running
151}159}
152160
153func (b *SimpleBroker) feedPending(sess *simpleBrokerSession) error {
154 // find relevant channels, for now only system
155 channels := []store.InternalChannelId{store.SystemInternalChannelId}
156 for _, chanId := range channels {
157 topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId)
158 if err != nil {
159 // next broadcast will try again
160 b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err)
161 continue
162 }
163 clientLevel := sess.levels[chanId]
164 if clientLevel != topLevel {
165 broadcastExchg := &broker.BroadcastExchange{
166 ChanId: chanId,
167 TopLevel: topLevel,
168 Notifications: notifications,
169 }
170 broadcastExchg.Init()
171 sess.exchanges <- broadcastExchg
172 }
173 }
174 return nil
175}
176
177// Register registers a session with the broker. It feeds the session161// Register registers a session with the broker. It feeds the session
178// pending notifications as well.162// pending notifications as well.
179func (b *SimpleBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) {163func (b *SimpleBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) {
@@ -205,7 +189,7 @@
205 }189 }
206 b.sessionCh <- sess190 b.sessionCh <- sess
207 <-sess.done191 <-sess.done
208 err = b.feedPending(sess)192 err = broker.FeedPending(sess)
209 if err != nil {193 if err != nil {
210 return nil, err194 return nil, err
211 }195 }
@@ -281,7 +265,7 @@
281 _, devId := chanId.UnicastUserAndDevice()265 _, devId := chanId.UnicastUserAndDevice()
282 sess := b.registry[devId]266 sess := b.registry[devId]
283 if sess != nil {267 if sess != nil {
284 sess.exchanges <- &broker.UnicastExchange{chanId}268 sess.exchanges <- &broker.UnicastExchange{ChanId: chanId, CachedOk: false}
285 }269 }
286 }270 }
287 }271 }
288272
=== modified file 'server/broker/simple/simple_test.go'
--- server/broker/simple/simple_test.go 2014-04-29 15:29:04 +0000
+++ server/broker/simple/simple_test.go 2014-05-15 15:14:22 +0000
@@ -17,16 +17,12 @@
17package simple17package simple
1818
19import (19import (
20 "encoding/json"
21 stdtesting "testing"20 stdtesting "testing"
22 "time"
2321
24 . "launchpad.net/gocheck"22 . "launchpad.net/gocheck"
2523
26 "launchpad.net/ubuntu-push/server/broker"
27 "launchpad.net/ubuntu-push/server/broker/testing"24 "launchpad.net/ubuntu-push/server/broker/testing"
28 "launchpad.net/ubuntu-push/server/store"25 "launchpad.net/ubuntu-push/server/store"
29 help "launchpad.net/ubuntu-push/testing"
30)26)
3127
32func TestSimple(t *stdtesting.T) { TestingT(t) }28func TestSimple(t *stdtesting.T) { TestingT(t) }
@@ -45,39 +41,7 @@
45 c.Check(b.sto, Equals, sto)41 c.Check(b.sto, Equals, sto)
46}42}
4743
48func (s *simpleSuite) TestFeedPending(c *C) {44func (s *simpleSuite) TestSessionInternalChannelId(c *C) {
49 sto := store.NewInMemoryPendingStore()45 sess := &simpleBrokerSession{deviceId: "dev21"}
50 muchLater := time.Now().Add(10 * time.Minute)46 c.Check(sess.InternalChannelId(), Equals, store.UnicastInternalChannelId("dev21", "dev21"))
51 notification1 := json.RawMessage(`{"m": "M"}`)
52 decoded1 := map[string]interface{}{"m": "M"}
53 sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
54 b := NewSimpleBroker(sto, testBrokerConfig, nil)
55 sess := &simpleBrokerSession{
56 exchanges: make(chan broker.Exchange, 1),
57 }
58 b.feedPending(sess)
59 c.Assert(len(sess.exchanges), Equals, 1)
60 exchg1 := <-sess.exchanges
61 c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{
62 ChanId: store.SystemInternalChannelId,
63 TopLevel: 1,
64 Notifications: help.Ns(notification1),
65 Decoded: []map[string]interface{}{decoded1},
66 })
67}
68
69func (s *simpleSuite) TestFeedPendingNop(c *C) {
70 sto := store.NewInMemoryPendingStore()
71 muchLater := time.Now().Add(10 * time.Minute)
72 notification1 := json.RawMessage(`{"m": "M"}`)
73 sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
74 b := NewSimpleBroker(sto, testBrokerConfig, nil)
75 sess := &simpleBrokerSession{
76 exchanges: make(chan broker.Exchange, 1),
77 levels: map[store.InternalChannelId]int64{
78 store.SystemInternalChannelId: 1,
79 },
80 }
81 b.feedPending(sess)
82 c.Assert(len(sess.exchanges), Equals, 0)
83}47}
8448
=== modified file 'server/broker/testing/impls.go'
--- server/broker/testing/impls.go 2014-05-01 18:56:07 +0000
+++ server/broker/testing/impls.go 2014-05-15 15:14:22 +0000
@@ -68,6 +68,14 @@
68 return tbs.DoDropByMsgId(chanId, targets)68 return tbs.DoDropByMsgId(chanId, targets)
69}69}
7070
71func (tbs *TestBrokerSession) Feed(exchg broker.Exchange) {
72 tbs.Exchanges <- exchg
73}
74
75func (tbs *TestBrokerSession) InternalChannelId() store.InternalChannelId {
76 return store.UnicastInternalChannelId(tbs.DeviceId, tbs.DeviceId)
77}
78
71// Test implementation of BrokerConfig.79// Test implementation of BrokerConfig.
72type TestBrokerConfig struct {80type TestBrokerConfig struct {
73 ConfigSessionQueueSize uint81 ConfigSessionQueueSize uint
7482
=== modified file 'server/broker/testsuite/suite.go'
--- server/broker/testsuite/suite.go 2014-05-01 18:56:07 +0000
+++ server/broker/testsuite/suite.go 2014-05-15 15:14:22 +0000
@@ -143,7 +143,7 @@
143 defer b.Stop()143 defer b.Stop()
144 sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")144 sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
145 c.Assert(err, IsNil)145 c.Assert(err, IsNil)
146 c.Check(len(sess.SessionChannel()), Equals, 1)146 c.Check(len(sess.SessionChannel()), Equals, 2)
147}147}
148148
149func (s *CommonBrokerSuite) TestRegistrationFeedPendingError(c *C) {149func (s *CommonBrokerSuite) TestRegistrationFeedPendingError(c *C) {
@@ -154,7 +154,12 @@
154 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")154 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
155 c.Assert(err, IsNil)155 c.Assert(err, IsNil)
156 // but156 // but
157 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful feed pending, get channel snapshot for 0: get channel snapshot fail\n")157 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, get channel snapshot for 0 \\(cachedOk=true\\): get channel snapshot fail\n")
158}
159
160func clearOfPending(c *C, sess broker.BrokerSession) {
161 c.Assert(len(sess.SessionChannel()) >= 1, Equals, true)
162 <-sess.SessionChannel()
158}163}
159164
160func (s *CommonBrokerSuite) TestRegistrationLastWins(c *C) {165func (s *CommonBrokerSuite) TestRegistrationLastWins(c *C) {
@@ -164,6 +169,7 @@
164 defer b.Stop()169 defer b.Stop()
165 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")170 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
166 c.Assert(err, IsNil)171 c.Assert(err, IsNil)
172 clearOfPending(c, sess1)
167 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s2")173 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s2")
168 c.Assert(err, IsNil)174 c.Assert(err, IsNil)
169 // previous session got signaled by sending nil on its channel175 // previous session got signaled by sending nil on its channel
@@ -194,8 +200,10 @@
194 defer b.Stop()200 defer b.Stop()
195 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")201 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
196 c.Assert(err, IsNil)202 c.Assert(err, IsNil)
203 clearOfPending(c, sess1)
197 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}, "s2")204 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}, "s2")
198 c.Assert(err, IsNil)205 c.Assert(err, IsNil)
206 clearOfPending(c, sess2)
199 // add notification to channel *after* the registrations207 // add notification to channel *after* the registrations
200 muchLater := time.Now().Add(10 * time.Minute)208 muchLater := time.Now().Add(10 * time.Minute)
201 sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)209 sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
@@ -250,8 +258,9 @@
250 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)258 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)
251 b.Start()259 b.Start()
252 defer b.Stop()260 defer b.Stop()
253 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")261 sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
254 c.Assert(err, IsNil)262 c.Assert(err, IsNil)
263 clearOfPending(c, sess)
255 b.Broadcast(store.SystemInternalChannelId)264 b.Broadcast(store.SystemInternalChannelId)
256 select {265 select {
257 case <-time.After(5 * time.Second):266 case <-time.After(5 * time.Second):
@@ -272,8 +281,10 @@
272 defer b.Stop()281 defer b.Stop()
273 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1")282 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1")
274 c.Assert(err, IsNil)283 c.Assert(err, IsNil)
284 clearOfPending(c, sess1)
275 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2")285 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2")
276 c.Assert(err, IsNil)286 c.Assert(err, IsNil)
287 clearOfPending(c, sess2)
277 // add notification to channel *after* the registrations288 // add notification to channel *after* the registrations
278 muchLater := time.Now().Add(10 * time.Minute)289 muchLater := time.Now().Add(10 * time.Minute)
279 sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater)290 sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater)
@@ -338,3 +349,20 @@
338 c.Assert(err, ErrorMatches, "drop fail")349 c.Assert(err, ErrorMatches, "drop fail")
339 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n")350 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n")
340}351}
352
353func (s *CommonBrokerSuite) TestSessionFeed(c *C) {
354 sto := store.NewInMemoryPendingStore()
355 b := s.MakeBroker(sto, testBrokerConfig, nil)
356 b.Start()
357 defer b.Stop()
358 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1")
359 c.Assert(err, IsNil)
360 clearOfPending(c, sess1)
361 bcast := &broker.BroadcastExchange{ChanId: store.SystemInternalChannelId, TopLevel: 99}
362 sess1.Feed(bcast)
363 c.Check(s.RevealBroadcastExchange(<-sess1.SessionChannel()), DeepEquals, bcast)
364
365 ucast := &broker.UnicastExchange{ChanId: store.UnicastInternalChannelId("dev21", "dev21"), CachedOk: true}
366 sess1.Feed(ucast)
367 c.Check(s.RevealUnicastExchange(<-sess1.SessionChannel()), DeepEquals, ucast)
368}

Subscribers

People subscribed via source and target branches