Merge lp:ubuntu-push/automatic into lp:ubuntu-push

Proposed by John Lenton
Status: Merged
Merged at revision: 94
Proposed branch: lp:ubuntu-push/automatic
Merge into: lp:ubuntu-push
Diff against target: 3651 lines (+1729/-310)
38 files modified
.bzrignore (+2/-0)
Makefile (+21/-6)
README (+11/-3)
bus/bus.go (+6/-1)
bus/connectivity/connectivity.go (+31/-8)
bus/connectivity/connectivity_test.go (+79/-19)
bus/endpoint.go (+18/-10)
bus/networkmanager/networkmanager.go (+71/-2)
bus/networkmanager/networkmanager_test.go (+124/-9)
bus/notifications/raw.go (+4/-8)
bus/systemimage/systemimage.go (+68/-0)
bus/systemimage/systemimage_test.go (+62/-0)
bus/testing/testing_endpoint.go (+40/-13)
bus/testing/testing_endpoint_test.go (+12/-10)
bus/urldispatcher/urldispatcher.go (+1/-1)
client/client.go (+99/-8)
client/client_test.go (+224/-49)
client/gethosts/gethost_test.go (+5/-7)
client/session/session.go (+179/-33)
client/session/session_test.go (+333/-45)
debian/config.json (+4/-1)
debian/ubuntu-push-client.conf (+3/-3)
sampleconfigs/dev.json (+3/-3)
server/acceptance/acceptanceclient.go (+6/-0)
server/acceptance/cmd/acceptanceclient.go (+7/-2)
server/acceptance/suites/broadcast.go (+48/-25)
server/acceptance/suites/helpers.go (+2/-2)
server/acceptance/suites/pingpong.go (+2/-2)
server/acceptance/suites/suite.go (+6/-4)
server/broker/broker.go (+20/-0)
server/broker/broker_test.go (+18/-0)
server/broker/exchanges.go (+31/-0)
server/broker/exchanges_test.go (+85/-25)
server/broker/exchg_impl_test.go (+30/-0)
server/broker/simple/simple.go (+32/-10)
server/broker/simple/simple_test.go (+2/-0)
server/broker/testing/impls.go (+10/-0)
server/broker/testsuite/suite.go (+30/-1)
To merge this branch: bzr merge lp:ubuntu-push/automatic
Reviewer Review Type Date Requested Status
Ubuntu Push Hackers Pending
Review via email: mp+214262@code.launchpad.net

Commit message

Merge automatic into trunk.

Description of the change

