Merge lp:~pedronis/ubuntu-push/unicast-preps into lp:ubuntu-push/automatic
- unicast-preps
- Merge into automatic
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 139 |
Merged at revision: | 142 |
Proposed branch: | lp:~pedronis/ubuntu-push/unicast-preps |
Merge into: | lp:ubuntu-push/automatic |
Diff against target: |
645 lines (+137/-93) 13 files modified
protocol/messages.go (+13/-0) protocol/messages_test.go (+8/-0) server/api/handlers_test.go (+3/-2) server/broker/exchanges.go (+19/-19) server/broker/exchanges_test.go (+22/-21) server/broker/exchg_impl_test.go (+19/-17) server/broker/simple/simple.go (+8/-8) server/broker/simple/simple_test.go (+5/-4) server/broker/testsuite/suite.go (+12/-12) server/store/inmemory.go (+8/-6) server/store/inmemory_test.go (+6/-3) server/store/store.go (+3/-1) testing/helpers.go (+11/-0) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/unicast-preps |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email: mp+217656@code.launchpad.net |
Commit message
reorganize things such that GetChannelSnapshot returns a bunch of Notification (with optional AppId, MsgId) so that it can be used together with the underlying store to store unicast notification user/device channels
Description of the change
reorganize things such that GetChannelSnapshot returns a bunch of Notification (with optional AppId, MsgId) so that it can be used together with the underlying store to store unicast notification user/device channels
these are the main changes:
608 - GetChannelSnaps
609 + GetChannelSnaps
73 type BroadcastExchange struct {
74 - ChanId store.InternalC
75 - TopLevel int64
76 - NotificationPay
77 - Decoded []map[string]
78 + ChanId store.InternalC
79 + TopLevel int64
80 + Notifications []protocol.
81 + Decoded []map[string]
82 }
and the rest follows from these
John Lenton (chipaca) : | # |
Preview Diff
1 | === modified file 'protocol/messages.go' |
2 | --- protocol/messages.go 2014-04-16 12:27:57 +0000 |
3 | +++ protocol/messages.go 2014-04-29 18:02:51 +0000 |
4 | @@ -162,6 +162,19 @@ |
5 | Payload json.RawMessage `json:"P"` |
6 | } |
7 | |
8 | +// ExtractPayloads gets only the payloads out of a slice of notications. |
9 | +func ExtractPayloads(notifications []Notification) []json.RawMessage { |
10 | + n := len(notifications) |
11 | + if n == 0 { |
12 | + return nil |
13 | + } |
14 | + payloads := make([]json.RawMessage, n) |
15 | + for i := 0; i < n; i++ { |
16 | + payloads[i] = notifications[i].Payload |
17 | + } |
18 | + return payloads |
19 | +} |
20 | + |
21 | // ACKnowledgement message |
22 | type AckMsg struct { |
23 | Type string `json:"T"` |
24 | |
25 | === modified file 'protocol/messages_test.go' |
26 | --- protocol/messages_test.go 2014-04-16 12:22:56 +0000 |
27 | +++ protocol/messages_test.go 2014-04-29 18:02:51 +0000 |
28 | @@ -115,3 +115,11 @@ |
29 | c.Check(m.Split(), Equals, true) |
30 | c.Check(m.OnewayContinue(), Equals, true) |
31 | } |
32 | + |
33 | +func (s *messagesSuite) TestExtractPayloads(c *C) { |
34 | + c.Check(ExtractPayloads(nil), IsNil) |
35 | + p1 := json.RawMessage(`{"a":1}`) |
36 | + p2 := json.RawMessage(`{"b":2}`) |
37 | + ns := []Notification{Notification{Payload: p1}, Notification{Payload: p2}} |
38 | + c.Check(ExtractPayloads(ns), DeepEquals, []json.RawMessage{p1, p2}) |
39 | +} |
40 | |
41 | === modified file 'server/api/handlers_test.go' |
42 | --- server/api/handlers_test.go 2014-02-21 11:32:38 +0000 |
43 | +++ server/api/handlers_test.go 2014-04-29 18:02:51 +0000 |
44 | @@ -30,6 +30,7 @@ |
45 | |
46 | . "launchpad.net/gocheck" |
47 | |
48 | + "launchpad.net/ubuntu-push/protocol" |
49 | "launchpad.net/ubuntu-push/server/store" |
50 | helpers "launchpad.net/ubuntu-push/testing" |
51 | ) |
52 | @@ -142,11 +143,11 @@ |
53 | } |
54 | |
55 | func (cbsend *checkBrokerSending) Broadcast(chanId store.InternalChannelId) { |
56 | - top, payloads, err := cbsend.store.GetChannelSnapshot(chanId) |
57 | + top, notifications, err := cbsend.store.GetChannelSnapshot(chanId) |
58 | cbsend.err = err |
59 | cbsend.chanId = chanId |
60 | cbsend.top = top |
61 | - cbsend.payloads = payloads |
62 | + cbsend.payloads = protocol.ExtractPayloads(notifications) |
63 | } |
64 | |
65 | func (s *handlersSuite) TestDoBroadcast(c *C) { |
66 | |
67 | === modified file 'server/broker/exchanges.go' |
68 | --- server/broker/exchanges.go 2014-04-16 13:29:03 +0000 |
69 | +++ server/broker/exchanges.go 2014-04-29 18:02:51 +0000 |
70 | @@ -35,10 +35,10 @@ |
71 | // BroadcastExchange leads a session through delivering a BROADCAST. |
72 | // For simplicity it is fully public. |
73 | type BroadcastExchange struct { |
74 | - ChanId store.InternalChannelId |
75 | - TopLevel int64 |
76 | - NotificationPayloads []json.RawMessage |
77 | - Decoded []map[string]interface{} |
78 | + ChanId store.InternalChannelId |
79 | + TopLevel int64 |
80 | + Notifications []protocol.Notification |
81 | + Decoded []map[string]interface{} |
82 | } |
83 | |
84 | // check interface already here |
85 | @@ -46,18 +46,18 @@ |
86 | |
87 | // Init ensures the BroadcastExchange is fully initialized for the sessions. |
88 | func (sbe *BroadcastExchange) Init() { |
89 | - decoded := make([]map[string]interface{}, len(sbe.NotificationPayloads)) |
90 | + decoded := make([]map[string]interface{}, len(sbe.Notifications)) |
91 | sbe.Decoded = decoded |
92 | - for i, p := range sbe.NotificationPayloads { |
93 | - err := json.Unmarshal(p, &decoded[i]) |
94 | + for i, notif := range sbe.Notifications { |
95 | + err := json.Unmarshal(notif.Payload, &decoded[i]) |
96 | if err != nil { |
97 | decoded[i] = nil |
98 | } |
99 | } |
100 | } |
101 | |
102 | -func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage { |
103 | - c := int64(len(payloads)) |
104 | +func filterByLevel(clientLevel, topLevel int64, notifs []protocol.Notification) []protocol.Notification { |
105 | + c := int64(len(notifs)) |
106 | if c == 0 { |
107 | return nil |
108 | } |
109 | @@ -66,32 +66,32 @@ |
110 | delta = 1 |
111 | } |
112 | if delta < c { |
113 | - return payloads[c-delta:] |
114 | + return notifs[c-delta:] |
115 | } else { |
116 | - return payloads |
117 | + return notifs |
118 | } |
119 | } |
120 | |
121 | -func channelFilter(tag string, chanId store.InternalChannelId, payloads []json.RawMessage, decoded []map[string]interface{}) []json.RawMessage { |
122 | - if len(payloads) != 0 && chanId == store.SystemInternalChannelId { |
123 | - decoded := decoded[len(decoded)-len(payloads):] |
124 | +func channelFilter(tag string, chanId store.InternalChannelId, notifs []protocol.Notification, decoded []map[string]interface{}) []json.RawMessage { |
125 | + if len(notifs) != 0 && chanId == store.SystemInternalChannelId { |
126 | + decoded := decoded[len(decoded)-len(notifs):] |
127 | filtered := make([]json.RawMessage, 0) |
128 | for i, decoded1 := range decoded { |
129 | if _, ok := decoded1[tag]; ok { |
130 | - filtered = append(filtered, payloads[i]) |
131 | + filtered = append(filtered, notifs[i].Payload) |
132 | } |
133 | } |
134 | - payloads = filtered |
135 | + return filtered |
136 | } |
137 | - return payloads |
138 | + return protocol.ExtractPayloads(notifs) |
139 | } |
140 | |
141 | // Prepare session for a BROADCAST. |
142 | func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
143 | clientLevel := sess.Levels()[sbe.ChanId] |
144 | - payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads) |
145 | + notifs := filterByLevel(clientLevel, sbe.TopLevel, sbe.Notifications) |
146 | tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel()) |
147 | - payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded) |
148 | + payloads := channelFilter(tag, sbe.ChanId, notifs, sbe.Decoded) |
149 | if len(payloads) == 0 && sbe.TopLevel >= clientLevel { |
150 | // empty and don't need to force resync => do nothing |
151 | return nil, nil, ErrNop |
152 | |
153 | === modified file 'server/broker/exchanges_test.go' |
154 | --- server/broker/exchanges_test.go 2014-04-16 12:22:56 +0000 |
155 | +++ server/broker/exchanges_test.go 2014-04-29 18:02:51 +0000 |
156 | @@ -28,6 +28,7 @@ |
157 | "launchpad.net/ubuntu-push/server/broker" |
158 | "launchpad.net/ubuntu-push/server/broker/testing" |
159 | "launchpad.net/ubuntu-push/server/store" |
160 | + help "launchpad.net/ubuntu-push/testing" |
161 | ) |
162 | |
163 | func TestBroker(t *stdtesting.T) { TestingT(t) } |
164 | @@ -40,11 +41,11 @@ |
165 | exchg := &broker.BroadcastExchange{ |
166 | ChanId: store.SystemInternalChannelId, |
167 | TopLevel: 3, |
168 | - NotificationPayloads: []json.RawMessage{ |
169 | + Notifications: help.Ns( |
170 | json.RawMessage(`{"a":"x"}`), |
171 | json.RawMessage(`[]`), |
172 | json.RawMessage(`{"a":"y"}`), |
173 | - }, |
174 | + ), |
175 | } |
176 | exchg.Init() |
177 | c.Check(exchg.Decoded, DeepEquals, []map[string]interface{}{ |
178 | @@ -63,10 +64,10 @@ |
179 | exchg := &broker.BroadcastExchange{ |
180 | ChanId: store.SystemInternalChannelId, |
181 | TopLevel: 3, |
182 | - NotificationPayloads: []json.RawMessage{ |
183 | + Notifications: help.Ns( |
184 | json.RawMessage(`{"img1/m1":100}`), |
185 | json.RawMessage(`{"img2/m2":200}`), |
186 | - }, |
187 | + ), |
188 | } |
189 | exchg.Init() |
190 | outMsg, inMsg, err := exchg.Prepare(sess) |
191 | @@ -89,9 +90,9 @@ |
192 | ImageChannel: "img1", |
193 | } |
194 | exchg := &broker.BroadcastExchange{ |
195 | - ChanId: store.SystemInternalChannelId, |
196 | - TopLevel: 3, |
197 | - NotificationPayloads: []json.RawMessage{}, |
198 | + ChanId: store.SystemInternalChannelId, |
199 | + TopLevel: 3, |
200 | + Notifications: []protocol.Notification{}, |
201 | } |
202 | exchg.Init() |
203 | outMsg, inMsg, err := exchg.Prepare(sess) |
204 | @@ -109,9 +110,9 @@ |
205 | ImageChannel: "img1", |
206 | } |
207 | exchg := &broker.BroadcastExchange{ |
208 | - ChanId: store.SystemInternalChannelId, |
209 | - TopLevel: 3, |
210 | - NotificationPayloads: []json.RawMessage{}, |
211 | + ChanId: store.SystemInternalChannelId, |
212 | + TopLevel: 3, |
213 | + Notifications: []protocol.Notification{}, |
214 | } |
215 | exchg.Init() |
216 | outMsg, inMsg, err := exchg.Prepare(sess) |
217 | @@ -134,9 +135,9 @@ |
218 | |
219 | topLevel := int64(len(needsSplitting)) |
220 | exchg := &broker.BroadcastExchange{ |
221 | - ChanId: store.SystemInternalChannelId, |
222 | - TopLevel: topLevel, |
223 | - NotificationPayloads: needsSplitting, |
224 | + ChanId: store.SystemInternalChannelId, |
225 | + TopLevel: topLevel, |
226 | + Notifications: help.Ns(needsSplitting...), |
227 | } |
228 | exchg.Init() |
229 | outMsg, _, err := exchg.Prepare(sess) |
230 | @@ -153,10 +154,10 @@ |
231 | exchg = &broker.BroadcastExchange{ |
232 | ChanId: store.SystemInternalChannelId, |
233 | TopLevel: topLevel + 2, |
234 | - NotificationPayloads: []json.RawMessage{ |
235 | + Notifications: help.Ns( |
236 | json.RawMessage(`{"img1/m1":"x"}`), |
237 | json.RawMessage(`{"img1/m1":"y"}`), |
238 | - }, |
239 | + ), |
240 | } |
241 | exchg.Init() |
242 | outMsg, _, err = exchg.Prepare(sess) |
243 | @@ -174,9 +175,9 @@ |
244 | exchg := &broker.BroadcastExchange{ |
245 | ChanId: store.SystemInternalChannelId, |
246 | TopLevel: 3, |
247 | - NotificationPayloads: []json.RawMessage{ |
248 | + Notifications: help.Ns( |
249 | json.RawMessage(`{"img2/m1":1}`), |
250 | - }, |
251 | + ), |
252 | } |
253 | exchg.Init() |
254 | outMsg, inMsg, err := exchg.Prepare(sess) |
255 | @@ -203,10 +204,10 @@ |
256 | exchg := &broker.BroadcastExchange{ |
257 | ChanId: store.SystemInternalChannelId, |
258 | TopLevel: 3, |
259 | - NotificationPayloads: []json.RawMessage{ |
260 | + Notifications: help.Ns( |
261 | json.RawMessage(`{"img1/m1":100}`), |
262 | json.RawMessage(`{"img1/m1":101}`), |
263 | - }, |
264 | + ), |
265 | } |
266 | exchg.Init() |
267 | outMsg, inMsg, err := exchg.Prepare(sess) |
268 | @@ -230,11 +231,11 @@ |
269 | exchg := &broker.BroadcastExchange{ |
270 | ChanId: store.SystemInternalChannelId, |
271 | TopLevel: 5, |
272 | - NotificationPayloads: []json.RawMessage{ |
273 | + Notifications: help.Ns( |
274 | json.RawMessage(`{"img1/m1":100}`), |
275 | json.RawMessage(`{"img2/m2":200}`), |
276 | json.RawMessage(`{"img1/m1":101}`), |
277 | - }, |
278 | + ), |
279 | } |
280 | exchg.Init() |
281 | outMsg, inMsg, err := exchg.Prepare(sess) |
282 | |
283 | === modified file 'server/broker/exchg_impl_test.go' |
284 | --- server/broker/exchg_impl_test.go 2014-04-03 16:00:53 +0000 |
285 | +++ server/broker/exchg_impl_test.go 2014-04-29 18:02:51 +0000 |
286 | @@ -22,6 +22,7 @@ |
287 | . "launchpad.net/gocheck" |
288 | |
289 | "launchpad.net/ubuntu-push/server/store" |
290 | + help "launchpad.net/ubuntu-push/testing" |
291 | ) |
292 | |
293 | type exchangesImplSuite struct{} |
294 | @@ -29,27 +30,27 @@ |
295 | var _ = Suite(&exchangesImplSuite{}) |
296 | |
297 | func (s *exchangesImplSuite) TestFilterByLevel(c *C) { |
298 | - payloads := []json.RawMessage{ |
299 | + notifs := help.Ns( |
300 | json.RawMessage(`{"a": 3}`), |
301 | json.RawMessage(`{"a": 4}`), |
302 | json.RawMessage(`{"a": 5}`), |
303 | - } |
304 | - res := filterByLevel(5, 5, payloads) |
305 | + ) |
306 | + res := filterByLevel(5, 5, notifs) |
307 | c.Check(len(res), Equals, 0) |
308 | - res = filterByLevel(4, 5, payloads) |
309 | + res = filterByLevel(4, 5, notifs) |
310 | c.Check(len(res), Equals, 1) |
311 | - c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`)) |
312 | - res = filterByLevel(3, 5, payloads) |
313 | + c.Check(res[0].Payload, DeepEquals, json.RawMessage(`{"a": 5}`)) |
314 | + res = filterByLevel(3, 5, notifs) |
315 | c.Check(len(res), Equals, 2) |
316 | - c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`)) |
317 | - res = filterByLevel(2, 5, payloads) |
318 | + c.Check(res[0].Payload, DeepEquals, json.RawMessage(`{"a": 4}`)) |
319 | + res = filterByLevel(2, 5, notifs) |
320 | c.Check(len(res), Equals, 3) |
321 | - res = filterByLevel(1, 5, payloads) |
322 | + res = filterByLevel(1, 5, notifs) |
323 | c.Check(len(res), Equals, 3) |
324 | // too ahead, pick only last |
325 | - res = filterByLevel(10, 5, payloads) |
326 | + res = filterByLevel(10, 5, notifs) |
327 | c.Check(len(res), Equals, 1) |
328 | - c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`)) |
329 | + c.Check(res[0].Payload, DeepEquals, json.RawMessage(`{"a": 5}`)) |
330 | } |
331 | |
332 | func (s *exchangesImplSuite) TestFilterByLevelEmpty(c *C) { |
333 | @@ -71,18 +72,19 @@ |
334 | err := json.Unmarshal(p, &decoded[i]) |
335 | c.Assert(err, IsNil) |
336 | } |
337 | + notifs := help.Ns(payloads...) |
338 | |
339 | other := store.InternalChannelId("1") |
340 | |
341 | c.Check(channelFilter("", store.SystemInternalChannelId, nil, nil), IsNil) |
342 | - c.Check(channelFilter("", other, payloads[1:], decoded), DeepEquals, payloads[1:]) |
343 | + c.Check(channelFilter("", other, notifs[1:], decoded), DeepEquals, payloads[1:]) |
344 | |
345 | // use tag when channel is the sytem channel |
346 | |
347 | - c.Check(channelFilter("c/z", store.SystemInternalChannelId, payloads, decoded), HasLen, 0) |
348 | - |
349 | - c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]}) |
350 | - |
351 | - c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]}) |
352 | + c.Check(channelFilter("c/z", store.SystemInternalChannelId, notifs, decoded), HasLen, 0) |
353 | + |
354 | + c.Check(channelFilter("a/x", store.SystemInternalChannelId, notifs, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]}) |
355 | + |
356 | + c.Check(channelFilter("a/x", store.SystemInternalChannelId, notifs[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]}) |
357 | |
358 | } |
359 | |
360 | === modified file 'server/broker/simple/simple.go' |
361 | --- server/broker/simple/simple.go 2014-04-17 15:42:27 +0000 |
362 | +++ server/broker/simple/simple.go 2014-04-29 18:02:51 +0000 |
363 | @@ -144,7 +144,7 @@ |
364 | // find relevant channels, for now only system |
365 | channels := []store.InternalChannelId{store.SystemInternalChannelId} |
366 | for _, chanId := range channels { |
367 | - topLevel, payloads, err := b.sto.GetChannelSnapshot(chanId) |
368 | + topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId) |
369 | if err != nil { |
370 | // next broadcast will try again |
371 | b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err) |
372 | @@ -153,9 +153,9 @@ |
373 | clientLevel := sess.levels[chanId] |
374 | if clientLevel != topLevel { |
375 | broadcastExchg := &broker.BroadcastExchange{ |
376 | - ChanId: chanId, |
377 | - TopLevel: topLevel, |
378 | - NotificationPayloads: payloads, |
379 | + ChanId: chanId, |
380 | + TopLevel: topLevel, |
381 | + Notifications: notifications, |
382 | } |
383 | broadcastExchg.Init() |
384 | sess.exchanges <- broadcastExchg |
385 | @@ -233,16 +233,16 @@ |
386 | case delivery := <-b.deliveryCh: |
387 | switch delivery.kind { |
388 | case broadcastDelivery: |
389 | - topLevel, payloads, err := b.sto.GetChannelSnapshot(delivery.chanId) |
390 | + topLevel, notifications, err := b.sto.GetChannelSnapshot(delivery.chanId) |
391 | if err != nil { |
392 | // next broadcast will try again |
393 | b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err) |
394 | continue Loop |
395 | } |
396 | broadcastExchg := &broker.BroadcastExchange{ |
397 | - ChanId: delivery.chanId, |
398 | - TopLevel: topLevel, |
399 | - NotificationPayloads: payloads, |
400 | + ChanId: delivery.chanId, |
401 | + TopLevel: topLevel, |
402 | + Notifications: notifications, |
403 | } |
404 | broadcastExchg.Init() |
405 | for _, sess := range b.registry { |
406 | |
407 | === modified file 'server/broker/simple/simple_test.go' |
408 | --- server/broker/simple/simple_test.go 2014-04-03 16:00:53 +0000 |
409 | +++ server/broker/simple/simple_test.go 2014-04-29 18:02:51 +0000 |
410 | @@ -26,6 +26,7 @@ |
411 | "launchpad.net/ubuntu-push/server/broker" |
412 | "launchpad.net/ubuntu-push/server/broker/testing" |
413 | "launchpad.net/ubuntu-push/server/store" |
414 | + help "launchpad.net/ubuntu-push/testing" |
415 | ) |
416 | |
417 | func TestSimple(t *stdtesting.T) { TestingT(t) } |
418 | @@ -58,10 +59,10 @@ |
419 | c.Assert(len(sess.exchanges), Equals, 1) |
420 | exchg1 := <-sess.exchanges |
421 | c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{ |
422 | - ChanId: store.SystemInternalChannelId, |
423 | - TopLevel: 1, |
424 | - NotificationPayloads: []json.RawMessage{notification1}, |
425 | - Decoded: []map[string]interface{}{decoded1}, |
426 | + ChanId: store.SystemInternalChannelId, |
427 | + TopLevel: 1, |
428 | + Notifications: help.Ns(notification1), |
429 | + Decoded: []map[string]interface{}{decoded1}, |
430 | }) |
431 | } |
432 | |
433 | |
434 | === modified file 'server/broker/testsuite/suite.go' |
435 | --- server/broker/testsuite/suite.go 2014-04-17 15:42:27 +0000 |
436 | +++ server/broker/testsuite/suite.go 2014-04-29 18:02:51 +0000 |
437 | @@ -30,7 +30,7 @@ |
438 | "launchpad.net/ubuntu-push/server/broker" |
439 | "launchpad.net/ubuntu-push/server/broker/testing" |
440 | "launchpad.net/ubuntu-push/server/store" |
441 | - helpers "launchpad.net/ubuntu-push/testing" |
442 | + help "launchpad.net/ubuntu-push/testing" |
443 | ) |
444 | |
445 | // The expected interface for tested brokers. |
446 | @@ -51,11 +51,11 @@ |
447 | // Let us get to a broker.BroadcastExchange from an Exchange. |
448 | RevealBroadcastExchange func(broker.Exchange) *broker.BroadcastExchange |
449 | // private |
450 | - testlog *helpers.TestLogger |
451 | + testlog *help.TestLogger |
452 | } |
453 | |
454 | func (s *CommonBrokerSuite) SetUpTest(c *C) { |
455 | - s.testlog = helpers.NewTestLogger(c, "error") |
456 | + s.testlog = help.NewTestLogger(c, "error") |
457 | } |
458 | |
459 | var testBrokerConfig = &testing.TestBrokerConfig{10, 5} |
460 | @@ -203,10 +203,10 @@ |
461 | c.Fatal("taking too long to get broadcast exchange") |
462 | case exchg1 := <-sess1.SessionChannel(): |
463 | c.Check(s.RevealBroadcastExchange(exchg1), DeepEquals, &broker.BroadcastExchange{ |
464 | - ChanId: store.SystemInternalChannelId, |
465 | - TopLevel: 1, |
466 | - NotificationPayloads: []json.RawMessage{notification1}, |
467 | - Decoded: []map[string]interface{}{decoded1}, |
468 | + ChanId: store.SystemInternalChannelId, |
469 | + TopLevel: 1, |
470 | + Notifications: help.Ns(notification1), |
471 | + Decoded: []map[string]interface{}{decoded1}, |
472 | }) |
473 | } |
474 | select { |
475 | @@ -214,10 +214,10 @@ |
476 | c.Fatal("taking too long to get broadcast exchange") |
477 | case exchg2 := <-sess2.SessionChannel(): |
478 | c.Check(s.RevealBroadcastExchange(exchg2), DeepEquals, &broker.BroadcastExchange{ |
479 | - ChanId: store.SystemInternalChannelId, |
480 | - TopLevel: 1, |
481 | - NotificationPayloads: []json.RawMessage{notification1}, |
482 | - Decoded: []map[string]interface{}{decoded1}, |
483 | + ChanId: store.SystemInternalChannelId, |
484 | + TopLevel: 1, |
485 | + Notifications: help.Ns(notification1), |
486 | + Decoded: []map[string]interface{}{decoded1}, |
487 | }) |
488 | } |
489 | } |
490 | @@ -227,7 +227,7 @@ |
491 | countdownToFail int |
492 | } |
493 | |
494 | -func (sto *testFailingStore) GetChannelSnapshot(chanId store.InternalChannelId) (int64, []json.RawMessage, error) { |
495 | +func (sto *testFailingStore) GetChannelSnapshot(chanId store.InternalChannelId) (int64, []protocol.Notification, error) { |
496 | if sto.countdownToFail == 0 { |
497 | return 0, nil, errors.New("get channel snapshot fail") |
498 | } |
499 | |
500 | === modified file 'server/store/inmemory.go' |
501 | --- server/store/inmemory.go 2014-02-18 14:19:05 +0000 |
502 | +++ server/store/inmemory.go 2014-04-29 18:02:51 +0000 |
503 | @@ -20,11 +20,13 @@ |
504 | "encoding/json" |
505 | "sync" |
506 | "time" |
507 | + |
508 | + "launchpad.net/ubuntu-push/protocol" |
509 | ) |
510 | |
511 | // one stored notification |
512 | type notification struct { |
513 | - payload json.RawMessage |
514 | + protocol.Notification |
515 | expiration time.Time |
516 | } |
517 | |
518 | @@ -63,14 +65,14 @@ |
519 | } |
520 | prev.topLevel++ |
521 | prev.notifications = append(prev.notifications, notification{ |
522 | - payload: notificationPayload, |
523 | - expiration: expiration, |
524 | + Notification: protocol.Notification{Payload: notificationPayload}, |
525 | + expiration: expiration, |
526 | }) |
527 | sto.store[chanId] = prev |
528 | return nil |
529 | } |
530 | |
531 | -func (sto *InMemoryPendingStore) GetChannelSnapshot(chanId InternalChannelId) (int64, []json.RawMessage, error) { |
532 | +func (sto *InMemoryPendingStore) GetChannelSnapshot(chanId InternalChannelId) (int64, []protocol.Notification, error) { |
533 | sto.lock.Lock() |
534 | defer sto.lock.Unlock() |
535 | channel, ok := sto.store[chanId] |
536 | @@ -79,13 +81,13 @@ |
537 | } |
538 | topLevel := channel.topLevel |
539 | n := len(channel.notifications) |
540 | - res := make([]json.RawMessage, 0, n) |
541 | + res := make([]protocol.Notification, 0, n) |
542 | now := time.Now() |
543 | for _, notification := range channel.notifications { |
544 | if notification.expiration.Before(now) { |
545 | continue |
546 | } |
547 | - res = append(res, notification.payload) |
548 | + res = append(res, notification.Notification) |
549 | } |
550 | return topLevel, res, nil |
551 | } |
552 | |
553 | === modified file 'server/store/inmemory_test.go' |
554 | --- server/store/inmemory_test.go 2014-02-14 12:38:38 +0000 |
555 | +++ server/store/inmemory_test.go 2014-04-29 18:02:51 +0000 |
556 | @@ -21,6 +21,9 @@ |
557 | "time" |
558 | |
559 | . "launchpad.net/gocheck" |
560 | + |
561 | + "launchpad.net/ubuntu-push/protocol" |
562 | + help "launchpad.net/ubuntu-push/testing" |
563 | ) |
564 | |
565 | type inMemorySuite struct{} |
566 | @@ -45,7 +48,7 @@ |
567 | top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId) |
568 | c.Assert(err, IsNil) |
569 | c.Check(top, Equals, int64(0)) |
570 | - c.Check(res, DeepEquals, []json.RawMessage(nil)) |
571 | + c.Check(res, DeepEquals, []protocol.Notification(nil)) |
572 | } |
573 | |
574 | func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshot(c *C) { |
575 | @@ -61,7 +64,7 @@ |
576 | top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId) |
577 | c.Assert(err, IsNil) |
578 | c.Check(top, Equals, int64(2)) |
579 | - c.Check(res, DeepEquals, []json.RawMessage{notification1, notification2}) |
580 | + c.Check(res, DeepEquals, help.Ns(notification1, notification2)) |
581 | } |
582 | |
583 | func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshotWithExpiration(c *C) { |
584 | @@ -81,5 +84,5 @@ |
585 | top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId) |
586 | c.Assert(err, IsNil) |
587 | c.Check(top, Equals, int64(2)) |
588 | - c.Check(res, DeepEquals, []json.RawMessage{notification1}) |
589 | + c.Check(res, DeepEquals, help.Ns(notification1)) |
590 | } |
591 | |
592 | === modified file 'server/store/store.go' |
593 | --- server/store/store.go 2014-02-18 13:43:07 +0000 |
594 | +++ server/store/store.go 2014-04-29 18:02:51 +0000 |
595 | @@ -22,6 +22,8 @@ |
596 | "encoding/json" |
597 | "errors" |
598 | "time" |
599 | + |
600 | + "launchpad.net/ubuntu-push/protocol" |
601 | ) |
602 | |
603 | type InternalChannelId string |
604 | @@ -70,7 +72,7 @@ |
605 | AppendToChannel(chanId InternalChannelId, notification json.RawMessage, expiration time.Time) error |
606 | // GetChannelSnapshot gets all the current notifications and |
607 | // current top level in the channel. |
608 | - GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, payloads []json.RawMessage, err error) |
609 | + GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, notifications []protocol.Notification, err error) |
610 | // Close is to be called when done with the store. |
611 | Close() |
612 | } |
613 | |
614 | === modified file 'testing/helpers.go' |
615 | --- testing/helpers.go 2014-02-21 16:04:44 +0000 |
616 | +++ testing/helpers.go 2014-04-29 18:02:51 +0000 |
617 | @@ -18,6 +18,7 @@ |
618 | package testing |
619 | |
620 | import ( |
621 | + "encoding/json" |
622 | "fmt" |
623 | "os" |
624 | "path/filepath" |
625 | @@ -26,6 +27,7 @@ |
626 | "sync" |
627 | |
628 | "launchpad.net/ubuntu-push/logger" |
629 | + "launchpad.net/ubuntu-push/protocol" |
630 | ) |
631 | |
632 | type captureHelper struct { |
633 | @@ -122,3 +124,12 @@ |
634 | } |
635 | return filepath.Join(dir, relativePath) |
636 | } |
637 | + |
638 | +// Ns makes a []Notification from just payloads. |
639 | +func Ns(payloads ...json.RawMessage) []protocol.Notification { |
640 | + res := make([]protocol.Notification, len(payloads)) |
641 | + for i := 0; i < len(payloads); i++ { |
642 | + res[i].Payload = payloads[i] |
643 | + } |
644 | + return res |
645 | +} |