Merge lp:~pedronis/ubuntu-push/use-info into lp:ubuntu-push/automatic

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 109
Merged at revision: 108
Proposed branch: lp:~pedronis/ubuntu-push/use-info
Merge into: lp:ubuntu-push/automatic
Diff against target: 900 lines (+325/-67)
14 files modified
server/acceptance/acceptanceclient.go (+6/-0)
server/acceptance/cmd/acceptanceclient.go (+7/-2)
server/acceptance/suites/broadcast.go (+48/-25)
server/acceptance/suites/pingpong.go (+2/-2)
server/acceptance/suites/suite.go (+4/-2)
server/broker/broker.go (+20/-0)
server/broker/broker_test.go (+18/-0)
server/broker/exchanges.go (+31/-0)
server/broker/exchanges_test.go (+85/-25)
server/broker/exchg_impl_test.go (+30/-0)
server/broker/simple/simple.go (+32/-10)
server/broker/simple/simple_test.go (+2/-0)
server/broker/testing/impls.go (+10/-0)
server/broker/testsuite/suite.go (+30/-1)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/use-info
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+214121@code.launchpad.net

Commit message

use image informations sent by the client to filter the system channel broadcasts

Description of the change

use image informations sent by the client to filter the system channel broadcasts,

assume there will be an entry in them with key "IMAGE-CHANNEL/DEVICE"

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

WOo! :)

review: Approve
Revision history for this message
Ubuntu One Auto Pilot (otto-pilot) wrote :
Download full text (7.1 KiB)

The attempt to merge lp:~pedronis/ubuntu-push/use-info into lp:ubuntu-push/automatic failed. Below is the output from the failed tests.

mkdir -p /mnt/tarmac/cache/ubuntu-push-automatic/go-ws/bin
mkdir -p /mnt/tarmac/cache/ubuntu-push-automatic/go-ws/pkg
go get -u launchpad.net/godeps
go get -d -u launchpad.net/gocheck launchpad.net/go-dbus/v1 launchpad.net/go-xdg/v0 code.google.com/p/gosqlite/sqlite3
/mnt/tarmac/cache/ubuntu-push-automatic/go-ws/bin/godeps -u dependencies.tsv
"/mnt/tarmac/cache/ubuntu-push-automatic/go-ws/src/launchpad.net/gocheck" now at <email address hidden>
go install launchpad.net/gocheck launchpad.net/go-dbus/v1 launchpad.net/go-xdg/v0 code.google.com/p/gosqlite/sqlite3
go test launchpad.net/ubuntu-push launchpad.net/ubuntu-push/bus launchpad.net/ubuntu-push/bus/connectivity launchpad.net/ubuntu-push/bus/networkmanager launchpad.net/ubuntu-push/bus/notifications launchpad.net/ubuntu-push/bus/systemimage launchpad.net/ubuntu-push/bus/testing launchpad.net/ubuntu-push/bus/urldispatcher launchpad.net/ubuntu-push/client launchpad.net/ubuntu-push/client/gethosts launchpad.net/ubuntu-push/client/session launchpad.net/ubuntu-push/client/session/levelmap launchpad.net/ubuntu-push/config launchpad.net/ubuntu-push/external/murmur3 launchpad.net/ubuntu-push/logger launchpad.net/ubuntu-push/protocol launchpad.net/ubuntu-push/server launchpad.net/ubuntu-push/server/api launchpad.net/ubuntu-push/server/broker launchpad.net/ubuntu-push/server/broker/simple launchpad.net/ubuntu-push/server/broker/testing launchpad.net/ubuntu-push/server/broker/testsuite launchpad.net/ubuntu-push/server/dev launchpad.net/ubuntu-push/server/listener launchpad.net/ubuntu-push/server/session launchpad.net/ubuntu-push/server/store launchpad.net/ubuntu-push/testing launchpad.net/ubuntu-push/testing/condition launchpad.net/ubuntu-push/util launchpad.net/ubuntu-push/whoopsie launchpad.net/ubuntu-push/whoopsie/identifier launchpad.net/ubuntu-push/whoopsie/identifier/testing
? launchpad.net/ubuntu-push [no test files]
ok launchpad.net/ubuntu-push/bus 0.009s
ok launchpad.net/ubuntu-push/bus/connectivity 1.153s
ok launchpad.net/ubuntu-push/bus/networkmanager 0.099s
ok launchpad.net/ubuntu-push/bus/notifications 0.016s
ok launchpad.net/ubuntu-push/bus/systemimage 0.006s
ok launchpad.net/ubuntu-push/bus/testing 0.017s
ok launchpad.net/ubuntu-push/bus/urldispatcher 0.007s
ok launchpad.net/ubuntu-push/client 0.073s
ok launchpad.net/ubuntu-push/client/gethosts 0.710s
ok launchpad.net/ubuntu-push/client/session 0.130s
ok launchpad.net/ubuntu-push/client/session/levelmap 0.095s
ok launchpad.net/ubuntu-push/config 0.014s
ok launchpad.net/ubuntu-push/external/murmur3 0.011s
ok launchpad.net/ubuntu-push/logger 0.007s
ok launchpad.net/ubuntu-push/protocol 0.010s
ok launchpad.net/ubuntu-push/server 0.043s
ok launchpad.net/ubuntu-push/server/api 0.016s
ok launchpad.net/ubuntu-push/server/broker 0.009s
ok launchpad.net/ubuntu-push/server/broker/simple 0.006s
? launchpad.net/ubuntu-push/server/broker/testing [no test files]
? launchpad.net/ubuntu-push/server/broker/testsuite [no test files]
? launc...