Merge automatic into trunk.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file '.bzrignore'
2--- .bzrignore 2014-02-07 19:36:38 +0000
3+++ .bzrignore 2014-04-04 14:39:41 +0000
4@@ -11,3 +11,5 @@
5 debian/*.ex
6 debian/*.EX
7 debian/*.substvars
8+ubuntu-push-client
9+push-server-dev
10
11=== modified file 'Makefile'
12--- Makefile 2014-03-12 13:23:26 +0000
13+++ Makefile 2014-04-04 14:39:41 +0000
14@@ -12,6 +12,8 @@
15 GODEPS += launchpad.net/go-xdg/v0
16 GODEPS += code.google.com/p/gosqlite/sqlite3
17
18+TOTEST = $(shell env GOPATH=$(GOPATH) go list $(PROJECT)/...|grep -v acceptance|grep -v http13client )
19+
20 bootstrap:
21 mkdir -p $(GOPATH)/bin
22 mkdir -p $(GOPATH)/pkg
23@@ -21,17 +23,29 @@
24 go install $(GODEPS)
25
26 check:
27- go test $(TESTFLAGS) $(PROJECT)/...
28+ go test $(TESTFLAGS) $(TOTEST)
29
30 check-race:
31- go test $(TESTFLAGS) -race $(PROJECT)/...
32+ go test $(TESTFLAGS) -race $(TOTEST)
33+
34+acceptance:
35+ cd server/acceptance; ./acceptance.sh
36+
37+build-client:
38+ go build ubuntu-push-client.go
39+
40+build-server-dev:
41+ go build -o push-server-dev launchpad.net/ubuntu-push/server/dev
42+
43+run-server-dev:
44+ go run server/dev/*.go sampleconfigs/dev.json
45
46 coverage-summary:
47- go test $(TESTFLAGS) -a -cover $(PROJECT)/...
48+ go test $(TESTFLAGS) -a -cover $(TOTEST)
49
50 coverage-html:
51 mkdir -p coverhtml
52- for pkg in $$(go list $(PROJECT)/...|grep -v acceptance ); do \
53+ for pkg in $(TOTEST); do \
54 relname="$${pkg#$(PROJECT)/}" ; \
55 mkdir -p coverhtml/$$(dirname $${relname}) ; \
56 go test $(TESTFLAGS) -a -coverprofile=coverhtml/$${relname}.out $$pkg ; \
57@@ -52,5 +66,6 @@
58 # requires graphviz installed
59 dot -Tsvg $< > $@
60
61-.PHONY: bootstrap check check-race format check-format coverage-summary \
62- coverage-html protocol-diagrams
63+.PHONY: bootstrap check check-race format check-format \
64+ acceptance build-client build server-dev run-server-dev \
65+ coverage-summary coverage-html protocol-diagrams
66
67=== modified file 'README'
68--- README 2014-02-21 16:17:28 +0000
69+++ README 2014-04-04 14:39:41 +0000
70@@ -15,8 +15,7 @@
71 make check
72
73 To produce coverage reports you need Go 1.2 (default on Trusty) and
74-the cover tool (the latter can be obtained atm with something like:
75-sudo GOPATH=<go-workspace> go get code.google.com/p/go.tools/cmd/cover ),
76+the cover tool (in the golang-go.tools package),
77 then run:
78
79 make coverage-summary
80@@ -31,4 +30,13 @@
81
82 To run the acceptance tests, change to the acceptance subdir and run:
83
84- ./acceptance.sh
85+ make acceptance
86+
87+There are build targets to build the client:
88+
89+ make build-client
90+
91+building ubuntu-push-client, and to run the development server:
92+
93+ make run-server-dev
94+
95
96=== modified file 'bus/bus.go'
97--- bus/bus.go 2014-02-06 09:57:49 +0000
98+++ bus/bus.go 2014-04-04 14:39:41 +0000
99@@ -57,11 +57,16 @@
100 }
101 }
102
103-// Connect() connects to the bus, and returns the bus endpoint (and/or error).
104+// Endpoint returns a bus endpoint.
105 func (bus concreteBus) Endpoint(addr Address, log logger.Logger) Endpoint {
106 return newEndpoint(bus, addr, log)
107 }
108
109+// Args helps build arguments for endpoint Call().
110+func Args(args ...interface{}) []interface{} {
111+ return args
112+}
113+
114 /*
115 private methods
116 */
117
118=== modified file 'bus/connectivity/connectivity.go'
119--- bus/connectivity/connectivity.go 2014-03-20 14:21:24 +0000
120+++ bus/connectivity/connectivity.go 2014-04-04 14:39:41 +0000
121@@ -47,6 +47,7 @@
122
123 type connectedState struct {
124 networkStateCh <-chan networkmanager.State
125+ networkConCh <-chan string
126 config ConnectivityConfig
127 log logger.Logger
128 endp bus.Endpoint
129@@ -62,7 +63,9 @@
130 // up the watch.
131 func (cs *connectedState) start() networkmanager.State {
132 var initial networkmanager.State
133- var ch <-chan networkmanager.State
134+ var stateCh <-chan networkmanager.State
135+ var primary string
136+ var conCh <-chan string
137 var err error
138 for {
139 ar := util.NewAutoRedialer(cs.endp)
140@@ -77,13 +80,24 @@
141 }
142
143 // set up the watch
144- ch, err = nm.WatchState()
145- if err != nil {
146- cs.log.Debugf("Failed to set up the watch: %s", err)
147- goto Continue
148- }
149-
150- cs.networkStateCh = ch
151+ stateCh, err = nm.WatchState()
152+ if err != nil {
153+ cs.log.Debugf("failed to set up the state watch: %s", err)
154+ goto Continue
155+ }
156+
157+ primary = nm.GetPrimaryConnection()
158+ cs.log.Debugf("primary connection starts as %#v", primary)
159+
160+ conCh, err = nm.WatchPrimaryConnection()
161+ if err != nil {
162+ cs.log.Debugf("failed to set up the connection watch: %s", err)
163+ goto Continue
164+ }
165+
166+ cs.networkStateCh = stateCh
167+ cs.networkConCh = conCh
168+
169 return initial
170
171 Continue:
172@@ -102,6 +116,15 @@
173 Loop:
174 for {
175 select {
176+ case <-cs.networkConCh:
177+ cs.webgetCh = nil
178+ cs.timer.Reset(stabilizingTimeout)
179+ log.Debugf("PrimaryConnection changed. Assuming disconnect.")
180+ if cs.lastSent == true {
181+ cs.lastSent = false
182+ break Loop
183+ }
184+
185 case v, ok := <-cs.networkStateCh:
186 if !ok {
187 // tear it all down and start over
188
189=== modified file 'bus/connectivity/connectivity_test.go'
190--- bus/connectivity/connectivity_test.go 2014-03-20 12:15:47 +0000
191+++ bus/connectivity/connectivity_test.go 2014-04-04 14:39:41 +0000
192@@ -17,6 +17,7 @@
193 package connectivity
194
195 import (
196+ "launchpad.net/go-dbus/v1"
197 . "launchpad.net/gocheck"
198 "launchpad.net/ubuntu-push/bus/networkmanager"
199 testingbus "launchpad.net/ubuntu-push/bus/testing"
200@@ -84,6 +85,17 @@
201 c.Check(cs.connAttempts, Equals, uint32(6))
202 }
203
204+// when some of the calls to NetworkManager fails for a bit, we're still OK
205+func (s *ConnSuite) TestStartRetriesCall2(c *C) {
206+ cond := condition.Chain(3, condition.Work(true), 1, condition.Work(false),
207+ 1, condition.Work(true))
208+
209+ endp := testingbus.NewTestingEndpoint(condition.Work(true), cond, uint32(networkmanager.Connecting))
210+ cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
211+
212+ c.Check(cs.start(), Equals, networkmanager.Connecting)
213+}
214+
215 // when... and bear with me... the bus works, and the first call to
216 // get network manager's state works, but then you can't establish the
217 // watch, we recover and try again.
218@@ -213,24 +225,72 @@
219 }{
220 {false, "first state is always false", 0},
221 {true, "then it should be true as per ConnectedGlobal above", 0},
222- {false, "then, false (upon receiving the next ConnectedGlobal)", 1},
223- {true, "then it should be true (webcheck passed)", 0},
224- {false, "then it should be false (Disconnected)", 1},
225- {false, "then it should be false again because it's restarted", 1},
226- }
227-
228- for i, expected := range expecteds {
229- for j := 0; j < expected.n; j++ {
230- watchTicker <- true
231- }
232- timer.Reset(dt)
233- select {
234- case v = <-out:
235- break
236- case <-timer.C:
237- c.Fatalf("Timed out before getting value (#%d: %s)", i+1, expected.s)
238- }
239-
240- c.Check(v, Equals, expected.p, Commentf(expected.s))
241+ {false, "then, false (upon receiving the next ConnectedGlobal)", 2},
242+ {true, "then it should be true (webcheck passed)", 0},
243+ {false, "then it should be false (Disconnected)", 2},
244+ {false, "then it should be false again because it's restarted", 2},
245+ }
246+
247+ for i, expected := range expecteds {
248+ for j := 0; j < expected.n; j++ {
249+ watchTicker <- true
250+ }
251+ timer.Reset(dt)
252+ select {
253+ case v = <-out:
254+ break
255+ case <-timer.C:
256+ c.Fatalf("Timed out before getting value (#%d: %s)", i+1, expected.s)
257+ }
258+ c.Assert(v, Equals, expected.p, Commentf(expected.s))
259+ }
260+}
261+
262+func (s *ConnSuite) TestRun4Active(c *C) {
263+ ts := httptest.NewServer(mkHandler(staticText))
264+ defer ts.Close()
265+
266+ cfg := ConnectivityConfig{
267+ ConnectivityCheckURL: ts.URL,
268+ ConnectivityCheckMD5: staticHash,
269+ RecheckTimeout: config.ConfigTimeDuration{time.Second},
270+ }
271+
272+ endp := testingbus.NewTestingEndpoint(condition.Work(true), condition.Work(true),
273+ uint32(networkmanager.ConnectedGlobal),
274+ map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("hello")}},
275+ )
276+
277+ watchTicker := make(chan bool)
278+ testingbus.SetWatchTicker(endp, watchTicker)
279+
280+ out := make(chan bool)
281+ dt := time.Second / 10
282+ timer := time.NewTimer(dt)
283+ go ConnectedState(endp, cfg, s.log, out)
284+ var v bool
285+ expecteds := []struct {
286+ p bool
287+ s string
288+ n int
289+ }{
290+ {false, "first state is always false", 0},
291+ {true, "then it should be true as per ConnectedGlobal above", 0},
292+ {false, "then, false (PrimaryConnection changed)", 2},
293+ {true, "then it should be true (webcheck passed)", 0},
294+ }
295+
296+ for i, expected := range expecteds {
297+ for j := 0; j < expected.n; j++ {
298+ watchTicker <- true
299+ }
300+ timer.Reset(dt)
301+ select {
302+ case v = <-out:
303+ break
304+ case <-timer.C:
305+ c.Fatalf("Timed out before getting value (#%d: %s)", i+1, expected.s)
306+ }
307+ c.Assert(v, Equals, expected.p, Commentf(expected.s))
308 }
309 }
310
311=== modified file 'bus/endpoint.go'
312--- bus/endpoint.go 2014-02-21 16:17:28 +0000
313+++ bus/endpoint.go 2014-04-04 14:39:41 +0000
314@@ -32,7 +32,7 @@
315 // bus.Endpoint represents the DBus connection itself.
316 type Endpoint interface {
317 WatchSignal(member string, f func(...interface{}), d func()) error
318- Call(member string, args ...interface{}) ([]interface{}, error)
319+ Call(member string, args []interface{}, rvs ...interface{}) error
320 GetProperty(property string) (interface{}, error)
321 Dial() error
322 Close()
323@@ -118,16 +118,20 @@
324 return nil
325 }
326
327-// Call() invokes the provided member method (on the name, path and interface
328-// provided when creating the endpoint). The return value is unpacked before
329-// being returned.
330-func (endp *endpoint) Call(member string, args ...interface{}) ([]interface{}, error) {
331+// Call() invokes the provided member method (on the name, path and
332+// interface provided when creating the endpoint). args can be built
333+// using bus.Args(...). The return value is unpacked into rvs before being
334+// returned.
335+func (endp *endpoint) Call(member string, args []interface{}, rvs ...interface{}) error {
336 msg, err := endp.proxy.Call(endp.addr.Interface, member, args...)
337 if err != nil {
338- return nil, err
339- }
340- rvs := endp.unpackOneMsg(msg, member)
341- return rvs, nil
342+ return err
343+ }
344+ err = msg.Args(rvs...)
345+ if err != nil {
346+ return err
347+ }
348+ return nil
349 }
350
351 // GetProperty uses the org.freedesktop.DBus.Properties interface's Get method
352@@ -175,7 +179,11 @@
353
354 // unpackOneMsg unpacks the value from the response msg
355 func (endp *endpoint) unpackOneMsg(msg *dbus.Message, member string) []interface{} {
356- return msg.AllArgs()
357+ var varmap map[string]dbus.Variant
358+ if err := msg.Args(&varmap); err != nil {
359+ return msg.AllArgs()
360+ }
361+ return []interface{}{varmap}
362 }
363
364 // unpackMessages unpacks the value from the watch
365
366=== modified file 'bus/networkmanager/networkmanager.go'
367--- bus/networkmanager/networkmanager.go 2014-01-21 13:21:19 +0000
368+++ bus/networkmanager/networkmanager.go 2014-04-04 14:39:41 +0000
369@@ -20,6 +20,8 @@
370 package networkmanager
371
372 import (
373+ "launchpad.net/go-dbus/v1"
374+
375 "launchpad.net/ubuntu-push/bus"
376 "launchpad.net/ubuntu-push/logger"
377 )
378@@ -41,6 +43,12 @@
379 // WatchState listens for changes to NetworkManager's state, and sends
380 // them out over the channel returned.
381 WatchState() (<-chan State, error)
382+ // GetPrimaryConnection fetches and returns NetworkManager's current
383+ // primary connection.
384+ GetPrimaryConnection() string
385+ // WatchPrimaryConnection listens for changes of NetworkManager's
386+ // Primary Connection, and sends it out over the channel returned.
387+ WatchPrimaryConnection() (<-chan string, error)
388 }
389
390 type networkManager struct {
391@@ -68,13 +76,28 @@
392 return Unknown
393 }
394
395- return State(s.(uint32))
396+ v, ok := s.(uint32)
397+ if !ok {
398+ nm.log.Errorf("Got weird state: %#v", s)
399+ return Unknown
400+ }
401+
402+ return State(v)
403 }
404
405 func (nm *networkManager) WatchState() (<-chan State, error) {
406 ch := make(chan State)
407 err := nm.bus.WatchSignal("StateChanged",
408- func(ns ...interface{}) { ch <- State(ns[0].(uint32)) },
409+ func(ns ...interface{}) {
410+ stint, ok := ns[0].(uint32)
411+ if !ok {
412+ nm.log.Errorf("got weird state: %#v", ns[0])
413+ return
414+ }
415+ st := State(stint)
416+ nm.log.Debugf("got state: %s", st)
417+ ch <- State(stint)
418+ },
419 func() { close(ch) })
420 if err != nil {
421 nm.log.Debugf("Failed to set up the watch: %s", err)
422@@ -83,3 +106,49 @@
423
424 return ch, nil
425 }
426+
427+func (nm *networkManager) GetPrimaryConnection() string {
428+ s, err := nm.bus.GetProperty("PrimaryConnection")
429+ if err != nil {
430+ nm.log.Errorf("Failed gettting current primary connection: %s", err)
431+ nm.log.Debugf("Defaulting primary connection to empty")
432+ return ""
433+ }
434+
435+ v, ok := s.(dbus.ObjectPath)
436+ if !ok {
437+ nm.log.Errorf("got weird PrimaryConnection: %#v", s)
438+ return ""
439+ }
440+
441+ return string(v)
442+}
443+
444+func (nm *networkManager) WatchPrimaryConnection() (<-chan string, error) {
445+ ch := make(chan string)
446+ err := nm.bus.WatchSignal("PropertiesChanged",
447+ func(ppsi ...interface{}) {
448+ pps, ok := ppsi[0].(map[string]dbus.Variant)
449+ if !ok {
450+ nm.log.Errorf("got weird PropertiesChanged: %#v", ppsi[0])
451+ return
452+ }
453+ v, ok := pps["PrimaryConnection"]
454+ if !ok {
455+ return
456+ }
457+ con, ok := v.Value.(dbus.ObjectPath)
458+ if !ok {
459+ nm.log.Errorf("got weird PrimaryConnection via PropertiesChanged: %#v", v)
460+ return
461+ }
462+ nm.log.Debugf("got primary connection: %s", con)
463+ ch <- string(con)
464+ }, func() { close(ch) })
465+ if err != nil {
466+ nm.log.Debugf("Failed to set up the watch: %s", err)
467+ return nil, err
468+ }
469+
470+ return ch, nil
471+}
472
473=== modified file 'bus/networkmanager/networkmanager_test.go'
474--- bus/networkmanager/networkmanager_test.go 2014-02-05 18:17:26 +0000
475+++ bus/networkmanager/networkmanager_test.go 2014-04-04 14:39:41 +0000
476@@ -17,12 +17,15 @@
477 package networkmanager
478
479 import (
480+ "testing"
481+
482+ "launchpad.net/go-dbus/v1"
483 . "launchpad.net/gocheck"
484+
485 testingbus "launchpad.net/ubuntu-push/bus/testing"
486 "launchpad.net/ubuntu-push/logger"
487 helpers "launchpad.net/ubuntu-push/testing"
488 "launchpad.net/ubuntu-push/testing/condition"
489- "testing"
490 )
491
492 // hook up gocheck
493@@ -71,7 +74,7 @@
494
495 // GetState returns the right state when dbus works but delivers rubbish values
496 func (s *NMSuite) TestGetStateRubbishValues(c *C) {
497- nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false), 42), s.log)
498+ nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(true), "Unknown"), s.log)
499 state := nm.GetState()
500 c.Check(state, Equals, Unknown)
501 }
502@@ -101,11 +104,123 @@
503 }
504
505 // WatchState calls close on its channel when the watch bails
506-func (s *NMSuite) TestWatchClosesOnWatchBail(c *C) {
507- tc := testingbus.NewTestingEndpoint(nil, condition.Work(true))
508- nm := New(tc, s.log)
509- ch, err := nm.WatchState()
510- c.Check(err, IsNil)
511- _, ok := <-ch
512- c.Check(ok, Equals, false)
513+func (s *NMSuite) TestWatchStateClosesOnWatchBail(c *C) {
514+ tc := testingbus.NewTestingEndpoint(nil, condition.Work(true))
515+ nm := New(tc, s.log)
516+ ch, err := nm.WatchState()
517+ c.Check(err, IsNil)
518+ _, ok := <-ch
519+ c.Check(ok, Equals, false)
520+}
521+
522+// WatchState survives rubbish values
523+func (s *NMSuite) TestWatchStateSurvivesRubbishValues(c *C) {
524+ tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), "a")
525+ nm := New(tc, s.log)
526+ ch, err := nm.WatchState()
527+ c.Check(err, IsNil)
528+ _, ok := <-ch
529+ c.Check(ok, Equals, false)
530+}
531+
532+// GetPrimaryConnection returns the right state when everything works
533+func (s *NMSuite) TestGetPrimaryConnection(c *C) {
534+ nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(true), dbus.ObjectPath("/a/1")), s.log)
535+ con := nm.GetPrimaryConnection()
536+ c.Check(con, Equals, "/a/1")
537+}
538+
539+// GetPrimaryConnection returns the right state when dbus fails
540+func (s *NMSuite) TestGetPrimaryConnectionFail(c *C) {
541+ nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false)), s.log)
542+ con := nm.GetPrimaryConnection()
543+ c.Check(con, Equals, "")
544+}
545+
546+// GetPrimaryConnection returns the right state when dbus works but delivers rubbish values
547+func (s *NMSuite) TestGetPrimaryConnectionRubbishValues(c *C) {
548+ nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(true), "broken"), s.log)
549+ con := nm.GetPrimaryConnection()
550+ c.Check(con, Equals, "")
551+}
552+
553+// GetPrimaryConnection returns the right state when dbus works but delivers a rubbish structure
554+func (s *NMSuite) TestGetPrimaryConnectionRubbishStructure(c *C) {
555+ nm := New(testingbus.NewMultiValuedTestingEndpoint(nil, condition.Work(true), []interface{}{}), s.log)
556+ con := nm.GetPrimaryConnection()
557+ c.Check(con, Equals, "")
558+}
559+
560+func mkPriConMap(priCon string) map[string]dbus.Variant {
561+ m := make(map[string]dbus.Variant)
562+ m["PrimaryConnection"] = dbus.Variant{dbus.ObjectPath(priCon)}
563+ return m
564+}
565+
566+// WatchPrimaryConnection sends a stream of Connections over the channel
567+func (s *NMSuite) TestWatchPrimaryConnection(c *C) {
568+ tc := testingbus.NewTestingEndpoint(nil, condition.Work(true),
569+ mkPriConMap("/a/1"),
570+ mkPriConMap("/b/2"),
571+ mkPriConMap("/c/3"))
572+ nm := New(tc, s.log)
573+ ch, err := nm.WatchPrimaryConnection()
574+ c.Check(err, IsNil)
575+ l := []string{<-ch, <-ch, <-ch}
576+ c.Check(l, DeepEquals, []string{"/a/1", "/b/2", "/c/3"})
577+}
578+
579+// WatchPrimaryConnection returns on error if the dbus call fails
580+func (s *NMSuite) TestWatchPrimaryConnectionFails(c *C) {
581+ nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false)), s.log)
582+ _, err := nm.WatchPrimaryConnection()
583+ c.Check(err, NotNil)
584+}
585+
586+// WatchPrimaryConnection calls close on its channel when the watch bails
587+func (s *NMSuite) TestWatchPrimaryConnectionClosesOnWatchBail(c *C) {
588+ tc := testingbus.NewTestingEndpoint(nil, condition.Work(true))
589+ nm := New(tc, s.log)
590+ ch, err := nm.WatchPrimaryConnection()
591+ c.Check(err, IsNil)
592+ _, ok := <-ch
593+ c.Check(ok, Equals, false)
594+}
595+
596+// WatchPrimaryConnection survives rubbish values
597+func (s *NMSuite) TestWatchPrimaryConnectionSurvivesRubbishValues(c *C) {
598+ tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), "a")
599+ nm := New(tc, s.log)
600+ ch, err := nm.WatchPrimaryConnection()
601+ c.Assert(err, IsNil)
602+ _, ok := <-ch
603+ c.Check(ok, Equals, false)
604+}
605+
606+// WatchPrimaryConnection ignores non-PrimaryConnection PropertyChanged
607+func (s *NMSuite) TestWatchPrimaryConnectionIgnoresIrrelephant(c *C) {
608+ tc := testingbus.NewTestingEndpoint(nil, condition.Work(true),
609+ map[string]dbus.Variant{"foo": dbus.Variant{}},
610+ map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("42")}},
611+ )
612+ nm := New(tc, s.log)
613+ ch, err := nm.WatchPrimaryConnection()
614+ c.Assert(err, IsNil)
615+ v, ok := <-ch
616+ c.Check(ok, Equals, true)
617+ c.Check(v, Equals, "42")
618+}
619+
620+// WatchPrimaryConnection ignores rubbish PrimaryConnections
621+func (s *NMSuite) TestWatchPrimaryConnectionIgnoresRubbishValues(c *C) {
622+ tc := testingbus.NewTestingEndpoint(nil, condition.Work(true),
623+ map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{-12}},
624+ map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("42")}},
625+ )
626+ nm := New(tc, s.log)
627+ ch, err := nm.WatchPrimaryConnection()
628+ c.Assert(err, IsNil)
629+ v, ok := <-ch
630+ c.Check(ok, Equals, true)
631+ c.Check(v, Equals, "42")
632 }
633
634=== modified file 'bus/notifications/raw.go'
635--- bus/notifications/raw.go 2014-01-27 14:22:00 +0000
636+++ bus/notifications/raw.go 2014-04-04 14:39:41 +0000
637@@ -22,7 +22,6 @@
638 // this is the lower-level api
639
640 import (
641- "fmt"
642 "launchpad.net/go-dbus/v1"
643 "launchpad.net/ubuntu-push/bus"
644 "launchpad.net/ubuntu-push/logger"
645@@ -69,16 +68,13 @@
646 timeout int32) (uint32, error) {
647 // that's a long argument list! Take a breather.
648 //
649- rvs, err := raw.bus.Call("Notify", app_name, reuse_id, icon,
650- summary, body, actions, hints, timeout)
651+ var res uint32
652+ err := raw.bus.Call("Notify", bus.Args(app_name, reuse_id, icon,
653+ summary, body, actions, hints, timeout), &res)
654 if err != nil {
655 return 0, err
656 }
657- if len(rvs) != 1 {
658- return 0, fmt.Errorf("Wrong number of values in Notify response: %d",
659- len(rvs))
660- }
661- return rvs[0].(uint32), nil
662+ return res, nil
663 }
664
665 // WatchActions listens for ActionInvoked signals from the notification daemon
666
667=== added directory 'bus/systemimage'
668=== added file 'bus/systemimage/systemimage.go'
669--- bus/systemimage/systemimage.go 1970-01-01 00:00:00 +0000
670+++ bus/systemimage/systemimage.go 2014-04-04 14:39:41 +0000
671@@ -0,0 +1,68 @@
672+/*
673+ Copyright 2013-2014 Canonical Ltd.
674+
675+ This program is free software: you can redistribute it and/or modify it
676+ under the terms of the GNU General Public License version 3, as published
677+ by the Free Software Foundation.
678+
679+ This program is distributed in the hope that it will be useful, but
680+ WITHOUT ANY WARRANTY; without even the implied warranties of
681+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
682+ PURPOSE. See the GNU General Public License for more details.
683+
684+ You should have received a copy of the GNU General Public License along
685+ with this program. If not, see <http://www.gnu.org/licenses/>.
686+*/
687+
688+// Package systemimage is a mimimal wrapper for the system-image dbus API.
689+package systemimage
690+
691+import (
692+ "launchpad.net/ubuntu-push/bus"
693+ "launchpad.net/ubuntu-push/logger"
694+)
695+
696+// system-image service lives on a well-known bus.Address
697+var BusAddress bus.Address = bus.Address{
698+ Interface: "com.canonical.SystemImage",
699+ Path: "/Service",
700+ Name: "com.canonical.SystemImage",
701+}
702+
703+// InfoResult holds the result of the system-image service Info method.
704+type InfoResult struct {
705+ BuildNumber int32
706+ Device string
707+ Channel string
708+ // xxx channel_target missing
709+ LastUpdate string
710+ VersionDetail map[string]string
711+}
712+
713+// A SystemImage exposes the a subset of system-image service.
714+type SystemImage interface {
715+ Info() (*InfoResult, error)
716+}
717+
718+type systemImage struct {
719+ endp bus.Endpoint
720+ log logger.Logger
721+}
722+
723+// New builds a new system-image service wrapper that uses the provided bus.Endpoint
724+func New(endp bus.Endpoint, log logger.Logger) SystemImage {
725+ return &systemImage{endp, log}
726+}
727+
728+var _ SystemImage = &systemImage{} // ensures it conforms
729+
730+func (si *systemImage) Info() (*InfoResult, error) {
731+ si.log.Debugf("Invoking Info")
732+ res := &InfoResult{}
733+ err := si.endp.Call("Info", bus.Args(), &res.BuildNumber, &res.Device, &res.Channel, &res.LastUpdate, &res.VersionDetail)
734+ if err != nil {
735+ si.log.Errorf("Info failed: %v", err)
736+ return nil, err
737+ }
738+ return res, err
739+}
740
741=== added file 'bus/systemimage/systemimage_test.go'
742--- bus/systemimage/systemimage_test.go 1970-01-01 00:00:00 +0000
743+++ bus/systemimage/systemimage_test.go 2014-04-04 14:39:41 +0000
744@@ -0,0 +1,62 @@
745+/*
746+ Copyright 2013-2014 Canonical Ltd.
747+
748+ This program is free software: you can redistribute it and/or modify it
749+ under the terms of the GNU General Public License version 3, as published
750+ by the Free Software Foundation.
751+
752+ This program is distributed in the hope that it will be useful, but
753+ WITHOUT ANY WARRANTY; without even the implied warranties of
754+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
755+ PURPOSE. See the GNU General Public License for more details.
756+
757+ You should have received a copy of the GNU General Public License along
758+ with this program. If not, see <http://www.gnu.org/licenses/>.
759+*/
760+
761+package systemimage
762+
763+import (
764+ "testing"
765+
766+ . "launchpad.net/gocheck"
767+
768+ testibus "launchpad.net/ubuntu-push/bus/testing"
769+ "launchpad.net/ubuntu-push/logger"
770+ helpers "launchpad.net/ubuntu-push/testing"
771+ "launchpad.net/ubuntu-push/testing/condition"
772+)
773+
774+// hook up gocheck
775+func TestSystemImage(t *testing.T) { TestingT(t) }
776+
777+type SISuite struct {
778+ log logger.Logger
779+}
780+
781+var _ = Suite(&SISuite{})
782+
783+func (s *SISuite) SetUpTest(c *C) {
784+ s.log = helpers.NewTestLogger(c, "debug")
785+}
786+
787+func (s *SISuite) TestWorks(c *C) {
788+ endp := testibus.NewMultiValuedTestingEndpoint(nil, condition.Work(true), []interface{}{int32(101), "mako", "daily", "Unknown", map[string]string{}})
789+ si := New(endp, s.log)
790+ res, err := si.Info()
791+ c.Assert(err, IsNil)
792+ c.Check(res, DeepEquals, &InfoResult{
793+ BuildNumber: 101,
794+ Device: "mako",
795+ Channel: "daily",
796+ LastUpdate: "Unknown",
797+ VersionDetail: map[string]string{},
798+ })
799+}
800+
801+func (s *SISuite) TestFailsIfCallFails(c *C) {
802+ endp := testibus.NewTestingEndpoint(nil, condition.Work(false))
803+ si := New(endp, s.log)
804+ _, err := si.Info()
805+ c.Check(err, NotNil)
806+}
807
808=== modified file 'bus/testing/testing_endpoint.go'
809--- bus/testing/testing_endpoint.go 2014-02-06 13:26:13 +0000
810+++ bus/testing/testing_endpoint.go 2014-04-04 14:39:41 +0000
811@@ -21,6 +21,9 @@
812 import (
813 "errors"
814 "fmt"
815+
816+ "launchpad.net/go-dbus/v1"
817+
818 "launchpad.net/ubuntu-push/bus"
819 "launchpad.net/ubuntu-push/testing/condition"
820 "sync"
821@@ -37,6 +40,7 @@
822 callCond condition.Interface
823 retvals [][]interface{}
824 watchTicker chan bool
825+ watchLck sync.RWMutex
826 callArgs []callArgs
827 callArgsLck sync.RWMutex
828 }
829@@ -62,7 +66,9 @@
830 // instead of the default timeout to wait while sending values over
831 // WatchSignal. Set it to nil again to restore default behaviour.
832 func SetWatchTicker(tc bus.Endpoint, watchTicker chan bool) {
833+ tc.(*testingEndpoint).watchLck.Lock()
834 tc.(*testingEndpoint).watchTicker = watchTicker
835+ tc.(*testingEndpoint).watchLck.Unlock()
836 }
837
838 // GetCallArgs returns a list of the arguments for each Call() invocation.
839@@ -79,8 +85,11 @@
840 go func() {
841 for _, v := range tc.retvals {
842 f(v...)
843- if tc.watchTicker != nil {
844- <-tc.watchTicker
845+ tc.watchLck.RLock()
846+ ticker := tc.watchTicker
847+ tc.watchLck.RUnlock()
848+ if ticker != nil {
849+ <-ticker
850 } else {
851 time.Sleep(10 * time.Millisecond)
852 }
853@@ -95,32 +104,50 @@
854
855 // See Endpoint's Call. This Call will check its condition to decide whether
856 // to return an error, or the first of its return values
857-func (tc *testingEndpoint) Call(member string, args ...interface{}) ([]interface{}, error) {
858+func (tc *testingEndpoint) Call(member string, args []interface{}, rvs ...interface{}) error {
859 tc.callArgsLck.Lock()
860 defer tc.callArgsLck.Unlock()
861
862 tc.callArgs = append(tc.callArgs, callArgs{member, args})
863 if tc.callCond.OK() {
864+ expected := len(rvs)
865+ var provided int
866 if len(tc.retvals) == 0 {
867- panic("No return values provided!")
868- }
869- return tc.retvals[0], nil
870+ if expected != 0 {
871+ panic("No return values provided!")
872+ }
873+ provided = 0
874+ } else {
875+ provided = len(tc.retvals[0])
876+ }
877+ if provided != expected {
878+ return errors.New("provided/expected return vals mismatch")
879+ }
880+ if provided != 0 {
881+ x := dbus.NewMethodCallMessage("", "", "", "")
882+ err := x.AppendArgs(tc.retvals[0]...)
883+ if err != nil {
884+ return err
885+ }
886+ err = x.Args(rvs...)
887+ if err != nil {
888+ return err
889+ }
890+ }
891+ return nil
892 } else {
893- return nil, errors.New("no way")
894+ return errors.New("no way")
895 }
896 }
897
898 // See Endpoint's GetProperty. This one is just another name for Call.
899 func (tc *testingEndpoint) GetProperty(property string) (interface{}, error) {
900- rvs, err := tc.Call(property)
901+ var res interface{}
902+ err := tc.Call(property, bus.Args(), &res)
903 if err != nil {
904 return nil, err
905 }
906- if len(rvs) != 1 {
907- return nil, errors.New("Wrong number of values given to testingEndpoint" +
908- " -- GetProperty only returns a single value for now!")
909- }
910- return rvs[0], err
911+ return res, err
912 }
913
914 // See Endpoint's Dial. This one will check its dialCondition to
915
916=== modified file 'bus/testing/testing_endpoint_test.go'
917--- bus/testing/testing_endpoint_test.go 2014-02-05 02:13:35 +0000
918+++ bus/testing/testing_endpoint_test.go 2014-04-04 14:39:41 +0000
919@@ -18,6 +18,7 @@
920
921 import (
922 . "launchpad.net/gocheck"
923+ "launchpad.net/ubuntu-push/bus"
924 "launchpad.net/ubuntu-push/testing/condition"
925 "testing"
926 "time"
927@@ -35,26 +36,26 @@
928 func (s *TestingEndpointSuite) TestCallReturnsFirstRetval(c *C) {
929 var m, n uint32 = 42, 17
930 endp := NewTestingEndpoint(nil, condition.Work(true), m, n)
931- vs, e := endp.Call("what")
932+ var r uint32
933+ e := endp.Call("what", bus.Args(), &r)
934 c.Check(e, IsNil)
935- c.Check(vs, HasLen, 1)
936- c.Check(vs[0], Equals, m)
937+ c.Check(r, Equals, m)
938 }
939
940 // Test the same Call() but with multi-valued endpoint
941 func (s *TestingEndpointSuite) TestMultiValuedCall(c *C) {
942 var m, n uint32 = 42, 17
943 endp := NewMultiValuedTestingEndpoint(nil, condition.Work(true), []interface{}{m}, []interface{}{n})
944- vs, e := endp.Call("what")
945+ var r uint32
946+ e := endp.Call("what", bus.Args(), &r)
947 c.Check(e, IsNil)
948- c.Check(vs, HasLen, 1)
949- c.Check(vs[0], Equals, m)
950+ c.Check(r, Equals, m)
951 }
952
953 // Test that Call() with a negative condition returns an error.
954 func (s *TestingEndpointSuite) TestCallFails(c *C) {
955 endp := NewTestingEndpoint(nil, condition.Work(false))
956- _, e := endp.Call("what")
957+ e := endp.Call("what", bus.Args())
958 c.Check(e, NotNil)
959 }
960
961@@ -62,13 +63,14 @@
962 // a helpful message.
963 func (s *TestingEndpointSuite) TestCallPanicsWithNiceMessage(c *C) {
964 endp := NewTestingEndpoint(nil, condition.Work(true))
965- c.Check(func() { endp.Call("") }, PanicMatches, "No return values provided.*")
966+ var x int32
967+ c.Check(func() { endp.Call("", bus.Args(), &x) }, PanicMatches, "No return values provided.*")
968 }
969
970 // Test that Call() updates callArgs
971 func (s *TestingEndpointSuite) TestCallArgs(c *C) {
972- endp := NewTestingEndpoint(nil, condition.Work(true), 0)
973- _, err := endp.Call("what", "is", "this", "thing")
974+ endp := NewTestingEndpoint(nil, condition.Work(true))
975+ err := endp.Call("what", bus.Args("is", "this", "thing"))
976 c.Assert(err, IsNil)
977 c.Check(GetCallArgs(endp), DeepEquals,
978 []callArgs{{"what", []interface{}{"is", "this", "thing"}}})
979
980=== modified file 'bus/urldispatcher/urldispatcher.go'
981--- bus/urldispatcher/urldispatcher.go 2014-01-23 00:54:51 +0000
982+++ bus/urldispatcher/urldispatcher.go 2014-04-04 14:39:41 +0000
983@@ -49,7 +49,7 @@
984
985 func (ud *urlDispatcher) DispatchURL(url string) error {
986 ud.log.Debugf("Dispatching %s", url)
987- _, err := ud.endp.Call("DispatchURL", url)
988+ err := ud.endp.Call("DispatchURL", bus.Args(url))
989 if err != nil {
990 ud.log.Errorf("Dispatch to %s failed with %s", url, err)
991 }
992
993=== modified file 'client/client.go'
994--- client/client.go 2014-03-26 16:26:36 +0000
995+++ client/client.go 2014-04-04 14:39:41 +0000
996@@ -20,13 +20,19 @@
997
998 import (
999 "encoding/pem"
1000+ "errors"
1001 "fmt"
1002 "io/ioutil"
1003+ "os"
1004+ "strings"
1005+
1006 "launchpad.net/go-dbus/v1"
1007+
1008 "launchpad.net/ubuntu-push/bus"
1009 "launchpad.net/ubuntu-push/bus/connectivity"
1010 "launchpad.net/ubuntu-push/bus/networkmanager"
1011 "launchpad.net/ubuntu-push/bus/notifications"
1012+ "launchpad.net/ubuntu-push/bus/systemimage"
1013 "launchpad.net/ubuntu-push/bus/urldispatcher"
1014 "launchpad.net/ubuntu-push/client/session"
1015 "launchpad.net/ubuntu-push/client/session/levelmap"
1016@@ -34,7 +40,6 @@
1017 "launchpad.net/ubuntu-push/logger"
1018 "launchpad.net/ubuntu-push/util"
1019 "launchpad.net/ubuntu-push/whoopsie/identifier"
1020- "os"
1021 )
1022
1023 // ClientConfig holds the client configuration
1024@@ -42,8 +47,13 @@
1025 connectivity.ConnectivityConfig // q.v.
1026 // A reasonably large timeout for receive/answer pairs
1027 ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"`
1028- // The server to connect to
1029- Addr config.ConfigHostPort
1030+ // A timeout to use when trying to connect to the server
1031+ ConnectTimeout config.ConfigTimeDuration `json:"connect_timeout"`
1032+ // The server to connect to or url to query for hosts to connect to
1033+ Addr string
1034+ // Host list management
1035+ HostsCachingExpiryTime config.ConfigTimeDuration `json:"hosts_cache_expiry"` // potentially refresh host list after
1036+ ExpectAllRepairedTime config.ConfigTimeDuration `json:"expect_all_repaired"` // worth retrying all servers after
1037 // The PEM-encoded server certificate
1038 CertPEMFile string `json:"cert_pem_file"`
1039 // The logging level (one of "debug", "info", "error")
1040@@ -62,6 +72,8 @@
1041 notificationsEndp bus.Endpoint
1042 urlDispatcherEndp bus.Endpoint
1043 connectivityEndp bus.Endpoint
1044+ systemImageEndp bus.Endpoint
1045+ systemImageInfo *systemimage.InfoResult
1046 connCh chan bool
1047 hasConnectivity bool
1048 actionsCh <-chan notifications.RawActionReply
1049@@ -89,6 +101,12 @@
1050 if err != nil {
1051 return fmt.Errorf("reading config: %v", err)
1052 }
1053+ // ignore spaces
1054+ client.config.Addr = strings.Replace(client.config.Addr, " ", "", -1)
1055+ if client.config.Addr == "" {
1056+ return errors.New("no hosts specified")
1057+ }
1058+
1059 // later, we'll be specifying more logging options in the config file
1060 client.log = logger.NewSimpleLogger(os.Stderr, client.config.LogLevel)
1061
1062@@ -97,6 +115,7 @@
1063 client.notificationsEndp = bus.SessionBus.Endpoint(notifications.BusAddress, client.log)
1064 client.urlDispatcherEndp = bus.SessionBus.Endpoint(urldispatcher.BusAddress, client.log)
1065 client.connectivityEndp = bus.SystemBus.Endpoint(networkmanager.BusAddress, client.log)
1066+ client.systemImageEndp = bus.SystemBus.Endpoint(systemimage.BusAddress, client.log)
1067
1068 client.connCh = make(chan bool, 1)
1069 client.sessionConnectedCh = make(chan uint32, 1)
1070@@ -116,6 +135,18 @@
1071 return nil
1072 }
1073
1074+// deriveSessionConfig dervies the session configuration from the client configuration bits.
1075+func (client *PushClient) deriveSessionConfig(info map[string]interface{}) session.ClientSessionConfig {
1076+ return session.ClientSessionConfig{
1077+ ConnectTimeout: client.config.ConnectTimeout.TimeDuration(),
1078+ ExchangeTimeout: client.config.ExchangeTimeout.TimeDuration(),
1079+ HostsCachingExpiryTime: client.config.HostsCachingExpiryTime.TimeDuration(),
1080+ ExpectAllRepairedTime: client.config.ExpectAllRepairedTime.TimeDuration(),
1081+ PEM: client.pem,
1082+ Info: info,
1083+ }
1084+}
1085+
1086 // getDeviceId gets the whoopsie identifier for the device
1087 func (client *PushClient) getDeviceId() error {
1088 err := client.idder.Generate()
1089@@ -133,8 +164,17 @@
1090 iniCh := make(chan uint32)
1091 go func() { iniCh <- util.NewAutoRedialer(client.notificationsEndp).Redial() }()
1092 go func() { iniCh <- util.NewAutoRedialer(client.urlDispatcherEndp).Redial() }()
1093- <-iniCh
1094- <-iniCh
1095+ go func() { iniCh <- util.NewAutoRedialer(client.systemImageEndp).Redial() }()
1096+ <-iniCh
1097+ <-iniCh
1098+ <-iniCh
1099+
1100+ sysimg := systemimage.New(client.systemImageEndp, client.log)
1101+ info, err := sysimg.Info()
1102+ if err != nil {
1103+ return err
1104+ }
1105+ client.systemImageInfo = info
1106
1107 actionsCh, err := notifications.Raw(client.notificationsEndp, client.log).WatchActions()
1108 client.actionsCh = actionsCh
1109@@ -143,8 +183,13 @@
1110
1111 // initSession creates the session object
1112 func (client *PushClient) initSession() error {
1113- sess, err := session.NewSession(string(client.config.Addr), client.pem,
1114- client.config.ExchangeTimeout.Duration, client.deviceId,
1115+ info := map[string]interface{}{
1116+ "device": client.systemImageInfo.Device,
1117+ "channel": client.systemImageInfo.Channel,
1118+ "build_number": client.systemImageInfo.BuildNumber,
1119+ }
1120+ sess, err := session.NewSession(client.config.Addr,
1121+ client.deriveSessionConfig(info), client.deviceId,
1122 client.levelMapFactory, client.log)
1123 if err != nil {
1124 return err
1125@@ -185,8 +230,54 @@
1126 }
1127 }
1128
1129+// filterNotification finds out if the notification is about an actual
1130+// upgrade for the device. It expects msg.Decoded entries to look
1131+// like:
1132+//
1133+// {
1134+// "IMAGE-CHANNEL/DEVICE-MODEL": [BUILD-NUMBER, CHANNEL-ALIAS]
1135+// ...
1136+// }
1137+func (client *PushClient) filterNotification(msg *session.Notification) bool {
1138+ n := len(msg.Decoded)
1139+ if n == 0 {
1140+ return false
1141+ }
1142+ // they are all for us, consider last
1143+ last := msg.Decoded[n-1]
1144+ tag := fmt.Sprintf("%s/%s", client.systemImageInfo.Channel, client.systemImageInfo.Device)
1145+ entry, ok := last[tag]
1146+ if !ok {
1147+ return false
1148+ }
1149+ pair, ok := entry.([]interface{})
1150+ if !ok {
1151+ return false
1152+ }
1153+ if len(pair) < 1 {
1154+ return false
1155+ }
1156+ buildNumber, ok := pair[0].(float64)
1157+ if !ok {
1158+ return false
1159+ }
1160+ curBuildNumber := float64(client.systemImageInfo.BuildNumber)
1161+ if buildNumber > curBuildNumber {
1162+ return true
1163+ }
1164+ // xxx we should really compare channel_target and alias here
1165+ // going backward by a margin, assume switch of target
1166+ if buildNumber < curBuildNumber && (curBuildNumber-buildNumber) > 10 {
1167+ return true
1168+ }
1169+ return false
1170+}
1171+
1172 // handleNotification deals with receiving a notification
1173 func (client *PushClient) handleNotification(msg *session.Notification) error {
1174+ if !client.filterNotification(msg) {
1175+ return nil
1176+ }
1177 action_id := "dummy_id"
1178 a := []string{action_id, "Go get it!"} // action value not visible on the phone
1179 h := map[string]*dbus.Variant{"x-canonical-switch-to-application": &dbus.Variant{true}}
1180@@ -260,7 +351,7 @@
1181 return client.doStart(
1182 client.configure,
1183 client.getDeviceId,
1184+ client.takeTheBus,
1185 client.initSession,
1186- client.takeTheBus,
1187 )
1188 }
1189
1190=== modified file 'client/client_test.go'
1191--- client/client_test.go 2014-03-26 16:26:36 +0000
1192+++ client/client_test.go 2014-04-04 14:39:41 +0000
1193@@ -17,13 +17,23 @@
1194 package client
1195
1196 import (
1197+ "encoding/json"
1198 "errors"
1199 "fmt"
1200 "io/ioutil"
1201+ "net/http"
1202+ "net/http/httptest"
1203+ "path/filepath"
1204+ "reflect"
1205+ "testing"
1206+ "time"
1207+
1208 . "launchpad.net/gocheck"
1209+
1210 "launchpad.net/ubuntu-push/bus"
1211 "launchpad.net/ubuntu-push/bus/networkmanager"
1212 "launchpad.net/ubuntu-push/bus/notifications"
1213+ "launchpad.net/ubuntu-push/bus/systemimage"
1214 testibus "launchpad.net/ubuntu-push/bus/testing"
1215 "launchpad.net/ubuntu-push/client/session"
1216 "launchpad.net/ubuntu-push/client/session/levelmap"
1217@@ -32,11 +42,6 @@
1218 "launchpad.net/ubuntu-push/util"
1219 "launchpad.net/ubuntu-push/whoopsie/identifier"
1220 idtesting "launchpad.net/ubuntu-push/whoopsie/identifier/testing"
1221- "net/http"
1222- "net/http/httptest"
1223- "path/filepath"
1224- "testing"
1225- "time"
1226 )
1227
1228 func TestClient(t *testing.T) { TestingT(t) }
1229@@ -83,22 +88,37 @@
1230 cs.timeouts = nil
1231 }
1232
1233+func (cs *clientSuite) writeTestConfig(overrides map[string]interface{}) {
1234+ pem_file := helpers.SourceRelative("../server/acceptance/ssl/testing.cert")
1235+ cfgMap := map[string]interface{}{
1236+ "connect_timeout": "7ms",
1237+ "exchange_timeout": "10ms",
1238+ "hosts_cache_expiry": "1h",
1239+ "expect_all_repaired": "30m",
1240+ "stabilizing_timeout": "0ms",
1241+ "connectivity_check_url": "",
1242+ "connectivity_check_md5": "",
1243+ "addr": ":0",
1244+ "cert_pem_file": pem_file,
1245+ "recheck_timeout": "3h",
1246+ "log_level": "debug",
1247+ }
1248+ for k, v := range overrides {
1249+ cfgMap[k] = v
1250+ }
1251+ cfgBlob, err := json.Marshal(cfgMap)
1252+ if err != nil {
1253+ panic(err)
1254+ }
1255+ ioutil.WriteFile(cs.configPath, cfgBlob, 0600)
1256+}
1257+
1258 func (cs *clientSuite) SetUpTest(c *C) {
1259 cs.log = helpers.NewTestLogger(c, "debug")
1260 dir := c.MkDir()
1261 cs.configPath = filepath.Join(dir, "config")
1262- cfg := fmt.Sprintf(`
1263-{
1264- "exchange_timeout": "10ms",
1265- "stabilizing_timeout": "0ms",
1266- "connectivity_check_url": "",
1267- "connectivity_check_md5": "",
1268- "addr": ":0",
1269- "cert_pem_file": %#v,
1270- "recheck_timeout": "3h",
1271- "log_level": "debug"
1272-}`, helpers.SourceRelative("../server/acceptance/config/testing.cert"))
1273- ioutil.WriteFile(cs.configPath, []byte(cfg), 0600)
1274+
1275+ cs.writeTestConfig(nil)
1276 }
1277
1278 type sqlientSuite struct{ clientSuite }
1279@@ -119,7 +139,7 @@
1280 err := cli.configure()
1281 c.Assert(err, IsNil)
1282 c.Assert(cli.config, NotNil)
1283- c.Check(cli.config.ExchangeTimeout.Duration, Equals, time.Duration(10*time.Millisecond))
1284+ c.Check(cli.config.ExchangeTimeout.TimeDuration(), Equals, time.Duration(10*time.Millisecond))
1285 }
1286
1287 func (cs *clientSuite) TestConfigureSetsUpLog(c *C) {
1288@@ -179,41 +199,74 @@
1289 }
1290
1291 func (cs *clientSuite) TestConfigureBailsOnBadPEMFilename(c *C) {
1292- ioutil.WriteFile(cs.configPath, []byte(`
1293-{
1294- "exchange_timeout": "10ms",
1295- "stabilizing_timeout": "0ms",
1296- "connectivity_check_url": "",
1297- "connectivity_check_md5": "",
1298- "addr": ":0",
1299- "cert_pem_file": "/a/b/c",
1300- "log_level": "debug",
1301- "recheck_timeout": "3h"
1302-}`), 0600)
1303-
1304+ cs.writeTestConfig(map[string]interface{}{
1305+ "cert_pem_file": "/a/b/c",
1306+ })
1307 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1308 err := cli.configure()
1309 c.Assert(err, ErrorMatches, "reading PEM file: .*")
1310 }
1311
1312 func (cs *clientSuite) TestConfigureBailsOnBadPEM(c *C) {
1313- ioutil.WriteFile(cs.configPath, []byte(`
1314-{
1315- "exchange_timeout": "10ms",
1316- "stabilizing_timeout": "0ms",
1317- "connectivity_check_url": "",
1318- "connectivity_check_md5": "",
1319- "addr": ":0",
1320- "cert_pem_file": "/etc/passwd",
1321- "log_level": "debug",
1322- "recheck_timeout": "3h"
1323-}`), 0600)
1324-
1325+ cs.writeTestConfig(map[string]interface{}{
1326+ "cert_pem_file": "/etc/passwd",
1327+ })
1328 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1329 err := cli.configure()
1330 c.Assert(err, ErrorMatches, "no PEM found.*")
1331 }
1332
1333+func (cs *clientSuite) TestConfigureBailsOnNoHosts(c *C) {
1334+ cs.writeTestConfig(map[string]interface{}{
1335+ "addr": " ",
1336+ })
1337+ cli := NewPushClient(cs.configPath, cs.leveldbPath)
1338+ err := cli.configure()
1339+ c.Assert(err, ErrorMatches, "no hosts specified")
1340+}
1341+
1342+func (cs *clientSuite) TestConfigureRemovesBlanksInAddr(c *C) {
1343+ cs.writeTestConfig(map[string]interface{}{
1344+ "addr": " foo: 443",
1345+ })
1346+ cli := NewPushClient(cs.configPath, cs.leveldbPath)
1347+ err := cli.configure()
1348+ c.Assert(err, IsNil)
1349+ c.Check(cli.config.Addr, Equals, "foo:443")
1350+}
1351+
1352+/*****************************************************************
1353+ deriveSessionConfig tests
1354+******************************************************************/
1355+
1356+func (cs *clientSuite) TestDeriveSessionConfig(c *C) {
1357+ info := map[string]interface{}{
1358+ "foo": 1,
1359+ }
1360+ cli := NewPushClient(cs.configPath, cs.leveldbPath)
1361+ err := cli.configure()
1362+ c.Assert(err, IsNil)
1363+ expected := session.ClientSessionConfig{
1364+ ConnectTimeout: 7 * time.Millisecond,
1365+ ExchangeTimeout: 10 * time.Millisecond,
1366+ HostsCachingExpiryTime: 1 * time.Hour,
1367+ ExpectAllRepairedTime: 30 * time.Minute,
1368+ PEM: cli.pem,
1369+ Info: info,
1370+ }
1371+ // sanity check that we are looking at all fields
1372+ vExpected := reflect.ValueOf(expected)
1373+ nf := vExpected.NumField()
1374+ for i := 0; i < nf; i++ {
1375+ fv := vExpected.Field(i)
1376+ // field isn't empty/zero
1377+ c.Assert(fv.Interface(), Not(DeepEquals), reflect.Zero(fv.Type()).Interface(), Commentf("forgot about: %s", vExpected.Type().Field(i).Name))
1378+ }
1379+ // finally compare
1380+ conf := cli.deriveSessionConfig(info)
1381+ c.Check(conf, DeepEquals, expected)
1382+}
1383+
1384 /*****************************************************************
1385 getDeviceId tests
1386 ******************************************************************/
1387@@ -254,6 +307,8 @@
1388 cEndp := testibus.NewTestingEndpoint(cCond, condition.Work(true),
1389 uint32(networkmanager.ConnectedGlobal),
1390 )
1391+ siCond := condition.Fail2Work(2)
1392+ siEndp := testibus.NewMultiValuedTestingEndpoint(siCond, condition.Work(true), []interface{}{int32(101), "mako", "daily", "Unknown", map[string]string{}})
1393 testibus.SetWatchTicker(cEndp, make(chan bool))
1394 // ok, create the thing
1395 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1396@@ -269,6 +324,7 @@
1397 cli.notificationsEndp = nEndp
1398 cli.urlDispatcherEndp = uEndp
1399 cli.connectivityEndp = cEndp
1400+ cli.systemImageEndp = siEndp
1401
1402 c.Assert(cli.takeTheBus(), IsNil)
1403 // the notifications and urldispatcher endpoints retried until connected
1404@@ -280,6 +336,8 @@
1405 c.Check(takeNextBool(cli.connCh), Equals, true)
1406 // the connectivity endpoint retried until connected
1407 c.Check(cCond.OK(), Equals, true)
1408+ // the systemimage endpoint retried until connected
1409+ c.Check(siCond.OK(), Equals, true)
1410 }
1411
1412 // takeTheBus can, in fact, fail
1413@@ -295,6 +353,7 @@
1414 cli.notificationsEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
1415 cli.urlDispatcherEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
1416 cli.connectivityEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
1417+ cli.systemImageEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
1418
1419 c.Check(cli.takeTheBus(), NotNil)
1420 c.Check(cli.actionsCh, IsNil)
1421@@ -307,7 +366,9 @@
1422 func (cs *clientSuite) TestHandleErr(c *C) {
1423 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1424 cli.log = cs.log
1425+ cli.systemImageInfo = siInfoRes
1426 c.Assert(cli.initSession(), IsNil)
1427+ cs.log.ResetCapture()
1428 cli.hasConnectivity = true
1429 cli.handleErr(errors.New("bananas"))
1430 c.Check(cs.log.Captured(), Matches, ".*session exited.*bananas\n")
1431@@ -338,6 +399,7 @@
1432 func (cs *clientSuite) TestHandleConnStateD2C(c *C) {
1433 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1434 cli.log = cs.log
1435+ cli.systemImageInfo = siInfoRes
1436 c.Assert(cli.initSession(), IsNil)
1437
1438 c.Assert(cli.hasConnectivity, Equals, false)
1439@@ -363,7 +425,7 @@
1440 func (cs *clientSuite) TestHandleConnStateC2D(c *C) {
1441 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1442 cli.log = cs.log
1443- cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, levelmap.NewLevelMap, cs.log)
1444+ cli.session, _ = session.NewSession(cli.config.Addr, cli.deriveSessionConfig(nil), cli.deviceId, levelmap.NewLevelMap, cs.log)
1445 cli.session.Dial()
1446 cli.hasConnectivity = true
1447
1448@@ -376,7 +438,7 @@
1449 func (cs *clientSuite) TestHandleConnStateC2DPending(c *C) {
1450 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1451 cli.log = cs.log
1452- cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, levelmap.NewLevelMap, cs.log)
1453+ cli.session, _ = session.NewSession(cli.config.Addr, cli.deriveSessionConfig(nil), cli.deviceId, levelmap.NewLevelMap, cs.log)
1454 cli.hasConnectivity = true
1455
1456 cli.handleConnState(false)
1457@@ -384,15 +446,110 @@
1458 }
1459
1460 /*****************************************************************
1461+ filterNotification tests
1462+******************************************************************/
1463+
1464+var siInfoRes = &systemimage.InfoResult{
1465+ Device: "mako",
1466+ Channel: "daily",
1467+ BuildNumber: 102,
1468+ LastUpdate: "Unknown",
1469+}
1470+
1471+func (cs *clientSuite) TestFilterNotification(c *C) {
1472+ cli := NewPushClient(cs.configPath, cs.leveldbPath)
1473+ cli.systemImageInfo = siInfoRes
1474+ // empty
1475+ msg := &session.Notification{}
1476+ c.Check(cli.filterNotification(msg), Equals, false)
1477+ // same build number
1478+ msg = &session.Notification{
1479+ Decoded: []map[string]interface{}{
1480+ map[string]interface{}{
1481+ "daily/mako": []interface{}{float64(102), "tubular"},
1482+ },
1483+ },
1484+ }
1485+ c.Check(cli.filterNotification(msg), Equals, false)
1486+ // higher build number and pick last
1487+ msg = &session.Notification{
1488+ Decoded: []map[string]interface{}{
1489+ map[string]interface{}{
1490+ "daily/mako": []interface{}{float64(102), "tubular"},
1491+ },
1492+ map[string]interface{}{
1493+ "daily/mako": []interface{}{float64(103), "tubular"},
1494+ },
1495+ },
1496+ }
1497+ c.Check(cli.filterNotification(msg), Equals, true)
1498+ // going backward by a margin, assume switch of alias
1499+ msg = &session.Notification{
1500+ Decoded: []map[string]interface{}{
1501+ map[string]interface{}{
1502+ "daily/mako": []interface{}{float64(102), "tubular"},
1503+ },
1504+ map[string]interface{}{
1505+ "daily/mako": []interface{}{float64(2), "urban"},
1506+ },
1507+ },
1508+ }
1509+ c.Check(cli.filterNotification(msg), Equals, true)
1510+}
1511+
1512+func (cs *clientSuite) TestFilterNotificationRobust(c *C) {
1513+ cli := NewPushClient(cs.configPath, cs.leveldbPath)
1514+ cli.systemImageInfo = siInfoRes
1515+ msg := &session.Notification{
1516+ Decoded: []map[string]interface{}{
1517+ map[string]interface{}{},
1518+ },
1519+ }
1520+ c.Check(cli.filterNotification(msg), Equals, false)
1521+ for _, broken := range []interface{}{
1522+ 5,
1523+ []interface{}{},
1524+ []interface{}{55},
1525+ } {
1526+ msg := &session.Notification{
1527+ Decoded: []map[string]interface{}{
1528+ map[string]interface{}{
1529+ "daily/mako": broken,
1530+ },
1531+ },
1532+ }
1533+ c.Check(cli.filterNotification(msg), Equals, false)
1534+ }
1535+}
1536+
1537+/*****************************************************************
1538 handleNotification tests
1539 ******************************************************************/
1540
1541+var (
1542+ positiveNotification = &session.Notification{
1543+ Decoded: []map[string]interface{}{
1544+ map[string]interface{}{
1545+ "daily/mako": []interface{}{float64(103), "tubular"},
1546+ },
1547+ },
1548+ }
1549+ negativeNotification = &session.Notification{
1550+ Decoded: []map[string]interface{}{
1551+ map[string]interface{}{
1552+ "daily/mako": []interface{}{float64(102), "tubular"},
1553+ },
1554+ },
1555+ }
1556+)
1557+
1558 func (cs *clientSuite) TestHandleNotification(c *C) {
1559 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1560+ cli.systemImageInfo = siInfoRes
1561 endp := testibus.NewTestingEndpoint(nil, condition.Work(true), uint32(1))
1562 cli.notificationsEndp = endp
1563 cli.log = cs.log
1564- c.Check(cli.handleNotification(nil), IsNil)
1565+ c.Check(cli.handleNotification(positiveNotification), IsNil)
1566 // check we sent the notification
1567 args := testibus.GetCallArgs(endp)
1568 c.Assert(args, HasLen, 1)
1569@@ -400,12 +557,26 @@
1570 c.Check(cs.log.Captured(), Matches, `.* got notification id \d+\s*`)
1571 }
1572
1573+func (cs *clientSuite) TestHandleNotificationNothingToDo(c *C) {
1574+ cli := NewPushClient(cs.configPath, cs.leveldbPath)
1575+ cli.systemImageInfo = siInfoRes
1576+ endp := testibus.NewTestingEndpoint(nil, condition.Work(true), uint32(1))
1577+ cli.notificationsEndp = endp
1578+ cli.log = cs.log
1579+ c.Check(cli.handleNotification(negativeNotification), IsNil)
1580+ // check we sent the notification
1581+ args := testibus.GetCallArgs(endp)
1582+ c.Assert(args, HasLen, 0)
1583+ c.Check(cs.log.Captured(), Matches, "")
1584+}
1585+
1586 func (cs *clientSuite) TestHandleNotificationFail(c *C) {
1587 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1588+ cli.systemImageInfo = siInfoRes
1589 cli.log = cs.log
1590 endp := testibus.NewTestingEndpoint(nil, condition.Work(false))
1591 cli.notificationsEndp = endp
1592- c.Check(cli.handleNotification(nil), NotNil)
1593+ c.Check(cli.handleNotification(positiveNotification), NotNil)
1594 }
1595
1596 /*****************************************************************
1597@@ -415,7 +586,7 @@
1598 func (cs *clientSuite) TestHandleClick(c *C) {
1599 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1600 cli.log = cs.log
1601- endp := testibus.NewTestingEndpoint(nil, condition.Work(true), nil)
1602+ endp := testibus.NewTestingEndpoint(nil, condition.Work(true))
1603 cli.urlDispatcherEndp = endp
1604 c.Check(cli.handleClick(), IsNil)
1605 // check we sent the notification
1606@@ -432,6 +603,7 @@
1607 func (cs *clientSuite) TestDoLoopConn(c *C) {
1608 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1609 cli.log = cs.log
1610+ cli.systemImageInfo = siInfoRes
1611 cli.connCh = make(chan bool, 1)
1612 cli.connCh <- true
1613 c.Assert(cli.initSession(), IsNil)
1614@@ -444,6 +616,7 @@
1615 func (cs *clientSuite) TestDoLoopClick(c *C) {
1616 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1617 cli.log = cs.log
1618+ cli.systemImageInfo = siInfoRes
1619 c.Assert(cli.initSession(), IsNil)
1620 aCh := make(chan notifications.RawActionReply, 1)
1621 aCh <- notifications.RawActionReply{}
1622@@ -457,6 +630,7 @@
1623 func (cs *clientSuite) TestDoLoopNotif(c *C) {
1624 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1625 cli.log = cs.log
1626+ cli.systemImageInfo = siInfoRes
1627 c.Assert(cli.initSession(), IsNil)
1628 cli.session.MsgCh = make(chan *session.Notification, 1)
1629 cli.session.MsgCh <- &session.Notification{}
1630@@ -469,6 +643,7 @@
1631 func (cs *clientSuite) TestDoLoopErr(c *C) {
1632 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1633 cli.log = cs.log
1634+ cli.systemImageInfo = siInfoRes
1635 c.Assert(cli.initSession(), IsNil)
1636 cli.session.ErrCh = make(chan error, 1)
1637 cli.session.ErrCh <- nil
1638@@ -521,7 +696,7 @@
1639 cli.urlDispatcherEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
1640 cli.connectivityEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(true),
1641 uint32(networkmanager.ConnectedGlobal))
1642-
1643+ cli.systemImageInfo = siInfoRes
1644 c.Assert(cli.initSession(), IsNil)
1645
1646 cli.session.MsgCh = make(chan *session.Notification)
1647@@ -558,7 +733,7 @@
1648 c.Check(cli.hasConnectivity, Equals, false)
1649
1650 // * session.MsgCh to the notifications handler
1651- cli.session.MsgCh <- &session.Notification{}
1652+ cli.session.MsgCh <- positiveNotification
1653 tick()
1654 nargs := testibus.GetCallArgs(cli.notificationsEndp)
1655 c.Check(nargs, HasLen, 1)
1656
1657=== modified file 'client/gethosts/gethost_test.go'
1658--- client/gethosts/gethost_test.go 2014-03-24 15:32:29 +0000
1659+++ client/gethosts/gethost_test.go 2014-04-04 14:39:41 +0000
1660@@ -61,18 +61,16 @@
1661 }
1662
1663 func (s *getHostsSuite) TestGetTimeout(c *C) {
1664- finish := make(chan bool, 1)
1665+ started := make(chan bool, 1)
1666 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1667- <-finish
1668+ started <- true
1669+ time.Sleep(700 * time.Millisecond)
1670 }))
1671 defer func() {
1672- time.Sleep(100 * time.Millisecond) // work around -race issue
1673+ <-started
1674 ts.Close()
1675 }()
1676- defer func() {
1677- finish <- true
1678- }()
1679- gh := New("foobar", ts.URL, 1*time.Second)
1680+ gh := New("foobar", ts.URL, 500*time.Millisecond)
1681 _, err := gh.Get()
1682 c.Check(err, ErrorMatches, ".*closed.*")
1683 }
1684
1685=== modified file 'client/session/session.go'
1686--- client/session/session.go 2014-03-26 16:26:36 +0000
1687+++ client/session/session.go 2014-04-04 14:39:41 +0000
1688@@ -21,22 +21,28 @@
1689 import (
1690 "crypto/tls"
1691 "crypto/x509"
1692+ "encoding/json"
1693 "errors"
1694 "fmt"
1695+ "math/rand"
1696+ "net"
1697+ "strings"
1698+ "sync"
1699+ "sync/atomic"
1700+ "time"
1701+
1702+ "launchpad.net/ubuntu-push/client/gethosts"
1703 "launchpad.net/ubuntu-push/client/session/levelmap"
1704 "launchpad.net/ubuntu-push/logger"
1705 "launchpad.net/ubuntu-push/protocol"
1706 "launchpad.net/ubuntu-push/util"
1707- "math/rand"
1708- "net"
1709- "sync/atomic"
1710- "time"
1711 )
1712
1713 var wireVersionBytes = []byte{protocol.ProtocolWireVersion}
1714
1715 type Notification struct {
1716 TopLevel int64
1717+ Decoded []map[string]interface{}
1718 }
1719
1720 type serverMsg struct {
1721@@ -45,6 +51,15 @@
1722 protocol.NotificationsMsg
1723 }
1724
1725+// parseServerAddrSpec recognizes whether spec is a HTTP URL to get
1726+// hosts from or a |-separated list of host:port pairs.
1727+func parseServerAddrSpec(spec string) (hostsEndpoint string, fallbackHosts []string) {
1728+ if strings.HasPrefix(spec, "http") {
1729+ return spec, nil
1730+ }
1731+ return "", strings.Split(spec, "|")
1732+}
1733+
1734 // ClientSessionState is a way to broadly track the progress of the session
1735 type ClientSessionState uint32
1736
1737@@ -56,15 +71,39 @@
1738 Running
1739 )
1740
1741-// ClienSession holds a client<->server session and its configuration.
1742+type hostGetter interface {
1743+ Get() ([]string, error)
1744+}
1745+
1746+// ClientSessionConfig groups the client session configuration.
1747+type ClientSessionConfig struct {
1748+ ConnectTimeout time.Duration
1749+ ExchangeTimeout time.Duration
1750+ HostsCachingExpiryTime time.Duration
1751+ ExpectAllRepairedTime time.Duration
1752+ PEM []byte
1753+ Info map[string]interface{}
1754+}
1755+
1756+// ClientSession holds a client<->server session and its configuration.
1757 type ClientSession struct {
1758 // configuration
1759- DeviceId string
1760- ServerAddr string
1761- ExchangeTimeout time.Duration
1762- Levels levelmap.LevelMap
1763- Protocolator func(net.Conn) protocol.Protocol
1764+ DeviceId string
1765+ ClientSessionConfig
1766+ Levels levelmap.LevelMap
1767+ Protocolator func(net.Conn) protocol.Protocol
1768+ // hosts
1769+ getHost hostGetter
1770+ fallbackHosts []string
1771+ deliveryHostsTimestamp time.Time
1772+ deliveryHosts []string
1773+ lastAttemptTimestamp time.Time
1774+ leftToTry int
1775+ tryHost int
1776+ // hook for testing
1777+ timeSince func(time.Time) time.Duration
1778 // connection
1779+ connLock sync.RWMutex
1780 Connection net.Conn
1781 Log logger.Logger
1782 TLS *tls.Config
1783@@ -77,7 +116,7 @@
1784 MsgCh chan *Notification
1785 }
1786
1787-func NewSession(serverAddr string, pem []byte, exchangeTimeout time.Duration,
1788+func NewSession(serverAddrSpec string, conf ClientSessionConfig,
1789 deviceId string, levelmapFactory func() (levelmap.LevelMap, error),
1790 log logger.Logger) (*ClientSession, error) {
1791 state := uint32(Disconnected)
1792@@ -85,19 +124,27 @@
1793 if err != nil {
1794 return nil, err
1795 }
1796+ var getHost hostGetter
1797+ log.Infof("using addr: %v", serverAddrSpec)
1798+ hostsEndpoint, fallbackHosts := parseServerAddrSpec(serverAddrSpec)
1799+ if hostsEndpoint != "" {
1800+ getHost = gethosts.New(deviceId, hostsEndpoint, conf.ExchangeTimeout)
1801+ }
1802 sess := &ClientSession{
1803- ExchangeTimeout: exchangeTimeout,
1804- ServerAddr: serverAddr,
1805- DeviceId: deviceId,
1806- Log: log,
1807- Protocolator: protocol.NewProtocol0,
1808- Levels: levels,
1809- TLS: &tls.Config{InsecureSkipVerify: true}, // XXX
1810- stateP: &state,
1811+ ClientSessionConfig: conf,
1812+ getHost: getHost,
1813+ fallbackHosts: fallbackHosts,
1814+ DeviceId: deviceId,
1815+ Log: log,
1816+ Protocolator: protocol.NewProtocol0,
1817+ Levels: levels,
1818+ TLS: &tls.Config{InsecureSkipVerify: true}, // XXX
1819+ stateP: &state,
1820+ timeSince: time.Since,
1821 }
1822- if pem != nil {
1823+ if sess.PEM != nil {
1824 cp := x509.NewCertPool()
1825- ok := cp.AppendCertsFromPEM(pem)
1826+ ok := cp.AppendCertsFromPEM(sess.PEM)
1827 if !ok {
1828 return nil, errors.New("could not parse certificate")
1829 }
1830@@ -114,15 +161,90 @@
1831 atomic.StoreUint32(sess.stateP, uint32(state))
1832 }
1833
1834+func (sess *ClientSession) setConnection(conn net.Conn) {
1835+ sess.connLock.Lock()
1836+ defer sess.connLock.Unlock()
1837+ sess.Connection = conn
1838+}
1839+
1840+func (sess *ClientSession) getConnection() net.Conn {
1841+ sess.connLock.RLock()
1842+ defer sess.connLock.RUnlock()
1843+ return sess.Connection
1844+}
1845+
1846+// getHosts sets deliveryHosts possibly querying a remote endpoint
1847+func (sess *ClientSession) getHosts() error {
1848+ if sess.getHost != nil {
1849+ if sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime {
1850+ return nil
1851+ }
1852+ hosts, err := sess.getHost.Get()
1853+ if err != nil {
1854+ sess.Log.Errorf("getHosts: %v", err)
1855+ sess.setState(Error)
1856+ return err
1857+ }
1858+ sess.deliveryHostsTimestamp = time.Now()
1859+ sess.deliveryHosts = hosts
1860+ } else {
1861+ sess.deliveryHosts = sess.fallbackHosts
1862+ }
1863+ return nil
1864+}
1865+
1866+// startConnectionAttempt/nextHostToTry help connect iterating over candidate hosts
1867+
1868+func (sess *ClientSession) startConnectionAttempt() {
1869+ if sess.timeSince(sess.lastAttemptTimestamp) > sess.ExpectAllRepairedTime {
1870+ sess.tryHost = 0
1871+ }
1872+ sess.leftToTry = len(sess.deliveryHosts)
1873+ if sess.leftToTry == 0 {
1874+ panic("should have got hosts from config or remote at this point")
1875+ }
1876+ sess.lastAttemptTimestamp = time.Now()
1877+}
1878+
1879+func (sess *ClientSession) nextHostToTry() string {
1880+ if sess.leftToTry == 0 {
1881+ return ""
1882+ }
1883+ res := sess.deliveryHosts[sess.tryHost]
1884+ sess.tryHost = (sess.tryHost + 1) % len(sess.deliveryHosts)
1885+ sess.leftToTry--
1886+ return res
1887+}
1888+
1889+// we reached the Started state, we can retry with the same host if we
1890+// have to retry again
1891+func (sess *ClientSession) started() {
1892+ sess.tryHost--
1893+ if sess.tryHost == -1 {
1894+ sess.tryHost = len(sess.deliveryHosts) - 1
1895+ }
1896+ sess.setState(Started)
1897+}
1898+
1899 // connect to a server using the configuration in the ClientSession
1900 // and set up the connection.
1901 func (sess *ClientSession) connect() error {
1902- conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout)
1903- if err != nil {
1904- sess.setState(Error)
1905- return fmt.Errorf("connect: %s", err)
1906+ sess.startConnectionAttempt()
1907+ var err error
1908+ var conn net.Conn
1909+ for {
1910+ host := sess.nextHostToTry()
1911+ if host == "" {
1912+ sess.setState(Error)
1913+ return fmt.Errorf("connect: %s", err)
1914+ }
1915+ sess.Log.Debugf("trying to connect to: %v", host)
1916+ conn, err = net.DialTimeout("tcp", host, sess.ConnectTimeout)
1917+ if err == nil {
1918+ break
1919+ }
1920 }
1921- sess.Connection = tls.Client(conn, sess.TLS)
1922+ sess.setConnection(tls.Client(conn, sess.TLS))
1923 sess.setState(Connected)
1924 return nil
1925 }
1926@@ -145,6 +267,8 @@
1927 sess.doClose()
1928 }
1929 func (sess *ClientSession) doClose() {
1930+ sess.connLock.Lock()
1931+ defer sess.connLock.Unlock()
1932 if sess.Connection != nil {
1933 sess.Connection.Close()
1934 // we ignore Close errors, on purpose (the thinking being that
1935@@ -167,6 +291,23 @@
1936 return err
1937 }
1938
1939+func (sess *ClientSession) decodeBroadcast(bcast *serverMsg) *Notification {
1940+ decoded := make([]map[string]interface{}, 0)
1941+ for _, p := range bcast.Payloads {
1942+ var v map[string]interface{}
1943+ err := json.Unmarshal(p, &v)
1944+ if err != nil {
1945+ sess.Log.Debugf("expected map in broadcast: %v", err)
1946+ continue
1947+ }
1948+ decoded = append(decoded, v)
1949+ }
1950+ return &Notification{
1951+ TopLevel: bcast.TopLevel,
1952+ Decoded: decoded,
1953+ }
1954+}
1955+
1956 // handle "broadcast" messages
1957 func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {
1958 err := sess.Levels.Set(bcast.ChanId, bcast.TopLevel)
1959@@ -189,7 +330,7 @@
1960 if bcast.ChanId == protocol.SystemChannelId {
1961 // the system channel id, the only one we care about for now
1962 sess.Log.Debugf("sending it over")
1963- sess.MsgCh <- &Notification{bcast.TopLevel}
1964+ sess.MsgCh <- sess.decodeBroadcast(bcast)
1965 sess.Log.Debugf("sent it over")
1966 } else {
1967 sess.Log.Debugf("what is this weird channel, %#v?", bcast.ChanId)
1968@@ -224,7 +365,7 @@
1969
1970 // Call this when you've connected and want to start looping.
1971 func (sess *ClientSession) start() error {
1972- conn := sess.Connection
1973+ conn := sess.getConnection()
1974 err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
1975 if err != nil {
1976 sess.setState(Error)
1977@@ -253,6 +394,7 @@
1978 // xxx get the SSO Authorization string from the phone
1979 Authorization: "",
1980 Levels: levels,
1981+ Info: sess.Info,
1982 })
1983 if err != nil {
1984 sess.setState(Error)
1985@@ -278,16 +420,20 @@
1986 }
1987 sess.proto = proto
1988 sess.pingInterval = pingInterval
1989- sess.Log.Debugf("Connected %v.", conn.LocalAddr())
1990- sess.setState(Started)
1991+ sess.Log.Debugf("Connected %v.", conn.RemoteAddr())
1992+ sess.started() // deals with choosing which host to retry with as well
1993 return nil
1994 }
1995
1996 // run calls connect, and if it works it calls start, and if it works
1997 // it runs loop in a goroutine, and ships its return value over ErrCh.
1998-func (sess *ClientSession) run(closer func(), connecter, starter, looper func() error) error {
1999+func (sess *ClientSession) run(closer func(), hostGetter, connecter, starter, looper func() error) error {
2000 closer()
2001- err := connecter()
2002+ err := hostGetter()
2003+ if err != nil {
2004+ return err
2005+ }
2006+ err = connecter()
2007 if err == nil {
2008 err = starter()
2009 if err == nil {
2010@@ -317,7 +463,7 @@
2011 // keep on trying.
2012 panic("can't Dial() without a protocol constructor.")
2013 }
2014- return sess.run(sess.doClose, sess.connect, sess.start, sess.loop)
2015+ return sess.run(sess.doClose, sess.getHosts, sess.connect, sess.start, sess.loop)
2016 }
2017
2018 func init() {
2019
2020=== modified file 'client/session/session_test.go'
2021--- client/session/session_test.go 2014-03-27 13:26:10 +0000
2022+++ client/session/session_test.go 2014-04-04 14:39:41 +0000
2023@@ -23,16 +23,21 @@
2024 "fmt"
2025 "io"
2026 "io/ioutil"
2027+ "net"
2028+ "net/http"
2029+ "net/http/httptest"
2030+ "reflect"
2031+ "testing"
2032+ "time"
2033+
2034 . "launchpad.net/gocheck"
2035+
2036 "launchpad.net/ubuntu-push/client/session/levelmap"
2037+ //"launchpad.net/ubuntu-push/client/gethosts"
2038 "launchpad.net/ubuntu-push/logger"
2039 "launchpad.net/ubuntu-push/protocol"
2040 helpers "launchpad.net/ubuntu-push/testing"
2041 "launchpad.net/ubuntu-push/testing/condition"
2042- "net"
2043- "reflect"
2044- "testing"
2045- "time"
2046 )
2047
2048 func TestSession(t *testing.T) { TestingT(t) }
2049@@ -181,23 +186,51 @@
2050 }
2051
2052 /****************************************************************
2053+ parseServerAddrSpec() tests
2054+****************************************************************/
2055+
2056+func (cs *clientSessionSuite) TestParseServerAddrSpec(c *C) {
2057+ hEp, fallbackHosts := parseServerAddrSpec("http://foo/hosts")
2058+ c.Check(hEp, Equals, "http://foo/hosts")
2059+ c.Check(fallbackHosts, IsNil)
2060+
2061+ hEp, fallbackHosts = parseServerAddrSpec("foo:443")
2062+ c.Check(hEp, Equals, "")
2063+ c.Check(fallbackHosts, DeepEquals, []string{"foo:443"})
2064+
2065+ hEp, fallbackHosts = parseServerAddrSpec("foo:443|bar:443")
2066+ c.Check(hEp, Equals, "")
2067+ c.Check(fallbackHosts, DeepEquals, []string{"foo:443", "bar:443"})
2068+}
2069+
2070+/****************************************************************
2071 NewSession() tests
2072 ****************************************************************/
2073
2074+var dummyConf = ClientSessionConfig{}
2075+
2076 func (cs *clientSessionSuite) TestNewSessionPlainWorks(c *C) {
2077- sess, err := NewSession("", nil, 0, "", cs.lvls, cs.log)
2078+ sess, err := NewSession("foo:443", dummyConf, "", cs.lvls, cs.log)
2079 c.Check(sess, NotNil)
2080 c.Check(err, IsNil)
2081+ c.Check(sess.fallbackHosts, DeepEquals, []string{"foo:443"})
2082 // but no root CAs set
2083 c.Check(sess.TLS.RootCAs, IsNil)
2084 c.Check(sess.State(), Equals, Disconnected)
2085 }
2086
2087-var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert")
2088+func (cs *clientSessionSuite) TestNewSessionHostEndpointWorks(c *C) {
2089+ sess, err := NewSession("http://foo/hosts", dummyConf, "wah", cs.lvls, cs.log)
2090+ c.Assert(err, IsNil)
2091+ c.Check(sess.getHost, NotNil)
2092+}
2093+
2094+var certfile string = helpers.SourceRelative("../../server/acceptance/ssl/testing.cert")
2095 var pem, _ = ioutil.ReadFile(certfile)
2096
2097 func (cs *clientSessionSuite) TestNewSessionPEMWorks(c *C) {
2098- sess, err := NewSession("", pem, 0, "wah", cs.lvls, cs.log)
2099+ conf := ClientSessionConfig{PEM: pem}
2100+ sess, err := NewSession("", conf, "wah", cs.lvls, cs.log)
2101 c.Check(sess, NotNil)
2102 c.Assert(err, IsNil)
2103 c.Check(sess.TLS.RootCAs, NotNil)
2104@@ -205,25 +238,172 @@
2105
2106 func (cs *clientSessionSuite) TestNewSessionBadPEMFileContentFails(c *C) {
2107 badpem := []byte("This is not the PEM you're looking for.")
2108- sess, err := NewSession("", badpem, 0, "wah", cs.lvls, cs.log)
2109+ conf := ClientSessionConfig{PEM: badpem}
2110+ sess, err := NewSession("", conf, "wah", cs.lvls, cs.log)
2111 c.Check(sess, IsNil)
2112 c.Check(err, NotNil)
2113 }
2114
2115 func (cs *clientSessionSuite) TestNewSessionBadLevelMapFails(c *C) {
2116 ferr := func() (levelmap.LevelMap, error) { return nil, errors.New("Busted.") }
2117- sess, err := NewSession("", nil, 0, "wah", ferr, cs.log)
2118+ sess, err := NewSession("", dummyConf, "wah", ferr, cs.log)
2119 c.Check(sess, IsNil)
2120 c.Assert(err, NotNil)
2121 }
2122
2123 /****************************************************************
2124+ getHosts() tests
2125+****************************************************************/
2126+
2127+func (cs *clientSessionSuite) TestGetHostsFallback(c *C) {
2128+ fallback := []string{"foo:443", "bar:443"}
2129+ sess := &ClientSession{fallbackHosts: fallback}
2130+ err := sess.getHosts()
2131+ c.Assert(err, IsNil)
2132+ c.Check(sess.deliveryHosts, DeepEquals, fallback)
2133+}
2134+
2135+type testHostGetter struct {
2136+ hosts []string
2137+ err error
2138+}
2139+
2140+func (thg *testHostGetter) Get() ([]string, error) {
2141+ return thg.hosts, thg.err
2142+}
2143+
2144+func (cs *clientSessionSuite) TestGetHostsRemote(c *C) {
2145+ hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil}
2146+ sess := &ClientSession{getHost: hostGetter, timeSince: time.Since}
2147+ err := sess.getHosts()
2148+ c.Assert(err, IsNil)
2149+ c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"})
2150+}
2151+
2152+func (cs *clientSessionSuite) TestGetHostsRemoteError(c *C) {
2153+ sess, err := NewSession("", dummyConf, "", cs.lvls, cs.log)
2154+ c.Assert(err, IsNil)
2155+ hostsErr := errors.New("failed")
2156+ hostGetter := &testHostGetter{nil, hostsErr}
2157+ sess.getHost = hostGetter
2158+ err = sess.getHosts()
2159+ c.Assert(err, Equals, hostsErr)
2160+ c.Check(sess.deliveryHosts, IsNil)
2161+ c.Check(sess.State(), Equals, Error)
2162+}
2163+
2164+func (cs *clientSessionSuite) TestGetHostsRemoteCaching(c *C) {
2165+ hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil}
2166+ sess := &ClientSession{
2167+ getHost: hostGetter,
2168+ ClientSessionConfig: ClientSessionConfig{
2169+ HostsCachingExpiryTime: 2 * time.Hour,
2170+ },
2171+ timeSince: time.Since,
2172+ }
2173+ err := sess.getHosts()
2174+ c.Assert(err, IsNil)
2175+ hostGetter.hosts = []string{"baz:443"}
2176+ // cached
2177+ err = sess.getHosts()
2178+ c.Assert(err, IsNil)
2179+ c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"})
2180+ // expired
2181+ sess.timeSince = func(ts time.Time) time.Duration {
2182+ return 3 * time.Hour
2183+ }
2184+ err = sess.getHosts()
2185+ c.Assert(err, IsNil)
2186+ c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"})
2187+}
2188+
2189+/****************************************************************
2190+ startConnectionAttempt()/nextHostToTry()/started tests
2191+****************************************************************/
2192+
2193+func (cs *clientSessionSuite) TestStartConnectionAttempt(c *C) {
2194+ since := time.Since(time.Time{})
2195+ sess := &ClientSession{
2196+ ClientSessionConfig: ClientSessionConfig{
2197+ ExpectAllRepairedTime: 10 * time.Second,
2198+ },
2199+ timeSince: func(ts time.Time) time.Duration {
2200+ return since
2201+ },
2202+ deliveryHosts: []string{"foo:443", "bar:443"},
2203+ }
2204+ // start from first host
2205+ sess.startConnectionAttempt()
2206+ c.Check(sess.lastAttemptTimestamp, Not(Equals), 0)
2207+ c.Check(sess.tryHost, Equals, 0)
2208+ c.Check(sess.leftToTry, Equals, 2)
2209+ since = 1 * time.Second
2210+ sess.tryHost = 1
2211+ // just continue
2212+ sess.startConnectionAttempt()
2213+ c.Check(sess.tryHost, Equals, 1)
2214+ sess.tryHost = 2
2215+}
2216+
2217+func (cs *clientSessionSuite) TestStartConnectionAttemptNoHostsPanic(c *C) {
2218+ since := time.Since(time.Time{})
2219+ sess := &ClientSession{
2220+ ClientSessionConfig: ClientSessionConfig{
2221+ ExpectAllRepairedTime: 10 * time.Second,
2222+ },
2223+ timeSince: func(ts time.Time) time.Duration {
2224+ return since
2225+ },
2226+ }
2227+ c.Check(sess.startConnectionAttempt, PanicMatches, "should have got hosts from config or remote at this point")
2228+}
2229+
2230+func (cs *clientSessionSuite) TestNextHostToTry(c *C) {
2231+ sess := &ClientSession{
2232+ deliveryHosts: []string{"foo:443", "bar:443", "baz:443"},
2233+ tryHost: 0,
2234+ leftToTry: 3,
2235+ }
2236+ c.Check(sess.nextHostToTry(), Equals, "foo:443")
2237+ c.Check(sess.nextHostToTry(), Equals, "bar:443")
2238+ c.Check(sess.nextHostToTry(), Equals, "baz:443")
2239+ c.Check(sess.nextHostToTry(), Equals, "")
2240+ c.Check(sess.nextHostToTry(), Equals, "")
2241+ c.Check(sess.tryHost, Equals, 0)
2242+
2243+ sess.leftToTry = 3
2244+ sess.tryHost = 1
2245+ c.Check(sess.nextHostToTry(), Equals, "bar:443")
2246+ c.Check(sess.nextHostToTry(), Equals, "baz:443")
2247+ c.Check(sess.nextHostToTry(), Equals, "foo:443")
2248+ c.Check(sess.nextHostToTry(), Equals, "")
2249+ c.Check(sess.nextHostToTry(), Equals, "")
2250+ c.Check(sess.tryHost, Equals, 1)
2251+}
2252+
2253+func (cs *clientSessionSuite) TestStarted(c *C) {
2254+ sess, err := NewSession("", dummyConf, "", cs.lvls, cs.log)
2255+ c.Assert(err, IsNil)
2256+
2257+ sess.deliveryHosts = []string{"foo:443", "bar:443", "baz:443"}
2258+ sess.tryHost = 1
2259+
2260+ sess.started()
2261+ c.Check(sess.tryHost, Equals, 0)
2262+ c.Check(sess.State(), Equals, Started)
2263+
2264+ sess.started()
2265+ c.Check(sess.tryHost, Equals, 2)
2266+}
2267+
2268+/****************************************************************
2269 connect() tests
2270 ****************************************************************/
2271
2272 func (cs *clientSessionSuite) TestConnectFailsWithNoAddress(c *C) {
2273- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2274+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2275 c.Assert(err, IsNil)
2276+ sess.deliveryHosts = []string{"nowhere"}
2277 err = sess.connect()
2278 c.Check(err, ErrorMatches, ".*connect.*address.*")
2279 c.Check(sess.State(), Equals, Error)
2280@@ -233,20 +413,36 @@
2281 srv, err := net.Listen("tcp", "localhost:0")
2282 c.Assert(err, IsNil)
2283 defer srv.Close()
2284- sess, err := NewSession(srv.Addr().String(), nil, 0, "wah", cs.lvls, cs.log)
2285- c.Assert(err, IsNil)
2286- err = sess.connect()
2287- c.Check(err, IsNil)
2288- c.Check(sess.Connection, NotNil)
2289- c.Check(sess.State(), Equals, Connected)
2290+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2291+ c.Assert(err, IsNil)
2292+ sess.deliveryHosts = []string{srv.Addr().String()}
2293+ err = sess.connect()
2294+ c.Check(err, IsNil)
2295+ c.Check(sess.Connection, NotNil)
2296+ c.Check(sess.State(), Equals, Connected)
2297+}
2298+
2299+func (cs *clientSessionSuite) TestConnectSecondConnects(c *C) {
2300+ srv, err := net.Listen("tcp", "localhost:0")
2301+ c.Assert(err, IsNil)
2302+ defer srv.Close()
2303+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2304+ c.Assert(err, IsNil)
2305+ sess.deliveryHosts = []string{"nowhere", srv.Addr().String()}
2306+ err = sess.connect()
2307+ c.Check(err, IsNil)
2308+ c.Check(sess.Connection, NotNil)
2309+ c.Check(sess.State(), Equals, Connected)
2310+ c.Check(sess.tryHost, Equals, 0)
2311 }
2312
2313 func (cs *clientSessionSuite) TestConnectConnectFail(c *C) {
2314 srv, err := net.Listen("tcp", "localhost:0")
2315 c.Assert(err, IsNil)
2316- sess, err := NewSession(srv.Addr().String(), nil, 0, "wah", cs.lvls, cs.log)
2317+ sess, err := NewSession(srv.Addr().String(), dummyConf, "wah", cs.lvls, cs.log)
2318 srv.Close()
2319 c.Assert(err, IsNil)
2320+ sess.deliveryHosts = []string{srv.Addr().String()}
2321 err = sess.connect()
2322 c.Check(err, ErrorMatches, ".*connection refused")
2323 c.Check(sess.State(), Equals, Error)
2324@@ -257,7 +453,7 @@
2325 ****************************************************************/
2326
2327 func (cs *clientSessionSuite) TestClose(c *C) {
2328- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2329+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2330 c.Assert(err, IsNil)
2331 sess.Connection = &testConn{Name: "TestClose"}
2332 sess.Close()
2333@@ -266,7 +462,7 @@
2334 }
2335
2336 func (cs *clientSessionSuite) TestCloseTwice(c *C) {
2337- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2338+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2339 c.Assert(err, IsNil)
2340 sess.Connection = &testConn{Name: "TestCloseTwice"}
2341 sess.Close()
2342@@ -277,7 +473,7 @@
2343 }
2344
2345 func (cs *clientSessionSuite) TestCloseFails(c *C) {
2346- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2347+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2348 c.Assert(err, IsNil)
2349 sess.Connection = &testConn{Name: "TestCloseFails", CloseCondition: condition.Work(false)}
2350 sess.Close()
2351@@ -291,7 +487,7 @@
2352 func (d *derp) Stop() { d.stopped = true }
2353
2354 func (cs *clientSessionSuite) TestCloseStopsRetrier(c *C) {
2355- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2356+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2357 c.Assert(err, IsNil)
2358 ar := new(derp)
2359 sess.retrier = ar
2360@@ -308,7 +504,7 @@
2361
2362 func (cs *clientSessionSuite) TestAutoRedialWorks(c *C) {
2363 // checks that AutoRedial sets up a retrier and tries redialing it
2364- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2365+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2366 c.Assert(err, IsNil)
2367 ar := new(derp)
2368 sess.retrier = ar
2369@@ -319,7 +515,7 @@
2370
2371 func (cs *clientSessionSuite) TestAutoRedialStopsRetrier(c *C) {
2372 // checks that AutoRedial stops the previous retrier
2373- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2374+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2375 c.Assert(err, IsNil)
2376 ch := make(chan uint32)
2377 c.Check(sess.retrier, IsNil)
2378@@ -344,7 +540,10 @@
2379
2380 func (s *msgSuite) SetUpTest(c *C) {
2381 var err error
2382- s.sess, err = NewSession("", nil, time.Millisecond, "wah", levelmap.NewLevelMap, helpers.NewTestLogger(c, "debug"))
2383+ conf := ClientSessionConfig{
2384+ ExchangeTimeout: time.Millisecond,
2385+ }
2386+ s.sess, err = NewSession("", conf, "wah", levelmap.NewLevelMap, helpers.NewTestLogger(c, "debug"))
2387 c.Assert(err, IsNil)
2388 s.sess.Connection = &testConn{Name: "TestHandle*"}
2389 s.errCh = make(chan error, 1)
2390@@ -383,14 +582,28 @@
2391 AppId: "--ignored--",
2392 ChanId: "0",
2393 TopLevel: 2,
2394- Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
2395+ Payloads: []json.RawMessage{
2396+ json.RawMessage(`{"img1/m1":[101,"tubular"]}`),
2397+ json.RawMessage("false"), // shouldn't happen but robust
2398+ json.RawMessage(`{"img1/m1":[102,"tubular"]}`),
2399+ },
2400 }, protocol.NotificationsMsg{}}
2401 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
2402 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
2403 s.upCh <- nil // ack ok
2404 c.Check(<-s.errCh, Equals, nil)
2405 c.Assert(len(s.sess.MsgCh), Equals, 1)
2406- c.Check(<-s.sess.MsgCh, DeepEquals, &Notification{TopLevel: 2})
2407+ c.Check(<-s.sess.MsgCh, DeepEquals, &Notification{
2408+ TopLevel: 2,
2409+ Decoded: []map[string]interface{}{
2410+ map[string]interface{}{
2411+ "img1/m1": []interface{}{float64(101), "tubular"},
2412+ },
2413+ map[string]interface{}{
2414+ "img1/m1": []interface{}{float64(102), "tubular"},
2415+ },
2416+ },
2417+ })
2418 // and finally, the session keeps track of the levels
2419 levels, err := s.sess.Levels.GetAll()
2420 c.Check(err, IsNil)
2421@@ -519,7 +732,7 @@
2422 start() tests
2423 ****************************************************************/
2424 func (cs *clientSessionSuite) TestStartFailsIfSetDeadlineFails(c *C) {
2425- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2426+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2427 c.Assert(err, IsNil)
2428 sess.Connection = &testConn{Name: "TestStartFailsIfSetDeadlineFails",
2429 DeadlineCondition: condition.Work(false)} // setdeadline will fail
2430@@ -529,7 +742,7 @@
2431 }
2432
2433 func (cs *clientSessionSuite) TestStartFailsIfWriteFails(c *C) {
2434- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2435+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2436 c.Assert(err, IsNil)
2437 sess.Connection = &testConn{Name: "TestStartFailsIfWriteFails",
2438 WriteCondition: condition.Work(false)} // write will fail
2439@@ -539,7 +752,7 @@
2440 }
2441
2442 func (cs *clientSessionSuite) TestStartFailsIfGetLevelsFails(c *C) {
2443- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2444+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2445 c.Assert(err, IsNil)
2446 sess.Levels = &brokenLevelMap{}
2447 sess.Connection = &testConn{Name: "TestStartConnectMessageFails"}
2448@@ -559,7 +772,7 @@
2449 }
2450
2451 func (cs *clientSessionSuite) TestStartConnectMessageFails(c *C) {
2452- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2453+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2454 c.Assert(err, IsNil)
2455 sess.Connection = &testConn{Name: "TestStartConnectMessageFails"}
2456 errCh := make(chan error, 1)
2457@@ -585,7 +798,7 @@
2458 }
2459
2460 func (cs *clientSessionSuite) TestStartConnackReadError(c *C) {
2461- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2462+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2463 c.Assert(err, IsNil)
2464 sess.Connection = &testConn{Name: "TestStartConnackReadError"}
2465 errCh := make(chan error, 1)
2466@@ -609,7 +822,7 @@
2467 }
2468
2469 func (cs *clientSessionSuite) TestStartBadConnack(c *C) {
2470- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2471+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2472 c.Assert(err, IsNil)
2473 sess.Connection = &testConn{Name: "TestStartBadConnack"}
2474 errCh := make(chan error, 1)
2475@@ -633,7 +846,7 @@
2476 }
2477
2478 func (cs *clientSessionSuite) TestStartNotConnack(c *C) {
2479- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2480+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2481 c.Assert(err, IsNil)
2482 sess.Connection = &testConn{Name: "TestStartBadConnack"}
2483 errCh := make(chan error, 1)
2484@@ -657,7 +870,14 @@
2485 }
2486
2487 func (cs *clientSessionSuite) TestStartWorks(c *C) {
2488- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2489+ info := map[string]interface{}{
2490+ "foo": 1,
2491+ "bar": "baz",
2492+ }
2493+ conf := ClientSessionConfig{
2494+ Info: info,
2495+ }
2496+ sess, err := NewSession("", conf, "wah", cs.lvls, cs.log)
2497 c.Assert(err, IsNil)
2498 sess.Connection = &testConn{Name: "TestStartWorks"}
2499 errCh := make(chan error, 1)
2500@@ -671,8 +891,10 @@
2501 }()
2502
2503 c.Check(takeNext(downCh), Equals, "deadline 0")
2504- _, ok := takeNext(downCh).(protocol.ConnectMsg)
2505+ msg, ok := takeNext(downCh).(protocol.ConnectMsg)
2506 c.Check(ok, Equals, true)
2507+ c.Check(msg.DeviceId, Equals, "wah")
2508+ c.Check(msg.Info, DeepEquals, info)
2509 upCh <- nil // no error
2510 upCh <- protocol.ConnAckMsg{
2511 Type: "connack",
2512@@ -688,34 +910,49 @@
2513 run() tests
2514 ****************************************************************/
2515
2516-func (cs *clientSessionSuite) TestRunBailsIfConnectFails(c *C) {
2517- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2518+func (cs *clientSessionSuite) TestRunBailsIfHostGetterFails(c *C) {
2519+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2520 c.Assert(err, IsNil)
2521- failure := errors.New("TestRunBailsIfConnectFails")
2522+ failure := errors.New("TestRunBailsIfHostGetterFails")
2523 has_closed := false
2524 err = sess.run(
2525 func() { has_closed = true },
2526 func() error { return failure },
2527 nil,
2528+ nil,
2529 nil)
2530 c.Check(err, Equals, failure)
2531 c.Check(has_closed, Equals, true)
2532 }
2533
2534+func (cs *clientSessionSuite) TestRunBailsIfConnectFails(c *C) {
2535+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2536+ c.Assert(err, IsNil)
2537+ failure := errors.New("TestRunBailsIfConnectFails")
2538+ err = sess.run(
2539+ func() {},
2540+ func() error { return nil },
2541+ func() error { return failure },
2542+ nil,
2543+ nil)
2544+ c.Check(err, Equals, failure)
2545+}
2546+
2547 func (cs *clientSessionSuite) TestRunBailsIfStartFails(c *C) {
2548- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2549+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2550 c.Assert(err, IsNil)
2551 failure := errors.New("TestRunBailsIfStartFails")
2552 err = sess.run(
2553 func() {},
2554 func() error { return nil },
2555+ func() error { return nil },
2556 func() error { return failure },
2557 nil)
2558 c.Check(err, Equals, failure)
2559 }
2560
2561 func (cs *clientSessionSuite) TestRunRunsEvenIfLoopFails(c *C) {
2562- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2563+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2564 c.Assert(err, IsNil)
2565 // just to make a point: until here we haven't set ErrCh & MsgCh (no
2566 // biggie if this stops being true)
2567@@ -727,6 +964,7 @@
2568 func() {},
2569 func() error { return nil },
2570 func() error { return nil },
2571+ func() error { return nil },
2572 func() error { sess.MsgCh <- notf; return <-failureCh })
2573 c.Check(err, Equals, nil)
2574 // if run doesn't error it sets up the channels
2575@@ -744,7 +982,7 @@
2576 ****************************************************************/
2577
2578 func (cs *clientSessionSuite) TestJitter(c *C) {
2579- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2580+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2581 c.Assert(err, IsNil)
2582 num_tries := 20 // should do the math
2583 spread := time.Second //
2584@@ -776,12 +1014,17 @@
2585
2586 func (cs *clientSessionSuite) TestDialPanics(c *C) {
2587 // one last unhappy test
2588- sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)
2589+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
2590 c.Assert(err, IsNil)
2591 sess.Protocolator = nil
2592 c.Check(sess.Dial, PanicMatches, ".*protocol constructor.")
2593 }
2594
2595+var (
2596+ dialTestTimeout = 100 * time.Millisecond
2597+ dialTestConf = ClientSessionConfig{ExchangeTimeout: dialTestTimeout}
2598+)
2599+
2600 func (cs *clientSessionSuite) TestDialWorks(c *C) {
2601 // happy path thoughts
2602 cert, err := tls.X509KeyPair(helpers.TestCertPEMBlock, helpers.TestKeyPEMBlock)
2603@@ -791,10 +1034,22 @@
2604 SessionTicketsDisabled: true,
2605 }
2606
2607- timeout := 100 * time.Millisecond
2608 lst, err := tls.Listen("tcp", "localhost:0", tlsCfg)
2609 c.Assert(err, IsNil)
2610- sess, err := NewSession(lst.Addr().String(), nil, timeout, "wah", cs.lvls, cs.log)
2611+ // advertise
2612+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2613+ b, err := json.Marshal(map[string]interface{}{
2614+ "hosts": []string{"nowhere", lst.Addr().String()},
2615+ })
2616+ if err != nil {
2617+ panic(err)
2618+ }
2619+ w.Header().Set("Content-Type", "application/json")
2620+ w.Write(b)
2621+ }))
2622+ defer ts.Close()
2623+
2624+ sess, err := NewSession(ts.URL, dialTestConf, "wah", cs.lvls, cs.log)
2625 c.Assert(err, IsNil)
2626 tconn := &testConn{CloseCondition: condition.Fail2Work(10)}
2627 sess.Connection = tconn
2628@@ -819,10 +1074,13 @@
2629 c.Check(tconn.CloseCondition.String(), Matches, ".* 9 to go.")
2630
2631 // now, start: 1. protocol version
2632- v, err := protocol.ReadWireFormatVersion(srv, timeout)
2633+ v, err := protocol.ReadWireFormatVersion(srv, dialTestTimeout)
2634 c.Assert(err, IsNil)
2635 c.Assert(v, Equals, protocol.ProtocolWireVersion)
2636
2637+ // if something goes wrong session would try the first/other host
2638+ c.Check(sess.tryHost, Equals, 0)
2639+
2640 // 2. "connect" (but on the fake protcol above! woo)
2641
2642 c.Check(takeNext(downCh), Equals, "deadline 100ms")
2643@@ -843,6 +1101,9 @@
2644 c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"})
2645 upCh <- nil
2646
2647+ // session would retry the same host
2648+ c.Check(sess.tryHost, Equals, 1)
2649+
2650 // and broadcasts...
2651 b := &protocol.BroadcastMsg{
2652 Type: "broadcast",
2653@@ -870,3 +1131,30 @@
2654 upCh <- failure
2655 c.Check(<-sess.ErrCh, Equals, failure)
2656 }
2657+
2658+func (cs *clientSessionSuite) TestDialWorksDirect(c *C) {
2659+ // happy path thoughts
2660+ cert, err := tls.X509KeyPair(helpers.TestCertPEMBlock, helpers.TestKeyPEMBlock)
2661+ c.Assert(err, IsNil)
2662+ tlsCfg := &tls.Config{
2663+ Certificates: []tls.Certificate{cert},
2664+ SessionTicketsDisabled: true,
2665+ }
2666+
2667+ lst, err := tls.Listen("tcp", "localhost:0", tlsCfg)
2668+ c.Assert(err, IsNil)
2669+ sess, err := NewSession(lst.Addr().String(), dialTestConf, "wah", cs.lvls, cs.log)
2670+ c.Assert(err, IsNil)
2671+ defer sess.Close()
2672+
2673+ upCh := make(chan interface{}, 5)
2674+ downCh := make(chan interface{}, 5)
2675+ proto := &testProtocol{up: upCh, down: downCh}
2676+ sess.Protocolator = func(net.Conn) protocol.Protocol { return proto }
2677+
2678+ go sess.Dial()
2679+
2680+ _, err = lst.Accept()
2681+ c.Assert(err, IsNil)
2682+ // connect done
2683+}
2684
2685=== modified file 'debian/config.json'
2686--- debian/config.json 2014-03-20 12:17:40 +0000
2687+++ debian/config.json 2014-04-04 14:39:41 +0000
2688@@ -1,6 +1,9 @@
2689 {
2690+ "connect_timeout": "20s",
2691 "exchange_timeout": "30s",
2692- "addr": "push-delivery.ubuntu.com:443",
2693+ "hosts_cache_expiry": "12h",
2694+ "expect_all_repaired": "40m",
2695+ "addr": "https://push.ubuntu.com/delivery-hosts",
2696 "cert_pem_file": "",
2697 "stabilizing_timeout": "2s",
2698 "recheck_timeout": "10m",
2699
2700=== modified file 'debian/ubuntu-push-client.conf'
2701--- debian/ubuntu-push-client.conf 2014-02-07 11:31:54 +0000
2702+++ debian/ubuntu-push-client.conf 2014-04-04 14:39:41 +0000
2703@@ -1,6 +1,6 @@
2704-description "Starts the ubuntu push notifications client side daemon"
2705+description "ubuntu push notification client-side daemon"
2706
2707-start on dbus
2708-stop on runlevel [06]
2709+start on started dbus
2710+stop on stopped dbus
2711
2712 exec /usr/lib/ubuntu-push-client/ubuntu-push-client
2713
2714=== added directory 'sampleconfigs'
2715=== renamed file 'server/acceptance/config/config.json' => 'sampleconfigs/dev.json'
2716--- server/acceptance/config/config.json 2014-01-17 17:20:34 +0000
2717+++ sampleconfigs/dev.json 2014-04-04 14:39:41 +0000
2718@@ -4,9 +4,9 @@
2719 "broker_queue_size": 10000,
2720 "session_queue_size": 10,
2721 "addr": "127.0.0.1:9090",
2722- "key_pem_file": "testing.key",
2723- "cert_pem_file": "testing.cert",
2724- "http_addr": "127.0.0.1:8888",
2725+ "key_pem_file": "../server/acceptance/ssl/testing.key",
2726+ "cert_pem_file": "../server/acceptance/ssl/testing.cert",
2727+ "http_addr": "127.0.0.1:8080",
2728 "http_read_timeout": "5s",
2729 "http_write_timeout": "5s"
2730 }
2731
2732=== modified file 'server/acceptance/acceptanceclient.go'
2733--- server/acceptance/acceptanceclient.go 2014-02-21 16:17:28 +0000
2734+++ server/acceptance/acceptanceclient.go 2014-04-04 14:39:41 +0000
2735@@ -35,6 +35,8 @@
2736 type ClientSession struct {
2737 // configuration
2738 DeviceId string
2739+ Model string
2740+ ImageChannel string
2741 ServerAddr string
2742 ExchangeTimeout time.Duration
2743 CertPEMBlock []byte
2744@@ -86,6 +88,10 @@
2745 Type: "connect",
2746 DeviceId: sess.DeviceId,
2747 Levels: sess.Levels,
2748+ Info: map[string]interface{}{
2749+ "device": sess.Model,
2750+ "channel": sess.ImageChannel,
2751+ },
2752 })
2753 if err != nil {
2754 return err
2755
2756=== modified file 'server/acceptance/cmd/acceptanceclient.go'
2757--- server/acceptance/cmd/acceptanceclient.go 2014-02-21 16:17:28 +0000
2758+++ server/acceptance/cmd/acceptanceclient.go 2014-04-04 14:39:41 +0000
2759@@ -30,6 +30,8 @@
2760 var (
2761 insecureFlag = flag.Bool("insecure", false, "disable checking of server certificate and hostname")
2762 reportPingsFlag = flag.Bool("reportPings", true, "report each Ping from the server")
2763+ deviceModel = flag.String("model", "?", "device image model")
2764+ imageChannel = flag.String("imageChannel", "?", "image channel")
2765 )
2766
2767 type configuration struct {
2768@@ -64,9 +66,12 @@
2769 ServerAddr: cfg.Addr.HostPort(),
2770 DeviceId: flag.Arg(1),
2771 // flags
2772- ReportPings: *reportPingsFlag,
2773- Insecure: *insecureFlag,
2774+ Model: *deviceModel,
2775+ ImageChannel: *imageChannel,
2776+ ReportPings: *reportPingsFlag,
2777+ Insecure: *insecureFlag,
2778 }
2779+ log.Printf("with: %#v", session)
2780 session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName))
2781 if err != nil {
2782 log.Fatalf("reading CertPEMFile: %v", err)
2783
2784=== renamed directory 'server/acceptance/config' => 'server/acceptance/ssl'
2785=== modified file 'server/acceptance/suites/broadcast.go'
2786--- server/acceptance/suites/broadcast.go 2014-02-21 21:39:54 +0000
2787+++ server/acceptance/suites/broadcast.go 2014-04-04 14:39:41 +0000
2788@@ -41,11 +41,34 @@
2789 got, err := s.PostRequest("/broadcast", &api.Broadcast{
2790 Channel: "system",
2791 ExpireOn: future,
2792- Data: json.RawMessage(`{"n": 42}`),
2793- })
2794- c.Assert(err, IsNil)
2795- c.Assert(got, Matches, ".*ok.*")
2796- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
2797+ Data: json.RawMessage(`{"img1/m1": 42}`),
2798+ })
2799+ c.Assert(err, IsNil)
2800+ c.Assert(got, Matches, ".*ok.*")
2801+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`)
2802+ stop()
2803+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2804+ c.Check(len(errCh), Equals, 0)
2805+}
2806+
2807+func (s *BroadcastAcceptanceSuite) TestBroadcastToConnectedChannelFilter(c *C) {
2808+ events, errCh, stop := s.StartClient(c, "DEVB", nil)
2809+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
2810+ Channel: "system",
2811+ ExpireOn: future,
2812+ Data: json.RawMessage(`{"img1/m2": 10}`),
2813+ })
2814+ c.Assert(err, IsNil)
2815+ got, err = s.PostRequest("/broadcast", &api.Broadcast{
2816+ Channel: "system",
2817+ ExpireOn: future,
2818+ Data: json.RawMessage(`{"img1/m1": 20}`),
2819+ })
2820+ c.Assert(err, IsNil)
2821+ c.Assert(got, Matches, ".*ok.*")
2822+ // xxx don't send this one
2823+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[]`)
2824+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":20}]`)
2825 stop()
2826 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2827 c.Check(len(errCh), Equals, 0)
2828@@ -56,14 +79,14 @@
2829 got, err := s.PostRequest("/broadcast", &api.Broadcast{
2830 Channel: "system",
2831 ExpireOn: future,
2832- Data: json.RawMessage(`{"b": 1}`),
2833+ Data: json.RawMessage(`{"img1/m1": 1}`),
2834 })
2835 c.Assert(err, IsNil)
2836 c.Assert(got, Matches, ".*ok.*")
2837
2838 events, errCh, stop := s.StartClient(c, "DEVB", nil)
2839 // gettting pending on connect
2840- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
2841+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":1}]`)
2842 stop()
2843 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2844 c.Check(len(errCh), Equals, 0)
2845@@ -71,7 +94,7 @@
2846
2847 func (s *BroadcastAcceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) {
2848 // send bunch of broadcasts that will be pending
2849- payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
2850+ payloadFmt := fmt.Sprintf(`{"img1/m1":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
2851 for i := 0; i < 32; i++ {
2852 got, err := s.PostRequest("/broadcast", &api.Broadcast{
2853 Channel: "system",
2854@@ -84,7 +107,7 @@
2855
2856 events, errCh, stop := s.StartClient(c, "DEVC", nil)
2857 // gettting pending on connect
2858- c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`)
2859+ c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"img1/m1":0,.*`)
2860 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)
2861 stop()
2862 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2863@@ -100,12 +123,12 @@
2864 got, err := s.PostRequest("/broadcast", &api.Broadcast{
2865 Channel: "system",
2866 ExpireOn: future,
2867- Data: json.RawMessage(`{"n": 42}`),
2868+ Data: json.RawMessage(`{"img1/m1": 42}`),
2869 })
2870 c.Assert(err, IsNil)
2871 c.Assert(got, Matches, ".*ok.*")
2872- c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
2873- c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
2874+ c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`)
2875+ c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`)
2876 stop1()
2877 stop2()
2878 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2879@@ -119,11 +142,11 @@
2880 got, err := s.PostRequest("/broadcast", &api.Broadcast{
2881 Channel: "system",
2882 ExpireOn: future,
2883- Data: json.RawMessage(`{"b": 1}`),
2884+ Data: json.RawMessage(`{"img1/m1": 1}`),
2885 })
2886 c.Assert(err, IsNil)
2887 c.Assert(got, Matches, ".*ok.*")
2888- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
2889+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":1}]`)
2890 stop()
2891 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2892 c.Check(len(errCh), Equals, 0)
2893@@ -131,7 +154,7 @@
2894 got, err = s.PostRequest("/broadcast", &api.Broadcast{
2895 Channel: "system",
2896 ExpireOn: future,
2897- Data: json.RawMessage(`{"b": 2}`),
2898+ Data: json.RawMessage(`{"img1/m1": 2}`),
2899 })
2900 c.Assert(err, IsNil)
2901 c.Assert(got, Matches, ".*ok.*")
2902@@ -139,7 +162,7 @@
2903 events, errCh, stop = s.StartClient(c, "DEVD", map[string]int64{
2904 protocol.SystemChannelId: 1,
2905 })
2906- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
2907+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":2}]`)
2908 stop()
2909 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2910 c.Check(len(errCh), Equals, 0)
2911@@ -150,14 +173,14 @@
2912 got, err := s.PostRequest("/broadcast", &api.Broadcast{
2913 Channel: "system",
2914 ExpireOn: future,
2915- Data: json.RawMessage(`{"b": 1}`),
2916+ Data: json.RawMessage(`{"img1/m1": 1}`),
2917 })
2918 c.Assert(err, IsNil)
2919 c.Assert(got, Matches, ".*ok.*")
2920 got, err = s.PostRequest("/broadcast", &api.Broadcast{
2921 Channel: "system",
2922 ExpireOn: future,
2923- Data: json.RawMessage(`{"b": 2}`),
2924+ Data: json.RawMessage(`{"img1/m1": 2}`),
2925 })
2926 c.Assert(err, IsNil)
2927 c.Assert(got, Matches, ".*ok.*")
2928@@ -166,7 +189,7 @@
2929 protocol.SystemChannelId: 10,
2930 })
2931 // gettting last one pending on connect
2932- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
2933+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":2}]`)
2934 stop()
2935 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2936 c.Check(len(errCh), Equals, 0)
2937@@ -189,14 +212,14 @@
2938 got, err := s.PostRequest("/broadcast", &api.Broadcast{
2939 Channel: "system",
2940 ExpireOn: future,
2941- Data: json.RawMessage(`{"b": 1}`),
2942+ Data: json.RawMessage(`{"img1/m1": 1}`),
2943 })
2944 c.Assert(err, IsNil)
2945 c.Assert(got, Matches, ".*ok.*")
2946 got, err = s.PostRequest("/broadcast", &api.Broadcast{
2947 Channel: "system",
2948 ExpireOn: future,
2949- Data: json.RawMessage(`{"b": 2}`),
2950+ Data: json.RawMessage(`{"img1/m1": 2}`),
2951 })
2952 c.Assert(err, IsNil)
2953 c.Assert(got, Matches, ".*ok.*")
2954@@ -205,7 +228,7 @@
2955 protocol.SystemChannelId: -10,
2956 })
2957 // gettting pending on connect
2958- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`)
2959+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":1},{"img1/m1":2}]`)
2960 stop()
2961 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2962 c.Check(len(errCh), Equals, 0)
2963@@ -216,14 +239,14 @@
2964 got, err := s.PostRequest("/broadcast", &api.Broadcast{
2965 Channel: "system",
2966 ExpireOn: future,
2967- Data: json.RawMessage(`{"b": 1}`),
2968+ Data: json.RawMessage(`{"img1/m1": 1}`),
2969 })
2970 c.Assert(err, IsNil)
2971 c.Assert(got, Matches, ".*ok.*")
2972 got, err = s.PostRequest("/broadcast", &api.Broadcast{
2973 Channel: "system",
2974 ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339),
2975- Data: json.RawMessage(`{"b": 2}`),
2976+ Data: json.RawMessage(`{"img1/m1": 2}`),
2977 })
2978 c.Assert(err, IsNil)
2979 c.Assert(got, Matches, ".*ok.*")
2980@@ -233,7 +256,7 @@
2981
2982 events, errCh, stop := s.StartClient(c, "DEVB", nil)
2983 // gettting pending on connect
2984- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`)
2985+ c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":1}]`)
2986 stop()
2987 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
2988 c.Check(len(errCh), Equals, 0)
2989
2990=== modified file 'server/acceptance/suites/helpers.go'
2991--- server/acceptance/suites/helpers.go 2014-03-25 19:08:00 +0000
2992+++ server/acceptance/suites/helpers.go 2014-04-04 14:39:41 +0000
2993@@ -48,8 +48,8 @@
2994 "session_queue_size": 10,
2995 "broker_queue_size": 100,
2996 "addr": addr,
2997- "key_pem_file": helpers.SourceRelative("../config/testing.key"),
2998- "cert_pem_file": helpers.SourceRelative("../config/testing.cert"),
2999+ "key_pem_file": helpers.SourceRelative("../ssl/testing.key"),
3000+ "cert_pem_file": helpers.SourceRelative("../ssl/testing.cert"),
3001 })
3002 }
3003
3004
3005=== modified file 'server/acceptance/suites/pingpong.go'
3006--- server/acceptance/suites/pingpong.go 2014-02-21 20:29:16 +0000
3007+++ server/acceptance/suites/pingpong.go 2014-04-04 14:39:41 +0000
3008@@ -34,7 +34,7 @@
3009 func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) {
3010 errCh := make(chan error, 1)
3011 events := make(chan string, 10)
3012- sess := testClientSession(s.ServerAddr, "DEVA", true)
3013+ sess := testClientSession(s.ServerAddr, "DEVA", "m1", "img1", true)
3014 err := sess.Dial()
3015 c.Assert(err, IsNil)
3016 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
3017@@ -68,7 +68,7 @@
3018 func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) {
3019 errCh := make(chan error, 1)
3020 events := make(chan string, 10)
3021- sess := testClientSession(s.ServerAddr, "DEVB", true)
3022+ sess := testClientSession(s.ServerAddr, "DEVB", "m1", "img1", true)
3023 err := sess.Dial()
3024 c.Assert(err, IsNil)
3025 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
3026
3027=== modified file 'server/acceptance/suites/suite.go'
3028--- server/acceptance/suites/suite.go 2014-03-25 19:08:00 +0000
3029+++ server/acceptance/suites/suite.go 2014-04-04 14:39:41 +0000
3030@@ -46,7 +46,7 @@
3031 func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
3032 errCh := make(chan error, 1)
3033 cliEvents := make(chan string, 10)
3034- sess := testClientSession(h.ServerAddr, devId, false)
3035+ sess := testClientSession(h.ServerAddr, devId, "m1", "img1", false)
3036 sess.Levels = levels
3037 err := sess.Dial()
3038 c.Assert(err, IsNil)
3039@@ -127,16 +127,18 @@
3040 return string(body), err
3041 }
3042
3043-func testClientSession(addr string, deviceId string, reportPings bool) *acceptance.ClientSession {
3044- certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("../config/testing.cert"))
3045+func testClientSession(addr string, deviceId, model, imageChannel string, reportPings bool) *acceptance.ClientSession {
3046+ certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("../ssl/testing.cert"))
3047 if err != nil {
3048- panic(fmt.Sprintf("could not read config/testing.cert: %v", err))
3049+ panic(fmt.Sprintf("could not read ssl/testing.cert: %v", err))
3050 }
3051 return &acceptance.ClientSession{
3052 ExchangeTimeout: 100 * time.Millisecond,
3053 ServerAddr: addr,
3054 CertPEMBlock: certPEMBlock,
3055 DeviceId: deviceId,
3056+ Model: model,
3057+ ImageChannel: imageChannel,
3058 ReportPings: reportPings,
3059 }
3060 }
3061
3062=== modified file 'server/broker/broker.go'
3063--- server/broker/broker.go 2014-02-21 16:04:44 +0000
3064+++ server/broker/broker.go 2014-04-04 14:39:41 +0000
3065@@ -49,6 +49,19 @@
3066 // LevelsMap is the type for holding channel levels for session.
3067 type LevelsMap map[store.InternalChannelId]int64
3068
3069+// GetInfoString helps retrivieng a string out of a protocol.ConnectMsg.Info
3070+func GetInfoString(msg *protocol.ConnectMsg, name, defaultVal string) (string, error) {
3071+ v, ok := msg.Info[name]
3072+ if !ok {
3073+ return defaultVal, nil
3074+ }
3075+ s, ok := v.(string)
3076+ if !ok {
3077+ return "", ErrUnexpectedValue
3078+ }
3079+ return s, nil
3080+}
3081+
3082 // BrokerSession holds broker session state.
3083 type BrokerSession interface {
3084 // SessionChannel returns the session control channel
3085@@ -56,6 +69,10 @@
3086 SessionChannel() <-chan Exchange
3087 // DeviceIdentifier returns the device id string.
3088 DeviceIdentifier() string
3089+ // DeviceImageModel returns the device model.
3090+ DeviceImageModel() string
3091+ // DeviceImageChannel returns the device system image channel.
3092+ DeviceImageChannel() string
3093 // Levels returns the current channel levels for the session
3094 Levels() LevelsMap
3095 // ExchangeScratchArea returns the scratch area for exchanges.
3096@@ -71,6 +88,9 @@
3097 return fmt.Sprintf("session aborted (%s)", ea.Reason)
3098 }
3099
3100+// Unexpect value in message
3101+var ErrUnexpectedValue = &ErrAbort{"unexpected value in message"}
3102+
3103 // BrokerConfig gives access to the typical broker configuration.
3104 type BrokerConfig interface {
3105 // SessionQueueSize gives the session queue size.
3106
3107=== modified file 'server/broker/broker_test.go'
3108--- server/broker/broker_test.go 2014-02-10 23:19:08 +0000
3109+++ server/broker/broker_test.go 2014-04-04 14:39:41 +0000
3110@@ -20,6 +20,8 @@
3111 "fmt"
3112
3113 . "launchpad.net/gocheck"
3114+
3115+ "launchpad.net/ubuntu-push/protocol"
3116 )
3117
3118 type brokerSuite struct{}
3119@@ -30,3 +32,19 @@
3120 err := &ErrAbort{"expected FOO"}
3121 c.Check(fmt.Sprintf("%s", err), Equals, "session aborted (expected FOO)")
3122 }
3123+
3124+func (s *brokerSuite) TestGetInfoString(c *C) {
3125+ connectMsg := &protocol.ConnectMsg{}
3126+ v, err := GetInfoString(connectMsg, "foo", "?")
3127+ c.Check(err, IsNil)
3128+ c.Check(v, Equals, "?")
3129+
3130+ connectMsg.Info = map[string]interface{}{"foo": "yay"}
3131+ v, err = GetInfoString(connectMsg, "foo", "?")
3132+ c.Check(err, IsNil)
3133+ c.Check(v, Equals, "yay")
3134+
3135+ connectMsg.Info["foo"] = 33
3136+ v, err = GetInfoString(connectMsg, "foo", "?")
3137+ c.Check(err, Equals, ErrUnexpectedValue)
3138+}
3139
3140=== modified file 'server/broker/exchanges.go'
3141--- server/broker/exchanges.go 2014-02-26 16:04:57 +0000
3142+++ server/broker/exchanges.go 2014-04-04 14:39:41 +0000
3143@@ -18,6 +18,7 @@
3144
3145 import (
3146 "encoding/json"
3147+ "fmt"
3148
3149 "launchpad.net/ubuntu-push/protocol"
3150 "launchpad.net/ubuntu-push/server/store"
3151@@ -37,11 +38,24 @@
3152 ChanId store.InternalChannelId
3153 TopLevel int64
3154 NotificationPayloads []json.RawMessage
3155+ Decoded []map[string]interface{}
3156 }
3157
3158 // check interface already here
3159 var _ Exchange = &BroadcastExchange{}
3160
3161+// Init ensures the BroadcastExchange is fully initialized for the sessions.
3162+func (sbe *BroadcastExchange) Init() {
3163+ decoded := make([]map[string]interface{}, len(sbe.NotificationPayloads))
3164+ sbe.Decoded = decoded
3165+ for i, p := range sbe.NotificationPayloads {
3166+ err := json.Unmarshal(p, &decoded[i])
3167+ if err != nil {
3168+ decoded[i] = nil
3169+ }
3170+ }
3171+}
3172+
3173 func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
3174 c := int64(len(payloads))
3175 if c == 0 {
3176@@ -58,6 +72,20 @@
3177 }
3178 }
3179
3180+func channelFilter(tag string, chanId store.InternalChannelId, payloads []json.RawMessage, decoded []map[string]interface{}) []json.RawMessage {
3181+ if len(payloads) != 0 && chanId == store.SystemInternalChannelId {
3182+ decoded := decoded[len(decoded)-len(payloads):]
3183+ filtered := make([]json.RawMessage, 0)
3184+ for i, decoded1 := range decoded {
3185+ if _, ok := decoded1[tag]; ok {
3186+ filtered = append(filtered, payloads[i])
3187+ }
3188+ }
3189+ payloads = filtered
3190+ }
3191+ return payloads
3192+}
3193+
3194 // Prepare session for a BROADCAST.
3195 func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
3196 scratchArea := sess.ExchangeScratchArea()
3197@@ -65,6 +93,9 @@
3198 scratchArea.broadcastMsg.Type = "broadcast"
3199 clientLevel := sess.Levels()[sbe.ChanId]
3200 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
3201+ tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel())
3202+ payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded)
3203+
3204 // xxx need an AppId as well, later
3205 scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId)
3206 scratchArea.broadcastMsg.TopLevel = sbe.TopLevel
3207
3208=== modified file 'server/broker/exchanges_test.go'
3209--- server/broker/exchanges_test.go 2014-02-26 16:04:57 +0000
3210+++ server/broker/exchanges_test.go 2014-04-04 14:39:41 +0000
3211@@ -35,24 +35,45 @@
3212
3213 var _ = Suite(&exchangesSuite{})
3214
3215+func (s *exchangesSuite) TestBroadcastExchangeInit(c *C) {
3216+ exchg := &broker.BroadcastExchange{
3217+ ChanId: store.SystemInternalChannelId,
3218+ TopLevel: 3,
3219+ NotificationPayloads: []json.RawMessage{
3220+ json.RawMessage(`{"a":"x"}`),
3221+ json.RawMessage(`[]`),
3222+ json.RawMessage(`{"a":"y"}`),
3223+ },
3224+ }
3225+ exchg.Init()
3226+ c.Check(exchg.Decoded, DeepEquals, []map[string]interface{}{
3227+ map[string]interface{}{"a": "x"},
3228+ nil,
3229+ map[string]interface{}{"a": "y"},
3230+ })
3231+}
3232+
3233 func (s *exchangesSuite) TestBroadcastExchange(c *C) {
3234 sess := &testing.TestBrokerSession{
3235- LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
3236+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
3237+ Model: "m1",
3238+ ImageChannel: "img1",
3239 }
3240 exchg := &broker.BroadcastExchange{
3241 ChanId: store.SystemInternalChannelId,
3242 TopLevel: 3,
3243 NotificationPayloads: []json.RawMessage{
3244- json.RawMessage(`{"a":"x"}`),
3245- json.RawMessage(`{"a":"y"}`),
3246+ json.RawMessage(`{"img1/m1":100}`),
3247+ json.RawMessage(`{"img2/m2":200}`),
3248 },
3249 }
3250+ exchg.Init()
3251 outMsg, inMsg, err := exchg.Prepare(sess)
3252 c.Assert(err, IsNil)
3253 // check
3254 marshalled, err := json.Marshal(outMsg)
3255 c.Assert(err, IsNil)
3256- c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
3257+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img1/m1":100}]}`)
3258 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
3259 c.Assert(err, IsNil)
3260 err = exchg.Acked(sess, true)
3261@@ -62,9 +83,11 @@
3262
3263 func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {
3264 sess := &testing.TestBrokerSession{
3265- LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
3266+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
3267+ Model: "m1",
3268+ ImageChannel: "img1",
3269 }
3270- payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
3271+ payloadFmt := fmt.Sprintf(`{"img1/m1":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
3272 needsSplitting := make([]json.RawMessage, 32)
3273 for i := 0; i < 32; i++ {
3274 needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i))
3275@@ -76,6 +99,7 @@
3276 TopLevel: topLevel,
3277 NotificationPayloads: needsSplitting,
3278 }
3279+ exchg.Init()
3280 outMsg, _, err := exchg.Prepare(sess)
3281 c.Assert(err, IsNil)
3282 parts := 0
3283@@ -91,10 +115,11 @@
3284 ChanId: store.SystemInternalChannelId,
3285 TopLevel: topLevel + 2,
3286 NotificationPayloads: []json.RawMessage{
3287- json.RawMessage(`{"a":"x"}`),
3288- json.RawMessage(`{"a":"y"}`),
3289+ json.RawMessage(`{"img1/m1":"x"}`),
3290+ json.RawMessage(`{"img1/m1":"y"}`),
3291 },
3292 }
3293+ exchg.Init()
3294 outMsg, _, err = exchg.Prepare(sess)
3295 c.Assert(err, IsNil)
3296 done := outMsg.Split() // shouldn't panic
3297@@ -103,21 +128,24 @@
3298
3299 func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {
3300 sess := &testing.TestBrokerSession{
3301- LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
3302+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
3303+ Model: "m1",
3304+ ImageChannel: "img2",
3305 }
3306 exchg := &broker.BroadcastExchange{
3307 ChanId: store.SystemInternalChannelId,
3308 TopLevel: 3,
3309 NotificationPayloads: []json.RawMessage{
3310- json.RawMessage(`{"a":"y"}`),
3311+ json.RawMessage(`{"img2/m1":1}`),
3312 },
3313 }
3314+ exchg.Init()
3315 outMsg, inMsg, err := exchg.Prepare(sess)
3316 c.Assert(err, IsNil)
3317 // check
3318 marshalled, err := json.Marshal(outMsg)
3319 c.Assert(err, IsNil)
3320- c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
3321+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img2/m1":1}]}`)
3322 err = json.Unmarshal([]byte(`{}`), inMsg)
3323 c.Assert(err, IsNil)
3324 err = exchg.Acked(sess, true)
3325@@ -130,23 +158,55 @@
3326 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{
3327 store.SystemInternalChannelId: 2,
3328 }),
3329+ Model: "m1",
3330+ ImageChannel: "img1",
3331 }
3332 exchg := &broker.BroadcastExchange{
3333 ChanId: store.SystemInternalChannelId,
3334 TopLevel: 3,
3335 NotificationPayloads: []json.RawMessage{
3336- json.RawMessage(`{"a":"x"}`),
3337- json.RawMessage(`{"a":"y"}`),
3338- },
3339- }
3340- outMsg, inMsg, err := exchg.Prepare(sess)
3341- c.Assert(err, IsNil)
3342- // check
3343- marshalled, err := json.Marshal(outMsg)
3344- c.Assert(err, IsNil)
3345- c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
3346- err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
3347- c.Assert(err, IsNil)
3348- err = exchg.Acked(sess, true)
3349- c.Assert(err, IsNil)
3350+ json.RawMessage(`{"img1/m1":100}`),
3351+ json.RawMessage(`{"img1/m1":101}`),
3352+ },
3353+ }
3354+ exchg.Init()
3355+ outMsg, inMsg, err := exchg.Prepare(sess)
3356+ c.Assert(err, IsNil)
3357+ // check
3358+ marshalled, err := json.Marshal(outMsg)
3359+ c.Assert(err, IsNil)
3360+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img1/m1":101}]}`)
3361+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
3362+ c.Assert(err, IsNil)
3363+ err = exchg.Acked(sess, true)
3364+ c.Assert(err, IsNil)
3365+}
3366+
3367+func (s *exchangesSuite) TestBroadcastExchangeChannelFilter(c *C) {
3368+ sess := &testing.TestBrokerSession{
3369+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
3370+ Model: "m1",
3371+ ImageChannel: "img1",
3372+ }
3373+ exchg := &broker.BroadcastExchange{
3374+ ChanId: store.SystemInternalChannelId,
3375+ TopLevel: 5,
3376+ NotificationPayloads: []json.RawMessage{
3377+ json.RawMessage(`{"img1/m1":100}`),
3378+ json.RawMessage(`{"img2/m2":200}`),
3379+ json.RawMessage(`{"img1/m1":101}`),
3380+ },
3381+ }
3382+ exchg.Init()
3383+ outMsg, inMsg, err := exchg.Prepare(sess)
3384+ c.Assert(err, IsNil)
3385+ // check
3386+ marshalled, err := json.Marshal(outMsg)
3387+ c.Assert(err, IsNil)
3388+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":5,"Payloads":[{"img1/m1":100},{"img1/m1":101}]}`)
3389+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
3390+ c.Assert(err, IsNil)
3391+ err = exchg.Acked(sess, true)
3392+ c.Assert(err, IsNil)
3393+ c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5))
3394 }
3395
3396=== modified file 'server/broker/exchg_impl_test.go'
3397--- server/broker/exchg_impl_test.go 2014-02-10 23:19:08 +0000
3398+++ server/broker/exchg_impl_test.go 2014-04-04 14:39:41 +0000
3399@@ -20,6 +20,8 @@
3400 "encoding/json"
3401
3402 . "launchpad.net/gocheck"
3403+
3404+ "launchpad.net/ubuntu-push/server/store"
3405 )
3406
3407 type exchangesImplSuite struct{}
3408@@ -56,3 +58,31 @@
3409 res = filterByLevel(5, 10, nil)
3410 c.Check(len(res), Equals, 0)
3411 }
3412+
3413+func (s *exchangesImplSuite) TestChannelFilter(c *C) {
3414+ payloads := []json.RawMessage{
3415+ json.RawMessage(`{"a/x": 3}`),
3416+ json.RawMessage(`{"b/x": 4}`),
3417+ json.RawMessage(`{"a/y": 5}`),
3418+ json.RawMessage(`{"a/x": 6}`),
3419+ }
3420+ decoded := make([]map[string]interface{}, 4)
3421+ for i, p := range payloads {
3422+ err := json.Unmarshal(p, &decoded[i])
3423+ c.Assert(err, IsNil)
3424+ }
3425+
3426+ other := store.InternalChannelId("1")
3427+
3428+ c.Check(channelFilter("", store.SystemInternalChannelId, nil, nil), IsNil)
3429+ c.Check(channelFilter("", other, payloads[1:], decoded), DeepEquals, payloads[1:])
3430+
3431+ // use tag when channel is the sytem channel
3432+
3433+ c.Check(channelFilter("c/z", store.SystemInternalChannelId, payloads, decoded), HasLen, 0)
3434+
3435+ c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]})
3436+
3437+ c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]})
3438+
3439+}
3440
3441=== modified file 'server/broker/simple/simple.go'
3442--- server/broker/simple/simple.go 2014-02-21 16:04:44 +0000
3443+++ server/broker/simple/simple.go 2014-04-04 14:39:41 +0000
3444@@ -46,11 +46,13 @@
3445
3446 // simpleBrokerSession represents a session in the broker.
3447 type simpleBrokerSession struct {
3448- registered bool
3449- deviceId string
3450- done chan bool
3451- exchanges chan broker.Exchange
3452- levels broker.LevelsMap
3453+ registered bool
3454+ deviceId string
3455+ model string
3456+ imageChannel string
3457+ done chan bool
3458+ exchanges chan broker.Exchange
3459+ levels broker.LevelsMap
3460 // for exchanges
3461 exchgScratch broker.ExchangesScratchArea
3462 }
3463@@ -75,6 +77,14 @@
3464 return sess.deviceId
3465 }
3466
3467+func (sess *simpleBrokerSession) DeviceImageModel() string {
3468+ return sess.model
3469+}
3470+
3471+func (sess *simpleBrokerSession) DeviceImageChannel() string {
3472+ return sess.imageChannel
3473+}
3474+
3475 func (sess *simpleBrokerSession) Levels() broker.LevelsMap {
3476 return sess.levels
3477 }
3478@@ -147,6 +157,7 @@
3479 TopLevel: topLevel,
3480 NotificationPayloads: payloads,
3481 }
3482+ broadcastExchg.Init()
3483 sess.exchanges <- broadcastExchg
3484 }
3485 }
3486@@ -157,6 +168,14 @@
3487 // pending notifications as well.
3488 func (b *SimpleBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) {
3489 // xxx sanity check DeviceId
3490+ model, err := broker.GetInfoString(connect, "device", "?")
3491+ if err != nil {
3492+ return nil, err
3493+ }
3494+ imageChannel, err := broker.GetInfoString(connect, "channel", "?")
3495+ if err != nil {
3496+ return nil, err
3497+ }
3498 levels := map[store.InternalChannelId]int64{}
3499 for hexId, v := range connect.Levels {
3500 id, err := store.HexToInternalChannelId(hexId)
3501@@ -166,14 +185,16 @@
3502 levels[id] = v
3503 }
3504 sess := &simpleBrokerSession{
3505- deviceId: connect.DeviceId,
3506- done: make(chan bool),
3507- exchanges: make(chan broker.Exchange, b.sessionQueueSize),
3508- levels: levels,
3509+ deviceId: connect.DeviceId,
3510+ model: model,
3511+ imageChannel: imageChannel,
3512+ done: make(chan bool),
3513+ exchanges: make(chan broker.Exchange, b.sessionQueueSize),
3514+ levels: levels,
3515 }
3516 b.sessionCh <- sess
3517 <-sess.done
3518- err := b.feedPending(sess)
3519+ err = b.feedPending(sess)
3520 if err != nil {
3521 return nil, err
3522 }
3523@@ -219,6 +240,7 @@
3524 TopLevel: topLevel,
3525 NotificationPayloads: payloads,
3526 }
3527+ broadcastExchg.Init()
3528 for _, sess := range b.registry {
3529 sess.exchanges <- broadcastExchg
3530 }
3531
3532=== modified file 'server/broker/simple/simple_test.go'
3533--- server/broker/simple/simple_test.go 2014-02-10 23:29:53 +0000
3534+++ server/broker/simple/simple_test.go 2014-04-04 14:39:41 +0000
3535@@ -48,6 +48,7 @@
3536 sto := store.NewInMemoryPendingStore()
3537 muchLater := time.Now().Add(10 * time.Minute)
3538 notification1 := json.RawMessage(`{"m": "M"}`)
3539+ decoded1 := map[string]interface{}{"m": "M"}
3540 sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
3541 b := NewSimpleBroker(sto, testBrokerConfig, nil)
3542 sess := &simpleBrokerSession{
3543@@ -60,6 +61,7 @@
3544 ChanId: store.SystemInternalChannelId,
3545 TopLevel: 1,
3546 NotificationPayloads: []json.RawMessage{notification1},
3547+ Decoded: []map[string]interface{}{decoded1},
3548 })
3549 }
3550
3551
3552=== modified file 'server/broker/testing/impls.go'
3553--- server/broker/testing/impls.go 2014-01-23 20:13:22 +0000
3554+++ server/broker/testing/impls.go 2014-04-04 14:39:41 +0000
3555@@ -24,6 +24,8 @@
3556 // Test implementation of BrokerSession.
3557 type TestBrokerSession struct {
3558 DeviceId string
3559+ Model string
3560+ ImageChannel string
3561 Exchanges chan broker.Exchange
3562 LevelsMap broker.LevelsMap
3563 exchgScratch broker.ExchangesScratchArea
3564@@ -33,6 +35,14 @@
3565 return tbs.DeviceId
3566 }
3567
3568+func (tbs *TestBrokerSession) DeviceImageModel() string {
3569+ return tbs.Model
3570+}
3571+
3572+func (tbs *TestBrokerSession) DeviceImageChannel() string {
3573+ return tbs.ImageChannel
3574+}
3575+
3576 func (tbs *TestBrokerSession) SessionChannel() <-chan broker.Exchange {
3577 return tbs.Exchanges
3578 }
3579
3580=== modified file 'server/broker/testsuite/suite.go'
3581--- server/broker/testsuite/suite.go 2014-03-19 23:46:18 +0000
3582+++ server/broker/testsuite/suite.go 2014-04-04 14:39:41 +0000
3583@@ -81,10 +81,20 @@
3584 b := s.MakeBroker(sto, testBrokerConfig, nil)
3585 b.Start()
3586 defer b.Stop()
3587- sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"0": 5}})
3588+ sess, err := b.Register(&protocol.ConnectMsg{
3589+ Type: "connect",
3590+ DeviceId: "dev-1",
3591+ Levels: map[string]int64{"0": 5},
3592+ Info: map[string]interface{}{
3593+ "device": "model",
3594+ "channel": "daily",
3595+ },
3596+ })
3597 c.Assert(err, IsNil)
3598 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess)
3599 c.Assert(sess.DeviceIdentifier(), Equals, "dev-1")
3600+ c.Check(sess.DeviceImageModel(), Equals, "model")
3601+ c.Check(sess.DeviceImageChannel(), Equals, "daily")
3602 c.Assert(sess.ExchangeScratchArea(), Not(IsNil))
3603 c.Check(sess.Levels(), DeepEquals, broker.LevelsMap(map[store.InternalChannelId]int64{
3604 store.SystemInternalChannelId: 5,
3605@@ -105,6 +115,22 @@
3606 c.Check(err, FitsTypeOf, &broker.ErrAbort{})
3607 }
3608
3609+func (s *CommonBrokerSuite) TestRegistrationInfoErrors(c *C) {
3610+ sto := store.NewInMemoryPendingStore()
3611+ b := s.MakeBroker(sto, testBrokerConfig, nil)
3612+ b.Start()
3613+ defer b.Stop()
3614+ info := map[string]interface{}{
3615+ "device": -1,
3616+ }
3617+ _, err := b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})
3618+ c.Check(err, Equals, broker.ErrUnexpectedValue)
3619+ info["device"] = "m"
3620+ info["channel"] = -1
3621+ _, err = b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})
3622+ c.Check(err, Equals, broker.ErrUnexpectedValue)
3623+}
3624+
3625 func (s *CommonBrokerSuite) TestRegistrationFeedPending(c *C) {
3626 sto := store.NewInMemoryPendingStore()
3627 notification1 := json.RawMessage(`{"m": "M"}`)
3628@@ -149,6 +175,7 @@
3629 func (s *CommonBrokerSuite) TestBroadcast(c *C) {
3630 sto := store.NewInMemoryPendingStore()
3631 notification1 := json.RawMessage(`{"m": "M"}`)
3632+ decoded1 := map[string]interface{}{"m": "M"}
3633 b := s.MakeBroker(sto, testBrokerConfig, nil)
3634 b.Start()
3635 defer b.Stop()
3636@@ -168,6 +195,7 @@
3637 ChanId: store.SystemInternalChannelId,
3638 TopLevel: 1,
3639 NotificationPayloads: []json.RawMessage{notification1},
3640+ Decoded: []map[string]interface{}{decoded1},
3641 })
3642 }
3643 select {
3644@@ -178,6 +206,7 @@
3645 ChanId: store.SystemInternalChannelId,
3646 TopLevel: 1,
3647 NotificationPayloads: []json.RawMessage{notification1},
3648+ Decoded: []map[string]interface{}{decoded1},
3649 })
3650 }
3651 }

Subscribers

People subscribed via source and target branches