Merge lp:~pedronis/ubuntu-push/freelance-exchanges into lp:ubuntu-push
- freelance-exchanges
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 21 |
Merged at revision: | 20 |
Proposed branch: | lp:~pedronis/ubuntu-push/freelance-exchanges |
Merge into: | lp:ubuntu-push |
Prerequisite: | lp:~pedronis/ubuntu-push/server-bits-in-server |
Diff against target: |
526 lines (+244/-161) 6 files modified
server/broker/broker.go (+7/-0) server/broker/exchanges.go (+74/-0) server/broker/exchanges_test.go (+123/-0) server/broker/simple.go (+18/-52) server/broker/simple_test.go (+14/-109) server/session/session_test.go (+8/-0) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/freelance-exchanges |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email:
|
Commit message
exchanges don't require to be so tied to a particular broker
Description of the change
with a bit of reorg exchanges don't require to be so tied to a particular broker
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'server/broker/broker.go' |
2 | --- server/broker/broker.go 2014-01-14 15:35:20 +0000 |
3 | +++ server/broker/broker.go 2014-01-24 07:56:28 +0000 |
4 | @@ -44,6 +44,9 @@ |
5 | Acked(BrokerSession) error |
6 | } |
7 | |
8 | +// LevelsMap is the type for holding levels for session. |
9 | +type LevelsMap map[store.InternalChannelId]int64 |
10 | + |
11 | // BrokerSession holds broker session state. |
12 | type BrokerSession interface { |
13 | // SessionChannel returns the session control channel |
14 | @@ -51,6 +54,10 @@ |
15 | SessionChannel() <-chan Exchange |
16 | // DeviceId returns the device id string. |
17 | DeviceId() string |
18 | + // Levels returns the current channel levels for the session |
19 | + Levels() LevelsMap |
20 | + // ExchangeScratchArea returns the scratch area for exchanges. |
21 | + ExchangeScratchArea() *ExchangesScratchArea |
22 | } |
23 | |
24 | // Session aborted error. |
25 | |
26 | === added file 'server/broker/exchanges.go' |
27 | --- server/broker/exchanges.go 1970-01-01 00:00:00 +0000 |
28 | +++ server/broker/exchanges.go 2014-01-24 07:56:28 +0000 |
29 | @@ -0,0 +1,74 @@ |
30 | +/* |
31 | + Copyright 2013-2014 Canonical Ltd. |
32 | + |
33 | + This program is free software: you can redistribute it and/or modify it |
34 | + under the terms of the GNU General Public License version 3, as published |
35 | + by the Free Software Foundation. |
36 | + |
37 | + This program is distributed in the hope that it will be useful, but |
38 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
39 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
40 | + PURPOSE. See the GNU General Public License for more details. |
41 | + |
42 | + You should have received a copy of the GNU General Public License along |
43 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
44 | +*/ |
45 | + |
46 | +package broker |
47 | + |
48 | +import ( |
49 | + "encoding/json" |
50 | + "launchpad.net/ubuntu-push/protocol" |
51 | + "launchpad.net/ubuntu-push/server/store" |
52 | + // "log" |
53 | +) |
54 | + |
55 | +// Exchanges |
56 | + |
57 | +// Scratch area for exchanges, sessions should one of these. |
58 | +type ExchangesScratchArea struct { |
59 | + broadcastMsg protocol.BroadcastMsg |
60 | + ackMsg protocol.AckMsg |
61 | +} |
62 | + |
63 | +// BroadcastExchange leads a session through delivering a BROADCAST. |
64 | +// For simplicity its fully public. |
65 | +type BroadcastExchange struct { |
66 | + ChanId store.InternalChannelId |
67 | + TopLevel int64 |
68 | + NotificationPayloads []json.RawMessage |
69 | +} |
70 | + |
71 | +func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage { |
72 | + c := int64(len(payloads)) |
73 | + delta := topLevel - clientLevel |
74 | + if delta < c { |
75 | + return payloads[c-delta:] |
76 | + } else { |
77 | + return payloads |
78 | + } |
79 | +} |
80 | + |
81 | +// Prepare session for a BROADCAST. |
82 | +func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
83 | + scratchArea := sess.ExchangeScratchArea() |
84 | + scratchArea.broadcastMsg.Type = "broadcast" |
85 | + clientLevel := sess.Levels()[sbe.ChanId] |
86 | + payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads) |
87 | + // xxx need an AppId as well, later |
88 | + scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId) |
89 | + scratchArea.broadcastMsg.TopLevel = sbe.TopLevel |
90 | + scratchArea.broadcastMsg.Payloads = payloads |
91 | + return &scratchArea.broadcastMsg, &scratchArea.ackMsg, nil |
92 | +} |
93 | + |
94 | +// Acked deals with an ACK for a BROADCAST. |
95 | +func (sbe *BroadcastExchange) Acked(sess BrokerSession) error { |
96 | + scratchArea := sess.ExchangeScratchArea() |
97 | + if scratchArea.ackMsg.Type != "ack" { |
98 | + return &ErrAbort{"expected ACK message"} |
99 | + } |
100 | + // update levels |
101 | + sess.Levels()[sbe.ChanId] = sbe.TopLevel |
102 | + return nil |
103 | +} |
104 | |
105 | === added file 'server/broker/exchanges_test.go' |
106 | --- server/broker/exchanges_test.go 1970-01-01 00:00:00 +0000 |
107 | +++ server/broker/exchanges_test.go 2014-01-24 07:56:28 +0000 |
108 | @@ -0,0 +1,123 @@ |
109 | +/* |
110 | + Copyright 2013-2014 Canonical Ltd. |
111 | + |
112 | + This program is free software: you can redistribute it and/or modify it |
113 | + under the terms of the GNU General Public License version 3, as published |
114 | + by the Free Software Foundation. |
115 | + |
116 | + This program is distributed in the hope that it will be useful, but |
117 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
118 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
119 | + PURPOSE. See the GNU General Public License for more details. |
120 | + |
121 | + You should have received a copy of the GNU General Public License along |
122 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
123 | +*/ |
124 | + |
125 | +package broker |
126 | + |
127 | +import ( |
128 | + "encoding/json" |
129 | + . "launchpad.net/gocheck" |
130 | + "launchpad.net/ubuntu-push/server/store" |
131 | + // "log" |
132 | +) |
133 | + |
134 | +type exchangesSuite struct{} |
135 | + |
136 | +var _ = Suite(&exchangesSuite{}) |
137 | + |
138 | +func (s *exchangesSuite) TestBroadcastExchange(c *C) { |
139 | + sess := &simpleBrokerSession{ |
140 | + levels: map[store.InternalChannelId]int64{}, |
141 | + } |
142 | + exchg := &BroadcastExchange{ |
143 | + ChanId: store.SystemInternalChannelId, |
144 | + TopLevel: 3, |
145 | + NotificationPayloads: []json.RawMessage{ |
146 | + json.RawMessage(`{"a":"x"}`), |
147 | + json.RawMessage(`{"a":"y"}`), |
148 | + }, |
149 | + } |
150 | + inMsg, outMsg, err := exchg.Prepare(sess) |
151 | + c.Assert(err, IsNil) |
152 | + // check |
153 | + marshalled, err := json.Marshal(inMsg) |
154 | + c.Assert(err, IsNil) |
155 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) |
156 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
157 | + c.Assert(err, IsNil) |
158 | + err = exchg.Acked(sess) |
159 | + c.Assert(err, IsNil) |
160 | + c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(3)) |
161 | +} |
162 | + |
163 | +func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) { |
164 | + sess := &simpleBrokerSession{ |
165 | + levels: map[store.InternalChannelId]int64{}, |
166 | + } |
167 | + exchg := &BroadcastExchange{ |
168 | + ChanId: store.SystemInternalChannelId, |
169 | + TopLevel: 3, |
170 | + NotificationPayloads: []json.RawMessage{ |
171 | + json.RawMessage(`{"a":"y"}`), |
172 | + }, |
173 | + } |
174 | + inMsg, outMsg, err := exchg.Prepare(sess) |
175 | + c.Assert(err, IsNil) |
176 | + // check |
177 | + marshalled, err := json.Marshal(inMsg) |
178 | + c.Assert(err, IsNil) |
179 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
180 | + err = json.Unmarshal([]byte(`{}`), outMsg) |
181 | + c.Assert(err, IsNil) |
182 | + err = exchg.Acked(sess) |
183 | + c.Assert(err, Not(IsNil)) |
184 | + c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(0)) |
185 | +} |
186 | + |
187 | +func (s *exchangesSuite) TestFilterByLevel(c *C) { |
188 | + payloads := []json.RawMessage{ |
189 | + json.RawMessage(`{"a": 3}`), |
190 | + json.RawMessage(`{"a": 4}`), |
191 | + json.RawMessage(`{"a": 5}`), |
192 | + } |
193 | + res := filterByLevel(5, 5, payloads) |
194 | + c.Check(len(res), Equals, 0) |
195 | + res = filterByLevel(4, 5, payloads) |
196 | + c.Check(len(res), Equals, 1) |
197 | + c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`)) |
198 | + res = filterByLevel(3, 5, payloads) |
199 | + c.Check(len(res), Equals, 2) |
200 | + c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`)) |
201 | + res = filterByLevel(2, 5, payloads) |
202 | + c.Check(len(res), Equals, 3) |
203 | + res = filterByLevel(1, 5, payloads) |
204 | + c.Check(len(res), Equals, 3) |
205 | +} |
206 | + |
207 | +func (s *exchangesSuite) TestBroadcastExchangeFilterByLevel(c *C) { |
208 | + sess := &simpleBrokerSession{ |
209 | + levels: map[store.InternalChannelId]int64{ |
210 | + store.SystemInternalChannelId: 2, |
211 | + }, |
212 | + } |
213 | + exchg := &BroadcastExchange{ |
214 | + ChanId: store.SystemInternalChannelId, |
215 | + TopLevel: 3, |
216 | + NotificationPayloads: []json.RawMessage{ |
217 | + json.RawMessage(`{"a":"x"}`), |
218 | + json.RawMessage(`{"a":"y"}`), |
219 | + }, |
220 | + } |
221 | + inMsg, outMsg, err := exchg.Prepare(sess) |
222 | + c.Assert(err, IsNil) |
223 | + // check |
224 | + marshalled, err := json.Marshal(inMsg) |
225 | + c.Assert(err, IsNil) |
226 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
227 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
228 | + c.Assert(err, IsNil) |
229 | + err = exchg.Acked(sess) |
230 | + c.Assert(err, IsNil) |
231 | +} |
232 | |
233 | === modified file 'server/broker/simple.go' |
234 | --- server/broker/simple.go 2014-01-14 15:35:20 +0000 |
235 | +++ server/broker/simple.go 2014-01-24 07:56:28 +0000 |
236 | @@ -17,7 +17,6 @@ |
237 | package broker |
238 | |
239 | import ( |
240 | - "encoding/json" |
241 | "launchpad.net/ubuntu-push/logger" |
242 | "launchpad.net/ubuntu-push/protocol" |
243 | "launchpad.net/ubuntu-push/server/store" |
244 | @@ -48,10 +47,9 @@ |
245 | deviceId string |
246 | done chan bool |
247 | exchanges chan Exchange |
248 | - levels map[store.InternalChannelId]int64 |
249 | + levels LevelsMap |
250 | // for exchanges |
251 | - broadcastMsg protocol.BroadcastMsg |
252 | - ackMsg protocol.AckMsg |
253 | + exchgScratch ExchangesScratchArea |
254 | } |
255 | |
256 | type deliveryKind int |
257 | @@ -74,6 +72,14 @@ |
258 | return sess.deviceId |
259 | } |
260 | |
261 | +func (sess *simpleBrokerSession) Levels() LevelsMap { |
262 | + return sess.levels |
263 | +} |
264 | + |
265 | +func (sess *simpleBrokerSession) ExchangeScratchArea() *ExchangesScratchArea { |
266 | + return &sess.exchgScratch |
267 | +} |
268 | + |
269 | // NewSimpleBroker makes a new SimpleBroker. |
270 | func NewSimpleBroker(sto store.PendingStore, cfg BrokerConfig, logger logger.Logger) *SimpleBroker { |
271 | sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize()) |
272 | @@ -126,10 +132,10 @@ |
273 | } |
274 | clientLevel := sess.levels[chanId] |
275 | if clientLevel != topLevel { |
276 | - broadcastExchg := &simpleBroadcastExchange{ |
277 | - chanId: chanId, |
278 | - topLevel: topLevel, |
279 | - notificationPayloads: payloads, |
280 | + broadcastExchg := &BroadcastExchange{ |
281 | + ChanId: chanId, |
282 | + TopLevel: topLevel, |
283 | + NotificationPayloads: payloads, |
284 | } |
285 | sess.exchanges <- broadcastExchg |
286 | } |
287 | @@ -198,10 +204,10 @@ |
288 | b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err) |
289 | continue Loop |
290 | } |
291 | - broadcastExchg := &simpleBroadcastExchange{ |
292 | - chanId: delivery.chanId, |
293 | - topLevel: topLevel, |
294 | - notificationPayloads: payloads, |
295 | + broadcastExchg := &BroadcastExchange{ |
296 | + ChanId: delivery.chanId, |
297 | + TopLevel: topLevel, |
298 | + NotificationPayloads: payloads, |
299 | } |
300 | for _, sess := range b.registry { |
301 | sess.exchanges <- broadcastExchg |
302 | @@ -218,43 +224,3 @@ |
303 | chanId: chanId, |
304 | } |
305 | } |
306 | - |
307 | -// Exchanges |
308 | - |
309 | -type simpleBroadcastExchange struct { |
310 | - chanId store.InternalChannelId |
311 | - topLevel int64 |
312 | - notificationPayloads []json.RawMessage |
313 | -} |
314 | - |
315 | -func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage { |
316 | - c := int64(len(payloads)) |
317 | - delta := topLevel - clientLevel |
318 | - if delta < c { |
319 | - return payloads[c-delta:] |
320 | - } else { |
321 | - return payloads |
322 | - } |
323 | -} |
324 | - |
325 | -func (sbe *simpleBroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
326 | - simpleSess := sess.(*simpleBrokerSession) |
327 | - simpleSess.broadcastMsg.Type = "broadcast" |
328 | - clientLevel := simpleSess.levels[sbe.chanId] |
329 | - payloads := filterByLevel(clientLevel, sbe.topLevel, sbe.notificationPayloads) |
330 | - // xxx need an AppId as well, later |
331 | - simpleSess.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.chanId) |
332 | - simpleSess.broadcastMsg.TopLevel = sbe.topLevel |
333 | - simpleSess.broadcastMsg.Payloads = payloads |
334 | - return &simpleSess.broadcastMsg, &simpleSess.ackMsg, nil |
335 | -} |
336 | - |
337 | -func (sbe *simpleBroadcastExchange) Acked(sess BrokerSession) error { |
338 | - simpleSess := sess.(*simpleBrokerSession) |
339 | - if simpleSess.ackMsg.Type != "ack" { |
340 | - return &ErrAbort{"expected ACK message"} |
341 | - } |
342 | - // update levels |
343 | - simpleSess.levels[sbe.chanId] = sbe.topLevel |
344 | - return nil |
345 | -} |
346 | |
347 | === modified file 'server/broker/simple_test.go' |
348 | --- server/broker/simple_test.go 2014-01-14 15:35:20 +0000 |
349 | +++ server/broker/simple_test.go 2014-01-24 07:56:28 +0000 |
350 | @@ -69,9 +69,9 @@ |
351 | c.Assert(err, IsNil) |
352 | c.Assert(b.registry["dev-1"], Equals, sess) |
353 | c.Assert(sess.DeviceId(), Equals, "dev-1") |
354 | - c.Check(sess.(*simpleBrokerSession).levels, DeepEquals, map[store.InternalChannelId]int64{ |
355 | + c.Check(sess.Levels(), DeepEquals, LevelsMap(map[store.InternalChannelId]int64{ |
356 | store.SystemInternalChannelId: 5, |
357 | - }) |
358 | + })) |
359 | b.Unregister(sess) |
360 | // just to make sure the unregister was processed |
361 | _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""}) |
362 | @@ -99,10 +99,10 @@ |
363 | b.feedPending(sess) |
364 | c.Assert(len(sess.exchanges), Equals, 1) |
365 | exchg1 := <-sess.exchanges |
366 | - c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{ |
367 | - chanId: store.SystemInternalChannelId, |
368 | - topLevel: 1, |
369 | - notificationPayloads: []json.RawMessage{notification1}, |
370 | + c.Check(exchg1, DeepEquals, &BroadcastExchange{ |
371 | + ChanId: store.SystemInternalChannelId, |
372 | + TopLevel: 1, |
373 | + NotificationPayloads: []json.RawMessage{notification1}, |
374 | }) |
375 | } |
376 | |
377 | @@ -163,101 +163,6 @@ |
378 | c.Check(b.registry["dev-1"], Equals, sess2) |
379 | } |
380 | |
381 | -func (s *simpleSuite) TestBroadcastExchange(c *C) { |
382 | - sess := &simpleBrokerSession{ |
383 | - levels: map[store.InternalChannelId]int64{}, |
384 | - } |
385 | - exchg := &simpleBroadcastExchange{ |
386 | - chanId: store.SystemInternalChannelId, |
387 | - topLevel: 3, |
388 | - notificationPayloads: []json.RawMessage{ |
389 | - json.RawMessage(`{"a":"x"}`), |
390 | - json.RawMessage(`{"a":"y"}`), |
391 | - }, |
392 | - } |
393 | - inMsg, outMsg, err := exchg.Prepare(sess) |
394 | - c.Assert(err, IsNil) |
395 | - // check |
396 | - marshalled, err := json.Marshal(inMsg) |
397 | - c.Assert(err, IsNil) |
398 | - c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) |
399 | - err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
400 | - c.Assert(err, IsNil) |
401 | - err = exchg.Acked(sess) |
402 | - c.Assert(err, IsNil) |
403 | - c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(3)) |
404 | -} |
405 | - |
406 | -func (s *simpleSuite) TestBroadcastExchangeAckMismatch(c *C) { |
407 | - sess := &simpleBrokerSession{ |
408 | - levels: map[store.InternalChannelId]int64{}, |
409 | - } |
410 | - exchg := &simpleBroadcastExchange{ |
411 | - chanId: store.SystemInternalChannelId, |
412 | - topLevel: 3, |
413 | - notificationPayloads: []json.RawMessage{ |
414 | - json.RawMessage(`{"a":"y"}`), |
415 | - }, |
416 | - } |
417 | - inMsg, outMsg, err := exchg.Prepare(sess) |
418 | - c.Assert(err, IsNil) |
419 | - // check |
420 | - marshalled, err := json.Marshal(inMsg) |
421 | - c.Assert(err, IsNil) |
422 | - c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
423 | - err = json.Unmarshal([]byte(`{}`), outMsg) |
424 | - c.Assert(err, IsNil) |
425 | - err = exchg.Acked(sess) |
426 | - c.Assert(err, Not(IsNil)) |
427 | - c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(0)) |
428 | -} |
429 | - |
430 | -func (s *simpleSuite) TestFilterByLevel(c *C) { |
431 | - payloads := []json.RawMessage{ |
432 | - json.RawMessage(`{"a": 3}`), |
433 | - json.RawMessage(`{"a": 4}`), |
434 | - json.RawMessage(`{"a": 5}`), |
435 | - } |
436 | - res := filterByLevel(5, 5, payloads) |
437 | - c.Check(len(res), Equals, 0) |
438 | - res = filterByLevel(4, 5, payloads) |
439 | - c.Check(len(res), Equals, 1) |
440 | - c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`)) |
441 | - res = filterByLevel(3, 5, payloads) |
442 | - c.Check(len(res), Equals, 2) |
443 | - c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`)) |
444 | - res = filterByLevel(2, 5, payloads) |
445 | - c.Check(len(res), Equals, 3) |
446 | - res = filterByLevel(1, 5, payloads) |
447 | - c.Check(len(res), Equals, 3) |
448 | -} |
449 | - |
450 | -func (s *simpleSuite) TestBroadcastExchangeFilterByLevel(c *C) { |
451 | - sess := &simpleBrokerSession{ |
452 | - levels: map[store.InternalChannelId]int64{ |
453 | - store.SystemInternalChannelId: 2, |
454 | - }, |
455 | - } |
456 | - exchg := &simpleBroadcastExchange{ |
457 | - chanId: store.SystemInternalChannelId, |
458 | - topLevel: 3, |
459 | - notificationPayloads: []json.RawMessage{ |
460 | - json.RawMessage(`{"a":"x"}`), |
461 | - json.RawMessage(`{"a":"y"}`), |
462 | - }, |
463 | - } |
464 | - inMsg, outMsg, err := exchg.Prepare(sess) |
465 | - c.Assert(err, IsNil) |
466 | - // check |
467 | - marshalled, err := json.Marshal(inMsg) |
468 | - c.Assert(err, IsNil) |
469 | - c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
470 | - err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
471 | - c.Assert(err, IsNil) |
472 | - err = exchg.Acked(sess) |
473 | - c.Assert(err, IsNil) |
474 | -} |
475 | - |
476 | func (s *simpleSuite) TestBroadcast(c *C) { |
477 | sto := store.NewInMemoryPendingStore() |
478 | notification1 := json.RawMessage(`{"m": "M"}`) |
479 | @@ -274,20 +179,20 @@ |
480 | case <-time.After(5 * time.Second): |
481 | c.Fatal("taking too long to get broadcast exchange") |
482 | case exchg1 := <-sess1.SessionChannel(): |
483 | - c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{ |
484 | - chanId: store.SystemInternalChannelId, |
485 | - topLevel: 1, |
486 | - notificationPayloads: []json.RawMessage{notification1}, |
487 | + c.Check(exchg1, DeepEquals, &BroadcastExchange{ |
488 | + ChanId: store.SystemInternalChannelId, |
489 | + TopLevel: 1, |
490 | + NotificationPayloads: []json.RawMessage{notification1}, |
491 | }) |
492 | } |
493 | select { |
494 | case <-time.After(5 * time.Second): |
495 | c.Fatal("taking too long to get broadcast exchange") |
496 | case exchg2 := <-sess2.SessionChannel(): |
497 | - c.Check(exchg2, DeepEquals, &simpleBroadcastExchange{ |
498 | - chanId: store.SystemInternalChannelId, |
499 | - topLevel: 1, |
500 | - notificationPayloads: []json.RawMessage{notification1}, |
501 | + c.Check(exchg2, DeepEquals, &BroadcastExchange{ |
502 | + ChanId: store.SystemInternalChannelId, |
503 | + TopLevel: 1, |
504 | + NotificationPayloads: []json.RawMessage{notification1}, |
505 | }) |
506 | } |
507 | } |
508 | |
509 | === modified file 'server/session/session_test.go' |
510 | --- server/session/session_test.go 2014-01-15 15:54:20 +0000 |
511 | +++ server/session/session_test.go 2014-01-24 07:56:28 +0000 |
512 | @@ -136,6 +136,14 @@ |
513 | return tbs.deviceId |
514 | } |
515 | |
516 | +func (tbs *testBrokerSession) Levels() broker.LevelsMap { |
517 | + return nil |
518 | +} |
519 | + |
520 | +func (tbs *testBrokerSession) ExchangeScratchArea() *broker.ExchangesScratchArea { |
521 | + return nil |
522 | +} |
523 | + |
524 | func (tb *testBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) { |
525 | tb.registration <- "register " + connect.DeviceId |
526 | return &testBrokerSession{connect.DeviceId, nil}, tb.err |
Only issues I'd have with this one are fixed in the next one. Good job.