Read more...

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'server/acceptance/acceptanceclient.go'
2--- server/acceptance/acceptanceclient.go 2014-02-21 16:17:28 +0000
3+++ server/acceptance/acceptanceclient.go 2014-04-04 09:57:13 +0000
4@@ -35,6 +35,8 @@
5 type ClientSession struct {
6 // configuration
7 DeviceId string
8+ Model string
9+ ImageChannel string
10 ServerAddr string
11 ExchangeTimeout time.Duration
12 CertPEMBlock []byte
13@@ -86,6 +88,10 @@
14 Type: "connect",
15 DeviceId: sess.DeviceId,
16 Levels: sess.Levels,
17+ Info: map[string]interface{}{
18+ "device": sess.Model,
19+ "channel": sess.ImageChannel,
20+ },
21 })
22 if err != nil {
23 return err
24
25=== modified file 'server/acceptance/cmd/acceptanceclient.go'
26--- server/acceptance/cmd/acceptanceclient.go 2014-02-21 16:17:28 +0000
27+++ server/acceptance/cmd/acceptanceclient.go 2014-04-04 09:57:13 +0000
28@@ -30,6 +30,8 @@
29 var (
30 insecureFlag = flag.Bool("insecure", false, "disable checking of server certificate and hostname")
31 reportPingsFlag = flag.Bool("reportPings", true, "report each Ping from the server")
32+ deviceModel = flag.String("model", "?", "device image model")
33+ imageChannel = flag.String("imageChannel", "?", "image channel")
34 )
35
36 type configuration struct {
37@@ -64,9 +66,12 @@
38 ServerAddr: cfg.Addr.HostPort(),
39 DeviceId: flag.Arg(1),
40 // flags
41- ReportPings: *reportPingsFlag,
42- Insecure: *insecureFlag,
43+ Model: *deviceModel,
44+ ImageChannel: *imageChannel,
45+ ReportPings: *reportPingsFlag,
46+ Insecure: *insecureFlag,
47 }
48+ log.Printf("with: %#v", session)
49 session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName))
50 if err != nil {
51 log.Fatalf("reading CertPEMFile: %v", err)
52
53=== modified file 'server/acceptance/suites/broadcast.go'
54--- server/acceptance/suites/broadcast.go 2014-02-21 21:39:54 +0000
55+++ server/acceptance/suites/broadcast.go 2014-04-04 09:57:13 +0000
56@@ -41,11 +41,34 @@
57 got, err := s.PostRequest("/broadcast", &api.Broadcast{
58 Channel: "system",
59 ExpireOn: future,
60- Data: json.RawMessage(`{"n": 42}`),
61- })
62- c.Assert(err, IsNil)
63- c.Assert(got, Matches, ".*ok.*")
64- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
65+ Data: json.RawMessage(`{"img1/m1": 42}`),
66+ })
67+ c.Assert(err, IsNil)
68+ c.Assert(got, Matches, ".*ok.*")
69+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`)
70+ stop()
71+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
72+ c.Check(len(errCh), Equals, 0)
73+}
74+
75+func (s *BroadcastAcceptanceSuite) TestBroadcastToConnectedChannelFilter(c *C) {
76+ events, errCh, stop := s.StartClient(c, "DEVB", nil)
77+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
78+ Channel: "system",
79+ ExpireOn: future,
80+ Data: json.RawMessage(`{"img1/m2": 10}`),
81+ })
82+ c.Assert(err, IsNil)
83+ got, err = s.PostRequest("/broadcast", &api.Broadcast{
84+ Channel: "system",
85+ ExpireOn: future,
86+ Data: json.RawMessage(`{"img1/m1": 20}`),
87+ })
88+ c.Assert(err, IsNil)
89+ c.Assert(got, Matches, ".*ok.*")
90+ // xxx don't send this one
91+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[]`)
92+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":20}]`)
93 stop()
94 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
95 c.Check(len(errCh), Equals, 0)
96@@ -56,14 +79,14 @@
97 got, err := s.PostRequest("/broadcast", &api.Broadcast{
98 Channel: "system",
99 ExpireOn: future,
100- Data: json.RawMessage(`{"b": 1}`),
101+ Data: json.RawMessage(`{"img1/m1": 1}`),
102 })
103 c.Assert(err, IsNil)
104 c.Assert(got, Matches, ".*ok.*")
105
106 events, errCh, stop := s.StartClient(c, "DEVB", nil)
107 // gettting pending on connect
108- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
109+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":1}]`)
110 stop()
111 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
112 c.Check(len(errCh), Equals, 0)
113@@ -71,7 +94,7 @@
114
115 func (s *BroadcastAcceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) {
116 // send bunch of broadcasts that will be pending
117- payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
118+ payloadFmt := fmt.Sprintf(`{"img1/m1":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
119 for i := 0; i < 32; i++ {
120 got, err := s.PostRequest("/broadcast", &api.Broadcast{
121 Channel: "system",
122@@ -84,7 +107,7 @@
123
124 events, errCh, stop := s.StartClient(c, "DEVC", nil)
125 // gettting pending on connect
126- c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`)
127+ c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"img1/m1":0,.*`)
128 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)
129 stop()
130 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
131@@ -100,12 +123,12 @@
132 got, err := s.PostRequest("/broadcast", &api.Broadcast{
133 Channel: "system",
134 ExpireOn: future,
135- Data: json.RawMessage(`{"n": 42}`),
136+ Data: json.RawMessage(`{"img1/m1": 42}`),
137 })
138 c.Assert(err, IsNil)
139 c.Assert(got, Matches, ".*ok.*")
140- c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
141- c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
142+ c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`)
143+ c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`)
144 stop1()
145 stop2()
146 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
147@@ -119,11 +142,11 @@
148 got, err := s.PostRequest("/broadcast", &api.Broadcast{
149 Channel: "system",
150 ExpireOn: future,
151- Data: json.RawMessage(`{"b": 1}`),
152+ Data: json.RawMessage(`{"img1/m1": 1}`),
153 })
154 c.Assert(err, IsNil)
155 c.Assert(got, Matches, ".*ok.*")
156- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
157+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":1}]`)
158 stop()
159 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
160 c.Check(len(errCh), Equals, 0)
161@@ -131,7 +154,7 @@
162 got, err = s.PostRequest("/broadcast", &api.Broadcast{
163 Channel: "system",
164 ExpireOn: future,
165- Data: json.RawMessage(`{"b": 2}`),
166+ Data: json.RawMessage(`{"img1/m1": 2}`),
167 })
168 c.Assert(err, IsNil)
169 c.Assert(got, Matches, ".*ok.*")
170@@ -139,7 +162,7 @@
171 events, errCh, stop = s.StartClient(c, "DEVD", map[string]int64{
172 protocol.SystemChannelId: 1,
173 })
174- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
175+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":2}]`)
176 stop()
177 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
178 c.Check(len(errCh), Equals, 0)
179@@ -150,14 +173,14 @@
180 got, err := s.PostRequest("/broadcast", &api.Broadcast{
181 Channel: "system",
182 ExpireOn: future,
183- Data: json.RawMessage(`{"b": 1}`),
184+ Data: json.RawMessage(`{"img1/m1": 1}`),
185 })
186 c.Assert(err, IsNil)
187 c.Assert(got, Matches, ".*ok.*")
188 got, err = s.PostRequest("/broadcast", &api.Broadcast{
189 Channel: "system",
190 ExpireOn: future,
191- Data: json.RawMessage(`{"b": 2}`),
192+ Data: json.RawMessage(`{"img1/m1": 2}`),
193 })
194 c.Assert(err, IsNil)
195 c.Assert(got, Matches, ".*ok.*")
196@@ -166,7 +189,7 @@
197 protocol.SystemChannelId: 10,
198 })
199 // gettting last one pending on connect
200- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
201+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":2}]`)
202 stop()
203 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
204 c.Check(len(errCh), Equals, 0)
205@@ -189,14 +212,14 @@
206 got, err := s.PostRequest("/broadcast", &api.Broadcast{
207 Channel: "system",
208 ExpireOn: future,
209- Data: json.RawMessage(`{"b": 1}`),
210+ Data: json.RawMessage(`{"img1/m1": 1}`),
211 })
212 c.Assert(err, IsNil)
213 c.Assert(got, Matches, ".*ok.*")
214 got, err = s.PostRequest("/broadcast", &api.Broadcast{
215 Channel: "system",
216 ExpireOn: future,
217- Data: json.RawMessage(`{"b": 2}`),
218+ Data: json.RawMessage(`{"img1/m1": 2}`),
219 })
220 c.Assert(err, IsNil)
221 c.Assert(got, Matches, ".*ok.*")
222@@ -205,7 +228,7 @@
223 protocol.SystemChannelId: -10,
224 })
225 // gettting pending on connect
226- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`)
227+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":1},{"img1/m1":2}]`)
228 stop()
229 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
230 c.Check(len(errCh), Equals, 0)
231@@ -216,14 +239,14 @@
232 got, err := s.PostRequest("/broadcast", &api.Broadcast{
233 Channel: "system",
234 ExpireOn: future,
235- Data: json.RawMessage(`{"b": 1}`),
236+ Data: json.RawMessage(`{"img1/m1": 1}`),
237 })
238 c.Assert(err, IsNil)
239 c.Assert(got, Matches, ".*ok.*")
240 got, err = s.PostRequest("/broadcast", &api.Broadcast{
241 Channel: "system",
242 ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339),
243- Data: json.RawMessage(`{"b": 2}`),
244+ Data: json.RawMessage(`{"img1/m1": 2}`),
245 })
246 c.Assert(err, IsNil)
247 c.Assert(got, Matches, ".*ok.*")
248@@ -233,7 +256,7 @@
249
250 events, errCh, stop := s.StartClient(c, "DEVB", nil)
251 // gettting pending on connect
252- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`)
253+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":1}]`)
254 stop()
255 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
256 c.Check(len(errCh), Equals, 0)
257
258=== modified file 'server/acceptance/suites/pingpong.go'
259--- server/acceptance/suites/pingpong.go 2014-02-21 20:29:16 +0000
260+++ server/acceptance/suites/pingpong.go 2014-04-04 09:57:13 +0000
261@@ -34,7 +34,7 @@
262 func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) {
263 errCh := make(chan error, 1)
264 events := make(chan string, 10)
265- sess := testClientSession(s.ServerAddr, "DEVA", true)
266+ sess := testClientSession(s.ServerAddr, "DEVA", "m1", "img1", true)
267 err := sess.Dial()
268 c.Assert(err, IsNil)
269 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
270@@ -68,7 +68,7 @@
271 func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) {
272 errCh := make(chan error, 1)
273 events := make(chan string, 10)
274- sess := testClientSession(s.ServerAddr, "DEVB", true)
275+ sess := testClientSession(s.ServerAddr, "DEVB", "m1", "img1", true)
276 err := sess.Dial()
277 c.Assert(err, IsNil)
278 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
279
280=== modified file 'server/acceptance/suites/suite.go'
281--- server/acceptance/suites/suite.go 2014-03-31 16:43:15 +0000
282+++ server/acceptance/suites/suite.go 2014-04-04 09:57:13 +0000
283@@ -46,7 +46,7 @@
284 func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
285 errCh := make(chan error, 1)
286 cliEvents := make(chan string, 10)
287- sess := testClientSession(h.ServerAddr, devId, false)
288+ sess := testClientSession(h.ServerAddr, devId, "m1", "img1", false)
289 sess.Levels = levels
290 err := sess.Dial()
291 c.Assert(err, IsNil)
292@@ -127,7 +127,7 @@
293 return string(body), err
294 }
295
296-func testClientSession(addr string, deviceId string, reportPings bool) *acceptance.ClientSession {
297+func testClientSession(addr string, deviceId, model, imageChannel string, reportPings bool) *acceptance.ClientSession {
298 certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("../ssl/testing.cert"))
299 if err != nil {
300 panic(fmt.Sprintf("could not read ssl/testing.cert: %v", err))
301@@ -137,6 +137,8 @@
302 ServerAddr: addr,
303 CertPEMBlock: certPEMBlock,
304 DeviceId: deviceId,
305+ Model: model,
306+ ImageChannel: imageChannel,
307 ReportPings: reportPings,
308 }
309 }
310
311=== modified file 'server/broker/broker.go'
312--- server/broker/broker.go 2014-02-21 16:04:44 +0000
313+++ server/broker/broker.go 2014-04-04 09:57:13 +0000
314@@ -49,6 +49,19 @@
315 // LevelsMap is the type for holding channel levels for session.
316 type LevelsMap map[store.InternalChannelId]int64
317
318+// GetInfoString helps retrivieng a string out of a protocol.ConnectMsg.Info
319+func GetInfoString(msg *protocol.ConnectMsg, name, defaultVal string) (string, error) {
320+ v, ok := msg.Info[name]
321+ if !ok {
322+ return defaultVal, nil
323+ }
324+ s, ok := v.(string)
325+ if !ok {
326+ return "", ErrUnexpectedValue
327+ }
328+ return s, nil
329+}
330+
331 // BrokerSession holds broker session state.
332 type BrokerSession interface {
333 // SessionChannel returns the session control channel
334@@ -56,6 +69,10 @@
335 SessionChannel() <-chan Exchange
336 // DeviceIdentifier returns the device id string.
337 DeviceIdentifier() string
338+ // DeviceImageModel returns the device model.
339+ DeviceImageModel() string
340+ // DeviceImageChannel returns the device system image channel.
341+ DeviceImageChannel() string
342 // Levels returns the current channel levels for the session
343 Levels() LevelsMap
344 // ExchangeScratchArea returns the scratch area for exchanges.
345@@ -71,6 +88,9 @@
346 return fmt.Sprintf("session aborted (%s)", ea.Reason)
347 }
348
349+// Unexpect value in message
350+var ErrUnexpectedValue = &ErrAbort{"unexpected value in message"}
351+
352 // BrokerConfig gives access to the typical broker configuration.
353 type BrokerConfig interface {
354 // SessionQueueSize gives the session queue size.
355
356=== modified file 'server/broker/broker_test.go'
357--- server/broker/broker_test.go 2014-02-10 23:19:08 +0000
358+++ server/broker/broker_test.go 2014-04-04 09:57:13 +0000
359@@ -20,6 +20,8 @@
360 "fmt"
361
362 . "launchpad.net/gocheck"
363+
364+ "launchpad.net/ubuntu-push/protocol"
365 )
366
367 type brokerSuite struct{}
368@@ -30,3 +32,19 @@
369 err := &ErrAbort{"expected FOO"}
370 c.Check(fmt.Sprintf("%s", err), Equals, "session aborted (expected FOO)")
371 }
372+
373+func (s *brokerSuite) TestGetInfoString(c *C) {
374+ connectMsg := &protocol.ConnectMsg{}
375+ v, err := GetInfoString(connectMsg, "foo", "?")
376+ c.Check(err, IsNil)
377+ c.Check(v, Equals, "?")
378+
379+ connectMsg.Info = map[string]interface{}{"foo": "yay"}
380+ v, err = GetInfoString(connectMsg, "foo", "?")
381+ c.Check(err, IsNil)
382+ c.Check(v, Equals, "yay")
383+
384+ connectMsg.Info["foo"] = 33
385+ v, err = GetInfoString(connectMsg, "foo", "?")
386+ c.Check(err, Equals, ErrUnexpectedValue)
387+}
388
389=== modified file 'server/broker/exchanges.go'
390--- server/broker/exchanges.go 2014-02-26 16:04:57 +0000
391+++ server/broker/exchanges.go 2014-04-04 09:57:13 +0000
392@@ -18,6 +18,7 @@
393
394 import (
395 "encoding/json"
396+ "fmt"
397
398 "launchpad.net/ubuntu-push/protocol"
399 "launchpad.net/ubuntu-push/server/store"
400@@ -37,11 +38,24 @@
401 ChanId store.InternalChannelId
402 TopLevel int64
403 NotificationPayloads []json.RawMessage
404+ Decoded []map[string]interface{}
405 }
406
407 // check interface already here
408 var _ Exchange = &BroadcastExchange{}
409
410+// Init ensures the BroadcastExchange is fully initialized for the sessions.
411+func (sbe *BroadcastExchange) Init() {
412+ decoded := make([]map[string]interface{}, len(sbe.NotificationPayloads))
413+ sbe.Decoded = decoded
414+ for i, p := range sbe.NotificationPayloads {
415+ err := json.Unmarshal(p, &decoded[i])
416+ if err != nil {
417+ decoded[i] = nil
418+ }
419+ }
420+}
421+
422 func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
423 c := int64(len(payloads))
424 if c == 0 {
425@@ -58,6 +72,20 @@
426 }
427 }
428
429+func channelFilter(tag string, chanId store.InternalChannelId, payloads []json.RawMessage, decoded []map[string]interface{}) []json.RawMessage {
430+ if len(payloads) != 0 && chanId == store.SystemInternalChannelId {
431+ decoded := decoded[len(decoded)-len(payloads):]
432+ filtered := make([]json.RawMessage, 0)
433+ for i, decoded1 := range decoded {
434+ if _, ok := decoded1[tag]; ok {
435+ filtered = append(filtered, payloads[i])
436+ }
437+ }
438+ payloads = filtered
439+ }
440+ return payloads
441+}
442+
443 // Prepare session for a BROADCAST.
444 func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
445 scratchArea := sess.ExchangeScratchArea()
446@@ -65,6 +93,9 @@
447 scratchArea.broadcastMsg.Type = "broadcast"
448 clientLevel := sess.Levels()[sbe.ChanId]
449 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
450+ tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel())
451+ payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded)
452+
453 // xxx need an AppId as well, later
454 scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId)
455 scratchArea.broadcastMsg.TopLevel = sbe.TopLevel
456
457=== modified file 'server/broker/exchanges_test.go'
458--- server/broker/exchanges_test.go 2014-02-26 16:04:57 +0000
459+++ server/broker/exchanges_test.go 2014-04-04 09:57:13 +0000
460@@ -35,24 +35,45 @@
461
462 var _ = Suite(&exchangesSuite{})
463
464+func (s *exchangesSuite) TestBroadcastExchangeInit(c *C) {
465+ exchg := &broker.BroadcastExchange{
466+ ChanId: store.SystemInternalChannelId,
467+ TopLevel: 3,
468+ NotificationPayloads: []json.RawMessage{
469+ json.RawMessage(`{"a":"x"}`),
470+ json.RawMessage(`[]`),
471+ json.RawMessage(`{"a":"y"}`),
472+ },
473+ }
474+ exchg.Init()
475+ c.Check(exchg.Decoded, DeepEquals, []map[string]interface{}{
476+ map[string]interface{}{"a": "x"},
477+ nil,
478+ map[string]interface{}{"a": "y"},
479+ })
480+}
481+
482 func (s *exchangesSuite) TestBroadcastExchange(c *C) {
483 sess := &testing.TestBrokerSession{
484- LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
485+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
486+ Model: "m1",
487+ ImageChannel: "img1",
488 }
489 exchg := &broker.BroadcastExchange{
490 ChanId: store.SystemInternalChannelId,
491 TopLevel: 3,
492 NotificationPayloads: []json.RawMessage{
493- json.RawMessage(`{"a":"x"}`),
494- json.RawMessage(`{"a":"y"}`),
495+ json.RawMessage(`{"img1/m1":100}`),
496+ json.RawMessage(`{"img2/m2":200}`),
497 },
498 }
499+ exchg.Init()
500 outMsg, inMsg, err := exchg.Prepare(sess)
501 c.Assert(err, IsNil)
502 // check
503 marshalled, err := json.Marshal(outMsg)
504 c.Assert(err, IsNil)
505- c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
506+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img1/m1":100}]}`)
507 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
508 c.Assert(err, IsNil)
509 err = exchg.Acked(sess, true)
510@@ -62,9 +83,11 @@
511
512 func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {
513 sess := &testing.TestBrokerSession{
514- LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
515+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
516+ Model: "m1",
517+ ImageChannel: "img1",
518 }
519- payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
520+ payloadFmt := fmt.Sprintf(`{"img1/m1":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
521 needsSplitting := make([]json.RawMessage, 32)
522 for i := 0; i < 32; i++ {
523 needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i))
524@@ -76,6 +99,7 @@
525 TopLevel: topLevel,
526 NotificationPayloads: needsSplitting,
527 }
528+ exchg.Init()
529 outMsg, _, err := exchg.Prepare(sess)
530 c.Assert(err, IsNil)
531 parts := 0
532@@ -91,10 +115,11 @@
533 ChanId: store.SystemInternalChannelId,
534 TopLevel: topLevel + 2,
535 NotificationPayloads: []json.RawMessage{
536- json.RawMessage(`{"a":"x"}`),
537- json.RawMessage(`{"a":"y"}`),
538+ json.RawMessage(`{"img1/m1":"x"}`),
539+ json.RawMessage(`{"img1/m1":"y"}`),
540 },
541 }
542+ exchg.Init()
543 outMsg, _, err = exchg.Prepare(sess)
544 c.Assert(err, IsNil)
545 done := outMsg.Split() // shouldn't panic
546@@ -103,21 +128,24 @@
547
548 func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {
549 sess := &testing.TestBrokerSession{
550- LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
551+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
552+ Model: "m1",
553+ ImageChannel: "img2",
554 }
555 exchg := &broker.BroadcastExchange{
556 ChanId: store.SystemInternalChannelId,
557 TopLevel: 3,
558 NotificationPayloads: []json.RawMessage{
559- json.RawMessage(`{"a":"y"}`),
560+ json.RawMessage(`{"img2/m1":1}`),
561 },
562 }
563+ exchg.Init()
564 outMsg, inMsg, err := exchg.Prepare(sess)
565 c.Assert(err, IsNil)
566 // check
567 marshalled, err := json.Marshal(outMsg)
568 c.Assert(err, IsNil)
569- c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
570+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img2/m1":1}]}`)
571 err = json.Unmarshal([]byte(`{}`), inMsg)
572 c.Assert(err, IsNil)
573 err = exchg.Acked(sess, true)
574@@ -130,23 +158,55 @@
575 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{
576 store.SystemInternalChannelId: 2,
577 }),
578+ Model: "m1",
579+ ImageChannel: "img1",
580 }
581 exchg := &broker.BroadcastExchange{
582 ChanId: store.SystemInternalChannelId,
583 TopLevel: 3,
584 NotificationPayloads: []json.RawMessage{
585- json.RawMessage(`{"a":"x"}`),
586- json.RawMessage(`{"a":"y"}`),
587- },
588- }
589- outMsg, inMsg, err := exchg.Prepare(sess)
590- c.Assert(err, IsNil)
591- // check
592- marshalled, err := json.Marshal(outMsg)
593- c.Assert(err, IsNil)
594- c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
595- err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
596- c.Assert(err, IsNil)
597- err = exchg.Acked(sess, true)
598- c.Assert(err, IsNil)
599+ json.RawMessage(`{"img1/m1":100}`),
600+ json.RawMessage(`{"img1/m1":101}`),
601+ },
602+ }
603+ exchg.Init()
604+ outMsg, inMsg, err := exchg.Prepare(sess)
605+ c.Assert(err, IsNil)
606+ // check
607+ marshalled, err := json.Marshal(outMsg)
608+ c.Assert(err, IsNil)
609+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img1/m1":101}]}`)
610+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
611+ c.Assert(err, IsNil)
612+ err = exchg.Acked(sess, true)
613+ c.Assert(err, IsNil)
614+}
615+
616+func (s *exchangesSuite) TestBroadcastExchangeChannelFilter(c *C) {
617+ sess := &testing.TestBrokerSession{
618+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
619+ Model: "m1",
620+ ImageChannel: "img1",
621+ }
622+ exchg := &broker.BroadcastExchange{
623+ ChanId: store.SystemInternalChannelId,
624+ TopLevel: 5,
625+ NotificationPayloads: []json.RawMessage{
626+ json.RawMessage(`{"img1/m1":100}`),
627+ json.RawMessage(`{"img2/m2":200}`),
628+ json.RawMessage(`{"img1/m1":101}`),
629+ },
630+ }
631+ exchg.Init()
632+ outMsg, inMsg, err := exchg.Prepare(sess)
633+ c.Assert(err, IsNil)
634+ // check
635+ marshalled, err := json.Marshal(outMsg)
636+ c.Assert(err, IsNil)
637+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":5,"Payloads":[{"img1/m1":100},{"img1/m1":101}]}`)
638+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
639+ c.Assert(err, IsNil)
640+ err = exchg.Acked(sess, true)
641+ c.Assert(err, IsNil)
642+ c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5))
643 }
644
645=== modified file 'server/broker/exchg_impl_test.go'
646--- server/broker/exchg_impl_test.go 2014-02-10 23:19:08 +0000
647+++ server/broker/exchg_impl_test.go 2014-04-04 09:57:13 +0000
648@@ -20,6 +20,8 @@
649 "encoding/json"
650
651 . "launchpad.net/gocheck"
652+
653+ "launchpad.net/ubuntu-push/server/store"
654 )
655
656 type exchangesImplSuite struct{}
657@@ -56,3 +58,31 @@
658 res = filterByLevel(5, 10, nil)
659 c.Check(len(res), Equals, 0)
660 }
661+
662+func (s *exchangesImplSuite) TestChannelFilter(c *C) {
663+ payloads := []json.RawMessage{
664+ json.RawMessage(`{"a/x": 3}`),
665+ json.RawMessage(`{"b/x": 4}`),
666+ json.RawMessage(`{"a/y": 5}`),
667+ json.RawMessage(`{"a/x": 6}`),
668+ }
669+ decoded := make([]map[string]interface{}, 4)
670+ for i, p := range payloads {
671+ err := json.Unmarshal(p, &decoded[i])
672+ c.Assert(err, IsNil)
673+ }
674+
675+ other := store.InternalChannelId("1")
676+
677+ c.Check(channelFilter("", store.SystemInternalChannelId, nil, nil), IsNil)
678+ c.Check(channelFilter("", other, payloads[1:], decoded), DeepEquals, payloads[1:])
679+
680+ // use tag when channel is the sytem channel
681+
682+ c.Check(channelFilter("c/z", store.SystemInternalChannelId, payloads, decoded), HasLen, 0)
683+
684+ c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]})
685+
686+ c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]})
687+
688+}
689
690=== modified file 'server/broker/simple/simple.go'
691--- server/broker/simple/simple.go 2014-02-21 16:04:44 +0000
692+++ server/broker/simple/simple.go 2014-04-04 09:57:13 +0000
693@@ -46,11 +46,13 @@
694
695 // simpleBrokerSession represents a session in the broker.
696 type simpleBrokerSession struct {
697- registered bool
698- deviceId string
699- done chan bool
700- exchanges chan broker.Exchange
701- levels broker.LevelsMap
702+ registered bool
703+ deviceId string
704+ model string
705+ imageChannel string
706+ done chan bool
707+ exchanges chan broker.Exchange
708+ levels broker.LevelsMap
709 // for exchanges
710 exchgScratch broker.ExchangesScratchArea
711 }
712@@ -75,6 +77,14 @@
713 return sess.deviceId
714 }
715
716+func (sess *simpleBrokerSession) DeviceImageModel() string {
717+ return sess.model
718+}
719+
720+func (sess *simpleBrokerSession) DeviceImageChannel() string {
721+ return sess.imageChannel
722+}
723+
724 func (sess *simpleBrokerSession) Levels() broker.LevelsMap {
725 return sess.levels
726 }
727@@ -147,6 +157,7 @@
728 TopLevel: topLevel,
729 NotificationPayloads: payloads,
730 }
731+ broadcastExchg.Init()
732 sess.exchanges <- broadcastExchg
733 }
734 }
735@@ -157,6 +168,14 @@
736 // pending notifications as well.
737 func (b *SimpleBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) {
738 // xxx sanity check DeviceId
739+ model, err := broker.GetInfoString(connect, "device", "?")
740+ if err != nil {
741+ return nil, err
742+ }
743+ imageChannel, err := broker.GetInfoString(connect, "channel", "?")
744+ if err != nil {
745+ return nil, err
746+ }
747 levels := map[store.InternalChannelId]int64{}
748 for hexId, v := range connect.Levels {
749 id, err := store.HexToInternalChannelId(hexId)
750@@ -166,14 +185,16 @@
751 levels[id] = v
752 }
753 sess := &simpleBrokerSession{
754- deviceId: connect.DeviceId,
755- done: make(chan bool),
756- exchanges: make(chan broker.Exchange, b.sessionQueueSize),
757- levels: levels,
758+ deviceId: connect.DeviceId,
759+ model: model,
760+ imageChannel: imageChannel,
761+ done: make(chan bool),
762+ exchanges: make(chan broker.Exchange, b.sessionQueueSize),
763+ levels: levels,
764 }
765 b.sessionCh <- sess
766 <-sess.done
767- err := b.feedPending(sess)
768+ err = b.feedPending(sess)
769 if err != nil {
770 return nil, err
771 }
772@@ -219,6 +240,7 @@
773 TopLevel: topLevel,
774 NotificationPayloads: payloads,
775 }
776+ broadcastExchg.Init()
777 for _, sess := range b.registry {
778 sess.exchanges <- broadcastExchg
779 }
780
781=== modified file 'server/broker/simple/simple_test.go'
782--- server/broker/simple/simple_test.go 2014-02-10 23:29:53 +0000
783+++ server/broker/simple/simple_test.go 2014-04-04 09:57:13 +0000
784@@ -48,6 +48,7 @@
785 sto := store.NewInMemoryPendingStore()
786 muchLater := time.Now().Add(10 * time.Minute)
787 notification1 := json.RawMessage(`{"m": "M"}`)
788+ decoded1 := map[string]interface{}{"m": "M"}
789 sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
790 b := NewSimpleBroker(sto, testBrokerConfig, nil)
791 sess := &simpleBrokerSession{
792@@ -60,6 +61,7 @@
793 ChanId: store.SystemInternalChannelId,
794 TopLevel: 1,
795 NotificationPayloads: []json.RawMessage{notification1},
796+ Decoded: []map[string]interface{}{decoded1},
797 })
798 }
799
800
801=== modified file 'server/broker/testing/impls.go'
802--- server/broker/testing/impls.go 2014-01-23 20:13:22 +0000
803+++ server/broker/testing/impls.go 2014-04-04 09:57:13 +0000
804@@ -24,6 +24,8 @@
805 // Test implementation of BrokerSession.
806 type TestBrokerSession struct {
807 DeviceId string
808+ Model string
809+ ImageChannel string
810 Exchanges chan broker.Exchange
811 LevelsMap broker.LevelsMap
812 exchgScratch broker.ExchangesScratchArea
813@@ -33,6 +35,14 @@
814 return tbs.DeviceId
815 }
816
817+func (tbs *TestBrokerSession) DeviceImageModel() string {
818+ return tbs.Model
819+}
820+
821+func (tbs *TestBrokerSession) DeviceImageChannel() string {
822+ return tbs.ImageChannel
823+}
824+
825 func (tbs *TestBrokerSession) SessionChannel() <-chan broker.Exchange {
826 return tbs.Exchanges
827 }
828
829=== modified file 'server/broker/testsuite/suite.go'
830--- server/broker/testsuite/suite.go 2014-03-19 23:46:18 +0000
831+++ server/broker/testsuite/suite.go 2014-04-04 09:57:13 +0000
832@@ -81,10 +81,20 @@
833 b := s.MakeBroker(sto, testBrokerConfig, nil)
834 b.Start()
835 defer b.Stop()
836- sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"0": 5}})
837+ sess, err := b.Register(&protocol.ConnectMsg{
838+ Type: "connect",
839+ DeviceId: "dev-1",
840+ Levels: map[string]int64{"0": 5},
841+ Info: map[string]interface{}{
842+ "device": "model",
843+ "channel": "daily",
844+ },
845+ })
846 c.Assert(err, IsNil)
847 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess)
848 c.Assert(sess.DeviceIdentifier(), Equals, "dev-1")
849+ c.Check(sess.DeviceImageModel(), Equals, "model")
850+ c.Check(sess.DeviceImageChannel(), Equals, "daily")
851 c.Assert(sess.ExchangeScratchArea(), Not(IsNil))
852 c.Check(sess.Levels(), DeepEquals, broker.LevelsMap(map[store.InternalChannelId]int64{
853 store.SystemInternalChannelId: 5,
854@@ -105,6 +115,22 @@
855 c.Check(err, FitsTypeOf, &broker.ErrAbort{})
856 }
857
858+func (s *CommonBrokerSuite) TestRegistrationInfoErrors(c *C) {
859+ sto := store.NewInMemoryPendingStore()
860+ b := s.MakeBroker(sto, testBrokerConfig, nil)
861+ b.Start()
862+ defer b.Stop()
863+ info := map[string]interface{}{
864+ "device": -1,
865+ }
866+ _, err := b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})
867+ c.Check(err, Equals, broker.ErrUnexpectedValue)
868+ info["device"] = "m"
869+ info["channel"] = -1
870+ _, err = b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})
871+ c.Check(err, Equals, broker.ErrUnexpectedValue)
872+}
873+
874 func (s *CommonBrokerSuite) TestRegistrationFeedPending(c *C) {
875 sto := store.NewInMemoryPendingStore()
876 notification1 := json.RawMessage(`{"m": "M"}`)
877@@ -149,6 +175,7 @@
878 func (s *CommonBrokerSuite) TestBroadcast(c *C) {
879 sto := store.NewInMemoryPendingStore()
880 notification1 := json.RawMessage(`{"m": "M"}`)
881+ decoded1 := map[string]interface{}{"m": "M"}
882 b := s.MakeBroker(sto, testBrokerConfig, nil)
883 b.Start()
884 defer b.Stop()
885@@ -168,6 +195,7 @@
886 ChanId: store.SystemInternalChannelId,
887 TopLevel: 1,
888 NotificationPayloads: []json.RawMessage{notification1},
889+ Decoded: []map[string]interface{}{decoded1},
890 })
891 }
892 select {
893@@ -178,6 +206,7 @@
894 ChanId: store.SystemInternalChannelId,
895 TopLevel: 1,
896 NotificationPayloads: []json.RawMessage{notification1},
897+ Decoded: []map[string]interface{}{decoded1},
898 })
899 }
900 }

Subscribers

People subscribed via source and target branches