Merge lp:~pedronis/ubuntu-push/use-info into lp:ubuntu-push/automatic
- use-info
- Merge into automatic
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 109 |
Merged at revision: | 108 |
Proposed branch: | lp:~pedronis/ubuntu-push/use-info |
Merge into: | lp:ubuntu-push/automatic |
Diff against target: |
900 lines (+325/-67) 14 files modified
server/acceptance/acceptanceclient.go (+6/-0) server/acceptance/cmd/acceptanceclient.go (+7/-2) server/acceptance/suites/broadcast.go (+48/-25) server/acceptance/suites/pingpong.go (+2/-2) server/acceptance/suites/suite.go (+4/-2) server/broker/broker.go (+20/-0) server/broker/broker_test.go (+18/-0) server/broker/exchanges.go (+31/-0) server/broker/exchanges_test.go (+85/-25) server/broker/exchg_impl_test.go (+30/-0) server/broker/simple/simple.go (+32/-10) server/broker/simple/simple_test.go (+2/-0) server/broker/testing/impls.go (+10/-0) server/broker/testsuite/suite.go (+30/-1) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/use-info |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email: mp+214121@code.launchpad.net |
Commit message
use image informations sent by the client to filter the system channel broadcasts
Description of the change
use image informations sent by the client to filter the system channel broadcasts,
assume there will be an entry in them with key "IMAGE-
Ubuntu One Auto Pilot (otto-pilot) wrote : | # |
The attempt to merge lp:~pedronis/ubuntu-push/use-info into lp:ubuntu-push/automatic failed. Below is the output from the failed tests.
mkdir -p /mnt/tarmac/
mkdir -p /mnt/tarmac/
go get -u launchpad.
go get -d -u launchpad.
/mnt/tarmac/
"/mnt/tarmac/
go install launchpad.
go test launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
? launchpad.
? launc...
Preview Diff
1 | === modified file 'server/acceptance/acceptanceclient.go' |
2 | --- server/acceptance/acceptanceclient.go 2014-02-21 16:17:28 +0000 |
3 | +++ server/acceptance/acceptanceclient.go 2014-04-04 09:57:13 +0000 |
4 | @@ -35,6 +35,8 @@ |
5 | type ClientSession struct { |
6 | // configuration |
7 | DeviceId string |
8 | + Model string |
9 | + ImageChannel string |
10 | ServerAddr string |
11 | ExchangeTimeout time.Duration |
12 | CertPEMBlock []byte |
13 | @@ -86,6 +88,10 @@ |
14 | Type: "connect", |
15 | DeviceId: sess.DeviceId, |
16 | Levels: sess.Levels, |
17 | + Info: map[string]interface{}{ |
18 | + "device": sess.Model, |
19 | + "channel": sess.ImageChannel, |
20 | + }, |
21 | }) |
22 | if err != nil { |
23 | return err |
24 | |
25 | === modified file 'server/acceptance/cmd/acceptanceclient.go' |
26 | --- server/acceptance/cmd/acceptanceclient.go 2014-02-21 16:17:28 +0000 |
27 | +++ server/acceptance/cmd/acceptanceclient.go 2014-04-04 09:57:13 +0000 |
28 | @@ -30,6 +30,8 @@ |
29 | var ( |
30 | insecureFlag = flag.Bool("insecure", false, "disable checking of server certificate and hostname") |
31 | reportPingsFlag = flag.Bool("reportPings", true, "report each Ping from the server") |
32 | + deviceModel = flag.String("model", "?", "device image model") |
33 | + imageChannel = flag.String("imageChannel", "?", "image channel") |
34 | ) |
35 | |
36 | type configuration struct { |
37 | @@ -64,9 +66,12 @@ |
38 | ServerAddr: cfg.Addr.HostPort(), |
39 | DeviceId: flag.Arg(1), |
40 | // flags |
41 | - ReportPings: *reportPingsFlag, |
42 | - Insecure: *insecureFlag, |
43 | + Model: *deviceModel, |
44 | + ImageChannel: *imageChannel, |
45 | + ReportPings: *reportPingsFlag, |
46 | + Insecure: *insecureFlag, |
47 | } |
48 | + log.Printf("with: %#v", session) |
49 | session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName)) |
50 | if err != nil { |
51 | log.Fatalf("reading CertPEMFile: %v", err) |
52 | |
53 | === modified file 'server/acceptance/suites/broadcast.go' |
54 | --- server/acceptance/suites/broadcast.go 2014-02-21 21:39:54 +0000 |
55 | +++ server/acceptance/suites/broadcast.go 2014-04-04 09:57:13 +0000 |
56 | @@ -41,11 +41,34 @@ |
57 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
58 | Channel: "system", |
59 | ExpireOn: future, |
60 | - Data: json.RawMessage(`{"n": 42}`), |
61 | - }) |
62 | - c.Assert(err, IsNil) |
63 | - c.Assert(got, Matches, ".*ok.*") |
64 | - c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
65 | + Data: json.RawMessage(`{"img1/m1": 42}`), |
66 | + }) |
67 | + c.Assert(err, IsNil) |
68 | + c.Assert(got, Matches, ".*ok.*") |
69 | + c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`) |
70 | + stop() |
71 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
72 | + c.Check(len(errCh), Equals, 0) |
73 | +} |
74 | + |
75 | +func (s *BroadcastAcceptanceSuite) TestBroadcastToConnectedChannelFilter(c *C) { |
76 | + events, errCh, stop := s.StartClient(c, "DEVB", nil) |
77 | + got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
78 | + Channel: "system", |
79 | + ExpireOn: future, |
80 | + Data: json.RawMessage(`{"img1/m2": 10}`), |
81 | + }) |
82 | + c.Assert(err, IsNil) |
83 | + got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
84 | + Channel: "system", |
85 | + ExpireOn: future, |
86 | + Data: json.RawMessage(`{"img1/m1": 20}`), |
87 | + }) |
88 | + c.Assert(err, IsNil) |
89 | + c.Assert(got, Matches, ".*ok.*") |
90 | + // xxx don't send this one |
91 | + c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[]`) |
92 | + c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":20}]`) |
93 | stop() |
94 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
95 | c.Check(len(errCh), Equals, 0) |
96 | @@ -56,14 +79,14 @@ |
97 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
98 | Channel: "system", |
99 | ExpireOn: future, |
100 | - Data: json.RawMessage(`{"b": 1}`), |
101 | + Data: json.RawMessage(`{"img1/m1": 1}`), |
102 | }) |
103 | c.Assert(err, IsNil) |
104 | c.Assert(got, Matches, ".*ok.*") |
105 | |
106 | events, errCh, stop := s.StartClient(c, "DEVB", nil) |
107 | // gettting pending on connect |
108 | - c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
109 | + c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":1}]`) |
110 | stop() |
111 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
112 | c.Check(len(errCh), Equals, 0) |
113 | @@ -71,7 +94,7 @@ |
114 | |
115 | func (s *BroadcastAcceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) { |
116 | // send bunch of broadcasts that will be pending |
117 | - payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
118 | + payloadFmt := fmt.Sprintf(`{"img1/m1":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
119 | for i := 0; i < 32; i++ { |
120 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
121 | Channel: "system", |
122 | @@ -84,7 +107,7 @@ |
123 | |
124 | events, errCh, stop := s.StartClient(c, "DEVC", nil) |
125 | // gettting pending on connect |
126 | - c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`) |
127 | + c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"img1/m1":0,.*`) |
128 | c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`) |
129 | stop() |
130 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
131 | @@ -100,12 +123,12 @@ |
132 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
133 | Channel: "system", |
134 | ExpireOn: future, |
135 | - Data: json.RawMessage(`{"n": 42}`), |
136 | + Data: json.RawMessage(`{"img1/m1": 42}`), |
137 | }) |
138 | c.Assert(err, IsNil) |
139 | c.Assert(got, Matches, ".*ok.*") |
140 | - c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
141 | - c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
142 | + c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`) |
143 | + c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`) |
144 | stop1() |
145 | stop2() |
146 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
147 | @@ -119,11 +142,11 @@ |
148 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
149 | Channel: "system", |
150 | ExpireOn: future, |
151 | - Data: json.RawMessage(`{"b": 1}`), |
152 | + Data: json.RawMessage(`{"img1/m1": 1}`), |
153 | }) |
154 | c.Assert(err, IsNil) |
155 | c.Assert(got, Matches, ".*ok.*") |
156 | - c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
157 | + c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":1}]`) |
158 | stop() |
159 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
160 | c.Check(len(errCh), Equals, 0) |
161 | @@ -131,7 +154,7 @@ |
162 | got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
163 | Channel: "system", |
164 | ExpireOn: future, |
165 | - Data: json.RawMessage(`{"b": 2}`), |
166 | + Data: json.RawMessage(`{"img1/m1": 2}`), |
167 | }) |
168 | c.Assert(err, IsNil) |
169 | c.Assert(got, Matches, ".*ok.*") |
170 | @@ -139,7 +162,7 @@ |
171 | events, errCh, stop = s.StartClient(c, "DEVD", map[string]int64{ |
172 | protocol.SystemChannelId: 1, |
173 | }) |
174 | - c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) |
175 | + c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":2}]`) |
176 | stop() |
177 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
178 | c.Check(len(errCh), Equals, 0) |
179 | @@ -150,14 +173,14 @@ |
180 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
181 | Channel: "system", |
182 | ExpireOn: future, |
183 | - Data: json.RawMessage(`{"b": 1}`), |
184 | + Data: json.RawMessage(`{"img1/m1": 1}`), |
185 | }) |
186 | c.Assert(err, IsNil) |
187 | c.Assert(got, Matches, ".*ok.*") |
188 | got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
189 | Channel: "system", |
190 | ExpireOn: future, |
191 | - Data: json.RawMessage(`{"b": 2}`), |
192 | + Data: json.RawMessage(`{"img1/m1": 2}`), |
193 | }) |
194 | c.Assert(err, IsNil) |
195 | c.Assert(got, Matches, ".*ok.*") |
196 | @@ -166,7 +189,7 @@ |
197 | protocol.SystemChannelId: 10, |
198 | }) |
199 | // gettting last one pending on connect |
200 | - c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) |
201 | + c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":2}]`) |
202 | stop() |
203 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
204 | c.Check(len(errCh), Equals, 0) |
205 | @@ -189,14 +212,14 @@ |
206 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
207 | Channel: "system", |
208 | ExpireOn: future, |
209 | - Data: json.RawMessage(`{"b": 1}`), |
210 | + Data: json.RawMessage(`{"img1/m1": 1}`), |
211 | }) |
212 | c.Assert(err, IsNil) |
213 | c.Assert(got, Matches, ".*ok.*") |
214 | got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
215 | Channel: "system", |
216 | ExpireOn: future, |
217 | - Data: json.RawMessage(`{"b": 2}`), |
218 | + Data: json.RawMessage(`{"img1/m1": 2}`), |
219 | }) |
220 | c.Assert(err, IsNil) |
221 | c.Assert(got, Matches, ".*ok.*") |
222 | @@ -205,7 +228,7 @@ |
223 | protocol.SystemChannelId: -10, |
224 | }) |
225 | // gettting pending on connect |
226 | - c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`) |
227 | + c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":1},{"img1/m1":2}]`) |
228 | stop() |
229 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
230 | c.Check(len(errCh), Equals, 0) |
231 | @@ -216,14 +239,14 @@ |
232 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
233 | Channel: "system", |
234 | ExpireOn: future, |
235 | - Data: json.RawMessage(`{"b": 1}`), |
236 | + Data: json.RawMessage(`{"img1/m1": 1}`), |
237 | }) |
238 | c.Assert(err, IsNil) |
239 | c.Assert(got, Matches, ".*ok.*") |
240 | got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
241 | Channel: "system", |
242 | ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339), |
243 | - Data: json.RawMessage(`{"b": 2}`), |
244 | + Data: json.RawMessage(`{"img1/m1": 2}`), |
245 | }) |
246 | c.Assert(err, IsNil) |
247 | c.Assert(got, Matches, ".*ok.*") |
248 | @@ -233,7 +256,7 @@ |
249 | |
250 | events, errCh, stop := s.StartClient(c, "DEVB", nil) |
251 | // gettting pending on connect |
252 | - c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`) |
253 | + c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":1}]`) |
254 | stop() |
255 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
256 | c.Check(len(errCh), Equals, 0) |
257 | |
258 | === modified file 'server/acceptance/suites/pingpong.go' |
259 | --- server/acceptance/suites/pingpong.go 2014-02-21 20:29:16 +0000 |
260 | +++ server/acceptance/suites/pingpong.go 2014-04-04 09:57:13 +0000 |
261 | @@ -34,7 +34,7 @@ |
262 | func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) { |
263 | errCh := make(chan error, 1) |
264 | events := make(chan string, 10) |
265 | - sess := testClientSession(s.ServerAddr, "DEVA", true) |
266 | + sess := testClientSession(s.ServerAddr, "DEVA", "m1", "img1", true) |
267 | err := sess.Dial() |
268 | c.Assert(err, IsNil) |
269 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
270 | @@ -68,7 +68,7 @@ |
271 | func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) { |
272 | errCh := make(chan error, 1) |
273 | events := make(chan string, 10) |
274 | - sess := testClientSession(s.ServerAddr, "DEVB", true) |
275 | + sess := testClientSession(s.ServerAddr, "DEVB", "m1", "img1", true) |
276 | err := sess.Dial() |
277 | c.Assert(err, IsNil) |
278 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
279 | |
280 | === modified file 'server/acceptance/suites/suite.go' |
281 | --- server/acceptance/suites/suite.go 2014-03-31 16:43:15 +0000 |
282 | +++ server/acceptance/suites/suite.go 2014-04-04 09:57:13 +0000 |
283 | @@ -46,7 +46,7 @@ |
284 | func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) { |
285 | errCh := make(chan error, 1) |
286 | cliEvents := make(chan string, 10) |
287 | - sess := testClientSession(h.ServerAddr, devId, false) |
288 | + sess := testClientSession(h.ServerAddr, devId, "m1", "img1", false) |
289 | sess.Levels = levels |
290 | err := sess.Dial() |
291 | c.Assert(err, IsNil) |
292 | @@ -127,7 +127,7 @@ |
293 | return string(body), err |
294 | } |
295 | |
296 | -func testClientSession(addr string, deviceId string, reportPings bool) *acceptance.ClientSession { |
297 | +func testClientSession(addr string, deviceId, model, imageChannel string, reportPings bool) *acceptance.ClientSession { |
298 | certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("../ssl/testing.cert")) |
299 | if err != nil { |
300 | panic(fmt.Sprintf("could not read ssl/testing.cert: %v", err)) |
301 | @@ -137,6 +137,8 @@ |
302 | ServerAddr: addr, |
303 | CertPEMBlock: certPEMBlock, |
304 | DeviceId: deviceId, |
305 | + Model: model, |
306 | + ImageChannel: imageChannel, |
307 | ReportPings: reportPings, |
308 | } |
309 | } |
310 | |
311 | === modified file 'server/broker/broker.go' |
312 | --- server/broker/broker.go 2014-02-21 16:04:44 +0000 |
313 | +++ server/broker/broker.go 2014-04-04 09:57:13 +0000 |
314 | @@ -49,6 +49,19 @@ |
315 | // LevelsMap is the type for holding channel levels for session. |
316 | type LevelsMap map[store.InternalChannelId]int64 |
317 | |
318 | +// GetInfoString helps retrivieng a string out of a protocol.ConnectMsg.Info |
319 | +func GetInfoString(msg *protocol.ConnectMsg, name, defaultVal string) (string, error) { |
320 | + v, ok := msg.Info[name] |
321 | + if !ok { |
322 | + return defaultVal, nil |
323 | + } |
324 | + s, ok := v.(string) |
325 | + if !ok { |
326 | + return "", ErrUnexpectedValue |
327 | + } |
328 | + return s, nil |
329 | +} |
330 | + |
331 | // BrokerSession holds broker session state. |
332 | type BrokerSession interface { |
333 | // SessionChannel returns the session control channel |
334 | @@ -56,6 +69,10 @@ |
335 | SessionChannel() <-chan Exchange |
336 | // DeviceIdentifier returns the device id string. |
337 | DeviceIdentifier() string |
338 | + // DeviceImageModel returns the device model. |
339 | + DeviceImageModel() string |
340 | + // DeviceImageChannel returns the device system image channel. |
341 | + DeviceImageChannel() string |
342 | // Levels returns the current channel levels for the session |
343 | Levels() LevelsMap |
344 | // ExchangeScratchArea returns the scratch area for exchanges. |
345 | @@ -71,6 +88,9 @@ |
346 | return fmt.Sprintf("session aborted (%s)", ea.Reason) |
347 | } |
348 | |
349 | +// Unexpect value in message |
350 | +var ErrUnexpectedValue = &ErrAbort{"unexpected value in message"} |
351 | + |
352 | // BrokerConfig gives access to the typical broker configuration. |
353 | type BrokerConfig interface { |
354 | // SessionQueueSize gives the session queue size. |
355 | |
356 | === modified file 'server/broker/broker_test.go' |
357 | --- server/broker/broker_test.go 2014-02-10 23:19:08 +0000 |
358 | +++ server/broker/broker_test.go 2014-04-04 09:57:13 +0000 |
359 | @@ -20,6 +20,8 @@ |
360 | "fmt" |
361 | |
362 | . "launchpad.net/gocheck" |
363 | + |
364 | + "launchpad.net/ubuntu-push/protocol" |
365 | ) |
366 | |
367 | type brokerSuite struct{} |
368 | @@ -30,3 +32,19 @@ |
369 | err := &ErrAbort{"expected FOO"} |
370 | c.Check(fmt.Sprintf("%s", err), Equals, "session aborted (expected FOO)") |
371 | } |
372 | + |
373 | +func (s *brokerSuite) TestGetInfoString(c *C) { |
374 | + connectMsg := &protocol.ConnectMsg{} |
375 | + v, err := GetInfoString(connectMsg, "foo", "?") |
376 | + c.Check(err, IsNil) |
377 | + c.Check(v, Equals, "?") |
378 | + |
379 | + connectMsg.Info = map[string]interface{}{"foo": "yay"} |
380 | + v, err = GetInfoString(connectMsg, "foo", "?") |
381 | + c.Check(err, IsNil) |
382 | + c.Check(v, Equals, "yay") |
383 | + |
384 | + connectMsg.Info["foo"] = 33 |
385 | + v, err = GetInfoString(connectMsg, "foo", "?") |
386 | + c.Check(err, Equals, ErrUnexpectedValue) |
387 | +} |
388 | |
389 | === modified file 'server/broker/exchanges.go' |
390 | --- server/broker/exchanges.go 2014-02-26 16:04:57 +0000 |
391 | +++ server/broker/exchanges.go 2014-04-04 09:57:13 +0000 |
392 | @@ -18,6 +18,7 @@ |
393 | |
394 | import ( |
395 | "encoding/json" |
396 | + "fmt" |
397 | |
398 | "launchpad.net/ubuntu-push/protocol" |
399 | "launchpad.net/ubuntu-push/server/store" |
400 | @@ -37,11 +38,24 @@ |
401 | ChanId store.InternalChannelId |
402 | TopLevel int64 |
403 | NotificationPayloads []json.RawMessage |
404 | + Decoded []map[string]interface{} |
405 | } |
406 | |
407 | // check interface already here |
408 | var _ Exchange = &BroadcastExchange{} |
409 | |
410 | +// Init ensures the BroadcastExchange is fully initialized for the sessions. |
411 | +func (sbe *BroadcastExchange) Init() { |
412 | + decoded := make([]map[string]interface{}, len(sbe.NotificationPayloads)) |
413 | + sbe.Decoded = decoded |
414 | + for i, p := range sbe.NotificationPayloads { |
415 | + err := json.Unmarshal(p, &decoded[i]) |
416 | + if err != nil { |
417 | + decoded[i] = nil |
418 | + } |
419 | + } |
420 | +} |
421 | + |
422 | func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage { |
423 | c := int64(len(payloads)) |
424 | if c == 0 { |
425 | @@ -58,6 +72,20 @@ |
426 | } |
427 | } |
428 | |
429 | +func channelFilter(tag string, chanId store.InternalChannelId, payloads []json.RawMessage, decoded []map[string]interface{}) []json.RawMessage { |
430 | + if len(payloads) != 0 && chanId == store.SystemInternalChannelId { |
431 | + decoded := decoded[len(decoded)-len(payloads):] |
432 | + filtered := make([]json.RawMessage, 0) |
433 | + for i, decoded1 := range decoded { |
434 | + if _, ok := decoded1[tag]; ok { |
435 | + filtered = append(filtered, payloads[i]) |
436 | + } |
437 | + } |
438 | + payloads = filtered |
439 | + } |
440 | + return payloads |
441 | +} |
442 | + |
443 | // Prepare session for a BROADCAST. |
444 | func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
445 | scratchArea := sess.ExchangeScratchArea() |
446 | @@ -65,6 +93,9 @@ |
447 | scratchArea.broadcastMsg.Type = "broadcast" |
448 | clientLevel := sess.Levels()[sbe.ChanId] |
449 | payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads) |
450 | + tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel()) |
451 | + payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded) |
452 | + |
453 | // xxx need an AppId as well, later |
454 | scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId) |
455 | scratchArea.broadcastMsg.TopLevel = sbe.TopLevel |
456 | |
457 | === modified file 'server/broker/exchanges_test.go' |
458 | --- server/broker/exchanges_test.go 2014-02-26 16:04:57 +0000 |
459 | +++ server/broker/exchanges_test.go 2014-04-04 09:57:13 +0000 |
460 | @@ -35,24 +35,45 @@ |
461 | |
462 | var _ = Suite(&exchangesSuite{}) |
463 | |
464 | +func (s *exchangesSuite) TestBroadcastExchangeInit(c *C) { |
465 | + exchg := &broker.BroadcastExchange{ |
466 | + ChanId: store.SystemInternalChannelId, |
467 | + TopLevel: 3, |
468 | + NotificationPayloads: []json.RawMessage{ |
469 | + json.RawMessage(`{"a":"x"}`), |
470 | + json.RawMessage(`[]`), |
471 | + json.RawMessage(`{"a":"y"}`), |
472 | + }, |
473 | + } |
474 | + exchg.Init() |
475 | + c.Check(exchg.Decoded, DeepEquals, []map[string]interface{}{ |
476 | + map[string]interface{}{"a": "x"}, |
477 | + nil, |
478 | + map[string]interface{}{"a": "y"}, |
479 | + }) |
480 | +} |
481 | + |
482 | func (s *exchangesSuite) TestBroadcastExchange(c *C) { |
483 | sess := &testing.TestBrokerSession{ |
484 | - LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
485 | + LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
486 | + Model: "m1", |
487 | + ImageChannel: "img1", |
488 | } |
489 | exchg := &broker.BroadcastExchange{ |
490 | ChanId: store.SystemInternalChannelId, |
491 | TopLevel: 3, |
492 | NotificationPayloads: []json.RawMessage{ |
493 | - json.RawMessage(`{"a":"x"}`), |
494 | - json.RawMessage(`{"a":"y"}`), |
495 | + json.RawMessage(`{"img1/m1":100}`), |
496 | + json.RawMessage(`{"img2/m2":200}`), |
497 | }, |
498 | } |
499 | + exchg.Init() |
500 | outMsg, inMsg, err := exchg.Prepare(sess) |
501 | c.Assert(err, IsNil) |
502 | // check |
503 | marshalled, err := json.Marshal(outMsg) |
504 | c.Assert(err, IsNil) |
505 | - c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) |
506 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img1/m1":100}]}`) |
507 | err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
508 | c.Assert(err, IsNil) |
509 | err = exchg.Acked(sess, true) |
510 | @@ -62,9 +83,11 @@ |
511 | |
512 | func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) { |
513 | sess := &testing.TestBrokerSession{ |
514 | - LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
515 | + LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
516 | + Model: "m1", |
517 | + ImageChannel: "img1", |
518 | } |
519 | - payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
520 | + payloadFmt := fmt.Sprintf(`{"img1/m1":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
521 | needsSplitting := make([]json.RawMessage, 32) |
522 | for i := 0; i < 32; i++ { |
523 | needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i)) |
524 | @@ -76,6 +99,7 @@ |
525 | TopLevel: topLevel, |
526 | NotificationPayloads: needsSplitting, |
527 | } |
528 | + exchg.Init() |
529 | outMsg, _, err := exchg.Prepare(sess) |
530 | c.Assert(err, IsNil) |
531 | parts := 0 |
532 | @@ -91,10 +115,11 @@ |
533 | ChanId: store.SystemInternalChannelId, |
534 | TopLevel: topLevel + 2, |
535 | NotificationPayloads: []json.RawMessage{ |
536 | - json.RawMessage(`{"a":"x"}`), |
537 | - json.RawMessage(`{"a":"y"}`), |
538 | + json.RawMessage(`{"img1/m1":"x"}`), |
539 | + json.RawMessage(`{"img1/m1":"y"}`), |
540 | }, |
541 | } |
542 | + exchg.Init() |
543 | outMsg, _, err = exchg.Prepare(sess) |
544 | c.Assert(err, IsNil) |
545 | done := outMsg.Split() // shouldn't panic |
546 | @@ -103,21 +128,24 @@ |
547 | |
548 | func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) { |
549 | sess := &testing.TestBrokerSession{ |
550 | - LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
551 | + LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
552 | + Model: "m1", |
553 | + ImageChannel: "img2", |
554 | } |
555 | exchg := &broker.BroadcastExchange{ |
556 | ChanId: store.SystemInternalChannelId, |
557 | TopLevel: 3, |
558 | NotificationPayloads: []json.RawMessage{ |
559 | - json.RawMessage(`{"a":"y"}`), |
560 | + json.RawMessage(`{"img2/m1":1}`), |
561 | }, |
562 | } |
563 | + exchg.Init() |
564 | outMsg, inMsg, err := exchg.Prepare(sess) |
565 | c.Assert(err, IsNil) |
566 | // check |
567 | marshalled, err := json.Marshal(outMsg) |
568 | c.Assert(err, IsNil) |
569 | - c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
570 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img2/m1":1}]}`) |
571 | err = json.Unmarshal([]byte(`{}`), inMsg) |
572 | c.Assert(err, IsNil) |
573 | err = exchg.Acked(sess, true) |
574 | @@ -130,23 +158,55 @@ |
575 | LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{ |
576 | store.SystemInternalChannelId: 2, |
577 | }), |
578 | + Model: "m1", |
579 | + ImageChannel: "img1", |
580 | } |
581 | exchg := &broker.BroadcastExchange{ |
582 | ChanId: store.SystemInternalChannelId, |
583 | TopLevel: 3, |
584 | NotificationPayloads: []json.RawMessage{ |
585 | - json.RawMessage(`{"a":"x"}`), |
586 | - json.RawMessage(`{"a":"y"}`), |
587 | - }, |
588 | - } |
589 | - outMsg, inMsg, err := exchg.Prepare(sess) |
590 | - c.Assert(err, IsNil) |
591 | - // check |
592 | - marshalled, err := json.Marshal(outMsg) |
593 | - c.Assert(err, IsNil) |
594 | - c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
595 | - err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
596 | - c.Assert(err, IsNil) |
597 | - err = exchg.Acked(sess, true) |
598 | - c.Assert(err, IsNil) |
599 | + json.RawMessage(`{"img1/m1":100}`), |
600 | + json.RawMessage(`{"img1/m1":101}`), |
601 | + }, |
602 | + } |
603 | + exchg.Init() |
604 | + outMsg, inMsg, err := exchg.Prepare(sess) |
605 | + c.Assert(err, IsNil) |
606 | + // check |
607 | + marshalled, err := json.Marshal(outMsg) |
608 | + c.Assert(err, IsNil) |
609 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img1/m1":101}]}`) |
610 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
611 | + c.Assert(err, IsNil) |
612 | + err = exchg.Acked(sess, true) |
613 | + c.Assert(err, IsNil) |
614 | +} |
615 | + |
616 | +func (s *exchangesSuite) TestBroadcastExchangeChannelFilter(c *C) { |
617 | + sess := &testing.TestBrokerSession{ |
618 | + LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
619 | + Model: "m1", |
620 | + ImageChannel: "img1", |
621 | + } |
622 | + exchg := &broker.BroadcastExchange{ |
623 | + ChanId: store.SystemInternalChannelId, |
624 | + TopLevel: 5, |
625 | + NotificationPayloads: []json.RawMessage{ |
626 | + json.RawMessage(`{"img1/m1":100}`), |
627 | + json.RawMessage(`{"img2/m2":200}`), |
628 | + json.RawMessage(`{"img1/m1":101}`), |
629 | + }, |
630 | + } |
631 | + exchg.Init() |
632 | + outMsg, inMsg, err := exchg.Prepare(sess) |
633 | + c.Assert(err, IsNil) |
634 | + // check |
635 | + marshalled, err := json.Marshal(outMsg) |
636 | + c.Assert(err, IsNil) |
637 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":5,"Payloads":[{"img1/m1":100},{"img1/m1":101}]}`) |
638 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
639 | + c.Assert(err, IsNil) |
640 | + err = exchg.Acked(sess, true) |
641 | + c.Assert(err, IsNil) |
642 | + c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5)) |
643 | } |
644 | |
645 | === modified file 'server/broker/exchg_impl_test.go' |
646 | --- server/broker/exchg_impl_test.go 2014-02-10 23:19:08 +0000 |
647 | +++ server/broker/exchg_impl_test.go 2014-04-04 09:57:13 +0000 |
648 | @@ -20,6 +20,8 @@ |
649 | "encoding/json" |
650 | |
651 | . "launchpad.net/gocheck" |
652 | + |
653 | + "launchpad.net/ubuntu-push/server/store" |
654 | ) |
655 | |
656 | type exchangesImplSuite struct{} |
657 | @@ -56,3 +58,31 @@ |
658 | res = filterByLevel(5, 10, nil) |
659 | c.Check(len(res), Equals, 0) |
660 | } |
661 | + |
662 | +func (s *exchangesImplSuite) TestChannelFilter(c *C) { |
663 | + payloads := []json.RawMessage{ |
664 | + json.RawMessage(`{"a/x": 3}`), |
665 | + json.RawMessage(`{"b/x": 4}`), |
666 | + json.RawMessage(`{"a/y": 5}`), |
667 | + json.RawMessage(`{"a/x": 6}`), |
668 | + } |
669 | + decoded := make([]map[string]interface{}, 4) |
670 | + for i, p := range payloads { |
671 | + err := json.Unmarshal(p, &decoded[i]) |
672 | + c.Assert(err, IsNil) |
673 | + } |
674 | + |
675 | + other := store.InternalChannelId("1") |
676 | + |
677 | + c.Check(channelFilter("", store.SystemInternalChannelId, nil, nil), IsNil) |
678 | + c.Check(channelFilter("", other, payloads[1:], decoded), DeepEquals, payloads[1:]) |
679 | + |
680 | + // use tag when channel is the sytem channel |
681 | + |
682 | + c.Check(channelFilter("c/z", store.SystemInternalChannelId, payloads, decoded), HasLen, 0) |
683 | + |
684 | + c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]}) |
685 | + |
686 | + c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]}) |
687 | + |
688 | +} |
689 | |
690 | === modified file 'server/broker/simple/simple.go' |
691 | --- server/broker/simple/simple.go 2014-02-21 16:04:44 +0000 |
692 | +++ server/broker/simple/simple.go 2014-04-04 09:57:13 +0000 |
693 | @@ -46,11 +46,13 @@ |
694 | |
695 | // simpleBrokerSession represents a session in the broker. |
696 | type simpleBrokerSession struct { |
697 | - registered bool |
698 | - deviceId string |
699 | - done chan bool |
700 | - exchanges chan broker.Exchange |
701 | - levels broker.LevelsMap |
702 | + registered bool |
703 | + deviceId string |
704 | + model string |
705 | + imageChannel string |
706 | + done chan bool |
707 | + exchanges chan broker.Exchange |
708 | + levels broker.LevelsMap |
709 | // for exchanges |
710 | exchgScratch broker.ExchangesScratchArea |
711 | } |
712 | @@ -75,6 +77,14 @@ |
713 | return sess.deviceId |
714 | } |
715 | |
716 | +func (sess *simpleBrokerSession) DeviceImageModel() string { |
717 | + return sess.model |
718 | +} |
719 | + |
720 | +func (sess *simpleBrokerSession) DeviceImageChannel() string { |
721 | + return sess.imageChannel |
722 | +} |
723 | + |
724 | func (sess *simpleBrokerSession) Levels() broker.LevelsMap { |
725 | return sess.levels |
726 | } |
727 | @@ -147,6 +157,7 @@ |
728 | TopLevel: topLevel, |
729 | NotificationPayloads: payloads, |
730 | } |
731 | + broadcastExchg.Init() |
732 | sess.exchanges <- broadcastExchg |
733 | } |
734 | } |
735 | @@ -157,6 +168,14 @@ |
736 | // pending notifications as well. |
737 | func (b *SimpleBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) { |
738 | // xxx sanity check DeviceId |
739 | + model, err := broker.GetInfoString(connect, "device", "?") |
740 | + if err != nil { |
741 | + return nil, err |
742 | + } |
743 | + imageChannel, err := broker.GetInfoString(connect, "channel", "?") |
744 | + if err != nil { |
745 | + return nil, err |
746 | + } |
747 | levels := map[store.InternalChannelId]int64{} |
748 | for hexId, v := range connect.Levels { |
749 | id, err := store.HexToInternalChannelId(hexId) |
750 | @@ -166,14 +185,16 @@ |
751 | levels[id] = v |
752 | } |
753 | sess := &simpleBrokerSession{ |
754 | - deviceId: connect.DeviceId, |
755 | - done: make(chan bool), |
756 | - exchanges: make(chan broker.Exchange, b.sessionQueueSize), |
757 | - levels: levels, |
758 | + deviceId: connect.DeviceId, |
759 | + model: model, |
760 | + imageChannel: imageChannel, |
761 | + done: make(chan bool), |
762 | + exchanges: make(chan broker.Exchange, b.sessionQueueSize), |
763 | + levels: levels, |
764 | } |
765 | b.sessionCh <- sess |
766 | <-sess.done |
767 | - err := b.feedPending(sess) |
768 | + err = b.feedPending(sess) |
769 | if err != nil { |
770 | return nil, err |
771 | } |
772 | @@ -219,6 +240,7 @@ |
773 | TopLevel: topLevel, |
774 | NotificationPayloads: payloads, |
775 | } |
776 | + broadcastExchg.Init() |
777 | for _, sess := range b.registry { |
778 | sess.exchanges <- broadcastExchg |
779 | } |
780 | |
781 | === modified file 'server/broker/simple/simple_test.go' |
782 | --- server/broker/simple/simple_test.go 2014-02-10 23:29:53 +0000 |
783 | +++ server/broker/simple/simple_test.go 2014-04-04 09:57:13 +0000 |
784 | @@ -48,6 +48,7 @@ |
785 | sto := store.NewInMemoryPendingStore() |
786 | muchLater := time.Now().Add(10 * time.Minute) |
787 | notification1 := json.RawMessage(`{"m": "M"}`) |
788 | + decoded1 := map[string]interface{}{"m": "M"} |
789 | sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) |
790 | b := NewSimpleBroker(sto, testBrokerConfig, nil) |
791 | sess := &simpleBrokerSession{ |
792 | @@ -60,6 +61,7 @@ |
793 | ChanId: store.SystemInternalChannelId, |
794 | TopLevel: 1, |
795 | NotificationPayloads: []json.RawMessage{notification1}, |
796 | + Decoded: []map[string]interface{}{decoded1}, |
797 | }) |
798 | } |
799 | |
800 | |
801 | === modified file 'server/broker/testing/impls.go' |
802 | --- server/broker/testing/impls.go 2014-01-23 20:13:22 +0000 |
803 | +++ server/broker/testing/impls.go 2014-04-04 09:57:13 +0000 |
804 | @@ -24,6 +24,8 @@ |
805 | // Test implementation of BrokerSession. |
806 | type TestBrokerSession struct { |
807 | DeviceId string |
808 | + Model string |
809 | + ImageChannel string |
810 | Exchanges chan broker.Exchange |
811 | LevelsMap broker.LevelsMap |
812 | exchgScratch broker.ExchangesScratchArea |
813 | @@ -33,6 +35,14 @@ |
814 | return tbs.DeviceId |
815 | } |
816 | |
817 | +func (tbs *TestBrokerSession) DeviceImageModel() string { |
818 | + return tbs.Model |
819 | +} |
820 | + |
821 | +func (tbs *TestBrokerSession) DeviceImageChannel() string { |
822 | + return tbs.ImageChannel |
823 | +} |
824 | + |
825 | func (tbs *TestBrokerSession) SessionChannel() <-chan broker.Exchange { |
826 | return tbs.Exchanges |
827 | } |
828 | |
829 | === modified file 'server/broker/testsuite/suite.go' |
830 | --- server/broker/testsuite/suite.go 2014-03-19 23:46:18 +0000 |
831 | +++ server/broker/testsuite/suite.go 2014-04-04 09:57:13 +0000 |
832 | @@ -81,10 +81,20 @@ |
833 | b := s.MakeBroker(sto, testBrokerConfig, nil) |
834 | b.Start() |
835 | defer b.Stop() |
836 | - sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"0": 5}}) |
837 | + sess, err := b.Register(&protocol.ConnectMsg{ |
838 | + Type: "connect", |
839 | + DeviceId: "dev-1", |
840 | + Levels: map[string]int64{"0": 5}, |
841 | + Info: map[string]interface{}{ |
842 | + "device": "model", |
843 | + "channel": "daily", |
844 | + }, |
845 | + }) |
846 | c.Assert(err, IsNil) |
847 | c.Assert(s.RevealSession(b, "dev-1"), Equals, sess) |
848 | c.Assert(sess.DeviceIdentifier(), Equals, "dev-1") |
849 | + c.Check(sess.DeviceImageModel(), Equals, "model") |
850 | + c.Check(sess.DeviceImageChannel(), Equals, "daily") |
851 | c.Assert(sess.ExchangeScratchArea(), Not(IsNil)) |
852 | c.Check(sess.Levels(), DeepEquals, broker.LevelsMap(map[store.InternalChannelId]int64{ |
853 | store.SystemInternalChannelId: 5, |
854 | @@ -105,6 +115,22 @@ |
855 | c.Check(err, FitsTypeOf, &broker.ErrAbort{}) |
856 | } |
857 | |
858 | +func (s *CommonBrokerSuite) TestRegistrationInfoErrors(c *C) { |
859 | + sto := store.NewInMemoryPendingStore() |
860 | + b := s.MakeBroker(sto, testBrokerConfig, nil) |
861 | + b.Start() |
862 | + defer b.Stop() |
863 | + info := map[string]interface{}{ |
864 | + "device": -1, |
865 | + } |
866 | + _, err := b.Register(&protocol.ConnectMsg{Type: "connect", Info: info}) |
867 | + c.Check(err, Equals, broker.ErrUnexpectedValue) |
868 | + info["device"] = "m" |
869 | + info["channel"] = -1 |
870 | + _, err = b.Register(&protocol.ConnectMsg{Type: "connect", Info: info}) |
871 | + c.Check(err, Equals, broker.ErrUnexpectedValue) |
872 | +} |
873 | + |
874 | func (s *CommonBrokerSuite) TestRegistrationFeedPending(c *C) { |
875 | sto := store.NewInMemoryPendingStore() |
876 | notification1 := json.RawMessage(`{"m": "M"}`) |
877 | @@ -149,6 +175,7 @@ |
878 | func (s *CommonBrokerSuite) TestBroadcast(c *C) { |
879 | sto := store.NewInMemoryPendingStore() |
880 | notification1 := json.RawMessage(`{"m": "M"}`) |
881 | + decoded1 := map[string]interface{}{"m": "M"} |
882 | b := s.MakeBroker(sto, testBrokerConfig, nil) |
883 | b.Start() |
884 | defer b.Stop() |
885 | @@ -168,6 +195,7 @@ |
886 | ChanId: store.SystemInternalChannelId, |
887 | TopLevel: 1, |
888 | NotificationPayloads: []json.RawMessage{notification1}, |
889 | + Decoded: []map[string]interface{}{decoded1}, |
890 | }) |
891 | } |
892 | select { |
893 | @@ -178,6 +206,7 @@ |
894 | ChanId: store.SystemInternalChannelId, |
895 | TopLevel: 1, |
896 | NotificationPayloads: []json.RawMessage{notification1}, |
897 | + Decoded: []map[string]interface{}{decoded1}, |
898 | }) |
899 | } |
900 | } |
WOo! :)