Merge lp:~pedronis/ubuntu-push/unicast-broker into lp:ubuntu-push/automatic

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 146
Merged at revision: 144
Proposed branch: lp:~pedronis/ubuntu-push/unicast-broker
Merge into: lp:ubuntu-push/automatic
Prerequisite: lp:~pedronis/ubuntu-push/unicast-store
Diff against target: 506 lines (+308/-5)
9 files modified
protocol/messages.go (+11/-0)
server/api/handlers_test.go (+8/-0)
server/broker/broker.go (+6/-0)
server/broker/exchanges.go (+38/-2)
server/broker/exchanges_test.go (+97/-0)
server/broker/simple/simple.go (+47/-2)
server/broker/simple/suite_test.go (+3/-0)
server/broker/testing/impls.go (+13/-0)
server/broker/testsuite/suite.go (+85/-1)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/unicast-broker
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+218044@code.launchpad.net

Commit message

unicast in broker and introduce UnicastExchange, some reorg

Description of the change

unicast in broker and introduce UnicastExchange, some reorg

for later branches:

- deal with pending unicast messages
- support splitting large NOTIFICATIONS messages

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) :
review: Approve
146. By Samuele Pedroni

Merged unicast-store into unicast-broker.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'protocol/messages.go'
2--- protocol/messages.go 2014-05-02 15:13:38 +0000
3+++ protocol/messages.go 2014-05-02 15:13:38 +0000
4@@ -154,6 +154,17 @@
5 Notifications []Notification
6 }
7
8+// Reset resets the splitting state if the message storage is to be
9+// reused.
10+func (m *NotificationsMsg) Reset() {
11+ // xxx
12+}
13+
14+func (m *NotificationsMsg) Split() bool {
15+ // xxx
16+ return true
17+}
18+
19 // A single unicast notification
20 type Notification struct {
21 AppId string `json:"A"`
22
23=== modified file 'server/api/handlers_test.go'
24--- server/api/handlers_test.go 2014-05-02 15:13:38 +0000
25+++ server/api/handlers_test.go 2014-05-02 15:13:38 +0000
26@@ -150,6 +150,10 @@
27 cbsend.payloads = protocol.ExtractPayloads(notifications)
28 }
29
30+func (cbsend *checkBrokerSending) Unicast(chanIds ...store.InternalChannelId) {
31+ // xxx later
32+}
33+
34 func (s *handlersSuite) TestDoBroadcast(c *C) {
35 sto := store.NewInMemoryPendingStore()
36 bsend := &checkBrokerSending{store: sto}
37@@ -269,6 +273,10 @@
38 bsend.chanId <- chanId
39 }
40
41+func (bsend testBrokerSending) Unicast(chanIds ...store.InternalChannelId) {
42+ // xxx later
43+}
44+
45 func (s *handlersSuite) TestRespondsToBasicSystemBroadcast(c *C) {
46 sto := store.NewInMemoryPendingStore()
47 stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
48
49=== modified file 'server/broker/broker.go'
50--- server/broker/broker.go 2014-04-17 15:42:27 +0000
51+++ server/broker/broker.go 2014-05-02 15:13:38 +0000
52@@ -39,6 +39,8 @@
53 type BrokerSending interface {
54 // Broadcast channel.
55 Broadcast(chanId store.InternalChannelId)
56+ // Unicast over channels.
57+ Unicast(chanIds ...store.InternalChannelId)
58 }
59
60 // Exchange leads the session through performing an exchange, typically delivery.
61@@ -81,6 +83,10 @@
62 Levels() LevelsMap
63 // ExchangeScratchArea returns the scratch area for exchanges.
64 ExchangeScratchArea() *ExchangesScratchArea
65+ // Get gets the content of the channel with chanId.
66+ Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error)
67+ // DropByMsgId drops notifications from the channel chanId by message id.
68+ DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error
69 }
70
71 // Session aborted error.
72
73=== modified file 'server/broker/exchanges.go'
74--- server/broker/exchanges.go 2014-05-02 15:13:38 +0000
75+++ server/broker/exchanges.go 2014-05-02 15:13:38 +0000
76@@ -28,8 +28,9 @@
77
78 // Scratch area for exchanges, sessions should hold one of these.
79 type ExchangesScratchArea struct {
80- broadcastMsg protocol.BroadcastMsg
81- ackMsg protocol.AckMsg
82+ broadcastMsg protocol.BroadcastMsg
83+ notificationsMsg protocol.NotificationsMsg
84+ ackMsg protocol.AckMsg
85 }
86
87 // BroadcastExchange leads a session through delivering a BROADCAST.
88@@ -135,3 +136,38 @@
89 func (cbe *ConnMetaExchange) Acked(sess BrokerSession, done bool) error {
90 panic("Acked should not get invoked on ConnMetaExchange")
91 }
92+
93+// UnicastExchange leads a session through delivering a NOTIFICATIONS message.
94+// For simplicity it is fully public.
95+type UnicastExchange struct {
96+ ChanId store.InternalChannelId
97+}
98+
99+// check interface already here
100+var _ Exchange = (*UnicastExchange)(nil)
101+
102+// Prepare session for a NOTIFICATIONS.
103+func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
104+ _, notifs, err := sess.Get(sue.ChanId, false)
105+ if err != nil {
106+ return nil, nil, err
107+ }
108+ scratchArea := sess.ExchangeScratchArea()
109+ scratchArea.notificationsMsg.Reset()
110+ scratchArea.notificationsMsg.Type = "notifications"
111+ scratchArea.notificationsMsg.Notifications = notifs
112+ return &scratchArea.notificationsMsg, &scratchArea.ackMsg, nil
113+}
114+
115+// Acked deals with an ACK for a NOTIFICATIONS.
116+func (sue *UnicastExchange) Acked(sess BrokerSession, done bool) error {
117+ scratchArea := sess.ExchangeScratchArea()
118+ if scratchArea.ackMsg.Type != "ack" {
119+ return &ErrAbort{"expected ACK message"}
120+ }
121+ err := sess.DropByMsgId(sue.ChanId, scratchArea.notificationsMsg.Notifications)
122+ if err != nil {
123+ return err
124+ }
125+ return nil
126+}
127
128=== modified file 'server/broker/exchanges_test.go'
129--- server/broker/exchanges_test.go 2014-05-02 15:13:38 +0000
130+++ server/broker/exchanges_test.go 2014-05-02 15:13:38 +0000
131@@ -18,6 +18,7 @@
132
133 import (
134 "encoding/json"
135+ "errors"
136 "fmt"
137 "strings"
138 stdtesting "testing"
139@@ -266,3 +267,99 @@
140
141 c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnMetaExchange")
142 }
143+
144+func (s *exchangesSuite) TestUnicastExchange(c *C) {
145+ chanId1 := store.UnicastInternalChannelId("u1", "d1")
146+ notifs := []protocol.Notification{
147+ protocol.Notification{
148+ MsgId: "msg1",
149+ AppId: "app1",
150+ Payload: json.RawMessage(`{"m": 1}`),
151+ },
152+ protocol.Notification{
153+ MsgId: "msg2",
154+ AppId: "app2",
155+ Payload: json.RawMessage(`{"m": 2}`),
156+ },
157+ }
158+ dropped := make(chan []protocol.Notification, 2)
159+ sess := &testing.TestBrokerSession{
160+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
161+ c.Check(chanId, Equals, chanId1)
162+ c.Check(cachedOk, Equals, false)
163+ return 0, notifs, nil
164+ },
165+ DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error {
166+ c.Check(chanId, Equals, chanId1)
167+ dropped <- targets
168+ return nil
169+ },
170+ }
171+ exchg := &broker.UnicastExchange{chanId1}
172+ outMsg, inMsg, err := exchg.Prepare(sess)
173+ c.Assert(err, IsNil)
174+ // check
175+ marshalled, err := json.Marshal(outMsg)
176+ c.Assert(err, IsNil)
177+ c.Check(string(marshalled), Equals, `{"T":"notifications","Notifications":[{"A":"app1","M":"msg1","P":{"m":1}},{"A":"app2","M":"msg2","P":{"m":2}}]}`)
178+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
179+ c.Assert(err, IsNil)
180+ err = exchg.Acked(sess, true)
181+ c.Assert(err, IsNil)
182+ c.Check(dropped, HasLen, 1)
183+ c.Check(<-dropped, DeepEquals, notifs)
184+}
185+
186+func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) {
187+ notifs := []protocol.Notification{}
188+ dropped := make(chan []protocol.Notification, 2)
189+ sess := &testing.TestBrokerSession{
190+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
191+ return 0, notifs, nil
192+ },
193+ DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error {
194+ dropped <- targets
195+ return nil
196+ },
197+ }
198+ exchg := &broker.UnicastExchange{}
199+ _, inMsg, err := exchg.Prepare(sess)
200+ c.Assert(err, IsNil)
201+ err = json.Unmarshal([]byte(`{}`), inMsg)
202+ c.Assert(err, IsNil)
203+ err = exchg.Acked(sess, true)
204+ c.Assert(err, Not(IsNil))
205+ c.Check(dropped, HasLen, 0)
206+}
207+
208+func (s *exchangesSuite) TestUnicastExchangeErrorOnPrepare(c *C) {
209+ fail := errors.New("fail")
210+ sess := &testing.TestBrokerSession{
211+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
212+ return 0, nil, fail
213+ },
214+ }
215+ exchg := &broker.UnicastExchange{}
216+ _, _, err := exchg.Prepare(sess)
217+ c.Assert(err, Equals, fail)
218+}
219+
220+func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) {
221+ notifs := []protocol.Notification{}
222+ fail := errors.New("fail")
223+ sess := &testing.TestBrokerSession{
224+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
225+ return 0, notifs, nil
226+ },
227+ DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error {
228+ return fail
229+ },
230+ }
231+ exchg := &broker.UnicastExchange{}
232+ _, inMsg, err := exchg.Prepare(sess)
233+ c.Assert(err, IsNil)
234+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
235+ c.Assert(err, IsNil)
236+ err = exchg.Acked(sess, true)
237+ c.Assert(err, Equals, fail)
238+}
239
240=== modified file 'server/broker/simple/simple.go'
241--- server/broker/simple/simple.go 2014-05-02 15:13:38 +0000
242+++ server/broker/simple/simple.go 2014-05-02 15:13:38 +0000
243@@ -46,6 +46,7 @@
244
245 // simpleBrokerSession represents a session in the broker.
246 type simpleBrokerSession struct {
247+ broker *SimpleBroker
248 registered bool
249 deviceId string
250 model string
251@@ -61,6 +62,7 @@
252
253 const (
254 broadcastDelivery deliveryKind = iota
255+ unicastDelivery
256 )
257
258 // delivery holds all the information to request a delivery
259@@ -93,6 +95,14 @@
260 return &sess.exchgScratch
261 }
262
263+func (sess *simpleBrokerSession) Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
264+ return sess.broker.get(chanId, cachedOk)
265+}
266+
267+func (sess *simpleBrokerSession) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error {
268+ return sess.broker.drop(chanId, targets)
269+}
270+
271 // NewSimpleBroker makes a new SimpleBroker.
272 func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker {
273 sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize())
274@@ -185,6 +195,7 @@
275 levels[id] = v
276 }
277 sess := &simpleBrokerSession{
278+ broker: b,
279 deviceId: connect.DeviceId,
280 model: model,
281 imageChannel: imageChannel,
282@@ -207,6 +218,24 @@
283 b.sessionCh <- sess
284 }
285
286+func (b *SimpleBroker) get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
287+ topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId)
288+ if err != nil {
289+ b.logger.Errorf("unsuccessful, get channel snapshot for %v (cachedOk=%v): %v", chanId, cachedOk, err)
290+ }
291+ return topLevel, notifications, err
292+
293+}
294+
295+func (b *SimpleBroker) drop(chanId store.InternalChannelId, targets []protocol.Notification) error {
296+ err := b.sto.DropByMsgId(chanId, targets)
297+ if err != nil {
298+ b.logger.Errorf("unsuccessful, drop from channel %v: %v", chanId, err)
299+ }
300+ return err
301+
302+}
303+
304 // run runs the agent logic of the broker.
305 func (b *SimpleBroker) run() {
306 Loop:
307@@ -233,10 +262,9 @@
308 case delivery := <-b.deliveryCh:
309 switch delivery.kind {
310 case broadcastDelivery:
311- topLevel, notifications, err := b.sto.GetChannelSnapshot(delivery.chanId)
312+ topLevel, notifications, err := b.get(delivery.chanId, false)
313 if err != nil {
314 // next broadcast will try again
315- b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err)
316 continue Loop
317 }
318 broadcastExchg := &broker.BroadcastExchange{
319@@ -248,6 +276,13 @@
320 for _, sess := range b.registry {
321 sess.exchanges <- broadcastExchg
322 }
323+ case unicastDelivery:
324+ chanId := delivery.chanId
325+ _, devId := chanId.UnicastUserAndDevice()
326+ sess := b.registry[devId]
327+ if sess != nil {
328+ sess.exchanges <- &broker.UnicastExchange{chanId}
329+ }
330 }
331 }
332 }
333@@ -260,3 +295,13 @@
334 chanId: chanId,
335 }
336 }
337+
338+// Unicast requests unicast for the channels.
339+func (b *SimpleBroker) Unicast(chanIds ...store.InternalChannelId) {
340+ for _, chanId := range chanIds {
341+ b.deliveryCh <- &delivery{
342+ kind: unicastDelivery,
343+ chanId: chanId,
344+ }
345+ }
346+}
347
348=== modified file 'server/broker/simple/suite_test.go'
349--- server/broker/simple/suite_test.go 2014-02-10 23:19:08 +0000
350+++ server/broker/simple/suite_test.go 2014-05-02 15:13:38 +0000
351@@ -42,4 +42,7 @@
352 RevealBroadcastExchange: func(exchg broker.Exchange) *broker.BroadcastExchange {
353 return exchg.(*broker.BroadcastExchange)
354 },
355+ RevealUnicastExchange: func(exchg broker.Exchange) *broker.UnicastExchange {
356+ return exchg.(*broker.UnicastExchange)
357+ },
358 }})
359
360=== modified file 'server/broker/testing/impls.go'
361--- server/broker/testing/impls.go 2014-04-03 14:31:10 +0000
362+++ server/broker/testing/impls.go 2014-05-02 15:13:38 +0000
363@@ -18,7 +18,9 @@
364 package testing
365
366 import (
367+ "launchpad.net/ubuntu-push/protocol"
368 "launchpad.net/ubuntu-push/server/broker"
369+ "launchpad.net/ubuntu-push/server/store"
370 )
371
372 // Test implementation of BrokerSession.
373@@ -29,6 +31,9 @@
374 Exchanges chan broker.Exchange
375 LevelsMap broker.LevelsMap
376 exchgScratch broker.ExchangesScratchArea
377+ // hooks
378+ DoGet func(store.InternalChannelId, bool) (int64, []protocol.Notification, error)
379+ DoDropByMsgId func(store.InternalChannelId, []protocol.Notification) error
380 }
381
382 func (tbs *TestBrokerSession) DeviceIdentifier() string {
383@@ -55,6 +60,14 @@
384 return &tbs.exchgScratch
385 }
386
387+func (tbs *TestBrokerSession) Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
388+ return tbs.DoGet(chanId, cachedOk)
389+}
390+
391+func (tbs *TestBrokerSession) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error {
392+ return tbs.DoDropByMsgId(chanId, targets)
393+}
394+
395 // Test implementation of BrokerConfig.
396 type TestBrokerConfig struct {
397 ConfigSessionQueueSize uint
398
399=== modified file 'server/broker/testsuite/suite.go'
400--- server/broker/testsuite/suite.go 2014-05-02 15:13:38 +0000
401+++ server/broker/testsuite/suite.go 2014-05-02 15:13:38 +0000
402@@ -50,6 +50,8 @@
403 RevealSession func(broker.Broker, string) broker.BrokerSession
404 // Let us get to a broker.BroadcastExchange from an Exchange.
405 RevealBroadcastExchange func(broker.Exchange) *broker.BroadcastExchange
406+ // Let us get to a broker.UnicastExchange from an Exchange.
407+ RevealUnicastExchange func(broker.Exchange) *broker.UnicastExchange
408 // private
409 testlog *help.TestLogger
410 }
411@@ -235,6 +237,10 @@
412 return 0, nil, nil
413 }
414
415+func (sto *testFailingStore) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error {
416+ return errors.New("drop fail")
417+}
418+
419 func (s *CommonBrokerSuite) TestBroadcastFail(c *C) {
420 logged := make(chan bool, 1)
421 s.testlog.SetLogEventCb(func(string) {
422@@ -252,5 +258,83 @@
423 c.Fatal("taking too long to log error")
424 case <-logged:
425 }
426- c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful broadcast, get channel snapshot for 0: get channel snapshot fail\n")
427+ c.Check(s.testlog.Captured(), Matches, "ERROR.*: get channel snapshot fail\n")
428+}
429+
430+func (s *CommonBrokerSuite) TestUnicast(c *C) {
431+ sto := store.NewInMemoryPendingStore()
432+ notification1 := json.RawMessage(`{"m": "M1"}`)
433+ notification2 := json.RawMessage(`{"m": "M2"}`)
434+ chanId1 := store.UnicastInternalChannelId("dev1", "dev1")
435+ chanId2 := store.UnicastInternalChannelId("dev2", "dev2")
436+ b := s.MakeBroker(sto, testBrokerConfig, nil)
437+ b.Start()
438+ defer b.Stop()
439+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1")
440+ c.Assert(err, IsNil)
441+ sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2")
442+ c.Assert(err, IsNil)
443+ // add notification to channel *after* the registrations
444+ muchLater := time.Now().Add(10 * time.Minute)
445+ sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater)
446+ sto.AppendToUnicastChannel(chanId2, "app2", notification2, "msg2", muchLater)
447+ b.Unicast(chanId2, chanId1)
448+ select {
449+ case <-time.After(5 * time.Second):
450+ c.Fatal("taking too long to get unicast exchange")
451+ case exchg1 := <-sess1.SessionChannel():
452+ u1 := s.RevealUnicastExchange(exchg1)
453+ c.Check(u1.ChanId, Equals, chanId1)
454+ }
455+ select {
456+ case <-time.After(5 * time.Second):
457+ c.Fatal("taking too long to get unicast exchange")
458+ case exchg2 := <-sess2.SessionChannel():
459+ u2 := s.RevealUnicastExchange(exchg2)
460+ c.Check(u2.ChanId, Equals, chanId2)
461+ }
462+}
463+
464+func (s *CommonBrokerSuite) TestGetAndDrop(c *C) {
465+ sto := store.NewInMemoryPendingStore()
466+ notification1 := json.RawMessage(`{"m": "M1"}`)
467+ chanId1 := store.UnicastInternalChannelId("dev3", "dev3")
468+ b := s.MakeBroker(sto, testBrokerConfig, nil)
469+ b.Start()
470+ defer b.Stop()
471+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1")
472+ c.Assert(err, IsNil)
473+ muchLater := time.Now().Add(10 * time.Minute)
474+ sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater)
475+ _, expected, err := sto.GetChannelSnapshot(chanId1)
476+ c.Assert(err, IsNil)
477+ _, notifs, err := sess1.Get(chanId1, false)
478+ c.Check(notifs, HasLen, 1)
479+ c.Check(notifs, DeepEquals, expected)
480+ err = sess1.DropByMsgId(chanId1, notifs)
481+ c.Assert(err, IsNil)
482+ _, notifs, err = sess1.Get(chanId1, true)
483+ c.Check(notifs, HasLen, 0)
484+ _, expected, err = sto.GetChannelSnapshot(chanId1)
485+ c.Assert(err, IsNil)
486+ c.Check(expected, HasLen, 0)
487+
488+}
489+
490+func (s *CommonBrokerSuite) TestGetAndDropErrors(c *C) {
491+ chanId1 := store.UnicastInternalChannelId("dev3", "dev3")
492+ sto := &testFailingStore{countdownToFail: 1}
493+ b := s.MakeBroker(sto, testBrokerConfig, s.testlog)
494+ b.Start()
495+ defer b.Stop()
496+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1")
497+ c.Assert(err, IsNil)
498+ _, _, err = sess1.Get(chanId1, false)
499+ c.Assert(err, ErrorMatches, "get channel snapshot fail")
500+ c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, get channel snapshot for Udev3:dev3 \\(cachedOk=false\\): get channel snapshot fail\n")
501+ s.testlog.ResetCapture()
502+
503+ err = sess1.DropByMsgId(chanId1, nil)
504+ c.Assert(err, ErrorMatches, "drop fail")
505+ c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n")
506 }

Subscribers

People subscribed via source and target branches