Merge lp:~pedronis/ubuntu-push/freelance-exchanges into lp:ubuntu-push
- freelance-exchanges
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 21 |
Merged at revision: | 20 |
Proposed branch: | lp:~pedronis/ubuntu-push/freelance-exchanges |
Merge into: | lp:ubuntu-push |
Prerequisite: | lp:~pedronis/ubuntu-push/server-bits-in-server |
Diff against target: |
526 lines (+244/-161) 6 files modified
server/broker/broker.go (+7/-0) server/broker/exchanges.go (+74/-0) server/broker/exchanges_test.go (+123/-0) server/broker/simple.go (+18/-52) server/broker/simple_test.go (+14/-109) server/session/session_test.go (+8/-0) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/freelance-exchanges |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email: mp+203000@code.launchpad.net |
Commit message
exchanges don't require to be so tied to a particular broker
Description of the change
with a bit of reorg exchanges don't require to be so tied to a particular broker
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'server/broker/broker.go' | |||
2 | --- server/broker/broker.go 2014-01-14 15:35:20 +0000 | |||
3 | +++ server/broker/broker.go 2014-01-24 07:56:28 +0000 | |||
4 | @@ -44,6 +44,9 @@ | |||
5 | 44 | Acked(BrokerSession) error | 44 | Acked(BrokerSession) error |
6 | 45 | } | 45 | } |
7 | 46 | 46 | ||
8 | 47 | // LevelsMap is the type for holding levels for session. | ||
9 | 48 | type LevelsMap map[store.InternalChannelId]int64 | ||
10 | 49 | |||
11 | 47 | // BrokerSession holds broker session state. | 50 | // BrokerSession holds broker session state. |
12 | 48 | type BrokerSession interface { | 51 | type BrokerSession interface { |
13 | 49 | // SessionChannel returns the session control channel | 52 | // SessionChannel returns the session control channel |
14 | @@ -51,6 +54,10 @@ | |||
15 | 51 | SessionChannel() <-chan Exchange | 54 | SessionChannel() <-chan Exchange |
16 | 52 | // DeviceId returns the device id string. | 55 | // DeviceId returns the device id string. |
17 | 53 | DeviceId() string | 56 | DeviceId() string |
18 | 57 | // Levels returns the current channel levels for the session | ||
19 | 58 | Levels() LevelsMap | ||
20 | 59 | // ExchangeScratchArea returns the scratch area for exchanges. | ||
21 | 60 | ExchangeScratchArea() *ExchangesScratchArea | ||
22 | 54 | } | 61 | } |
23 | 55 | 62 | ||
24 | 56 | // Session aborted error. | 63 | // Session aborted error. |
25 | 57 | 64 | ||
26 | === added file 'server/broker/exchanges.go' | |||
27 | --- server/broker/exchanges.go 1970-01-01 00:00:00 +0000 | |||
28 | +++ server/broker/exchanges.go 2014-01-24 07:56:28 +0000 | |||
29 | @@ -0,0 +1,74 @@ | |||
30 | 1 | /* | ||
31 | 2 | Copyright 2013-2014 Canonical Ltd. | ||
32 | 3 | |||
33 | 4 | This program is free software: you can redistribute it and/or modify it | ||
34 | 5 | under the terms of the GNU General Public License version 3, as published | ||
35 | 6 | by the Free Software Foundation. | ||
36 | 7 | |||
37 | 8 | This program is distributed in the hope that it will be useful, but | ||
38 | 9 | WITHOUT ANY WARRANTY; without even the implied warranties of | ||
39 | 10 | MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR | ||
40 | 11 | PURPOSE. See the GNU General Public License for more details. | ||
41 | 12 | |||
42 | 13 | You should have received a copy of the GNU General Public License along | ||
43 | 14 | with this program. If not, see <http://www.gnu.org/licenses/>. | ||
44 | 15 | */ | ||
45 | 16 | |||
46 | 17 | package broker | ||
47 | 18 | |||
48 | 19 | import ( | ||
49 | 20 | "encoding/json" | ||
50 | 21 | "launchpad.net/ubuntu-push/protocol" | ||
51 | 22 | "launchpad.net/ubuntu-push/server/store" | ||
52 | 23 | // "log" | ||
53 | 24 | ) | ||
54 | 25 | |||
55 | 26 | // Exchanges | ||
56 | 27 | |||
57 | 28 | // Scratch area for exchanges, sessions should one of these. | ||
58 | 29 | type ExchangesScratchArea struct { | ||
59 | 30 | broadcastMsg protocol.BroadcastMsg | ||
60 | 31 | ackMsg protocol.AckMsg | ||
61 | 32 | } | ||
62 | 33 | |||
63 | 34 | // BroadcastExchange leads a session through delivering a BROADCAST. | ||
64 | 35 | // For simplicity its fully public. | ||
65 | 36 | type BroadcastExchange struct { | ||
66 | 37 | ChanId store.InternalChannelId | ||
67 | 38 | TopLevel int64 | ||
68 | 39 | NotificationPayloads []json.RawMessage | ||
69 | 40 | } | ||
70 | 41 | |||
71 | 42 | func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage { | ||
72 | 43 | c := int64(len(payloads)) | ||
73 | 44 | delta := topLevel - clientLevel | ||
74 | 45 | if delta < c { | ||
75 | 46 | return payloads[c-delta:] | ||
76 | 47 | } else { | ||
77 | 48 | return payloads | ||
78 | 49 | } | ||
79 | 50 | } | ||
80 | 51 | |||
81 | 52 | // Prepare session for a BROADCAST. | ||
82 | 53 | func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { | ||
83 | 54 | scratchArea := sess.ExchangeScratchArea() | ||
84 | 55 | scratchArea.broadcastMsg.Type = "broadcast" | ||
85 | 56 | clientLevel := sess.Levels()[sbe.ChanId] | ||
86 | 57 | payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads) | ||
87 | 58 | // xxx need an AppId as well, later | ||
88 | 59 | scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId) | ||
89 | 60 | scratchArea.broadcastMsg.TopLevel = sbe.TopLevel | ||
90 | 61 | scratchArea.broadcastMsg.Payloads = payloads | ||
91 | 62 | return &scratchArea.broadcastMsg, &scratchArea.ackMsg, nil | ||
92 | 63 | } | ||
93 | 64 | |||
94 | 65 | // Acked deals with an ACK for a BROADCAST. | ||
95 | 66 | func (sbe *BroadcastExchange) Acked(sess BrokerSession) error { | ||
96 | 67 | scratchArea := sess.ExchangeScratchArea() | ||
97 | 68 | if scratchArea.ackMsg.Type != "ack" { | ||
98 | 69 | return &ErrAbort{"expected ACK message"} | ||
99 | 70 | } | ||
100 | 71 | // update levels | ||
101 | 72 | sess.Levels()[sbe.ChanId] = sbe.TopLevel | ||
102 | 73 | return nil | ||
103 | 74 | } | ||
104 | 0 | 75 | ||
105 | === added file 'server/broker/exchanges_test.go' | |||
106 | --- server/broker/exchanges_test.go 1970-01-01 00:00:00 +0000 | |||
107 | +++ server/broker/exchanges_test.go 2014-01-24 07:56:28 +0000 | |||
108 | @@ -0,0 +1,123 @@ | |||
109 | 1 | /* | ||
110 | 2 | Copyright 2013-2014 Canonical Ltd. | ||
111 | 3 | |||
112 | 4 | This program is free software: you can redistribute it and/or modify it | ||
113 | 5 | under the terms of the GNU General Public License version 3, as published | ||
114 | 6 | by the Free Software Foundation. | ||
115 | 7 | |||
116 | 8 | This program is distributed in the hope that it will be useful, but | ||
117 | 9 | WITHOUT ANY WARRANTY; without even the implied warranties of | ||
118 | 10 | MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR | ||
119 | 11 | PURPOSE. See the GNU General Public License for more details. | ||
120 | 12 | |||
121 | 13 | You should have received a copy of the GNU General Public License along | ||
122 | 14 | with this program. If not, see <http://www.gnu.org/licenses/>. | ||
123 | 15 | */ | ||
124 | 16 | |||
125 | 17 | package broker | ||
126 | 18 | |||
127 | 19 | import ( | ||
128 | 20 | "encoding/json" | ||
129 | 21 | . "launchpad.net/gocheck" | ||
130 | 22 | "launchpad.net/ubuntu-push/server/store" | ||
131 | 23 | // "log" | ||
132 | 24 | ) | ||
133 | 25 | |||
134 | 26 | type exchangesSuite struct{} | ||
135 | 27 | |||
136 | 28 | var _ = Suite(&exchangesSuite{}) | ||
137 | 29 | |||
138 | 30 | func (s *exchangesSuite) TestBroadcastExchange(c *C) { | ||
139 | 31 | sess := &simpleBrokerSession{ | ||
140 | 32 | levels: map[store.InternalChannelId]int64{}, | ||
141 | 33 | } | ||
142 | 34 | exchg := &BroadcastExchange{ | ||
143 | 35 | ChanId: store.SystemInternalChannelId, | ||
144 | 36 | TopLevel: 3, | ||
145 | 37 | NotificationPayloads: []json.RawMessage{ | ||
146 | 38 | json.RawMessage(`{"a":"x"}`), | ||
147 | 39 | json.RawMessage(`{"a":"y"}`), | ||
148 | 40 | }, | ||
149 | 41 | } | ||
150 | 42 | inMsg, outMsg, err := exchg.Prepare(sess) | ||
151 | 43 | c.Assert(err, IsNil) | ||
152 | 44 | // check | ||
153 | 45 | marshalled, err := json.Marshal(inMsg) | ||
154 | 46 | c.Assert(err, IsNil) | ||
155 | 47 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) | ||
156 | 48 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) | ||
157 | 49 | c.Assert(err, IsNil) | ||
158 | 50 | err = exchg.Acked(sess) | ||
159 | 51 | c.Assert(err, IsNil) | ||
160 | 52 | c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(3)) | ||
161 | 53 | } | ||
162 | 54 | |||
163 | 55 | func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) { | ||
164 | 56 | sess := &simpleBrokerSession{ | ||
165 | 57 | levels: map[store.InternalChannelId]int64{}, | ||
166 | 58 | } | ||
167 | 59 | exchg := &BroadcastExchange{ | ||
168 | 60 | ChanId: store.SystemInternalChannelId, | ||
169 | 61 | TopLevel: 3, | ||
170 | 62 | NotificationPayloads: []json.RawMessage{ | ||
171 | 63 | json.RawMessage(`{"a":"y"}`), | ||
172 | 64 | }, | ||
173 | 65 | } | ||
174 | 66 | inMsg, outMsg, err := exchg.Prepare(sess) | ||
175 | 67 | c.Assert(err, IsNil) | ||
176 | 68 | // check | ||
177 | 69 | marshalled, err := json.Marshal(inMsg) | ||
178 | 70 | c.Assert(err, IsNil) | ||
179 | 71 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) | ||
180 | 72 | err = json.Unmarshal([]byte(`{}`), outMsg) | ||
181 | 73 | c.Assert(err, IsNil) | ||
182 | 74 | err = exchg.Acked(sess) | ||
183 | 75 | c.Assert(err, Not(IsNil)) | ||
184 | 76 | c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(0)) | ||
185 | 77 | } | ||
186 | 78 | |||
187 | 79 | func (s *exchangesSuite) TestFilterByLevel(c *C) { | ||
188 | 80 | payloads := []json.RawMessage{ | ||
189 | 81 | json.RawMessage(`{"a": 3}`), | ||
190 | 82 | json.RawMessage(`{"a": 4}`), | ||
191 | 83 | json.RawMessage(`{"a": 5}`), | ||
192 | 84 | } | ||
193 | 85 | res := filterByLevel(5, 5, payloads) | ||
194 | 86 | c.Check(len(res), Equals, 0) | ||
195 | 87 | res = filterByLevel(4, 5, payloads) | ||
196 | 88 | c.Check(len(res), Equals, 1) | ||
197 | 89 | c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`)) | ||
198 | 90 | res = filterByLevel(3, 5, payloads) | ||
199 | 91 | c.Check(len(res), Equals, 2) | ||
200 | 92 | c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`)) | ||
201 | 93 | res = filterByLevel(2, 5, payloads) | ||
202 | 94 | c.Check(len(res), Equals, 3) | ||
203 | 95 | res = filterByLevel(1, 5, payloads) | ||
204 | 96 | c.Check(len(res), Equals, 3) | ||
205 | 97 | } | ||
206 | 98 | |||
207 | 99 | func (s *exchangesSuite) TestBroadcastExchangeFilterByLevel(c *C) { | ||
208 | 100 | sess := &simpleBrokerSession{ | ||
209 | 101 | levels: map[store.InternalChannelId]int64{ | ||
210 | 102 | store.SystemInternalChannelId: 2, | ||
211 | 103 | }, | ||
212 | 104 | } | ||
213 | 105 | exchg := &BroadcastExchange{ | ||
214 | 106 | ChanId: store.SystemInternalChannelId, | ||
215 | 107 | TopLevel: 3, | ||
216 | 108 | NotificationPayloads: []json.RawMessage{ | ||
217 | 109 | json.RawMessage(`{"a":"x"}`), | ||
218 | 110 | json.RawMessage(`{"a":"y"}`), | ||
219 | 111 | }, | ||
220 | 112 | } | ||
221 | 113 | inMsg, outMsg, err := exchg.Prepare(sess) | ||
222 | 114 | c.Assert(err, IsNil) | ||
223 | 115 | // check | ||
224 | 116 | marshalled, err := json.Marshal(inMsg) | ||
225 | 117 | c.Assert(err, IsNil) | ||
226 | 118 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) | ||
227 | 119 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) | ||
228 | 120 | c.Assert(err, IsNil) | ||
229 | 121 | err = exchg.Acked(sess) | ||
230 | 122 | c.Assert(err, IsNil) | ||
231 | 123 | } | ||
232 | 0 | 124 | ||
233 | === modified file 'server/broker/simple.go' | |||
234 | --- server/broker/simple.go 2014-01-14 15:35:20 +0000 | |||
235 | +++ server/broker/simple.go 2014-01-24 07:56:28 +0000 | |||
236 | @@ -17,7 +17,6 @@ | |||
237 | 17 | package broker | 17 | package broker |
238 | 18 | 18 | ||
239 | 19 | import ( | 19 | import ( |
240 | 20 | "encoding/json" | ||
241 | 21 | "launchpad.net/ubuntu-push/logger" | 20 | "launchpad.net/ubuntu-push/logger" |
242 | 22 | "launchpad.net/ubuntu-push/protocol" | 21 | "launchpad.net/ubuntu-push/protocol" |
243 | 23 | "launchpad.net/ubuntu-push/server/store" | 22 | "launchpad.net/ubuntu-push/server/store" |
244 | @@ -48,10 +47,9 @@ | |||
245 | 48 | deviceId string | 47 | deviceId string |
246 | 49 | done chan bool | 48 | done chan bool |
247 | 50 | exchanges chan Exchange | 49 | exchanges chan Exchange |
249 | 51 | levels map[store.InternalChannelId]int64 | 50 | levels LevelsMap |
250 | 52 | // for exchanges | 51 | // for exchanges |
253 | 53 | broadcastMsg protocol.BroadcastMsg | 52 | exchgScratch ExchangesScratchArea |
252 | 54 | ackMsg protocol.AckMsg | ||
254 | 55 | } | 53 | } |
255 | 56 | 54 | ||
256 | 57 | type deliveryKind int | 55 | type deliveryKind int |
257 | @@ -74,6 +72,14 @@ | |||
258 | 74 | return sess.deviceId | 72 | return sess.deviceId |
259 | 75 | } | 73 | } |
260 | 76 | 74 | ||
261 | 75 | func (sess *simpleBrokerSession) Levels() LevelsMap { | ||
262 | 76 | return sess.levels | ||
263 | 77 | } | ||
264 | 78 | |||
265 | 79 | func (sess *simpleBrokerSession) ExchangeScratchArea() *ExchangesScratchArea { | ||
266 | 80 | return &sess.exchgScratch | ||
267 | 81 | } | ||
268 | 82 | |||
269 | 77 | // NewSimpleBroker makes a new SimpleBroker. | 83 | // NewSimpleBroker makes a new SimpleBroker. |
270 | 78 | func NewSimpleBroker(sto store.PendingStore, cfg BrokerConfig, logger logger.Logger) *SimpleBroker { | 84 | func NewSimpleBroker(sto store.PendingStore, cfg BrokerConfig, logger logger.Logger) *SimpleBroker { |
271 | 79 | sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize()) | 85 | sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize()) |
272 | @@ -126,10 +132,10 @@ | |||
273 | 126 | } | 132 | } |
274 | 127 | clientLevel := sess.levels[chanId] | 133 | clientLevel := sess.levels[chanId] |
275 | 128 | if clientLevel != topLevel { | 134 | if clientLevel != topLevel { |
280 | 129 | broadcastExchg := &simpleBroadcastExchange{ | 135 | broadcastExchg := &BroadcastExchange{ |
281 | 130 | chanId: chanId, | 136 | ChanId: chanId, |
282 | 131 | topLevel: topLevel, | 137 | TopLevel: topLevel, |
283 | 132 | notificationPayloads: payloads, | 138 | NotificationPayloads: payloads, |
284 | 133 | } | 139 | } |
285 | 134 | sess.exchanges <- broadcastExchg | 140 | sess.exchanges <- broadcastExchg |
286 | 135 | } | 141 | } |
287 | @@ -198,10 +204,10 @@ | |||
288 | 198 | b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err) | 204 | b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err) |
289 | 199 | continue Loop | 205 | continue Loop |
290 | 200 | } | 206 | } |
295 | 201 | broadcastExchg := &simpleBroadcastExchange{ | 207 | broadcastExchg := &BroadcastExchange{ |
296 | 202 | chanId: delivery.chanId, | 208 | ChanId: delivery.chanId, |
297 | 203 | topLevel: topLevel, | 209 | TopLevel: topLevel, |
298 | 204 | notificationPayloads: payloads, | 210 | NotificationPayloads: payloads, |
299 | 205 | } | 211 | } |
300 | 206 | for _, sess := range b.registry { | 212 | for _, sess := range b.registry { |
301 | 207 | sess.exchanges <- broadcastExchg | 213 | sess.exchanges <- broadcastExchg |
302 | @@ -218,43 +224,3 @@ | |||
303 | 218 | chanId: chanId, | 224 | chanId: chanId, |
304 | 219 | } | 225 | } |
305 | 220 | } | 226 | } |
306 | 221 | |||
307 | 222 | // Exchanges | ||
308 | 223 | |||
309 | 224 | type simpleBroadcastExchange struct { | ||
310 | 225 | chanId store.InternalChannelId | ||
311 | 226 | topLevel int64 | ||
312 | 227 | notificationPayloads []json.RawMessage | ||
313 | 228 | } | ||
314 | 229 | |||
315 | 230 | func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage { | ||
316 | 231 | c := int64(len(payloads)) | ||
317 | 232 | delta := topLevel - clientLevel | ||
318 | 233 | if delta < c { | ||
319 | 234 | return payloads[c-delta:] | ||
320 | 235 | } else { | ||
321 | 236 | return payloads | ||
322 | 237 | } | ||
323 | 238 | } | ||
324 | 239 | |||
325 | 240 | func (sbe *simpleBroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { | ||
326 | 241 | simpleSess := sess.(*simpleBrokerSession) | ||
327 | 242 | simpleSess.broadcastMsg.Type = "broadcast" | ||
328 | 243 | clientLevel := simpleSess.levels[sbe.chanId] | ||
329 | 244 | payloads := filterByLevel(clientLevel, sbe.topLevel, sbe.notificationPayloads) | ||
330 | 245 | // xxx need an AppId as well, later | ||
331 | 246 | simpleSess.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.chanId) | ||
332 | 247 | simpleSess.broadcastMsg.TopLevel = sbe.topLevel | ||
333 | 248 | simpleSess.broadcastMsg.Payloads = payloads | ||
334 | 249 | return &simpleSess.broadcastMsg, &simpleSess.ackMsg, nil | ||
335 | 250 | } | ||
336 | 251 | |||
337 | 252 | func (sbe *simpleBroadcastExchange) Acked(sess BrokerSession) error { | ||
338 | 253 | simpleSess := sess.(*simpleBrokerSession) | ||
339 | 254 | if simpleSess.ackMsg.Type != "ack" { | ||
340 | 255 | return &ErrAbort{"expected ACK message"} | ||
341 | 256 | } | ||
342 | 257 | // update levels | ||
343 | 258 | simpleSess.levels[sbe.chanId] = sbe.topLevel | ||
344 | 259 | return nil | ||
345 | 260 | } | ||
346 | 261 | 227 | ||
347 | === modified file 'server/broker/simple_test.go' | |||
348 | --- server/broker/simple_test.go 2014-01-14 15:35:20 +0000 | |||
349 | +++ server/broker/simple_test.go 2014-01-24 07:56:28 +0000 | |||
350 | @@ -69,9 +69,9 @@ | |||
351 | 69 | c.Assert(err, IsNil) | 69 | c.Assert(err, IsNil) |
352 | 70 | c.Assert(b.registry["dev-1"], Equals, sess) | 70 | c.Assert(b.registry["dev-1"], Equals, sess) |
353 | 71 | c.Assert(sess.DeviceId(), Equals, "dev-1") | 71 | c.Assert(sess.DeviceId(), Equals, "dev-1") |
355 | 72 | c.Check(sess.(*simpleBrokerSession).levels, DeepEquals, map[store.InternalChannelId]int64{ | 72 | c.Check(sess.Levels(), DeepEquals, LevelsMap(map[store.InternalChannelId]int64{ |
356 | 73 | store.SystemInternalChannelId: 5, | 73 | store.SystemInternalChannelId: 5, |
358 | 74 | }) | 74 | })) |
359 | 75 | b.Unregister(sess) | 75 | b.Unregister(sess) |
360 | 76 | // just to make sure the unregister was processed | 76 | // just to make sure the unregister was processed |
361 | 77 | _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""}) | 77 | _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""}) |
362 | @@ -99,10 +99,10 @@ | |||
363 | 99 | b.feedPending(sess) | 99 | b.feedPending(sess) |
364 | 100 | c.Assert(len(sess.exchanges), Equals, 1) | 100 | c.Assert(len(sess.exchanges), Equals, 1) |
365 | 101 | exchg1 := <-sess.exchanges | 101 | exchg1 := <-sess.exchanges |
370 | 102 | c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{ | 102 | c.Check(exchg1, DeepEquals, &BroadcastExchange{ |
371 | 103 | chanId: store.SystemInternalChannelId, | 103 | ChanId: store.SystemInternalChannelId, |
372 | 104 | topLevel: 1, | 104 | TopLevel: 1, |
373 | 105 | notificationPayloads: []json.RawMessage{notification1}, | 105 | NotificationPayloads: []json.RawMessage{notification1}, |
374 | 106 | }) | 106 | }) |
375 | 107 | } | 107 | } |
376 | 108 | 108 | ||
377 | @@ -163,101 +163,6 @@ | |||
378 | 163 | c.Check(b.registry["dev-1"], Equals, sess2) | 163 | c.Check(b.registry["dev-1"], Equals, sess2) |
379 | 164 | } | 164 | } |
380 | 165 | 165 | ||
381 | 166 | func (s *simpleSuite) TestBroadcastExchange(c *C) { | ||
382 | 167 | sess := &simpleBrokerSession{ | ||
383 | 168 | levels: map[store.InternalChannelId]int64{}, | ||
384 | 169 | } | ||
385 | 170 | exchg := &simpleBroadcastExchange{ | ||
386 | 171 | chanId: store.SystemInternalChannelId, | ||
387 | 172 | topLevel: 3, | ||
388 | 173 | notificationPayloads: []json.RawMessage{ | ||
389 | 174 | json.RawMessage(`{"a":"x"}`), | ||
390 | 175 | json.RawMessage(`{"a":"y"}`), | ||
391 | 176 | }, | ||
392 | 177 | } | ||
393 | 178 | inMsg, outMsg, err := exchg.Prepare(sess) | ||
394 | 179 | c.Assert(err, IsNil) | ||
395 | 180 | // check | ||
396 | 181 | marshalled, err := json.Marshal(inMsg) | ||
397 | 182 | c.Assert(err, IsNil) | ||
398 | 183 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) | ||
399 | 184 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) | ||
400 | 185 | c.Assert(err, IsNil) | ||
401 | 186 | err = exchg.Acked(sess) | ||
402 | 187 | c.Assert(err, IsNil) | ||
403 | 188 | c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(3)) | ||
404 | 189 | } | ||
405 | 190 | |||
406 | 191 | func (s *simpleSuite) TestBroadcastExchangeAckMismatch(c *C) { | ||
407 | 192 | sess := &simpleBrokerSession{ | ||
408 | 193 | levels: map[store.InternalChannelId]int64{}, | ||
409 | 194 | } | ||
410 | 195 | exchg := &simpleBroadcastExchange{ | ||
411 | 196 | chanId: store.SystemInternalChannelId, | ||
412 | 197 | topLevel: 3, | ||
413 | 198 | notificationPayloads: []json.RawMessage{ | ||
414 | 199 | json.RawMessage(`{"a":"y"}`), | ||
415 | 200 | }, | ||
416 | 201 | } | ||
417 | 202 | inMsg, outMsg, err := exchg.Prepare(sess) | ||
418 | 203 | c.Assert(err, IsNil) | ||
419 | 204 | // check | ||
420 | 205 | marshalled, err := json.Marshal(inMsg) | ||
421 | 206 | c.Assert(err, IsNil) | ||
422 | 207 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) | ||
423 | 208 | err = json.Unmarshal([]byte(`{}`), outMsg) | ||
424 | 209 | c.Assert(err, IsNil) | ||
425 | 210 | err = exchg.Acked(sess) | ||
426 | 211 | c.Assert(err, Not(IsNil)) | ||
427 | 212 | c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(0)) | ||
428 | 213 | } | ||
429 | 214 | |||
430 | 215 | func (s *simpleSuite) TestFilterByLevel(c *C) { | ||
431 | 216 | payloads := []json.RawMessage{ | ||
432 | 217 | json.RawMessage(`{"a": 3}`), | ||
433 | 218 | json.RawMessage(`{"a": 4}`), | ||
434 | 219 | json.RawMessage(`{"a": 5}`), | ||
435 | 220 | } | ||
436 | 221 | res := filterByLevel(5, 5, payloads) | ||
437 | 222 | c.Check(len(res), Equals, 0) | ||
438 | 223 | res = filterByLevel(4, 5, payloads) | ||
439 | 224 | c.Check(len(res), Equals, 1) | ||
440 | 225 | c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`)) | ||
441 | 226 | res = filterByLevel(3, 5, payloads) | ||
442 | 227 | c.Check(len(res), Equals, 2) | ||
443 | 228 | c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`)) | ||
444 | 229 | res = filterByLevel(2, 5, payloads) | ||
445 | 230 | c.Check(len(res), Equals, 3) | ||
446 | 231 | res = filterByLevel(1, 5, payloads) | ||
447 | 232 | c.Check(len(res), Equals, 3) | ||
448 | 233 | } | ||
449 | 234 | |||
450 | 235 | func (s *simpleSuite) TestBroadcastExchangeFilterByLevel(c *C) { | ||
451 | 236 | sess := &simpleBrokerSession{ | ||
452 | 237 | levels: map[store.InternalChannelId]int64{ | ||
453 | 238 | store.SystemInternalChannelId: 2, | ||
454 | 239 | }, | ||
455 | 240 | } | ||
456 | 241 | exchg := &simpleBroadcastExchange{ | ||
457 | 242 | chanId: store.SystemInternalChannelId, | ||
458 | 243 | topLevel: 3, | ||
459 | 244 | notificationPayloads: []json.RawMessage{ | ||
460 | 245 | json.RawMessage(`{"a":"x"}`), | ||
461 | 246 | json.RawMessage(`{"a":"y"}`), | ||
462 | 247 | }, | ||
463 | 248 | } | ||
464 | 249 | inMsg, outMsg, err := exchg.Prepare(sess) | ||
465 | 250 | c.Assert(err, IsNil) | ||
466 | 251 | // check | ||
467 | 252 | marshalled, err := json.Marshal(inMsg) | ||
468 | 253 | c.Assert(err, IsNil) | ||
469 | 254 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) | ||
470 | 255 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) | ||
471 | 256 | c.Assert(err, IsNil) | ||
472 | 257 | err = exchg.Acked(sess) | ||
473 | 258 | c.Assert(err, IsNil) | ||
474 | 259 | } | ||
475 | 260 | |||
476 | 261 | func (s *simpleSuite) TestBroadcast(c *C) { | 166 | func (s *simpleSuite) TestBroadcast(c *C) { |
477 | 262 | sto := store.NewInMemoryPendingStore() | 167 | sto := store.NewInMemoryPendingStore() |
478 | 263 | notification1 := json.RawMessage(`{"m": "M"}`) | 168 | notification1 := json.RawMessage(`{"m": "M"}`) |
479 | @@ -274,20 +179,20 @@ | |||
480 | 274 | case <-time.After(5 * time.Second): | 179 | case <-time.After(5 * time.Second): |
481 | 275 | c.Fatal("taking too long to get broadcast exchange") | 180 | c.Fatal("taking too long to get broadcast exchange") |
482 | 276 | case exchg1 := <-sess1.SessionChannel(): | 181 | case exchg1 := <-sess1.SessionChannel(): |
487 | 277 | c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{ | 182 | c.Check(exchg1, DeepEquals, &BroadcastExchange{ |
488 | 278 | chanId: store.SystemInternalChannelId, | 183 | ChanId: store.SystemInternalChannelId, |
489 | 279 | topLevel: 1, | 184 | TopLevel: 1, |
490 | 280 | notificationPayloads: []json.RawMessage{notification1}, | 185 | NotificationPayloads: []json.RawMessage{notification1}, |
491 | 281 | }) | 186 | }) |
492 | 282 | } | 187 | } |
493 | 283 | select { | 188 | select { |
494 | 284 | case <-time.After(5 * time.Second): | 189 | case <-time.After(5 * time.Second): |
495 | 285 | c.Fatal("taking too long to get broadcast exchange") | 190 | c.Fatal("taking too long to get broadcast exchange") |
496 | 286 | case exchg2 := <-sess2.SessionChannel(): | 191 | case exchg2 := <-sess2.SessionChannel(): |
501 | 287 | c.Check(exchg2, DeepEquals, &simpleBroadcastExchange{ | 192 | c.Check(exchg2, DeepEquals, &BroadcastExchange{ |
502 | 288 | chanId: store.SystemInternalChannelId, | 193 | ChanId: store.SystemInternalChannelId, |
503 | 289 | topLevel: 1, | 194 | TopLevel: 1, |
504 | 290 | notificationPayloads: []json.RawMessage{notification1}, | 195 | NotificationPayloads: []json.RawMessage{notification1}, |
505 | 291 | }) | 196 | }) |
506 | 292 | } | 197 | } |
507 | 293 | } | 198 | } |
508 | 294 | 199 | ||
509 | === modified file 'server/session/session_test.go' | |||
510 | --- server/session/session_test.go 2014-01-15 15:54:20 +0000 | |||
511 | +++ server/session/session_test.go 2014-01-24 07:56:28 +0000 | |||
512 | @@ -136,6 +136,14 @@ | |||
513 | 136 | return tbs.deviceId | 136 | return tbs.deviceId |
514 | 137 | } | 137 | } |
515 | 138 | 138 | ||
516 | 139 | func (tbs *testBrokerSession) Levels() broker.LevelsMap { | ||
517 | 140 | return nil | ||
518 | 141 | } | ||
519 | 142 | |||
520 | 143 | func (tbs *testBrokerSession) ExchangeScratchArea() *broker.ExchangesScratchArea { | ||
521 | 144 | return nil | ||
522 | 145 | } | ||
523 | 146 | |||
524 | 139 | func (tb *testBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) { | 147 | func (tb *testBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) { |
525 | 140 | tb.registration <- "register " + connect.DeviceId | 148 | tb.registration <- "register " + connect.DeviceId |
526 | 141 | return &testBrokerSession{connect.DeviceId, nil}, tb.err | 149 | return &testBrokerSession{connect.DeviceId, nil}, tb.err |
Only issues I'd have with this one are fixed in the next one. Good job.