Merge lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg into lp:ubuntu-push/automatic
- unicast-feed-pending-and-reorg
- Merge into automatic
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 156 |
Merged at revision: | 156 |
Proposed branch: | lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg |
Merge into: | lp:ubuntu-push/automatic |
Diff against target: |
495 lines (+208/-73) 8 files modified
server/acceptance/suites/unicast.go (+21/-0) server/broker/broker.go (+4/-0) server/broker/exchanges.go (+38/-2) server/broker/exchanges_test.go (+93/-3) server/broker/simple/simple.go (+10/-26) server/broker/simple/simple_test.go (+3/-39) server/broker/testing/impls.go (+8/-0) server/broker/testsuite/suite.go (+31/-3) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/unicast-feed-pending-and-reorg |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email: mp+219710@code.launchpad.net |
Commit message
support feeding pending unicasts at session start, together with/through some related reorg
Description of the change
support feeding pending unicasts at session start, together with/through some related reorg
To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'server/acceptance/suites/unicast.go' | |||
2 | --- server/acceptance/suites/unicast.go 2014-05-02 11:20:55 +0000 | |||
3 | +++ server/acceptance/suites/unicast.go 2014-05-15 15:14:22 +0000 | |||
4 | @@ -94,3 +94,24 @@ | |||
5 | 94 | c.Check(len(errCh1), Equals, 0) | 94 | c.Check(len(errCh1), Equals, 0) |
6 | 95 | c.Check(len(errCh2), Equals, 0) | 95 | c.Check(len(errCh2), Equals, 0) |
7 | 96 | } | 96 | } |
8 | 97 | |||
9 | 98 | func (s *UnicastAcceptanceSuite) TestUnicastPending(c *C) { | ||
10 | 99 | // send unicast that will be pending | ||
11 | 100 | userId, auth := s.associatedAuth("DEV1") | ||
12 | 101 | got, err := s.PostRequest("/notify", &api.Unicast{ | ||
13 | 102 | UserId: userId, | ||
14 | 103 | DeviceId: "DEV1", | ||
15 | 104 | AppId: "app1", | ||
16 | 105 | ExpireOn: future, | ||
17 | 106 | Data: json.RawMessage(`{"a": 42}`), | ||
18 | 107 | }) | ||
19 | 108 | c.Assert(err, IsNil) | ||
20 | 109 | c.Assert(got, Matches, ".*ok.*") | ||
21 | 110 | |||
22 | 111 | // get pending on connect | ||
23 | 112 | events, errCh, stop := s.StartClientAuth(c, "DEV1", nil, auth) | ||
24 | 113 | c.Check(NextEvent(events, errCh), Equals, `unicast app:app1 payload:{"a":42};`) | ||
25 | 114 | stop() | ||
26 | 115 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) | ||
27 | 116 | c.Check(len(errCh), Equals, 0) | ||
28 | 117 | } | ||
29 | 97 | 118 | ||
30 | === modified file 'server/broker/broker.go' | |||
31 | --- server/broker/broker.go 2014-05-01 18:56:07 +0000 | |||
32 | +++ server/broker/broker.go 2014-05-15 15:14:22 +0000 | |||
33 | @@ -87,6 +87,10 @@ | |||
34 | 87 | Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) | 87 | Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) |
35 | 88 | // DropByMsgId drops notifications from the channel chanId by message id. | 88 | // DropByMsgId drops notifications from the channel chanId by message id. |
36 | 89 | DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error | 89 | DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error |
37 | 90 | // Feed feeds exchange into the session. | ||
38 | 91 | Feed(Exchange) | ||
39 | 92 | // InternalChannelId() returns the channel id corresponding to the session. | ||
40 | 93 | InternalChannelId() store.InternalChannelId | ||
41 | 90 | } | 94 | } |
42 | 91 | 95 | ||
43 | 92 | // Session aborted error. | 96 | // Session aborted error. |
44 | 93 | 97 | ||
45 | === modified file 'server/broker/exchanges.go' | |||
46 | --- server/broker/exchanges.go 2014-05-01 18:56:07 +0000 | |||
47 | +++ server/broker/exchanges.go 2014-05-15 15:14:22 +0000 | |||
48 | @@ -19,6 +19,7 @@ | |||
49 | 19 | import ( | 19 | import ( |
50 | 20 | "encoding/json" | 20 | "encoding/json" |
51 | 21 | "fmt" | 21 | "fmt" |
52 | 22 | "time" | ||
53 | 22 | 23 | ||
54 | 23 | "launchpad.net/ubuntu-push/protocol" | 24 | "launchpad.net/ubuntu-push/protocol" |
55 | 24 | "launchpad.net/ubuntu-push/server/store" | 25 | "launchpad.net/ubuntu-push/server/store" |
56 | @@ -33,6 +34,10 @@ | |||
57 | 33 | ackMsg protocol.AckMsg | 34 | ackMsg protocol.AckMsg |
58 | 34 | } | 35 | } |
59 | 35 | 36 | ||
60 | 37 | type BaseExchange struct { | ||
61 | 38 | Timestamp time.Time | ||
62 | 39 | } | ||
63 | 40 | |||
64 | 36 | // BroadcastExchange leads a session through delivering a BROADCAST. | 41 | // BroadcastExchange leads a session through delivering a BROADCAST. |
65 | 37 | // For simplicity it is fully public. | 42 | // For simplicity it is fully public. |
66 | 38 | type BroadcastExchange struct { | 43 | type BroadcastExchange struct { |
67 | @@ -40,6 +45,7 @@ | |||
68 | 40 | TopLevel int64 | 45 | TopLevel int64 |
69 | 41 | Notifications []protocol.Notification | 46 | Notifications []protocol.Notification |
70 | 42 | Decoded []map[string]interface{} | 47 | Decoded []map[string]interface{} |
71 | 48 | BaseExchange | ||
72 | 43 | } | 49 | } |
73 | 44 | 50 | ||
74 | 45 | // check interface already here | 51 | // check interface already here |
75 | @@ -140,7 +146,9 @@ | |||
76 | 140 | // UnicastExchange leads a session through delivering a NOTIFICATIONS message. | 146 | // UnicastExchange leads a session through delivering a NOTIFICATIONS message. |
77 | 141 | // For simplicity it is fully public. | 147 | // For simplicity it is fully public. |
78 | 142 | type UnicastExchange struct { | 148 | type UnicastExchange struct { |
80 | 143 | ChanId store.InternalChannelId | 149 | ChanId store.InternalChannelId |
81 | 150 | CachedOk bool | ||
82 | 151 | BaseExchange | ||
83 | 144 | } | 152 | } |
84 | 145 | 153 | ||
85 | 146 | // check interface already here | 154 | // check interface already here |
86 | @@ -148,10 +156,13 @@ | |||
87 | 148 | 156 | ||
88 | 149 | // Prepare session for a NOTIFICATIONS. | 157 | // Prepare session for a NOTIFICATIONS. |
89 | 150 | func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { | 158 | func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
91 | 151 | _, notifs, err := sess.Get(sue.ChanId, false) | 159 | _, notifs, err := sess.Get(sue.ChanId, sue.CachedOk) |
92 | 152 | if err != nil { | 160 | if err != nil { |
93 | 153 | return nil, nil, err | 161 | return nil, nil, err |
94 | 154 | } | 162 | } |
95 | 163 | if len(notifs) == 0 { | ||
96 | 164 | return nil, nil, ErrNop | ||
97 | 165 | } | ||
98 | 155 | scratchArea := sess.ExchangeScratchArea() | 166 | scratchArea := sess.ExchangeScratchArea() |
99 | 156 | scratchArea.notificationsMsg.Reset() | 167 | scratchArea.notificationsMsg.Reset() |
100 | 157 | scratchArea.notificationsMsg.Type = "notifications" | 168 | scratchArea.notificationsMsg.Type = "notifications" |
101 | @@ -171,3 +182,28 @@ | |||
102 | 171 | } | 182 | } |
103 | 172 | return nil | 183 | return nil |
104 | 173 | } | 184 | } |
105 | 185 | |||
106 | 186 | // FeedPending feeds exchanges covering pending notifications into the session. | ||
107 | 187 | func FeedPending(sess BrokerSession) error { | ||
108 | 188 | // find relevant channels, for now only system | ||
109 | 189 | channels := []store.InternalChannelId{store.SystemInternalChannelId} | ||
110 | 190 | for _, chanId := range channels { | ||
111 | 191 | topLevel, notifications, err := sess.Get(chanId, true) | ||
112 | 192 | if err != nil { | ||
113 | 193 | // next broadcast will try again | ||
114 | 194 | continue | ||
115 | 195 | } | ||
116 | 196 | clientLevel := sess.Levels()[chanId] | ||
117 | 197 | if clientLevel != topLevel { | ||
118 | 198 | broadcastExchg := &BroadcastExchange{ | ||
119 | 199 | ChanId: chanId, | ||
120 | 200 | TopLevel: topLevel, | ||
121 | 201 | Notifications: notifications, | ||
122 | 202 | } | ||
123 | 203 | broadcastExchg.Init() | ||
124 | 204 | sess.Feed(broadcastExchg) | ||
125 | 205 | } | ||
126 | 206 | } | ||
127 | 207 | sess.Feed(&UnicastExchange{ChanId: sess.InternalChannelId(), CachedOk: true}) | ||
128 | 208 | return nil | ||
129 | 209 | } | ||
130 | 174 | 210 | ||
131 | === modified file 'server/broker/exchanges_test.go' | |||
132 | --- server/broker/exchanges_test.go 2014-05-01 18:56:07 +0000 | |||
133 | +++ server/broker/exchanges_test.go 2014-05-15 15:14:22 +0000 | |||
134 | @@ -295,7 +295,7 @@ | |||
135 | 295 | return nil | 295 | return nil |
136 | 296 | }, | 296 | }, |
137 | 297 | } | 297 | } |
139 | 298 | exchg := &broker.UnicastExchange{chanId1} | 298 | exchg := &broker.UnicastExchange{ChanId: chanId1, CachedOk: false} |
140 | 299 | outMsg, inMsg, err := exchg.Prepare(sess) | 299 | outMsg, inMsg, err := exchg.Prepare(sess) |
141 | 300 | c.Assert(err, IsNil) | 300 | c.Assert(err, IsNil) |
142 | 301 | // check | 301 | // check |
143 | @@ -311,7 +311,7 @@ | |||
144 | 311 | } | 311 | } |
145 | 312 | 312 | ||
146 | 313 | func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) { | 313 | func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) { |
148 | 314 | notifs := []protocol.Notification{} | 314 | notifs := []protocol.Notification{protocol.Notification{}} |
149 | 315 | dropped := make(chan []protocol.Notification, 2) | 315 | dropped := make(chan []protocol.Notification, 2) |
150 | 316 | sess := &testing.TestBrokerSession{ | 316 | sess := &testing.TestBrokerSession{ |
151 | 317 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { | 317 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
152 | @@ -344,8 +344,22 @@ | |||
153 | 344 | c.Assert(err, Equals, fail) | 344 | c.Assert(err, Equals, fail) |
154 | 345 | } | 345 | } |
155 | 346 | 346 | ||
156 | 347 | func (s *exchangesSuite) TestUnicastExchangeCachedOkNop(c *C) { | ||
157 | 348 | chanId1 := store.UnicastInternalChannelId("u1", "d1") | ||
158 | 349 | sess := &testing.TestBrokerSession{ | ||
159 | 350 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { | ||
160 | 351 | c.Check(chanId, Equals, chanId1) | ||
161 | 352 | c.Check(cachedOk, Equals, true) | ||
162 | 353 | return 0, nil, nil | ||
163 | 354 | }, | ||
164 | 355 | } | ||
165 | 356 | exchg := &broker.UnicastExchange{ChanId: chanId1, CachedOk: true} | ||
166 | 357 | _, _, err := exchg.Prepare(sess) | ||
167 | 358 | c.Assert(err, Equals, broker.ErrNop) | ||
168 | 359 | } | ||
169 | 360 | |||
170 | 347 | func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) { | 361 | func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) { |
172 | 348 | notifs := []protocol.Notification{} | 362 | notifs := []protocol.Notification{protocol.Notification{}} |
173 | 349 | fail := errors.New("fail") | 363 | fail := errors.New("fail") |
174 | 350 | sess := &testing.TestBrokerSession{ | 364 | sess := &testing.TestBrokerSession{ |
175 | 351 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { | 365 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
176 | @@ -363,3 +377,79 @@ | |||
177 | 363 | err = exchg.Acked(sess, true) | 377 | err = exchg.Acked(sess, true) |
178 | 364 | c.Assert(err, Equals, fail) | 378 | c.Assert(err, Equals, fail) |
179 | 365 | } | 379 | } |
180 | 380 | |||
181 | 381 | func (s *exchangesSuite) TestFeedPending(c *C) { | ||
182 | 382 | bcast1 := json.RawMessage(`{"m": "M"}`) | ||
183 | 383 | decoded1 := map[string]interface{}{"m": "M"} | ||
184 | 384 | sess := &testing.TestBrokerSession{ | ||
185 | 385 | Exchanges: make(chan broker.Exchange, 5), | ||
186 | 386 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { | ||
187 | 387 | switch chanId { | ||
188 | 388 | case store.SystemInternalChannelId: | ||
189 | 389 | return 1, help.Ns(bcast1), nil | ||
190 | 390 | default: | ||
191 | 391 | return 0, nil, nil | ||
192 | 392 | } | ||
193 | 393 | }, | ||
194 | 394 | } | ||
195 | 395 | err := broker.FeedPending(sess) | ||
196 | 396 | c.Assert(err, IsNil) | ||
197 | 397 | c.Assert(len(sess.Exchanges), Equals, 2) | ||
198 | 398 | exchg1 := <-sess.Exchanges | ||
199 | 399 | c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{ | ||
200 | 400 | ChanId: store.SystemInternalChannelId, | ||
201 | 401 | TopLevel: 1, | ||
202 | 402 | Notifications: help.Ns(bcast1), | ||
203 | 403 | Decoded: []map[string]interface{}{decoded1}, | ||
204 | 404 | }) | ||
205 | 405 | exchg2 := <-sess.Exchanges | ||
206 | 406 | c.Check(exchg2, DeepEquals, &broker.UnicastExchange{ | ||
207 | 407 | ChanId: sess.InternalChannelId(), | ||
208 | 408 | CachedOk: true, | ||
209 | 409 | }) | ||
210 | 410 | } | ||
211 | 411 | |||
212 | 412 | func (s *exchangesSuite) TestFeedPendingSystemChanNop(c *C) { | ||
213 | 413 | bcast1 := json.RawMessage(`{"m": "M"}`) | ||
214 | 414 | sess := &testing.TestBrokerSession{ | ||
215 | 415 | LevelsMap: map[store.InternalChannelId]int64{ | ||
216 | 416 | store.SystemInternalChannelId: 1, | ||
217 | 417 | }, | ||
218 | 418 | Exchanges: make(chan broker.Exchange, 5), | ||
219 | 419 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { | ||
220 | 420 | switch chanId { | ||
221 | 421 | case store.SystemInternalChannelId: | ||
222 | 422 | return 1, help.Ns(bcast1), nil | ||
223 | 423 | default: | ||
224 | 424 | return 0, nil, nil | ||
225 | 425 | } | ||
226 | 426 | }, | ||
227 | 427 | } | ||
228 | 428 | err := broker.FeedPending(sess) | ||
229 | 429 | c.Assert(err, IsNil) | ||
230 | 430 | c.Check(len(sess.Exchanges), Equals, 1) | ||
231 | 431 | exchg1 := <-sess.Exchanges | ||
232 | 432 | c.Check(exchg1, FitsTypeOf, &broker.UnicastExchange{}) | ||
233 | 433 | } | ||
234 | 434 | |||
235 | 435 | func (s *exchangesSuite) TestFeedPendingSystemChanFail(c *C) { | ||
236 | 436 | sess := &testing.TestBrokerSession{ | ||
237 | 437 | LevelsMap: map[store.InternalChannelId]int64{ | ||
238 | 438 | store.SystemInternalChannelId: 1, | ||
239 | 439 | }, | ||
240 | 440 | Exchanges: make(chan broker.Exchange, 5), | ||
241 | 441 | DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { | ||
242 | 442 | switch chanId { | ||
243 | 443 | case store.SystemInternalChannelId: | ||
244 | 444 | return 0, nil, errors.New("fail") | ||
245 | 445 | default: | ||
246 | 446 | return 0, nil, nil | ||
247 | 447 | } | ||
248 | 448 | }, | ||
249 | 449 | } | ||
250 | 450 | err := broker.FeedPending(sess) | ||
251 | 451 | c.Assert(err, IsNil) | ||
252 | 452 | c.Check(len(sess.Exchanges), Equals, 1) | ||
253 | 453 | exchg1 := <-sess.Exchanges | ||
254 | 454 | c.Check(exchg1, FitsTypeOf, &broker.UnicastExchange{}) | ||
255 | 455 | } | ||
256 | 366 | 456 | ||
257 | === modified file 'server/broker/simple/simple.go' | |||
258 | --- server/broker/simple/simple.go 2014-05-01 18:56:07 +0000 | |||
259 | +++ server/broker/simple/simple.go 2014-05-15 15:14:22 +0000 | |||
260 | @@ -103,6 +103,14 @@ | |||
261 | 103 | return sess.broker.drop(chanId, targets) | 103 | return sess.broker.drop(chanId, targets) |
262 | 104 | } | 104 | } |
263 | 105 | 105 | ||
264 | 106 | func (sess *simpleBrokerSession) Feed(exchg broker.Exchange) { | ||
265 | 107 | sess.exchanges <- exchg | ||
266 | 108 | } | ||
267 | 109 | |||
268 | 110 | func (sess *simpleBrokerSession) InternalChannelId() store.InternalChannelId { | ||
269 | 111 | return store.UnicastInternalChannelId(sess.deviceId, sess.deviceId) | ||
270 | 112 | } | ||
271 | 113 | |||
272 | 106 | // NewSimpleBroker makes a new SimpleBroker. | 114 | // NewSimpleBroker makes a new SimpleBroker. |
273 | 107 | func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker { | 115 | func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker { |
274 | 108 | sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize()) | 116 | sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize()) |
275 | @@ -150,30 +158,6 @@ | |||
276 | 150 | return b.running | 158 | return b.running |
277 | 151 | } | 159 | } |
278 | 152 | 160 | ||
279 | 153 | func (b *SimpleBroker) feedPending(sess *simpleBrokerSession) error { | ||
280 | 154 | // find relevant channels, for now only system | ||
281 | 155 | channels := []store.InternalChannelId{store.SystemInternalChannelId} | ||
282 | 156 | for _, chanId := range channels { | ||
283 | 157 | topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId) | ||
284 | 158 | if err != nil { | ||
285 | 159 | // next broadcast will try again | ||
286 | 160 | b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err) | ||
287 | 161 | continue | ||
288 | 162 | } | ||
289 | 163 | clientLevel := sess.levels[chanId] | ||
290 | 164 | if clientLevel != topLevel { | ||
291 | 165 | broadcastExchg := &broker.BroadcastExchange{ | ||
292 | 166 | ChanId: chanId, | ||
293 | 167 | TopLevel: topLevel, | ||
294 | 168 | Notifications: notifications, | ||
295 | 169 | } | ||
296 | 170 | broadcastExchg.Init() | ||
297 | 171 | sess.exchanges <- broadcastExchg | ||
298 | 172 | } | ||
299 | 173 | } | ||
300 | 174 | return nil | ||
301 | 175 | } | ||
302 | 176 | |||
303 | 177 | // Register registers a session with the broker. It feeds the session | 161 | // Register registers a session with the broker. It feeds the session |
304 | 178 | // pending notifications as well. | 162 | // pending notifications as well. |
305 | 179 | func (b *SimpleBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) { | 163 | func (b *SimpleBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) { |
306 | @@ -205,7 +189,7 @@ | |||
307 | 205 | } | 189 | } |
308 | 206 | b.sessionCh <- sess | 190 | b.sessionCh <- sess |
309 | 207 | <-sess.done | 191 | <-sess.done |
311 | 208 | err = b.feedPending(sess) | 192 | err = broker.FeedPending(sess) |
312 | 209 | if err != nil { | 193 | if err != nil { |
313 | 210 | return nil, err | 194 | return nil, err |
314 | 211 | } | 195 | } |
315 | @@ -281,7 +265,7 @@ | |||
316 | 281 | _, devId := chanId.UnicastUserAndDevice() | 265 | _, devId := chanId.UnicastUserAndDevice() |
317 | 282 | sess := b.registry[devId] | 266 | sess := b.registry[devId] |
318 | 283 | if sess != nil { | 267 | if sess != nil { |
320 | 284 | sess.exchanges <- &broker.UnicastExchange{chanId} | 268 | sess.exchanges <- &broker.UnicastExchange{ChanId: chanId, CachedOk: false} |
321 | 285 | } | 269 | } |
322 | 286 | } | 270 | } |
323 | 287 | } | 271 | } |
324 | 288 | 272 | ||
325 | === modified file 'server/broker/simple/simple_test.go' | |||
326 | --- server/broker/simple/simple_test.go 2014-04-29 15:29:04 +0000 | |||
327 | +++ server/broker/simple/simple_test.go 2014-05-15 15:14:22 +0000 | |||
328 | @@ -17,16 +17,12 @@ | |||
329 | 17 | package simple | 17 | package simple |
330 | 18 | 18 | ||
331 | 19 | import ( | 19 | import ( |
332 | 20 | "encoding/json" | ||
333 | 21 | stdtesting "testing" | 20 | stdtesting "testing" |
334 | 22 | "time" | ||
335 | 23 | 21 | ||
336 | 24 | . "launchpad.net/gocheck" | 22 | . "launchpad.net/gocheck" |
337 | 25 | 23 | ||
338 | 26 | "launchpad.net/ubuntu-push/server/broker" | ||
339 | 27 | "launchpad.net/ubuntu-push/server/broker/testing" | 24 | "launchpad.net/ubuntu-push/server/broker/testing" |
340 | 28 | "launchpad.net/ubuntu-push/server/store" | 25 | "launchpad.net/ubuntu-push/server/store" |
341 | 29 | help "launchpad.net/ubuntu-push/testing" | ||
342 | 30 | ) | 26 | ) |
343 | 31 | 27 | ||
344 | 32 | func TestSimple(t *stdtesting.T) { TestingT(t) } | 28 | func TestSimple(t *stdtesting.T) { TestingT(t) } |
345 | @@ -45,39 +41,7 @@ | |||
346 | 45 | c.Check(b.sto, Equals, sto) | 41 | c.Check(b.sto, Equals, sto) |
347 | 46 | } | 42 | } |
348 | 47 | 43 | ||
384 | 48 | func (s *simpleSuite) TestFeedPending(c *C) { | 44 | func (s *simpleSuite) TestSessionInternalChannelId(c *C) { |
385 | 49 | sto := store.NewInMemoryPendingStore() | 45 | sess := &simpleBrokerSession{deviceId: "dev21"} |
386 | 50 | muchLater := time.Now().Add(10 * time.Minute) | 46 | c.Check(sess.InternalChannelId(), Equals, store.UnicastInternalChannelId("dev21", "dev21")) |
352 | 51 | notification1 := json.RawMessage(`{"m": "M"}`) | ||
353 | 52 | decoded1 := map[string]interface{}{"m": "M"} | ||
354 | 53 | sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) | ||
355 | 54 | b := NewSimpleBroker(sto, testBrokerConfig, nil) | ||
356 | 55 | sess := &simpleBrokerSession{ | ||
357 | 56 | exchanges: make(chan broker.Exchange, 1), | ||
358 | 57 | } | ||
359 | 58 | b.feedPending(sess) | ||
360 | 59 | c.Assert(len(sess.exchanges), Equals, 1) | ||
361 | 60 | exchg1 := <-sess.exchanges | ||
362 | 61 | c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{ | ||
363 | 62 | ChanId: store.SystemInternalChannelId, | ||
364 | 63 | TopLevel: 1, | ||
365 | 64 | Notifications: help.Ns(notification1), | ||
366 | 65 | Decoded: []map[string]interface{}{decoded1}, | ||
367 | 66 | }) | ||
368 | 67 | } | ||
369 | 68 | |||
370 | 69 | func (s *simpleSuite) TestFeedPendingNop(c *C) { | ||
371 | 70 | sto := store.NewInMemoryPendingStore() | ||
372 | 71 | muchLater := time.Now().Add(10 * time.Minute) | ||
373 | 72 | notification1 := json.RawMessage(`{"m": "M"}`) | ||
374 | 73 | sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) | ||
375 | 74 | b := NewSimpleBroker(sto, testBrokerConfig, nil) | ||
376 | 75 | sess := &simpleBrokerSession{ | ||
377 | 76 | exchanges: make(chan broker.Exchange, 1), | ||
378 | 77 | levels: map[store.InternalChannelId]int64{ | ||
379 | 78 | store.SystemInternalChannelId: 1, | ||
380 | 79 | }, | ||
381 | 80 | } | ||
382 | 81 | b.feedPending(sess) | ||
383 | 82 | c.Assert(len(sess.exchanges), Equals, 0) | ||
387 | 83 | } | 47 | } |
388 | 84 | 48 | ||
389 | === modified file 'server/broker/testing/impls.go' | |||
390 | --- server/broker/testing/impls.go 2014-05-01 18:56:07 +0000 | |||
391 | +++ server/broker/testing/impls.go 2014-05-15 15:14:22 +0000 | |||
392 | @@ -68,6 +68,14 @@ | |||
393 | 68 | return tbs.DoDropByMsgId(chanId, targets) | 68 | return tbs.DoDropByMsgId(chanId, targets) |
394 | 69 | } | 69 | } |
395 | 70 | 70 | ||
396 | 71 | func (tbs *TestBrokerSession) Feed(exchg broker.Exchange) { | ||
397 | 72 | tbs.Exchanges <- exchg | ||
398 | 73 | } | ||
399 | 74 | |||
400 | 75 | func (tbs *TestBrokerSession) InternalChannelId() store.InternalChannelId { | ||
401 | 76 | return store.UnicastInternalChannelId(tbs.DeviceId, tbs.DeviceId) | ||
402 | 77 | } | ||
403 | 78 | |||
404 | 71 | // Test implementation of BrokerConfig. | 79 | // Test implementation of BrokerConfig. |
405 | 72 | type TestBrokerConfig struct { | 80 | type TestBrokerConfig struct { |
406 | 73 | ConfigSessionQueueSize uint | 81 | ConfigSessionQueueSize uint |
407 | 74 | 82 | ||
408 | === modified file 'server/broker/testsuite/suite.go' | |||
409 | --- server/broker/testsuite/suite.go 2014-05-01 18:56:07 +0000 | |||
410 | +++ server/broker/testsuite/suite.go 2014-05-15 15:14:22 +0000 | |||
411 | @@ -143,7 +143,7 @@ | |||
412 | 143 | defer b.Stop() | 143 | defer b.Stop() |
413 | 144 | sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") | 144 | sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
414 | 145 | c.Assert(err, IsNil) | 145 | c.Assert(err, IsNil) |
416 | 146 | c.Check(len(sess.SessionChannel()), Equals, 1) | 146 | c.Check(len(sess.SessionChannel()), Equals, 2) |
417 | 147 | } | 147 | } |
418 | 148 | 148 | ||
419 | 149 | func (s *CommonBrokerSuite) TestRegistrationFeedPendingError(c *C) { | 149 | func (s *CommonBrokerSuite) TestRegistrationFeedPendingError(c *C) { |
420 | @@ -154,7 +154,12 @@ | |||
421 | 154 | _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") | 154 | _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
422 | 155 | c.Assert(err, IsNil) | 155 | c.Assert(err, IsNil) |
423 | 156 | // but | 156 | // but |
425 | 157 | c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful feed pending, get channel snapshot for 0: get channel snapshot fail\n") | 157 | c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, get channel snapshot for 0 \\(cachedOk=true\\): get channel snapshot fail\n") |
426 | 158 | } | ||
427 | 159 | |||
428 | 160 | func clearOfPending(c *C, sess broker.BrokerSession) { | ||
429 | 161 | c.Assert(len(sess.SessionChannel()) >= 1, Equals, true) | ||
430 | 162 | <-sess.SessionChannel() | ||
431 | 158 | } | 163 | } |
432 | 159 | 164 | ||
433 | 160 | func (s *CommonBrokerSuite) TestRegistrationLastWins(c *C) { | 165 | func (s *CommonBrokerSuite) TestRegistrationLastWins(c *C) { |
434 | @@ -164,6 +169,7 @@ | |||
435 | 164 | defer b.Stop() | 169 | defer b.Stop() |
436 | 165 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") | 170 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
437 | 166 | c.Assert(err, IsNil) | 171 | c.Assert(err, IsNil) |
438 | 172 | clearOfPending(c, sess1) | ||
439 | 167 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s2") | 173 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s2") |
440 | 168 | c.Assert(err, IsNil) | 174 | c.Assert(err, IsNil) |
441 | 169 | // previous session got signaled by sending nil on its channel | 175 | // previous session got signaled by sending nil on its channel |
442 | @@ -194,8 +200,10 @@ | |||
443 | 194 | defer b.Stop() | 200 | defer b.Stop() |
444 | 195 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") | 201 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
445 | 196 | c.Assert(err, IsNil) | 202 | c.Assert(err, IsNil) |
446 | 203 | clearOfPending(c, sess1) | ||
447 | 197 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}, "s2") | 204 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}, "s2") |
448 | 198 | c.Assert(err, IsNil) | 205 | c.Assert(err, IsNil) |
449 | 206 | clearOfPending(c, sess2) | ||
450 | 199 | // add notification to channel *after* the registrations | 207 | // add notification to channel *after* the registrations |
451 | 200 | muchLater := time.Now().Add(10 * time.Minute) | 208 | muchLater := time.Now().Add(10 * time.Minute) |
452 | 201 | sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) | 209 | sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) |
453 | @@ -250,8 +258,9 @@ | |||
454 | 250 | b := s.MakeBroker(sto, testBrokerConfig, s.testlog) | 258 | b := s.MakeBroker(sto, testBrokerConfig, s.testlog) |
455 | 251 | b.Start() | 259 | b.Start() |
456 | 252 | defer b.Stop() | 260 | defer b.Stop() |
458 | 253 | _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") | 261 | sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1") |
459 | 254 | c.Assert(err, IsNil) | 262 | c.Assert(err, IsNil) |
460 | 263 | clearOfPending(c, sess) | ||
461 | 255 | b.Broadcast(store.SystemInternalChannelId) | 264 | b.Broadcast(store.SystemInternalChannelId) |
462 | 256 | select { | 265 | select { |
463 | 257 | case <-time.After(5 * time.Second): | 266 | case <-time.After(5 * time.Second): |
464 | @@ -272,8 +281,10 @@ | |||
465 | 272 | defer b.Stop() | 281 | defer b.Stop() |
466 | 273 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1") | 282 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1") |
467 | 274 | c.Assert(err, IsNil) | 283 | c.Assert(err, IsNil) |
468 | 284 | clearOfPending(c, sess1) | ||
469 | 275 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2") | 285 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2") |
470 | 276 | c.Assert(err, IsNil) | 286 | c.Assert(err, IsNil) |
471 | 287 | clearOfPending(c, sess2) | ||
472 | 277 | // add notification to channel *after* the registrations | 288 | // add notification to channel *after* the registrations |
473 | 278 | muchLater := time.Now().Add(10 * time.Minute) | 289 | muchLater := time.Now().Add(10 * time.Minute) |
474 | 279 | sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater) | 290 | sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater) |
475 | @@ -338,3 +349,20 @@ | |||
476 | 338 | c.Assert(err, ErrorMatches, "drop fail") | 349 | c.Assert(err, ErrorMatches, "drop fail") |
477 | 339 | c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n") | 350 | c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n") |
478 | 340 | } | 351 | } |
479 | 352 | |||
480 | 353 | func (s *CommonBrokerSuite) TestSessionFeed(c *C) { | ||
481 | 354 | sto := store.NewInMemoryPendingStore() | ||
482 | 355 | b := s.MakeBroker(sto, testBrokerConfig, nil) | ||
483 | 356 | b.Start() | ||
484 | 357 | defer b.Stop() | ||
485 | 358 | sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1") | ||
486 | 359 | c.Assert(err, IsNil) | ||
487 | 360 | clearOfPending(c, sess1) | ||
488 | 361 | bcast := &broker.BroadcastExchange{ChanId: store.SystemInternalChannelId, TopLevel: 99} | ||
489 | 362 | sess1.Feed(bcast) | ||
490 | 363 | c.Check(s.RevealBroadcastExchange(<-sess1.SessionChannel()), DeepEquals, bcast) | ||
491 | 364 | |||
492 | 365 | ucast := &broker.UnicastExchange{ChanId: store.UnicastInternalChannelId("dev21", "dev21"), CachedOk: true} | ||
493 | 366 | sess1.Feed(ucast) | ||
494 | 367 | c.Check(s.RevealUnicastExchange(<-sess1.SessionChannel()), DeepEquals, ucast) | ||
495 | 368 | } |