Merge lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg into lp:ubuntu-push/automatic

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 156
Merged at revision: 156
Proposed branch: lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg
Merge into: lp:ubuntu-push/automatic
Diff against target: 495 lines (+208/-73)
8 files modified
server/acceptance/suites/unicast.go (+21/-0)
server/broker/broker.go (+4/-0)
server/broker/exchanges.go (+38/-2)
server/broker/exchanges_test.go (+93/-3)
server/broker/simple/simple.go (+10/-26)
server/broker/simple/simple_test.go (+3/-39)
server/broker/testing/impls.go (+8/-0)
server/broker/testsuite/suite.go (+31/-3)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+219710@code.launchpad.net

Commit message

support feeding pending unicasts at session start, together with/through some related reorg

Description of the change

support feeding pending unicasts at session start, together with/through some related reorg

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'server/acceptance/suites/unicast.go'
2--- server/acceptance/suites/unicast.go 2014-05-02 11:20:55 +0000
3+++ server/acceptance/suites/unicast.go 2014-05-15 15:14:22 +0000
4@@ -94,3 +94,24 @@
5 c.Check(len(errCh1), Equals, 0)
6 c.Check(len(errCh2), Equals, 0)
7 }
8+
9+func (s *UnicastAcceptanceSuite) TestUnicastPending(c *C) {
10+ // send unicast that will be pending
11+ userId, auth := s.associatedAuth("DEV1")
12+ got, err := s.PostRequest("/notify", &api.Unicast{
13+ UserId: userId,
14+ DeviceId: "DEV1",
15+ AppId: "app1",
16+ ExpireOn: future,
17+ Data: json.RawMessage(`{"a": 42}`),
18+ })
19+ c.Assert(err, IsNil)
20+ c.Assert(got, Matches, ".*ok.*")
21+
22+ // get pending on connect
23+ events, errCh, stop := s.StartClientAuth(c, "DEV1", nil, auth)
24+ c.Check(NextEvent(events, errCh), Equals, `unicast app:app1 payload:{"a":42};`)
25+ stop()
26+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
27+ c.Check(len(errCh), Equals, 0)
28+}
29
30=== modified file 'server/broker/broker.go'
31--- server/broker/broker.go 2014-05-01 18:56:07 +0000
32+++ server/broker/broker.go 2014-05-15 15:14:22 +0000
33@@ -87,6 +87,10 @@
34 Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error)
35 // DropByMsgId drops notifications from the channel chanId by message id.
36 DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error
37+ // Feed feeds exchange into the session.
38+ Feed(Exchange)
39+ // InternalChannelId() returns the channel id corresponding to the session.
40+ InternalChannelId() store.InternalChannelId
41 }
42
43 // Session aborted error.
44
45=== modified file 'server/broker/exchanges.go'
46--- server/broker/exchanges.go 2014-05-01 18:56:07 +0000
47+++ server/broker/exchanges.go 2014-05-15 15:14:22 +0000
48@@ -19,6 +19,7 @@
49 import (
50 "encoding/json"
51 "fmt"
52+ "time"
53
54 "launchpad.net/ubuntu-push/protocol"
55 "launchpad.net/ubuntu-push/server/store"
56@@ -33,6 +34,10 @@
57 ackMsg protocol.AckMsg
58 }
59
60+type BaseExchange struct {
61+ Timestamp time.Time
62+}
63+
64 // BroadcastExchange leads a session through delivering a BROADCAST.
65 // For simplicity it is fully public.
66 type BroadcastExchange struct {
67@@ -40,6 +45,7 @@
68 TopLevel int64
69 Notifications []protocol.Notification
70 Decoded []map[string]interface{}
71+ BaseExchange
72 }
73
74 // check interface already here
75@@ -140,7 +146,9 @@
76 // UnicastExchange leads a session through delivering a NOTIFICATIONS message.
77 // For simplicity it is fully public.
78 type UnicastExchange struct {
79- ChanId store.InternalChannelId
80+ ChanId store.InternalChannelId
81+ CachedOk bool
82+ BaseExchange
83 }
84
85 // check interface already here
86@@ -148,10 +156,13 @@
87
88 // Prepare session for a NOTIFICATIONS.
89 func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
90- _, notifs, err := sess.Get(sue.ChanId, false)
91+ _, notifs, err := sess.Get(sue.ChanId, sue.CachedOk)
92 if err != nil {
93 return nil, nil, err
94 }
95+ if len(notifs) == 0 {
96+ return nil, nil, ErrNop
97+ }
98 scratchArea := sess.ExchangeScratchArea()
99 scratchArea.notificationsMsg.Reset()
100 scratchArea.notificationsMsg.Type = "notifications"
101@@ -171,3 +182,28 @@
102 }
103 return nil
104 }
105+
106+// FeedPending feeds exchanges covering pending notifications into the session.
107+func FeedPending(sess BrokerSession) error {
108+ // find relevant channels, for now only system
109+ channels := []store.InternalChannelId{store.SystemInternalChannelId}
110+ for _, chanId := range channels {
111+ topLevel, notifications, err := sess.Get(chanId, true)
112+ if err != nil {
113+ // next broadcast will try again
114+ continue
115+ }
116+ clientLevel := sess.Levels()[chanId]
117+ if clientLevel != topLevel {
118+ broadcastExchg := &BroadcastExchange{
119+ ChanId: chanId,
120+ TopLevel: topLevel,
121+ Notifications: notifications,
122+ }
123+ broadcastExchg.Init()
124+ sess.Feed(broadcastExchg)
125+ }
126+ }
127+ sess.Feed(&UnicastExchange{ChanId: sess.InternalChannelId(), CachedOk: true})
128+ return nil
129+}
130
131=== modified file 'server/broker/exchanges_test.go'
132--- server/broker/exchanges_test.go 2014-05-01 18:56:07 +0000
133+++ server/broker/exchanges_test.go 2014-05-15 15:14:22 +0000
134@@ -295,7 +295,7 @@
135 return nil
136 },
137 }
138- exchg := &broker.UnicastExchange{chanId1}
139+ exchg := &broker.UnicastExchange{ChanId: chanId1, CachedOk: false}
140 outMsg, inMsg, err := exchg.Prepare(sess)
141 c.Assert(err, IsNil)
142 // check
143@@ -311,7 +311,7 @@
144 }
145
146 func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) {
147- notifs := []protocol.Notification{}
148+ notifs := []protocol.Notification{protocol.Notification{}}
149 dropped := make(chan []protocol.Notification, 2)
150 sess := &testing.TestBrokerSession{
151 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
152@@ -344,8 +344,22 @@
153 c.Assert(err, Equals, fail)
154 }
155
156+func (s *exchangesSuite) TestUnicastExchangeCachedOkNop(c *C) {
157+ chanId1 := store.UnicastInternalChannelId("u1", "d1")
158+ sess := &testing.TestBrokerSession{
159+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
160+ c.Check(chanId, Equals, chanId1)
161+ c.Check(cachedOk, Equals, true)
162+ return 0, nil, nil
163+ },
164+ }
165+ exchg := &broker.UnicastExchange{ChanId: chanId1, CachedOk: true}
166+ _, _, err := exchg.Prepare(sess)
167+ c.Assert(err, Equals, broker.ErrNop)
168+}
169+
170 func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) {
171- notifs := []protocol.Notification{}
172+ notifs := []protocol.Notification{protocol.Notification{}}
173 fail := errors.New("fail")
174 sess := &testing.TestBrokerSession{
175 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
176@@ -363,3 +377,79 @@
177 err = exchg.Acked(sess, true)
178 c.Assert(err, Equals, fail)
179 }
180+
181+func (s *exchangesSuite) TestFeedPending(c *C) {
182+ bcast1 := json.RawMessage(`{"m": "M"}`)
183+ decoded1 := map[string]interface{}{"m": "M"}
184+ sess := &testing.TestBrokerSession{
185+ Exchanges: make(chan broker.Exchange, 5),
186+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
187+ switch chanId {
188+ case store.SystemInternalChannelId:
189+ return 1, help.Ns(bcast1), nil
190+ default:
191+ return 0, nil, nil
192+ }
193+ },
194+ }
195+ err := broker.FeedPending(sess)
196+ c.Assert(err, IsNil)
197+ c.Assert(len(sess.Exchanges), Equals, 2)
198+ exchg1 := <-sess.Exchanges
199+ c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{
200+ ChanId: store.SystemInternalChannelId,
201+ TopLevel: 1,
202+ Notifications: help.Ns(bcast1),
203+ Decoded: []map[string]interface{}{decoded1},
204+ })
205+ exchg2 := <-sess.Exchanges
206+ c.Check(exchg2, DeepEquals, &broker.UnicastExchange{
207+ ChanId: sess.InternalChannelId(),
208+ CachedOk: true,
209+ })
210+}
211+
212+func (s *exchangesSuite) TestFeedPendingSystemChanNop(c *C) {
213+ bcast1 := json.RawMessage(`{"m": "M"}`)
214+ sess := &testing.TestBrokerSession{
215+ LevelsMap: map[store.InternalChannelId]int64{
216+ store.SystemInternalChannelId: 1,
217+ },
218+ Exchanges: make(chan broker.Exchange, 5),
219+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
220+ switch chanId {
221+ case store.SystemInternalChannelId:
222+ return 1, help.Ns(bcast1), nil
223+ default:
224+ return 0, nil, nil
225+ }
226+ },
227+ }
228+ err := broker.FeedPending(sess)
229+ c.Assert(err, IsNil)
230+ c.Check(len(sess.Exchanges), Equals, 1)
231+ exchg1 := <-sess.Exchanges
232+ c.Check(exchg1, FitsTypeOf, &broker.UnicastExchange{})
233+}
234+
235+func (s *exchangesSuite) TestFeedPendingSystemChanFail(c *C) {
236+ sess := &testing.TestBrokerSession{
237+ LevelsMap: map[store.InternalChannelId]int64{
238+ store.SystemInternalChannelId: 1,
239+ },
240+ Exchanges: make(chan broker.Exchange, 5),
241+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
242+ switch chanId {
243+ case store.SystemInternalChannelId:
244+ return 0, nil, errors.New("fail")
245+ default:
246+ return 0, nil, nil
247+ }
248+ },
249+ }
250+ err := broker.FeedPending(sess)
251+ c.Assert(err, IsNil)
252+ c.Check(len(sess.Exchanges), Equals, 1)
253+ exchg1 := <-sess.Exchanges
254+ c.Check(exchg1, FitsTypeOf, &broker.UnicastExchange{})
255+}
256
257=== modified file 'server/broker/simple/simple.go'
258--- server/broker/simple/simple.go 2014-05-01 18:56:07 +0000
259+++ server/broker/simple/simple.go 2014-05-15 15:14:22 +0000
260@@ -103,6 +103,14 @@
261 return sess.broker.drop(chanId, targets)
262 }
263
264+func (sess *simpleBrokerSession) Feed(exchg broker.Exchange) {
265+ sess.exchanges <- exchg
266+}
267+
268+func (sess *simpleBrokerSession) InternalChannelId() store.InternalChannelId {
269+ return store.UnicastInternalChannelId(sess.deviceId, sess.deviceId)
270+}
271+
272 // NewSimpleBroker makes a new SimpleBroker.
273 func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker {
274 sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize())
275@@ -150,30 +158,6 @@
276 return b.running
277 }
278
279-func (b *SimpleBroker) feedPending(sess *simpleBrokerSession) error {
280- // find relevant channels, for now only system
281- channels := []store.InternalChannelId{store.SystemInternalChannelId}
282- for _, chanId := range channels {
283- topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId)
284- if err != nil {
285- // next broadcast will try again
286- b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err)
287- continue
288- }
289- clientLevel := sess.levels[chanId]
290- if clientLevel != topLevel {
291- broadcastExchg := &broker.BroadcastExchange{
292- ChanId: chanId,
293- TopLevel: topLevel,
294- Notifications: notifications,
295- }
296- broadcastExchg.Init()
297- sess.exchanges <- broadcastExchg
298- }
299- }
300- return nil
301-}
302-
303 // Register registers a session with the broker. It feeds the session
304 // pending notifications as well.
305 func (b *SimpleBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) {
306@@ -205,7 +189,7 @@
307 }
308 b.sessionCh <- sess
309 <-sess.done
310- err = b.feedPending(sess)
311+ err = broker.FeedPending(sess)
312 if err != nil {
313 return nil, err
314 }
315@@ -281,7 +265,7 @@
316 _, devId := chanId.UnicastUserAndDevice()
317 sess := b.registry[devId]
318 if sess != nil {
319- sess.exchanges <- &broker.UnicastExchange{chanId}
320+ sess.exchanges <- &broker.UnicastExchange{ChanId: chanId, CachedOk: false}
321 }
322 }
323 }
324
325=== modified file 'server/broker/simple/simple_test.go'
326--- server/broker/simple/simple_test.go 2014-04-29 15:29:04 +0000
327+++ server/broker/simple/simple_test.go 2014-05-15 15:14:22 +0000
328@@ -17,16 +17,12 @@
329 package simple
330
331 import (
332- "encoding/json"
333 stdtesting "testing"
334- "time"
335
336 . "launchpad.net/gocheck"
337
338- "launchpad.net/ubuntu-push/server/broker"
339 "launchpad.net/ubuntu-push/server/broker/testing"
340 "launchpad.net/ubuntu-push/server/store"
341- help "launchpad.net/ubuntu-push/testing"
342 )
343
344 func TestSimple(t *stdtesting.T) { TestingT(t) }
345@@ -45,39 +41,7 @@
346 c.Check(b.sto, Equals, sto)
347 }
348
349-func (s *simpleSuite) TestFeedPending(c *C) {
350- sto := store.NewInMemoryPendingStore()
351- muchLater := time.Now().Add(10 * time.Minute)
352- notification1 := json.RawMessage(`{"m": "M"}`)
353- decoded1 := map[string]interface{}{"m": "M"}
354- sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
355- b := NewSimpleBroker(sto, testBrokerConfig, nil)
356- sess := &simpleBrokerSession{
357- exchanges: make(chan broker.Exchange, 1),
358- }
359- b.feedPending(sess)
360- c.Assert(len(sess.exchanges), Equals, 1)
361- exchg1 := <-sess.exchanges
362- c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{
363- ChanId: store.SystemInternalChannelId,
364- TopLevel: 1,
365- Notifications: help.Ns(notification1),
366- Decoded: []map[string]interface{}{decoded1},
367- })
368-}
369-
370-func (s *simpleSuite) TestFeedPendingNop(c *C) {
371- sto := store.NewInMemoryPendingStore()
372- muchLater := time.Now().Add(10 * time.Minute)
373- notification1 := json.RawMessage(`{"m": "M"}`)
374- sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
375- b := NewSimpleBroker(sto, testBrokerConfig, nil)
376- sess := &simpleBrokerSession{
377- exchanges: make(chan broker.Exchange, 1),
378- levels: map[store.InternalChannelId]int64{
379- store.SystemInternalChannelId: 1,
380- },
381- }
382- b.feedPending(sess)
383- c.Assert(len(sess.exchanges), Equals, 0)
384+func (s *simpleSuite) TestSessionInternalChannelId(c *C) {
385+ sess := &simpleBrokerSession{deviceId: "dev21"}
386+ c.Check(sess.InternalChannelId(), Equals, store.UnicastInternalChannelId("dev21", "dev21"))
387 }
388
389=== modified file 'server/broker/testing/impls.go'
390--- server/broker/testing/impls.go 2014-05-01 18:56:07 +0000
391+++ server/broker/testing/impls.go 2014-05-15 15:14:22 +0000
392@@ -68,6 +68,14 @@
393 return tbs.DoDropByMsgId(chanId, targets)
394 }
395
396+func (tbs *TestBrokerSession) Feed(exchg broker.Exchange) {
397+ tbs.Exchanges <- exchg
398+}
399+
400+func (tbs *TestBrokerSession) InternalChannelId() store.InternalChannelId {
401+ return store.UnicastInternalChannelId(tbs.DeviceId, tbs.DeviceId)
402+}
403+
404 // Test implementation of BrokerConfig.
405 type TestBrokerConfig struct {
406 ConfigSessionQueueSize uint
407
408=== modified file 'server/broker/testsuite/suite.go'
409--- server/broker/testsuite/suite.go 2014-05-01 18:56:07 +0000
410+++ server/broker/testsuite/suite.go 2014-05-15 15:14:22 +0000
411@@ -143,7 +143,7 @@
412 defer b.Stop()
413 sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
414 c.Assert(err, IsNil)
415- c.Check(len(sess.SessionChannel()), Equals, 1)
416+ c.Check(len(sess.SessionChannel()), Equals, 2)
417 }
418
419 func (s *CommonBrokerSuite) TestRegistrationFeedPendingError(c *C) {
420@@ -154,7 +154,12 @@
421 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
422 c.Assert(err, IsNil)
423 // but
424- c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful feed pending, get channel snapshot for 0: get channel snapshot fail\n")
425+ c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, get channel snapshot for 0 \\(cachedOk=true\\): get channel snapshot fail\n")
426+}
427+
428+func clearOfPending(c *C, sess broker.BrokerSession) {
429+ c.Assert(len(sess.SessionChannel()) >= 1, Equals, true)
430+ <-sess.SessionChannel()
431 }
432
433 func (s *CommonBrokerSuite) TestRegistrationLastWins(c *C) {
434@@ -164,6 +169,7 @@
435 defer b.Stop()
436 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
437 c.Assert(err, IsNil)
438+ clearOfPending(c, sess1)
439 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s2")
440 c.Assert(err, IsNil)
441 // previous session got signaled by sending nil on its channel
442@@ -194,8 +200,10 @@
443 defer b.Stop()
444 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
445 c.Assert(err, IsNil)
446+ clearOfPending(c, sess1)
447 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}, "s2")
448 c.Assert(err, IsNil)
449+ clearOfPending(c, sess2)
450 // add notification to channel *after* the registrations
451 muchLater := time.Now().Add(10 * time.Minute)
452 sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
453@@ -250,8 +258,9 @@
454 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)
455 b.Start()
456 defer b.Stop()
457- _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
458+ sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
459 c.Assert(err, IsNil)
460+ clearOfPending(c, sess)
461 b.Broadcast(store.SystemInternalChannelId)
462 select {
463 case <-time.After(5 * time.Second):
464@@ -272,8 +281,10 @@
465 defer b.Stop()
466 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1")
467 c.Assert(err, IsNil)
468+ clearOfPending(c, sess1)
469 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2")
470 c.Assert(err, IsNil)
471+ clearOfPending(c, sess2)
472 // add notification to channel *after* the registrations
473 muchLater := time.Now().Add(10 * time.Minute)
474 sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater)
475@@ -338,3 +349,20 @@
476 c.Assert(err, ErrorMatches, "drop fail")
477 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n")
478 }
479+
480+func (s *CommonBrokerSuite) TestSessionFeed(c *C) {
481+ sto := store.NewInMemoryPendingStore()
482+ b := s.MakeBroker(sto, testBrokerConfig, nil)
483+ b.Start()
484+ defer b.Stop()
485+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1")
486+ c.Assert(err, IsNil)
487+ clearOfPending(c, sess1)
488+ bcast := &broker.BroadcastExchange{ChanId: store.SystemInternalChannelId, TopLevel: 99}
489+ sess1.Feed(bcast)
490+ c.Check(s.RevealBroadcastExchange(<-sess1.SessionChannel()), DeepEquals, bcast)
491+
492+ ucast := &broker.UnicastExchange{ChanId: store.UnicastInternalChannelId("dev21", "dev21"), CachedOk: true}
493+ sess1.Feed(ucast)
494+ c.Check(s.RevealUnicastExchange(<-sess1.SessionChannel()), DeepEquals, ucast)
495+}

Subscribers

People subscribed via source and target branches