Merge lp:~pedronis/ubuntu-push/unicast-store into lp:ubuntu-push/automatic

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 144
Merged at revision: 143
Proposed branch: lp:~pedronis/ubuntu-push/unicast-store
Merge into: lp:ubuntu-push/automatic
Prerequisite: lp:~pedronis/ubuntu-push/unicast-preps
Diff against target: 378 lines (+231/-19)
4 files modified
server/store/inmemory.go (+48/-16)
server/store/inmemory_test.go (+66/-0)
server/store/store.go (+71/-2)
server/store/store_test.go (+46/-1)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/unicast-store
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+217889@code.launchpad.net

Commit message

add support for unicast channels to the pending store

Description of the change

add support for unicast channels to the pending store

add the concept/support for unicast chan identifiers, plus some settling/clarifications

To post a comment you must log in.
143. By Samuele Pedroni

Merged unicast-preps-2 into unicast-store.

Revision history for this message
John Lenton (chipaca) wrote :

Good.

review: Approve
144. By Samuele Pedroni

rename the function DropByMsgId -> FilterOutByMsgId

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'server/store/inmemory.go'
2--- server/store/inmemory.go 2014-05-02 15:13:24 +0000
3+++ server/store/inmemory.go 2014-05-02 15:13:24 +0000
4@@ -24,16 +24,11 @@
5 "launchpad.net/ubuntu-push/protocol"
6 )
7
8-// one stored notification
9-type notification struct {
10- protocol.Notification
11- expiration time.Time
12-}
13-
14 // one stored channel
15 type channel struct {
16 topLevel int64
17- notifications []notification
18+ notifications []protocol.Notification
19+ expirations []time.Time
20 }
21
22 // InMemoryPendingStore is a basic in-memory pending notification store.
23@@ -56,22 +51,34 @@
24 return InternalChannelId(""), ErrUnknownChannel
25 }
26
27-func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notificationPayload json.RawMessage, expiration time.Time) error {
28+func (sto *InMemoryPendingStore) appendToChannel(chanId InternalChannelId, newNotification protocol.Notification, inc int64, expiration time.Time) error {
29 sto.lock.Lock()
30 defer sto.lock.Unlock()
31 prev := sto.store[chanId]
32 if prev == nil {
33 prev = &channel{}
34 }
35- prev.topLevel++
36- prev.notifications = append(prev.notifications, notification{
37- Notification: protocol.Notification{Payload: notificationPayload},
38- expiration: expiration,
39- })
40+ prev.topLevel += inc
41+ prev.notifications = append(prev.notifications, newNotification)
42+ prev.expirations = append(prev.expirations, expiration)
43 sto.store[chanId] = prev
44 return nil
45 }
46
47+func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notificationPayload json.RawMessage, expiration time.Time) error {
48+ newNotification := protocol.Notification{Payload: notificationPayload}
49+ return sto.appendToChannel(chanId, newNotification, 1, expiration)
50+}
51+
52+func (sto *InMemoryPendingStore) AppendToUnicastChannel(chanId InternalChannelId, appId string, notificationPayload json.RawMessage, msgId string, expiration time.Time) error {
53+ newNotification := protocol.Notification{
54+ Payload: notificationPayload,
55+ AppId: appId,
56+ MsgId: msgId,
57+ }
58+ return sto.appendToChannel(chanId, newNotification, 0, expiration)
59+}
60+
61 func (sto *InMemoryPendingStore) GetChannelSnapshot(chanId InternalChannelId) (int64, []protocol.Notification, error) {
62 sto.lock.Lock()
63 defer sto.lock.Unlock()
64@@ -82,13 +89,18 @@
65 topLevel := channel.topLevel
66 n := len(channel.notifications)
67 res := make([]protocol.Notification, 0, n)
68+ exps := make([]time.Time, 0, n)
69 now := time.Now()
70- for _, notification := range channel.notifications {
71- if notification.expiration.Before(now) {
72+ for i, expiration := range channel.expirations {
73+ if expiration.Before(now) {
74 continue
75 }
76- res = append(res, notification.Notification)
77+ res = append(res, channel.notifications[i])
78+ exps = append(exps, expiration)
79 }
80+ // store as well
81+ channel.notifications = res
82+ channel.expirations = exps
83 return topLevel, res, nil
84 }
85
86@@ -96,5 +108,25 @@
87 // ignored
88 }
89
90+func (sto *InMemoryPendingStore) DropByMsgId(chanId InternalChannelId, targets []protocol.Notification) error {
91+ sto.lock.Lock()
92+ defer sto.lock.Unlock()
93+ channel, ok := sto.store[chanId]
94+ if !ok {
95+ return nil
96+ }
97+ expById := make(map[string]time.Time, len(channel.notifications))
98+ for i, notif := range channel.notifications {
99+ expById[notif.MsgId] = channel.expirations[i]
100+ }
101+ channel.notifications = FilterOutByMsgId(channel.notifications, targets)
102+ exps := make([]time.Time, len(channel.notifications))
103+ for i, notif := range channel.notifications {
104+ exps[i] = expById[notif.MsgId]
105+ }
106+ channel.expirations = exps
107+ return nil
108+}
109+
110 // sanity check we implement the interface
111 var _ PendingStore = (*InMemoryPendingStore)(nil)
112
113=== modified file 'server/store/inmemory_test.go'
114--- server/store/inmemory_test.go 2014-05-02 15:13:24 +0000
115+++ server/store/inmemory_test.go 2014-05-02 15:13:24 +0000
116@@ -67,6 +67,32 @@
117 c.Check(res, DeepEquals, help.Ns(notification1, notification2))
118 }
119
120+func (s *inMemorySuite) TestAppendToUnicastChannelAndGetChannelSnapshot(c *C) {
121+ sto := NewInMemoryPendingStore()
122+
123+ chanId := UnicastInternalChannelId("user", "dev1")
124+ notification1 := json.RawMessage(`{"a":1}`)
125+ notification2 := json.RawMessage(`{"b":2}`)
126+ app1 := "app1"
127+ app2 := "app2"
128+ msg1 := "msg1"
129+ msg2 := "msg2"
130+
131+ muchLater := time.Now().Add(time.Minute)
132+
133+ err := sto.AppendToUnicastChannel(chanId, app1, notification1, msg1, muchLater)
134+ c.Assert(err, IsNil)
135+ err = sto.AppendToUnicastChannel(chanId, app2, notification2, msg2, muchLater)
136+ c.Assert(err, IsNil)
137+ top, res, err := sto.GetChannelSnapshot(chanId)
138+ c.Assert(err, IsNil)
139+ c.Check(res, DeepEquals, []protocol.Notification{
140+ protocol.Notification{Payload: notification1, AppId: app1, MsgId: msg1},
141+ protocol.Notification{Payload: notification2, AppId: app2, MsgId: msg2},
142+ })
143+ c.Check(top, Equals, int64(0))
144+}
145+
146 func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshotWithExpiration(c *C) {
147 sto := NewInMemoryPendingStore()
148
149@@ -86,3 +112,43 @@
150 c.Check(top, Equals, int64(2))
151 c.Check(res, DeepEquals, help.Ns(notification1))
152 }
153+
154+func (s *inMemorySuite) TestDropByMsgId(c *C) {
155+ sto := NewInMemoryPendingStore()
156+
157+ chanId := UnicastInternalChannelId("user", "dev2")
158+
159+ // nothing to do is fine
160+ err := sto.DropByMsgId(chanId, nil)
161+ c.Assert(err, IsNil)
162+
163+ notification1 := json.RawMessage(`{"a":1}`)
164+ notification2 := json.RawMessage(`{"b":2}`)
165+ notification3 := json.RawMessage(`{"a":2}`)
166+ app1 := "app1"
167+ app2 := "app2"
168+ msg1 := "msg1"
169+ msg2 := "msg2"
170+ msg3 := "msg3"
171+
172+ muchLater := time.Now().Add(time.Minute)
173+
174+ err = sto.AppendToUnicastChannel(chanId, app1, notification1, msg1, muchLater)
175+ c.Assert(err, IsNil)
176+ err = sto.AppendToUnicastChannel(chanId, app2, notification2, msg2, muchLater)
177+ c.Assert(err, IsNil)
178+ err = sto.AppendToUnicastChannel(chanId, app1, notification3, msg3, muchLater)
179+ c.Assert(err, IsNil)
180+ _, res, err := sto.GetChannelSnapshot(chanId)
181+ c.Assert(err, IsNil)
182+
183+ err = sto.DropByMsgId(chanId, res[:2])
184+ c.Assert(err, IsNil)
185+
186+ _, res, err = sto.GetChannelSnapshot(chanId)
187+ c.Assert(err, IsNil)
188+ c.Check(res, HasLen, 1)
189+ c.Check(res, DeepEquals, []protocol.Notification{
190+ protocol.Notification{Payload: notification3, AppId: app1, MsgId: msg3},
191+ })
192+}
193
194=== modified file 'server/store/store.go'
195--- server/store/store.go 2014-05-02 15:13:24 +0000
196+++ server/store/store.go 2014-05-02 15:13:24 +0000
197@@ -21,6 +21,8 @@
198 "encoding/hex"
199 "encoding/json"
200 "errors"
201+ "fmt"
202+ "strings"
203 "time"
204
205 "launchpad.net/ubuntu-push/protocol"
206@@ -28,6 +30,27 @@
207
208 type InternalChannelId string
209
210+// BroadcastChannel returns whether the id represents a broadcast channel.
211+func (icid InternalChannelId) BroadcastChannel() bool {
212+ marker := icid[0]
213+ return marker == 'B' || marker == '0'
214+}
215+
216+// UnicastChannel returns whether the id represents a unicast channel.
217+func (icid InternalChannelId) UnicastChannel() bool {
218+ marker := icid[0]
219+ return marker == 'U'
220+}
221+
222+// UnicastUserAndDevice returns the user and device ids of a unicast channel.
223+func (icid InternalChannelId) UnicastUserAndDevice() (userId, deviceId string) {
224+ if !icid.UnicastChannel() {
225+ panic("UnicastUserAndDevice is for unicast channels")
226+ }
227+ parts := strings.SplitN(string(icid)[1:], ":", 2)
228+ return parts[0], parts[1]
229+}
230+
231 var ErrUnknownChannel = errors.New("unknown channel name")
232 var ErrFull = errors.New("channel is full")
233 var ErrExpected128BitsHexRepr = errors.New("expected 128 bits hex repr")
234@@ -38,7 +61,10 @@
235 if chanId == SystemInternalChannelId {
236 return "0"
237 }
238- panic("general InternalChannelIdToHex not implemeted yet")
239+ if !chanId.BroadcastChannel() {
240+ panic("InternalChannelIdToHex is for broadcast channels")
241+ }
242+ return string(chanId)[1:]
243 }
244
245 var zero128 [16]byte
246@@ -60,7 +86,14 @@
247 if idbytes == zero128 {
248 return SystemInternalChannelId, nil
249 }
250- return InternalChannelId(idbytes[:]), nil
251+ // mark with B(roadcast) prefix
252+ s := "B" + hexRepr
253+ return InternalChannelId(s), nil
254+}
255+
256+// UnicastInternalChannelId builds a channel id for the userId, deviceId pair.
257+func UnicastInternalChannelId(userId, deviceId string) InternalChannelId {
258+ return InternalChannelId(fmt.Sprintf("U%s:%s", userId, deviceId))
259 }
260
261 // PendingStore let store notifications into channels.
262@@ -70,9 +103,45 @@
263 GetInternalChannelId(name string) (InternalChannelId, error)
264 // AppendToChannel appends a notification to the channel.
265 AppendToChannel(chanId InternalChannelId, notification json.RawMessage, expiration time.Time) error
266+ // AppendToUnicastChannel appends a notification to the unicast channel.
267 // GetChannelSnapshot gets all the current notifications and
268+ AppendToUnicastChannel(chanId InternalChannelId, appId string, notification json.RawMessage, msgId string, expiration time.Time) error
269 // current top level in the channel.
270 GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, notifications []protocol.Notification, err error)
271+ // DropByMsgId drops notifications from a unicast channel based on message ids.
272+ DropByMsgId(chanId InternalChannelId, targets []protocol.Notification) error
273 // Close is to be called when done with the store.
274 Close()
275 }
276+
277+// FilterOutByMsgId returns the notifications from orig whose msg id is not
278+// mentioned in targets.
279+func FilterOutByMsgId(orig, targets []protocol.Notification) []protocol.Notification {
280+ n := len(orig)
281+ t := len(targets)
282+ // common case, removing the continuous head
283+ if t > 0 && n >= t {
284+ if targets[0].MsgId == orig[0].MsgId {
285+ for i := t - 1; i >= 0; i-- {
286+ if i == 0 {
287+ return orig[t:]
288+ }
289+ if targets[i].MsgId != orig[i].MsgId {
290+ break
291+ }
292+ }
293+ }
294+ }
295+ // slow way
296+ ids := make(map[string]bool, t)
297+ for _, target := range targets {
298+ ids[target.MsgId] = true
299+ }
300+ acc := make([]protocol.Notification, 0, n)
301+ for _, notif := range orig {
302+ if !ids[notif.MsgId] {
303+ acc = append(acc, notif)
304+ }
305+ }
306+ return acc
307+}
308
309=== modified file 'server/store/store_test.go'
310--- server/store/store_test.go 2014-02-10 23:19:08 +0000
311+++ server/store/store_test.go 2014-05-02 15:13:24 +0000
312@@ -33,6 +33,8 @@
313
314 func (s *storeSuite) TestInternalChannelIdToHex(c *C) {
315 c.Check(InternalChannelIdToHex(SystemInternalChannelId), Equals, protocol.SystemChannelId)
316+ c.Check(InternalChannelIdToHex(InternalChannelId("Bf1c9bf7096084cb2a154979ce00c7f50")), Equals, "f1c9bf7096084cb2a154979ce00c7f50")
317+ c.Check(func() { InternalChannelIdToHex(InternalChannelId("U")) }, PanicMatches, "InternalChannelIdToHex is for broadcast channels")
318 }
319
320 func (s *storeSuite) TestHexToInternalChannelId(c *C) {
321@@ -42,9 +44,11 @@
322 i1, err := HexToInternalChannelId("00000000000000000000000000000000")
323 c.Check(err, IsNil)
324 c.Check(i1, Equals, SystemInternalChannelId)
325+ c.Check(i1.BroadcastChannel(), Equals, true)
326 i2, err := HexToInternalChannelId("f1c9bf7096084cb2a154979ce00c7f50")
327 c.Check(err, IsNil)
328- c.Check(i2, Equals, InternalChannelId("\xf1\xc9\xbf\x70\x96\x08\x4c\xb2\xa1\x54\x97\x9c\xe0\x0c\x7f\x50"))
329+ c.Check(i2.BroadcastChannel(), Equals, true)
330+ c.Check(i2, Equals, InternalChannelId("Bf1c9bf7096084cb2a154979ce00c7f50"))
331 _, err = HexToInternalChannelId("01")
332 c.Check(err, Equals, ErrExpected128BitsHexRepr)
333 _, err = HexToInternalChannelId("abceddddddddddddddddzeeeeeeeeeee")
334@@ -52,3 +56,44 @@
335 _, err = HexToInternalChannelId("f1c9bf7096084cb2a154979ce00c7f50ff")
336 c.Check(err, Equals, ErrExpected128BitsHexRepr)
337 }
338+
339+func (s *storeSuite) TestUnicastInternalChannelId(c *C) {
340+ chanId := UnicastInternalChannelId("user1", "dev2")
341+ c.Check(chanId.BroadcastChannel(), Equals, false)
342+ c.Check(chanId.UnicastChannel(), Equals, true)
343+ u, d := chanId.UnicastUserAndDevice()
344+ c.Check(u, Equals, "user1")
345+ c.Check(d, Equals, "dev2")
346+ c.Check(func() { SystemInternalChannelId.UnicastUserAndDevice() }, PanicMatches, "UnicastUserAndDevice is for unicast channels")
347+}
348+
349+func (s *storeSuite) TestFilterOutByMsgId(c *C) {
350+ orig := []protocol.Notification{
351+ protocol.Notification{MsgId: "a"},
352+ protocol.Notification{MsgId: "b"},
353+ protocol.Notification{MsgId: "c"},
354+ protocol.Notification{MsgId: "d"},
355+ }
356+ // removing the continuous head
357+ res := FilterOutByMsgId(orig, orig[:3])
358+ c.Check(res, DeepEquals, orig[3:])
359+
360+ // random removal
361+ res = FilterOutByMsgId(orig, orig[1:2])
362+ c.Check(res, DeepEquals, []protocol.Notification{
363+ protocol.Notification{MsgId: "a"},
364+ protocol.Notification{MsgId: "c"},
365+ protocol.Notification{MsgId: "d"},
366+ })
367+
368+ // looks like removing the continuous head, but it isn't
369+ res = FilterOutByMsgId(orig, []protocol.Notification{
370+ protocol.Notification{MsgId: "a"},
371+ protocol.Notification{MsgId: "c"},
372+ protocol.Notification{MsgId: "d"},
373+ })
374+ c.Check(res, DeepEquals, []protocol.Notification{
375+ protocol.Notification{MsgId: "b"},
376+ })
377+
378+}

Subscribers

People subscribed via source and target branches