Merge lp:~pedronis/ubuntu-push/freelance-exchanges into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 21
Merged at revision: 20
Proposed branch: lp:~pedronis/ubuntu-push/freelance-exchanges
Merge into: lp:ubuntu-push
Prerequisite: lp:~pedronis/ubuntu-push/server-bits-in-server
Diff against target: 526 lines (+244/-161)
6 files modified
server/broker/broker.go (+7/-0)
server/broker/exchanges.go (+74/-0)
server/broker/exchanges_test.go (+123/-0)
server/broker/simple.go (+18/-52)
server/broker/simple_test.go (+14/-109)
server/session/session_test.go (+8/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/freelance-exchanges
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+203000@code.launchpad.net

Commit message

exchanges don't require to be so tied to a particular broker

Description of the change

with a bit of reorg exchanges don't require to be so tied to a particular broker

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

Only issues I'd have with this one are fixed in the next one. Good job.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'server/broker/broker.go'
2--- server/broker/broker.go 2014-01-14 15:35:20 +0000
3+++ server/broker/broker.go 2014-01-24 07:56:28 +0000
4@@ -44,6 +44,9 @@
5 Acked(BrokerSession) error
6 }
7
8+// LevelsMap is the type for holding levels for session.
9+type LevelsMap map[store.InternalChannelId]int64
10+
11 // BrokerSession holds broker session state.
12 type BrokerSession interface {
13 // SessionChannel returns the session control channel
14@@ -51,6 +54,10 @@
15 SessionChannel() <-chan Exchange
16 // DeviceId returns the device id string.
17 DeviceId() string
18+ // Levels returns the current channel levels for the session
19+ Levels() LevelsMap
20+ // ExchangeScratchArea returns the scratch area for exchanges.
21+ ExchangeScratchArea() *ExchangesScratchArea
22 }
23
24 // Session aborted error.
25
26=== added file 'server/broker/exchanges.go'
27--- server/broker/exchanges.go 1970-01-01 00:00:00 +0000
28+++ server/broker/exchanges.go 2014-01-24 07:56:28 +0000
29@@ -0,0 +1,74 @@
30+/*
31+ Copyright 2013-2014 Canonical Ltd.
32+
33+ This program is free software: you can redistribute it and/or modify it
34+ under the terms of the GNU General Public License version 3, as published
35+ by the Free Software Foundation.
36+
37+ This program is distributed in the hope that it will be useful, but
38+ WITHOUT ANY WARRANTY; without even the implied warranties of
39+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
40+ PURPOSE. See the GNU General Public License for more details.
41+
42+ You should have received a copy of the GNU General Public License along
43+ with this program. If not, see <http://www.gnu.org/licenses/>.
44+*/
45+
46+package broker
47+
48+import (
49+ "encoding/json"
50+ "launchpad.net/ubuntu-push/protocol"
51+ "launchpad.net/ubuntu-push/server/store"
52+ // "log"
53+)
54+
55+// Exchanges
56+
57+// Scratch area for exchanges, sessions should one of these.
58+type ExchangesScratchArea struct {
59+ broadcastMsg protocol.BroadcastMsg
60+ ackMsg protocol.AckMsg
61+}
62+
63+// BroadcastExchange leads a session through delivering a BROADCAST.
64+// For simplicity its fully public.
65+type BroadcastExchange struct {
66+ ChanId store.InternalChannelId
67+ TopLevel int64
68+ NotificationPayloads []json.RawMessage
69+}
70+
71+func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
72+ c := int64(len(payloads))
73+ delta := topLevel - clientLevel
74+ if delta < c {
75+ return payloads[c-delta:]
76+ } else {
77+ return payloads
78+ }
79+}
80+
81+// Prepare session for a BROADCAST.
82+func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
83+ scratchArea := sess.ExchangeScratchArea()
84+ scratchArea.broadcastMsg.Type = "broadcast"
85+ clientLevel := sess.Levels()[sbe.ChanId]
86+ payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
87+ // xxx need an AppId as well, later
88+ scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId)
89+ scratchArea.broadcastMsg.TopLevel = sbe.TopLevel
90+ scratchArea.broadcastMsg.Payloads = payloads
91+ return &scratchArea.broadcastMsg, &scratchArea.ackMsg, nil
92+}
93+
94+// Acked deals with an ACK for a BROADCAST.
95+func (sbe *BroadcastExchange) Acked(sess BrokerSession) error {
96+ scratchArea := sess.ExchangeScratchArea()
97+ if scratchArea.ackMsg.Type != "ack" {
98+ return &ErrAbort{"expected ACK message"}
99+ }
100+ // update levels
101+ sess.Levels()[sbe.ChanId] = sbe.TopLevel
102+ return nil
103+}
104
105=== added file 'server/broker/exchanges_test.go'
106--- server/broker/exchanges_test.go 1970-01-01 00:00:00 +0000
107+++ server/broker/exchanges_test.go 2014-01-24 07:56:28 +0000
108@@ -0,0 +1,123 @@
109+/*
110+ Copyright 2013-2014 Canonical Ltd.
111+
112+ This program is free software: you can redistribute it and/or modify it
113+ under the terms of the GNU General Public License version 3, as published
114+ by the Free Software Foundation.
115+
116+ This program is distributed in the hope that it will be useful, but
117+ WITHOUT ANY WARRANTY; without even the implied warranties of
118+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
119+ PURPOSE. See the GNU General Public License for more details.
120+
121+ You should have received a copy of the GNU General Public License along
122+ with this program. If not, see <http://www.gnu.org/licenses/>.
123+*/
124+
125+package broker
126+
127+import (
128+ "encoding/json"
129+ . "launchpad.net/gocheck"
130+ "launchpad.net/ubuntu-push/server/store"
131+ // "log"
132+)
133+
134+type exchangesSuite struct{}
135+
136+var _ = Suite(&exchangesSuite{})
137+
138+func (s *exchangesSuite) TestBroadcastExchange(c *C) {
139+ sess := &simpleBrokerSession{
140+ levels: map[store.InternalChannelId]int64{},
141+ }
142+ exchg := &BroadcastExchange{
143+ ChanId: store.SystemInternalChannelId,
144+ TopLevel: 3,
145+ NotificationPayloads: []json.RawMessage{
146+ json.RawMessage(`{"a":"x"}`),
147+ json.RawMessage(`{"a":"y"}`),
148+ },
149+ }
150+ inMsg, outMsg, err := exchg.Prepare(sess)
151+ c.Assert(err, IsNil)
152+ // check
153+ marshalled, err := json.Marshal(inMsg)
154+ c.Assert(err, IsNil)
155+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
156+ err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
157+ c.Assert(err, IsNil)
158+ err = exchg.Acked(sess)
159+ c.Assert(err, IsNil)
160+ c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(3))
161+}
162+
163+func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {
164+ sess := &simpleBrokerSession{
165+ levels: map[store.InternalChannelId]int64{},
166+ }
167+ exchg := &BroadcastExchange{
168+ ChanId: store.SystemInternalChannelId,
169+ TopLevel: 3,
170+ NotificationPayloads: []json.RawMessage{
171+ json.RawMessage(`{"a":"y"}`),
172+ },
173+ }
174+ inMsg, outMsg, err := exchg.Prepare(sess)
175+ c.Assert(err, IsNil)
176+ // check
177+ marshalled, err := json.Marshal(inMsg)
178+ c.Assert(err, IsNil)
179+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
180+ err = json.Unmarshal([]byte(`{}`), outMsg)
181+ c.Assert(err, IsNil)
182+ err = exchg.Acked(sess)
183+ c.Assert(err, Not(IsNil))
184+ c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(0))
185+}
186+
187+func (s *exchangesSuite) TestFilterByLevel(c *C) {
188+ payloads := []json.RawMessage{
189+ json.RawMessage(`{"a": 3}`),
190+ json.RawMessage(`{"a": 4}`),
191+ json.RawMessage(`{"a": 5}`),
192+ }
193+ res := filterByLevel(5, 5, payloads)
194+ c.Check(len(res), Equals, 0)
195+ res = filterByLevel(4, 5, payloads)
196+ c.Check(len(res), Equals, 1)
197+ c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`))
198+ res = filterByLevel(3, 5, payloads)
199+ c.Check(len(res), Equals, 2)
200+ c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`))
201+ res = filterByLevel(2, 5, payloads)
202+ c.Check(len(res), Equals, 3)
203+ res = filterByLevel(1, 5, payloads)
204+ c.Check(len(res), Equals, 3)
205+}
206+
207+func (s *exchangesSuite) TestBroadcastExchangeFilterByLevel(c *C) {
208+ sess := &simpleBrokerSession{
209+ levels: map[store.InternalChannelId]int64{
210+ store.SystemInternalChannelId: 2,
211+ },
212+ }
213+ exchg := &BroadcastExchange{
214+ ChanId: store.SystemInternalChannelId,
215+ TopLevel: 3,
216+ NotificationPayloads: []json.RawMessage{
217+ json.RawMessage(`{"a":"x"}`),
218+ json.RawMessage(`{"a":"y"}`),
219+ },
220+ }
221+ inMsg, outMsg, err := exchg.Prepare(sess)
222+ c.Assert(err, IsNil)
223+ // check
224+ marshalled, err := json.Marshal(inMsg)
225+ c.Assert(err, IsNil)
226+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
227+ err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
228+ c.Assert(err, IsNil)
229+ err = exchg.Acked(sess)
230+ c.Assert(err, IsNil)
231+}
232
233=== modified file 'server/broker/simple.go'
234--- server/broker/simple.go 2014-01-14 15:35:20 +0000
235+++ server/broker/simple.go 2014-01-24 07:56:28 +0000
236@@ -17,7 +17,6 @@
237 package broker
238
239 import (
240- "encoding/json"
241 "launchpad.net/ubuntu-push/logger"
242 "launchpad.net/ubuntu-push/protocol"
243 "launchpad.net/ubuntu-push/server/store"
244@@ -48,10 +47,9 @@
245 deviceId string
246 done chan bool
247 exchanges chan Exchange
248- levels map[store.InternalChannelId]int64
249+ levels LevelsMap
250 // for exchanges
251- broadcastMsg protocol.BroadcastMsg
252- ackMsg protocol.AckMsg
253+ exchgScratch ExchangesScratchArea
254 }
255
256 type deliveryKind int
257@@ -74,6 +72,14 @@
258 return sess.deviceId
259 }
260
261+func (sess *simpleBrokerSession) Levels() LevelsMap {
262+ return sess.levels
263+}
264+
265+func (sess *simpleBrokerSession) ExchangeScratchArea() *ExchangesScratchArea {
266+ return &sess.exchgScratch
267+}
268+
269 // NewSimpleBroker makes a new SimpleBroker.
270 func NewSimpleBroker(sto store.PendingStore, cfg BrokerConfig, logger logger.Logger) *SimpleBroker {
271 sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize())
272@@ -126,10 +132,10 @@
273 }
274 clientLevel := sess.levels[chanId]
275 if clientLevel != topLevel {
276- broadcastExchg := &simpleBroadcastExchange{
277- chanId: chanId,
278- topLevel: topLevel,
279- notificationPayloads: payloads,
280+ broadcastExchg := &BroadcastExchange{
281+ ChanId: chanId,
282+ TopLevel: topLevel,
283+ NotificationPayloads: payloads,
284 }
285 sess.exchanges <- broadcastExchg
286 }
287@@ -198,10 +204,10 @@
288 b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err)
289 continue Loop
290 }
291- broadcastExchg := &simpleBroadcastExchange{
292- chanId: delivery.chanId,
293- topLevel: topLevel,
294- notificationPayloads: payloads,
295+ broadcastExchg := &BroadcastExchange{
296+ ChanId: delivery.chanId,
297+ TopLevel: topLevel,
298+ NotificationPayloads: payloads,
299 }
300 for _, sess := range b.registry {
301 sess.exchanges <- broadcastExchg
302@@ -218,43 +224,3 @@
303 chanId: chanId,
304 }
305 }
306-
307-// Exchanges
308-
309-type simpleBroadcastExchange struct {
310- chanId store.InternalChannelId
311- topLevel int64
312- notificationPayloads []json.RawMessage
313-}
314-
315-func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
316- c := int64(len(payloads))
317- delta := topLevel - clientLevel
318- if delta < c {
319- return payloads[c-delta:]
320- } else {
321- return payloads
322- }
323-}
324-
325-func (sbe *simpleBroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
326- simpleSess := sess.(*simpleBrokerSession)
327- simpleSess.broadcastMsg.Type = "broadcast"
328- clientLevel := simpleSess.levels[sbe.chanId]
329- payloads := filterByLevel(clientLevel, sbe.topLevel, sbe.notificationPayloads)
330- // xxx need an AppId as well, later
331- simpleSess.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.chanId)
332- simpleSess.broadcastMsg.TopLevel = sbe.topLevel
333- simpleSess.broadcastMsg.Payloads = payloads
334- return &simpleSess.broadcastMsg, &simpleSess.ackMsg, nil
335-}
336-
337-func (sbe *simpleBroadcastExchange) Acked(sess BrokerSession) error {
338- simpleSess := sess.(*simpleBrokerSession)
339- if simpleSess.ackMsg.Type != "ack" {
340- return &ErrAbort{"expected ACK message"}
341- }
342- // update levels
343- simpleSess.levels[sbe.chanId] = sbe.topLevel
344- return nil
345-}
346
347=== modified file 'server/broker/simple_test.go'
348--- server/broker/simple_test.go 2014-01-14 15:35:20 +0000
349+++ server/broker/simple_test.go 2014-01-24 07:56:28 +0000
350@@ -69,9 +69,9 @@
351 c.Assert(err, IsNil)
352 c.Assert(b.registry["dev-1"], Equals, sess)
353 c.Assert(sess.DeviceId(), Equals, "dev-1")
354- c.Check(sess.(*simpleBrokerSession).levels, DeepEquals, map[store.InternalChannelId]int64{
355+ c.Check(sess.Levels(), DeepEquals, LevelsMap(map[store.InternalChannelId]int64{
356 store.SystemInternalChannelId: 5,
357- })
358+ }))
359 b.Unregister(sess)
360 // just to make sure the unregister was processed
361 _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""})
362@@ -99,10 +99,10 @@
363 b.feedPending(sess)
364 c.Assert(len(sess.exchanges), Equals, 1)
365 exchg1 := <-sess.exchanges
366- c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{
367- chanId: store.SystemInternalChannelId,
368- topLevel: 1,
369- notificationPayloads: []json.RawMessage{notification1},
370+ c.Check(exchg1, DeepEquals, &BroadcastExchange{
371+ ChanId: store.SystemInternalChannelId,
372+ TopLevel: 1,
373+ NotificationPayloads: []json.RawMessage{notification1},
374 })
375 }
376
377@@ -163,101 +163,6 @@
378 c.Check(b.registry["dev-1"], Equals, sess2)
379 }
380
381-func (s *simpleSuite) TestBroadcastExchange(c *C) {
382- sess := &simpleBrokerSession{
383- levels: map[store.InternalChannelId]int64{},
384- }
385- exchg := &simpleBroadcastExchange{
386- chanId: store.SystemInternalChannelId,
387- topLevel: 3,
388- notificationPayloads: []json.RawMessage{
389- json.RawMessage(`{"a":"x"}`),
390- json.RawMessage(`{"a":"y"}`),
391- },
392- }
393- inMsg, outMsg, err := exchg.Prepare(sess)
394- c.Assert(err, IsNil)
395- // check
396- marshalled, err := json.Marshal(inMsg)
397- c.Assert(err, IsNil)
398- c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
399- err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
400- c.Assert(err, IsNil)
401- err = exchg.Acked(sess)
402- c.Assert(err, IsNil)
403- c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(3))
404-}
405-
406-func (s *simpleSuite) TestBroadcastExchangeAckMismatch(c *C) {
407- sess := &simpleBrokerSession{
408- levels: map[store.InternalChannelId]int64{},
409- }
410- exchg := &simpleBroadcastExchange{
411- chanId: store.SystemInternalChannelId,
412- topLevel: 3,
413- notificationPayloads: []json.RawMessage{
414- json.RawMessage(`{"a":"y"}`),
415- },
416- }
417- inMsg, outMsg, err := exchg.Prepare(sess)
418- c.Assert(err, IsNil)
419- // check
420- marshalled, err := json.Marshal(inMsg)
421- c.Assert(err, IsNil)
422- c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
423- err = json.Unmarshal([]byte(`{}`), outMsg)
424- c.Assert(err, IsNil)
425- err = exchg.Acked(sess)
426- c.Assert(err, Not(IsNil))
427- c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(0))
428-}
429-
430-func (s *simpleSuite) TestFilterByLevel(c *C) {
431- payloads := []json.RawMessage{
432- json.RawMessage(`{"a": 3}`),
433- json.RawMessage(`{"a": 4}`),
434- json.RawMessage(`{"a": 5}`),
435- }
436- res := filterByLevel(5, 5, payloads)
437- c.Check(len(res), Equals, 0)
438- res = filterByLevel(4, 5, payloads)
439- c.Check(len(res), Equals, 1)
440- c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`))
441- res = filterByLevel(3, 5, payloads)
442- c.Check(len(res), Equals, 2)
443- c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`))
444- res = filterByLevel(2, 5, payloads)
445- c.Check(len(res), Equals, 3)
446- res = filterByLevel(1, 5, payloads)
447- c.Check(len(res), Equals, 3)
448-}
449-
450-func (s *simpleSuite) TestBroadcastExchangeFilterByLevel(c *C) {
451- sess := &simpleBrokerSession{
452- levels: map[store.InternalChannelId]int64{
453- store.SystemInternalChannelId: 2,
454- },
455- }
456- exchg := &simpleBroadcastExchange{
457- chanId: store.SystemInternalChannelId,
458- topLevel: 3,
459- notificationPayloads: []json.RawMessage{
460- json.RawMessage(`{"a":"x"}`),
461- json.RawMessage(`{"a":"y"}`),
462- },
463- }
464- inMsg, outMsg, err := exchg.Prepare(sess)
465- c.Assert(err, IsNil)
466- // check
467- marshalled, err := json.Marshal(inMsg)
468- c.Assert(err, IsNil)
469- c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
470- err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
471- c.Assert(err, IsNil)
472- err = exchg.Acked(sess)
473- c.Assert(err, IsNil)
474-}
475-
476 func (s *simpleSuite) TestBroadcast(c *C) {
477 sto := store.NewInMemoryPendingStore()
478 notification1 := json.RawMessage(`{"m": "M"}`)
479@@ -274,20 +179,20 @@
480 case <-time.After(5 * time.Second):
481 c.Fatal("taking too long to get broadcast exchange")
482 case exchg1 := <-sess1.SessionChannel():
483- c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{
484- chanId: store.SystemInternalChannelId,
485- topLevel: 1,
486- notificationPayloads: []json.RawMessage{notification1},
487+ c.Check(exchg1, DeepEquals, &BroadcastExchange{
488+ ChanId: store.SystemInternalChannelId,
489+ TopLevel: 1,
490+ NotificationPayloads: []json.RawMessage{notification1},
491 })
492 }
493 select {
494 case <-time.After(5 * time.Second):
495 c.Fatal("taking too long to get broadcast exchange")
496 case exchg2 := <-sess2.SessionChannel():
497- c.Check(exchg2, DeepEquals, &simpleBroadcastExchange{
498- chanId: store.SystemInternalChannelId,
499- topLevel: 1,
500- notificationPayloads: []json.RawMessage{notification1},
501+ c.Check(exchg2, DeepEquals, &BroadcastExchange{
502+ ChanId: store.SystemInternalChannelId,
503+ TopLevel: 1,
504+ NotificationPayloads: []json.RawMessage{notification1},
505 })
506 }
507 }
508
509=== modified file 'server/session/session_test.go'
510--- server/session/session_test.go 2014-01-15 15:54:20 +0000
511+++ server/session/session_test.go 2014-01-24 07:56:28 +0000
512@@ -136,6 +136,14 @@
513 return tbs.deviceId
514 }
515
516+func (tbs *testBrokerSession) Levels() broker.LevelsMap {
517+ return nil
518+}
519+
520+func (tbs *testBrokerSession) ExchangeScratchArea() *broker.ExchangesScratchArea {
521+ return nil
522+}
523+
524 func (tb *testBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) {
525 tb.registration <- "register " + connect.DeviceId
526 return &testBrokerSession{connect.DeviceId, nil}, tb.err

Subscribers

People subscribed via source and target branches