Merge lp:~pedronis/ubuntu-push/unicast-broker into lp:ubuntu-push/automatic
- unicast-broker
- Merge into automatic
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 146 |
Merged at revision: | 144 |
Proposed branch: | lp:~pedronis/ubuntu-push/unicast-broker |
Merge into: | lp:ubuntu-push/automatic |
Prerequisite: | lp:~pedronis/ubuntu-push/unicast-store |
Diff against target: |
506 lines (+308/-5) 9 files modified
protocol/messages.go (+11/-0) server/api/handlers_test.go (+8/-0) server/broker/broker.go (+6/-0) server/broker/exchanges.go (+38/-2) server/broker/exchanges_test.go (+97/-0) server/broker/simple/simple.go (+47/-2) server/broker/simple/suite_test.go (+3/-0) server/broker/testing/impls.go (+13/-0) server/broker/testsuite/suite.go (+85/-1) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/unicast-broker |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email: mp+218044@code.launchpad.net |
Commit message
unicast in broker and introduce UnicastExchange, some reorg
Description of the change
unicast in broker and introduce UnicastExchange, some reorg
for later branches:
- deal with pending unicast messages
- support splitting large NOTIFICATIONS messages
To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) : | # |
review:
Approve
- 146. By Samuele Pedroni
-
Merged unicast-store into unicast-broker.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'protocol/messages.go' |
2 | --- protocol/messages.go 2014-05-02 15:13:38 +0000 |
3 | +++ protocol/messages.go 2014-05-02 15:13:38 +0000 |
4 | @@ -154,6 +154,17 @@ |
5 | Notifications []Notification |
6 | } |
7 | |
8 | +// Reset resets the splitting state if the message storage is to be |
9 | +// reused. |
10 | +func (m *NotificationsMsg) Reset() { |
11 | + // xxx |
12 | +} |
13 | + |
14 | +func (m *NotificationsMsg) Split() bool { |
15 | + // xxx |
16 | + return true |
17 | +} |
18 | + |
19 | // A single unicast notification |
20 | type Notification struct { |
21 | AppId string `json:"A"` |
22 | |
23 | === modified file 'server/api/handlers_test.go' |
24 | --- server/api/handlers_test.go 2014-05-02 15:13:38 +0000 |
25 | +++ server/api/handlers_test.go 2014-05-02 15:13:38 +0000 |
26 | @@ -150,6 +150,10 @@ |
27 | cbsend.payloads = protocol.ExtractPayloads(notifications) |
28 | } |
29 | |
30 | +func (cbsend *checkBrokerSending) Unicast(chanIds ...store.InternalChannelId) { |
31 | + // xxx later |
32 | +} |
33 | + |
34 | func (s *handlersSuite) TestDoBroadcast(c *C) { |
35 | sto := store.NewInMemoryPendingStore() |
36 | bsend := &checkBrokerSending{store: sto} |
37 | @@ -269,6 +273,10 @@ |
38 | bsend.chanId <- chanId |
39 | } |
40 | |
41 | +func (bsend testBrokerSending) Unicast(chanIds ...store.InternalChannelId) { |
42 | + // xxx later |
43 | +} |
44 | + |
45 | func (s *handlersSuite) TestRespondsToBasicSystemBroadcast(c *C) { |
46 | sto := store.NewInMemoryPendingStore() |
47 | stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) { |
48 | |
49 | === modified file 'server/broker/broker.go' |
50 | --- server/broker/broker.go 2014-04-17 15:42:27 +0000 |
51 | +++ server/broker/broker.go 2014-05-02 15:13:38 +0000 |
52 | @@ -39,6 +39,8 @@ |
53 | type BrokerSending interface { |
54 | // Broadcast channel. |
55 | Broadcast(chanId store.InternalChannelId) |
56 | + // Unicast over channels. |
57 | + Unicast(chanIds ...store.InternalChannelId) |
58 | } |
59 | |
60 | // Exchange leads the session through performing an exchange, typically delivery. |
61 | @@ -81,6 +83,10 @@ |
62 | Levels() LevelsMap |
63 | // ExchangeScratchArea returns the scratch area for exchanges. |
64 | ExchangeScratchArea() *ExchangesScratchArea |
65 | + // Get gets the content of the channel with chanId. |
66 | + Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) |
67 | + // DropByMsgId drops notifications from the channel chanId by message id. |
68 | + DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error |
69 | } |
70 | |
71 | // Session aborted error. |
72 | |
73 | === modified file 'server/broker/exchanges.go' |
74 | --- server/broker/exchanges.go 2014-05-02 15:13:38 +0000 |
75 | +++ server/broker/exchanges.go 2014-05-02 15:13:38 +0000 |
76 | @@ -28,8 +28,9 @@ |
77 | |
78 | // Scratch area for exchanges, sessions should hold one of these. |
79 | type ExchangesScratchArea struct { |
80 | - broadcastMsg protocol.BroadcastMsg |
81 | - ackMsg protocol.AckMsg |
82 | + broadcastMsg protocol.BroadcastMsg |
83 | + notificationsMsg protocol.NotificationsMsg |
84 | + ackMsg protocol.AckMsg |
85 | } |
86 | |
87 | // BroadcastExchange leads a session through delivering a BROADCAST. |
88 | @@ -135,3 +136,38 @@ |
89 | func (cbe *ConnMetaExchange) Acked(sess BrokerSession, done bool) error { |
90 | panic("Acked should not get invoked on ConnMetaExchange") |
91 | } |
92 | + |
93 | +// UnicastExchange leads a session through delivering a NOTIFICATIONS message. |
94 | +// For simplicity it is fully public. |
95 | +type UnicastExchange struct { |
96 | + ChanId store.InternalChannelId |
97 | +} |
98 | + |
99 | +// check interface already here |
100 | +var _ Exchange = (*UnicastExchange)(nil) |
101 | + |
102 | +// Prepare session for a NOTIFICATIONS. |
103 | +func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
104 | + _, notifs, err := sess.Get(sue.ChanId, false) |
105 | + if err != nil { |
106 | + return nil, nil, err |
107 | + } |
108 | + scratchArea := sess.ExchangeScratchArea() |
109 | + scratchArea.notificationsMsg.Reset() |
110 | + scratchArea.notificationsMsg.Type = "notifications" |
111 | + scratchArea.notificationsMsg.Notifications = notifs |
112 | + return &scratchArea.notificationsMsg, &scratchArea.ackMsg, nil |
113 | +} |
114 | + |
115 | +// Acked deals with an ACK for a NOTIFICATIONS. |
116 | +func (sue *UnicastExchange) Acked(sess BrokerSession, done bool) error { |
117 | + scratchArea := sess.ExchangeScratchArea() |
118 | + if scratchArea.ackMsg.Type != "ack" { |
119 | + return &ErrAbort{"expected ACK message"} |
120 | + } |
121 | + err := sess.DropByMsgId(sue.ChanId, scratchArea.notificationsMsg.Notifications) |
122 | + if err != nil { |
123 | + return err |
124 | + } |
125 | + return nil |
126 | +} |
127 | |
128 | === modified file 'server/broker/exchanges_test.go' |
129 | --- server/broker/exchanges_test.go 2014-05-02 15:13:38 +0000 |
130 | +++ server/broker/exchanges_test.go 2014-05-02 15:13:38 +0000 |
131 | @@ -18,6 +18,7 @@ |
132 | |
133 | import ( |
134 | "encoding/json" |
135 | + "errors" |
136 | "fmt" |
137 | "strings" |
138 | stdtesting "testing" |
139 | @@ -266,3 +267,99 @@ |
140 | |
141 | c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnMetaExchange") |
142 | } |
143 | + |
144 | +func (s *exchangesSuite) TestUnicastExchange(c *C) { |
145 | + chanId1 := store.UnicastInternalChannelId("u1", "d1") |
146 | + notifs := []protocol.Notification{ |
147 | + protocol.Notification{ |
148 | + MsgId: "msg1", |
149 | + AppId: "app1", |
150 | + Payload: json.RawMessage(`{"m": 1}`), |
151 | + }, |
152 | + protocol.Notification{ |
153 | + MsgId: "msg2", |
154 | + AppId: "app2", |
155 | + Payload: json.RawMessage(`{"m": 2}`), |
156 | + }, |
157 | + } |
158 | + dropped := make(chan []protocol.Notification, 2) |
159 | + sess := &testing.TestBrokerSession{ |
160 | + DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
161 | + c.Check(chanId, Equals, chanId1) |
162 | + c.Check(cachedOk, Equals, false) |
163 | + return 0, notifs, nil |
164 | + }, |
165 | + DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error { |
166 | + c.Check(chanId, Equals, chanId1) |
167 | + dropped <- targets |
168 | + return nil |
169 | + }, |
170 | + } |
171 | + exchg := &broker.UnicastExchange{chanId1} |
172 | + outMsg, inMsg, err := exchg.Prepare(sess) |
173 | + c.Assert(err, IsNil) |
174 | + // check |
175 | + marshalled, err := json.Marshal(outMsg) |
176 | + c.Assert(err, IsNil) |
177 | + c.Check(string(marshalled), Equals, `{"T":"notifications","Notifications":[{"A":"app1","M":"msg1","P":{"m":1}},{"A":"app2","M":"msg2","P":{"m":2}}]}`) |
178 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
179 | + c.Assert(err, IsNil) |
180 | + err = exchg.Acked(sess, true) |
181 | + c.Assert(err, IsNil) |
182 | + c.Check(dropped, HasLen, 1) |
183 | + c.Check(<-dropped, DeepEquals, notifs) |
184 | +} |
185 | + |
186 | +func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) { |
187 | + notifs := []protocol.Notification{} |
188 | + dropped := make(chan []protocol.Notification, 2) |
189 | + sess := &testing.TestBrokerSession{ |
190 | + DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
191 | + return 0, notifs, nil |
192 | + }, |
193 | + DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error { |
194 | + dropped <- targets |
195 | + return nil |
196 | + }, |
197 | + } |
198 | + exchg := &broker.UnicastExchange{} |
199 | + _, inMsg, err := exchg.Prepare(sess) |
200 | + c.Assert(err, IsNil) |
201 | + err = json.Unmarshal([]byte(`{}`), inMsg) |
202 | + c.Assert(err, IsNil) |
203 | + err = exchg.Acked(sess, true) |
204 | + c.Assert(err, Not(IsNil)) |
205 | + c.Check(dropped, HasLen, 0) |
206 | +} |
207 | + |
208 | +func (s *exchangesSuite) TestUnicastExchangeErrorOnPrepare(c *C) { |
209 | + fail := errors.New("fail") |
210 | + sess := &testing.TestBrokerSession{ |
211 | + DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
212 | + return 0, nil, fail |
213 | + }, |
214 | + } |
215 | + exchg := &broker.UnicastExchange{} |
216 | + _, _, err := exchg.Prepare(sess) |
217 | + c.Assert(err, Equals, fail) |
218 | +} |
219 | + |
220 | +func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) { |
221 | + notifs := []protocol.Notification{} |
222 | + fail := errors.New("fail") |
223 | + sess := &testing.TestBrokerSession{ |
224 | + DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
225 | + return 0, notifs, nil |
226 | + }, |
227 | + DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error { |
228 | + return fail |
229 | + }, |
230 | + } |
231 | + exchg := &broker.UnicastExchange{} |
232 | + _, inMsg, err := exchg.Prepare(sess) |
233 | + c.Assert(err, IsNil) |
234 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
235 | + c.Assert(err, IsNil) |
236 | + err = exchg.Acked(sess, true) |
237 | + c.Assert(err, Equals, fail) |
238 | +} |
239 | |
240 | === modified file 'server/broker/simple/simple.go' |
241 | --- server/broker/simple/simple.go 2014-05-02 15:13:38 +0000 |
242 | +++ server/broker/simple/simple.go 2014-05-02 15:13:38 +0000 |
243 | @@ -46,6 +46,7 @@ |
244 | |
245 | // simpleBrokerSession represents a session in the broker. |
246 | type simpleBrokerSession struct { |
247 | + broker *SimpleBroker |
248 | registered bool |
249 | deviceId string |
250 | model string |
251 | @@ -61,6 +62,7 @@ |
252 | |
253 | const ( |
254 | broadcastDelivery deliveryKind = iota |
255 | + unicastDelivery |
256 | ) |
257 | |
258 | // delivery holds all the information to request a delivery |
259 | @@ -93,6 +95,14 @@ |
260 | return &sess.exchgScratch |
261 | } |
262 | |
263 | +func (sess *simpleBrokerSession) Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
264 | + return sess.broker.get(chanId, cachedOk) |
265 | +} |
266 | + |
267 | +func (sess *simpleBrokerSession) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error { |
268 | + return sess.broker.drop(chanId, targets) |
269 | +} |
270 | + |
271 | // NewSimpleBroker makes a new SimpleBroker. |
272 | func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker { |
273 | sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize()) |
274 | @@ -185,6 +195,7 @@ |
275 | levels[id] = v |
276 | } |
277 | sess := &simpleBrokerSession{ |
278 | + broker: b, |
279 | deviceId: connect.DeviceId, |
280 | model: model, |
281 | imageChannel: imageChannel, |
282 | @@ -207,6 +218,24 @@ |
283 | b.sessionCh <- sess |
284 | } |
285 | |
286 | +func (b *SimpleBroker) get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
287 | + topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId) |
288 | + if err != nil { |
289 | + b.logger.Errorf("unsuccessful, get channel snapshot for %v (cachedOk=%v): %v", chanId, cachedOk, err) |
290 | + } |
291 | + return topLevel, notifications, err |
292 | + |
293 | +} |
294 | + |
295 | +func (b *SimpleBroker) drop(chanId store.InternalChannelId, targets []protocol.Notification) error { |
296 | + err := b.sto.DropByMsgId(chanId, targets) |
297 | + if err != nil { |
298 | + b.logger.Errorf("unsuccessful, drop from channel %v: %v", chanId, err) |
299 | + } |
300 | + return err |
301 | + |
302 | +} |
303 | + |
304 | // run runs the agent logic of the broker. |
305 | func (b *SimpleBroker) run() { |
306 | Loop: |
307 | @@ -233,10 +262,9 @@ |
308 | case delivery := <-b.deliveryCh: |
309 | switch delivery.kind { |
310 | case broadcastDelivery: |
311 | - topLevel, notifications, err := b.sto.GetChannelSnapshot(delivery.chanId) |
312 | + topLevel, notifications, err := b.get(delivery.chanId, false) |
313 | if err != nil { |
314 | // next broadcast will try again |
315 | - b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err) |
316 | continue Loop |
317 | } |
318 | broadcastExchg := &broker.BroadcastExchange{ |
319 | @@ -248,6 +276,13 @@ |
320 | for _, sess := range b.registry { |
321 | sess.exchanges <- broadcastExchg |
322 | } |
323 | + case unicastDelivery: |
324 | + chanId := delivery.chanId |
325 | + _, devId := chanId.UnicastUserAndDevice() |
326 | + sess := b.registry[devId] |
327 | + if sess != nil { |
328 | + sess.exchanges <- &broker.UnicastExchange{chanId} |
329 | + } |
330 | } |
331 | } |
332 | } |
333 | @@ -260,3 +295,13 @@ |
334 | chanId: chanId, |
335 | } |
336 | } |
337 | + |
338 | +// Unicast requests unicast for the channels. |
339 | +func (b *SimpleBroker) Unicast(chanIds ...store.InternalChannelId) { |
340 | + for _, chanId := range chanIds { |
341 | + b.deliveryCh <- &delivery{ |
342 | + kind: unicastDelivery, |
343 | + chanId: chanId, |
344 | + } |
345 | + } |
346 | +} |
347 | |
348 | === modified file 'server/broker/simple/suite_test.go' |
349 | --- server/broker/simple/suite_test.go 2014-02-10 23:19:08 +0000 |
350 | +++ server/broker/simple/suite_test.go 2014-05-02 15:13:38 +0000 |
351 | @@ -42,4 +42,7 @@ |
352 | RevealBroadcastExchange: func(exchg broker.Exchange) *broker.BroadcastExchange { |
353 | return exchg.(*broker.BroadcastExchange) |
354 | }, |
355 | + RevealUnicastExchange: func(exchg broker.Exchange) *broker.UnicastExchange { |
356 | + return exchg.(*broker.UnicastExchange) |
357 | + }, |
358 | }}) |
359 | |
360 | === modified file 'server/broker/testing/impls.go' |
361 | --- server/broker/testing/impls.go 2014-04-03 14:31:10 +0000 |
362 | +++ server/broker/testing/impls.go 2014-05-02 15:13:38 +0000 |
363 | @@ -18,7 +18,9 @@ |
364 | package testing |
365 | |
366 | import ( |
367 | + "launchpad.net/ubuntu-push/protocol" |
368 | "launchpad.net/ubuntu-push/server/broker" |
369 | + "launchpad.net/ubuntu-push/server/store" |
370 | ) |
371 | |
372 | // Test implementation of BrokerSession. |
373 | @@ -29,6 +31,9 @@ |
374 | Exchanges chan broker.Exchange |
375 | LevelsMap broker.LevelsMap |
376 | exchgScratch broker.ExchangesScratchArea |
377 | + // hooks |
378 | + DoGet func(store.InternalChannelId, bool) (int64, []protocol.Notification, error) |
379 | + DoDropByMsgId func(store.InternalChannelId, []protocol.Notification) error |
380 | } |
381 | |
382 | func (tbs *TestBrokerSession) DeviceIdentifier() string { |
383 | @@ -55,6 +60,14 @@ |
384 | return &tbs.exchgScratch |
385 | } |
386 | |
387 | +func (tbs *TestBrokerSession) Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) { |
388 | + return tbs.DoGet(chanId, cachedOk) |
389 | +} |
390 | + |
391 | +func (tbs *TestBrokerSession) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error { |
392 | + return tbs.DoDropByMsgId(chanId, targets) |
393 | +} |
394 | + |
395 | // Test implementation of BrokerConfig. |
396 | type TestBrokerConfig struct { |
397 | ConfigSessionQueueSize uint |
398 | |
399 | === modified file 'server/broker/testsuite/suite.go' |
400 | --- server/broker/testsuite/suite.go 2014-05-02 15:13:38 +0000 |
401 | +++ server/broker/testsuite/suite.go 2014-05-02 15:13:38 +0000 |
402 | @@ -50,6 +50,8 @@ |
403 | RevealSession func(broker.Broker, string) broker.BrokerSession |
404 | // Let us get to a broker.BroadcastExchange from an Exchange. |
405 | RevealBroadcastExchange func(broker.Exchange) *broker.BroadcastExchange |
406 | + // Let us get to a broker.UnicastExchange from an Exchange. |
407 | + RevealUnicastExchange func(broker.Exchange) *broker.UnicastExchange |
408 | // private |
409 | testlog *help.TestLogger |
410 | } |
411 | @@ -235,6 +237,10 @@ |
412 | return 0, nil, nil |
413 | } |
414 | |
415 | +func (sto *testFailingStore) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error { |
416 | + return errors.New("drop fail") |
417 | +} |
418 | + |
419 | func (s *CommonBrokerSuite) TestBroadcastFail(c *C) { |
420 | logged := make(chan bool, 1) |
421 | s.testlog.SetLogEventCb(func(string) { |
422 | @@ -252,5 +258,83 @@ |
423 | c.Fatal("taking too long to log error") |
424 | case <-logged: |
425 | } |
426 | - c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful broadcast, get channel snapshot for 0: get channel snapshot fail\n") |
427 | + c.Check(s.testlog.Captured(), Matches, "ERROR.*: get channel snapshot fail\n") |
428 | +} |
429 | + |
430 | +func (s *CommonBrokerSuite) TestUnicast(c *C) { |
431 | + sto := store.NewInMemoryPendingStore() |
432 | + notification1 := json.RawMessage(`{"m": "M1"}`) |
433 | + notification2 := json.RawMessage(`{"m": "M2"}`) |
434 | + chanId1 := store.UnicastInternalChannelId("dev1", "dev1") |
435 | + chanId2 := store.UnicastInternalChannelId("dev2", "dev2") |
436 | + b := s.MakeBroker(sto, testBrokerConfig, nil) |
437 | + b.Start() |
438 | + defer b.Stop() |
439 | + sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1") |
440 | + c.Assert(err, IsNil) |
441 | + sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2") |
442 | + c.Assert(err, IsNil) |
443 | + // add notification to channel *after* the registrations |
444 | + muchLater := time.Now().Add(10 * time.Minute) |
445 | + sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater) |
446 | + sto.AppendToUnicastChannel(chanId2, "app2", notification2, "msg2", muchLater) |
447 | + b.Unicast(chanId2, chanId1) |
448 | + select { |
449 | + case <-time.After(5 * time.Second): |
450 | + c.Fatal("taking too long to get unicast exchange") |
451 | + case exchg1 := <-sess1.SessionChannel(): |
452 | + u1 := s.RevealUnicastExchange(exchg1) |
453 | + c.Check(u1.ChanId, Equals, chanId1) |
454 | + } |
455 | + select { |
456 | + case <-time.After(5 * time.Second): |
457 | + c.Fatal("taking too long to get unicast exchange") |
458 | + case exchg2 := <-sess2.SessionChannel(): |
459 | + u2 := s.RevealUnicastExchange(exchg2) |
460 | + c.Check(u2.ChanId, Equals, chanId2) |
461 | + } |
462 | +} |
463 | + |
464 | +func (s *CommonBrokerSuite) TestGetAndDrop(c *C) { |
465 | + sto := store.NewInMemoryPendingStore() |
466 | + notification1 := json.RawMessage(`{"m": "M1"}`) |
467 | + chanId1 := store.UnicastInternalChannelId("dev3", "dev3") |
468 | + b := s.MakeBroker(sto, testBrokerConfig, nil) |
469 | + b.Start() |
470 | + defer b.Stop() |
471 | + sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1") |
472 | + c.Assert(err, IsNil) |
473 | + muchLater := time.Now().Add(10 * time.Minute) |
474 | + sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater) |
475 | + _, expected, err := sto.GetChannelSnapshot(chanId1) |
476 | + c.Assert(err, IsNil) |
477 | + _, notifs, err := sess1.Get(chanId1, false) |
478 | + c.Check(notifs, HasLen, 1) |
479 | + c.Check(notifs, DeepEquals, expected) |
480 | + err = sess1.DropByMsgId(chanId1, notifs) |
481 | + c.Assert(err, IsNil) |
482 | + _, notifs, err = sess1.Get(chanId1, true) |
483 | + c.Check(notifs, HasLen, 0) |
484 | + _, expected, err = sto.GetChannelSnapshot(chanId1) |
485 | + c.Assert(err, IsNil) |
486 | + c.Check(expected, HasLen, 0) |
487 | + |
488 | +} |
489 | + |
490 | +func (s *CommonBrokerSuite) TestGetAndDropErrors(c *C) { |
491 | + chanId1 := store.UnicastInternalChannelId("dev3", "dev3") |
492 | + sto := &testFailingStore{countdownToFail: 1} |
493 | + b := s.MakeBroker(sto, testBrokerConfig, s.testlog) |
494 | + b.Start() |
495 | + defer b.Stop() |
496 | + sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1") |
497 | + c.Assert(err, IsNil) |
498 | + _, _, err = sess1.Get(chanId1, false) |
499 | + c.Assert(err, ErrorMatches, "get channel snapshot fail") |
500 | + c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, get channel snapshot for Udev3:dev3 \\(cachedOk=false\\): get channel snapshot fail\n") |
501 | + s.testlog.ResetCapture() |
502 | + |
503 | + err = sess1.DropByMsgId(chanId1, nil) |
504 | + c.Assert(err, ErrorMatches, "drop fail") |
505 | + c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n") |
506 | } |