Merge lp:~chipaca/ubuntu-push/fix-from-xnox into lp:ubuntu-push

Proposed by John Lenton
Status: Superseded
Proposed branch: lp:~chipaca/ubuntu-push/fix-from-xnox
Merge into: lp:ubuntu-push
Diff against target: 3651 lines (+1729/-310)
38 files modified
.bzrignore (+2/-0)
Makefile (+21/-6)
README (+11/-3)
bus/bus.go (+6/-1)
bus/connectivity/connectivity.go (+31/-8)
bus/connectivity/connectivity_test.go (+79/-19)
bus/endpoint.go (+18/-10)
bus/networkmanager/networkmanager.go (+71/-2)
bus/networkmanager/networkmanager_test.go (+124/-9)
bus/notifications/raw.go (+4/-8)
bus/systemimage/systemimage.go (+68/-0)
bus/systemimage/systemimage_test.go (+62/-0)
bus/testing/testing_endpoint.go (+40/-13)
bus/testing/testing_endpoint_test.go (+12/-10)
bus/urldispatcher/urldispatcher.go (+1/-1)
client/client.go (+99/-8)
client/client_test.go (+224/-49)
client/gethosts/gethost_test.go (+5/-7)
client/session/session.go (+179/-33)
client/session/session_test.go (+333/-45)
debian/config.json (+4/-1)
debian/ubuntu-push-client.conf (+3/-3)
sampleconfigs/dev.json (+3/-3)
server/acceptance/acceptanceclient.go (+6/-0)
server/acceptance/cmd/acceptanceclient.go (+7/-2)
server/acceptance/suites/broadcast.go (+48/-25)
server/acceptance/suites/helpers.go (+2/-2)
server/acceptance/suites/pingpong.go (+2/-2)
server/acceptance/suites/suite.go (+6/-4)
server/broker/broker.go (+20/-0)
server/broker/broker_test.go (+18/-0)
server/broker/exchanges.go (+31/-0)
server/broker/exchanges_test.go (+85/-25)
server/broker/exchg_impl_test.go (+30/-0)
server/broker/simple/simple.go (+32/-10)
server/broker/simple/simple_test.go (+2/-0)
server/broker/testing/impls.go (+10/-0)
server/broker/testsuite/suite.go (+30/-1)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/fix-from-xnox
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+214252@code.launchpad.net

Commit message

Fix upstart session job to start/stop when session bus starts/stop,
rather than start when any dbus event is emitted. (LP: #1302516)

Description of the change

Fix upstart session job to start/stop when session bus starts/stop,
rather than start when any dbus event is emitted. (LP: #1302516)

To post a comment you must log in.
Revision history for this message
Samuele Pedroni (pedronis) :
review: Approve

Unmerged revisions

110. By Dimitri John Ledkov

Fix upstart session job to start/stop when session bus starts/stop, rather than start when any dbus event is emitted. (LP: #1302516)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file '.bzrignore'
--- .bzrignore 2014-02-07 19:36:38 +0000
+++ .bzrignore 2014-04-04 13:58:43 +0000
@@ -11,3 +11,5 @@
11debian/*.ex11debian/*.ex
12debian/*.EX12debian/*.EX
13debian/*.substvars13debian/*.substvars
14ubuntu-push-client
15push-server-dev
1416
=== modified file 'Makefile'
--- Makefile 2014-03-12 13:23:26 +0000
+++ Makefile 2014-04-04 13:58:43 +0000
@@ -12,6 +12,8 @@
12GODEPS += launchpad.net/go-xdg/v012GODEPS += launchpad.net/go-xdg/v0
13GODEPS += code.google.com/p/gosqlite/sqlite313GODEPS += code.google.com/p/gosqlite/sqlite3
1414
15TOTEST = $(shell env GOPATH=$(GOPATH) go list $(PROJECT)/...|grep -v acceptance|grep -v http13client )
16
15bootstrap:17bootstrap:
16 mkdir -p $(GOPATH)/bin18 mkdir -p $(GOPATH)/bin
17 mkdir -p $(GOPATH)/pkg19 mkdir -p $(GOPATH)/pkg
@@ -21,17 +23,29 @@
21 go install $(GODEPS)23 go install $(GODEPS)
2224
23check:25check:
24 go test $(TESTFLAGS) $(PROJECT)/...26 go test $(TESTFLAGS) $(TOTEST)
2527
26check-race:28check-race:
27 go test $(TESTFLAGS) -race $(PROJECT)/...29 go test $(TESTFLAGS) -race $(TOTEST)
30
31acceptance:
32 cd server/acceptance; ./acceptance.sh
33
34build-client:
35 go build ubuntu-push-client.go
36
37build-server-dev:
38 go build -o push-server-dev launchpad.net/ubuntu-push/server/dev
39
40run-server-dev:
41 go run server/dev/*.go sampleconfigs/dev.json
2842
29coverage-summary:43coverage-summary:
30 go test $(TESTFLAGS) -a -cover $(PROJECT)/...44 go test $(TESTFLAGS) -a -cover $(TOTEST)
3145
32coverage-html:46coverage-html:
33 mkdir -p coverhtml47 mkdir -p coverhtml
34 for pkg in $$(go list $(PROJECT)/...|grep -v acceptance ); do \48 for pkg in $(TOTEST); do \
35 relname="$${pkg#$(PROJECT)/}" ; \49 relname="$${pkg#$(PROJECT)/}" ; \
36 mkdir -p coverhtml/$$(dirname $${relname}) ; \50 mkdir -p coverhtml/$$(dirname $${relname}) ; \
37 go test $(TESTFLAGS) -a -coverprofile=coverhtml/$${relname}.out $$pkg ; \51 go test $(TESTFLAGS) -a -coverprofile=coverhtml/$${relname}.out $$pkg ; \
@@ -52,5 +66,6 @@
52 # requires graphviz installed66 # requires graphviz installed
53 dot -Tsvg $< > $@67 dot -Tsvg $< > $@
5468
55.PHONY: bootstrap check check-race format check-format coverage-summary \69.PHONY: bootstrap check check-race format check-format \
56 coverage-html protocol-diagrams70 acceptance build-client build server-dev run-server-dev \
71 coverage-summary coverage-html protocol-diagrams
5772
=== modified file 'README'
--- README 2014-02-21 16:17:28 +0000
+++ README 2014-04-04 13:58:43 +0000
@@ -15,8 +15,7 @@
15 make check15 make check
1616
17To produce coverage reports you need Go 1.2 (default on Trusty) and17To produce coverage reports you need Go 1.2 (default on Trusty) and
18the cover tool (the latter can be obtained atm with something like:18the cover tool (in the golang-go.tools package),
19sudo GOPATH=<go-workspace> go get code.google.com/p/go.tools/cmd/cover ),
20then run:19then run:
2120
22 make coverage-summary21 make coverage-summary
@@ -31,4 +30,13 @@
3130
32To run the acceptance tests, change to the acceptance subdir and run:31To run the acceptance tests, change to the acceptance subdir and run:
3332
34 ./acceptance.sh33 make acceptance
34
35There are build targets to build the client:
36
37 make build-client
38
39building ubuntu-push-client, and to run the development server:
40
41 make run-server-dev
42
3543
=== modified file 'bus/bus.go'
--- bus/bus.go 2014-02-06 09:57:49 +0000
+++ bus/bus.go 2014-04-04 13:58:43 +0000
@@ -57,11 +57,16 @@
57 }57 }
58}58}
5959
60// Connect() connects to the bus, and returns the bus endpoint (and/or error).60// Endpoint returns a bus endpoint.
61func (bus concreteBus) Endpoint(addr Address, log logger.Logger) Endpoint {61func (bus concreteBus) Endpoint(addr Address, log logger.Logger) Endpoint {
62 return newEndpoint(bus, addr, log)62 return newEndpoint(bus, addr, log)
63}63}
6464
65// Args helps build arguments for endpoint Call().
66func Args(args ...interface{}) []interface{} {
67 return args
68}
69
65/*70/*
66 private methods71 private methods
67*/72*/
6873
=== modified file 'bus/connectivity/connectivity.go'
--- bus/connectivity/connectivity.go 2014-03-20 14:21:24 +0000
+++ bus/connectivity/connectivity.go 2014-04-04 13:58:43 +0000
@@ -47,6 +47,7 @@
4747
48type connectedState struct {48type connectedState struct {
49 networkStateCh <-chan networkmanager.State49 networkStateCh <-chan networkmanager.State
50 networkConCh <-chan string
50 config ConnectivityConfig51 config ConnectivityConfig
51 log logger.Logger52 log logger.Logger
52 endp bus.Endpoint53 endp bus.Endpoint
@@ -62,7 +63,9 @@
62// up the watch.63// up the watch.
63func (cs *connectedState) start() networkmanager.State {64func (cs *connectedState) start() networkmanager.State {
64 var initial networkmanager.State65 var initial networkmanager.State
65 var ch <-chan networkmanager.State66 var stateCh <-chan networkmanager.State
67 var primary string
68 var conCh <-chan string
66 var err error69 var err error
67 for {70 for {
68 ar := util.NewAutoRedialer(cs.endp)71 ar := util.NewAutoRedialer(cs.endp)
@@ -77,13 +80,24 @@
77 }80 }
7881
79 // set up the watch82 // set up the watch
80 ch, err = nm.WatchState()83 stateCh, err = nm.WatchState()
81 if err != nil {84 if err != nil {
82 cs.log.Debugf("Failed to set up the watch: %s", err)85 cs.log.Debugf("failed to set up the state watch: %s", err)
83 goto Continue86 goto Continue
84 }87 }
8588
86 cs.networkStateCh = ch89 primary = nm.GetPrimaryConnection()
90 cs.log.Debugf("primary connection starts as %#v", primary)
91
92 conCh, err = nm.WatchPrimaryConnection()
93 if err != nil {
94 cs.log.Debugf("failed to set up the connection watch: %s", err)
95 goto Continue
96 }
97
98 cs.networkStateCh = stateCh
99 cs.networkConCh = conCh
100
87 return initial101 return initial
88102
89 Continue:103 Continue:
@@ -102,6 +116,15 @@
102Loop:116Loop:
103 for {117 for {
104 select {118 select {
119 case <-cs.networkConCh:
120 cs.webgetCh = nil
121 cs.timer.Reset(stabilizingTimeout)
122 log.Debugf("PrimaryConnection changed. Assuming disconnect.")
123 if cs.lastSent == true {
124 cs.lastSent = false
125 break Loop
126 }
127
105 case v, ok := <-cs.networkStateCh:128 case v, ok := <-cs.networkStateCh:
106 if !ok {129 if !ok {
107 // tear it all down and start over130 // tear it all down and start over
108131
=== modified file 'bus/connectivity/connectivity_test.go'
--- bus/connectivity/connectivity_test.go 2014-03-20 12:15:47 +0000
+++ bus/connectivity/connectivity_test.go 2014-04-04 13:58:43 +0000
@@ -17,6 +17,7 @@
17package connectivity17package connectivity
1818
19import (19import (
20 "launchpad.net/go-dbus/v1"
20 . "launchpad.net/gocheck"21 . "launchpad.net/gocheck"
21 "launchpad.net/ubuntu-push/bus/networkmanager"22 "launchpad.net/ubuntu-push/bus/networkmanager"
22 testingbus "launchpad.net/ubuntu-push/bus/testing"23 testingbus "launchpad.net/ubuntu-push/bus/testing"
@@ -84,6 +85,17 @@
84 c.Check(cs.connAttempts, Equals, uint32(6))85 c.Check(cs.connAttempts, Equals, uint32(6))
85}86}
8687
88// when some of the calls to NetworkManager fails for a bit, we're still OK
89func (s *ConnSuite) TestStartRetriesCall2(c *C) {
90 cond := condition.Chain(3, condition.Work(true), 1, condition.Work(false),
91 1, condition.Work(true))
92
93 endp := testingbus.NewTestingEndpoint(condition.Work(true), cond, uint32(networkmanager.Connecting))
94 cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
95
96 c.Check(cs.start(), Equals, networkmanager.Connecting)
97}
98
87// when... and bear with me... the bus works, and the first call to99// when... and bear with me... the bus works, and the first call to
88// get network manager's state works, but then you can't establish the100// get network manager's state works, but then you can't establish the
89// watch, we recover and try again.101// watch, we recover and try again.
@@ -213,24 +225,72 @@
213 }{225 }{
214 {false, "first state is always false", 0},226 {false, "first state is always false", 0},
215 {true, "then it should be true as per ConnectedGlobal above", 0},227 {true, "then it should be true as per ConnectedGlobal above", 0},
216 {false, "then, false (upon receiving the next ConnectedGlobal)", 1},228 {false, "then, false (upon receiving the next ConnectedGlobal)", 2},
217 {true, "then it should be true (webcheck passed)", 0},229 {true, "then it should be true (webcheck passed)", 0},
218 {false, "then it should be false (Disconnected)", 1},230 {false, "then it should be false (Disconnected)", 2},
219 {false, "then it should be false again because it's restarted", 1},231 {false, "then it should be false again because it's restarted", 2},
220 }232 }
221233
222 for i, expected := range expecteds {234 for i, expected := range expecteds {
223 for j := 0; j < expected.n; j++ {235 for j := 0; j < expected.n; j++ {
224 watchTicker <- true236 watchTicker <- true
225 }237 }
226 timer.Reset(dt)238 timer.Reset(dt)
227 select {239 select {
228 case v = <-out:240 case v = <-out:
229 break241 break
230 case <-timer.C:242 case <-timer.C:
231 c.Fatalf("Timed out before getting value (#%d: %s)", i+1, expected.s)243 c.Fatalf("Timed out before getting value (#%d: %s)", i+1, expected.s)
232 }244 }
233245 c.Assert(v, Equals, expected.p, Commentf(expected.s))
234 c.Check(v, Equals, expected.p, Commentf(expected.s))246 }
247}
248
249func (s *ConnSuite) TestRun4Active(c *C) {
250 ts := httptest.NewServer(mkHandler(staticText))
251 defer ts.Close()
252
253 cfg := ConnectivityConfig{
254 ConnectivityCheckURL: ts.URL,
255 ConnectivityCheckMD5: staticHash,
256 RecheckTimeout: config.ConfigTimeDuration{time.Second},
257 }
258
259 endp := testingbus.NewTestingEndpoint(condition.Work(true), condition.Work(true),
260 uint32(networkmanager.ConnectedGlobal),
261 map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("hello")}},
262 )
263
264 watchTicker := make(chan bool)
265 testingbus.SetWatchTicker(endp, watchTicker)
266
267 out := make(chan bool)
268 dt := time.Second / 10
269 timer := time.NewTimer(dt)
270 go ConnectedState(endp, cfg, s.log, out)
271 var v bool
272 expecteds := []struct {
273 p bool
274 s string
275 n int
276 }{
277 {false, "first state is always false", 0},
278 {true, "then it should be true as per ConnectedGlobal above", 0},
279 {false, "then, false (PrimaryConnection changed)", 2},
280 {true, "then it should be true (webcheck passed)", 0},
281 }
282
283 for i, expected := range expecteds {
284 for j := 0; j < expected.n; j++ {
285 watchTicker <- true
286 }
287 timer.Reset(dt)
288 select {
289 case v = <-out:
290 break
291 case <-timer.C:
292 c.Fatalf("Timed out before getting value (#%d: %s)", i+1, expected.s)
293 }
294 c.Assert(v, Equals, expected.p, Commentf(expected.s))
235 }295 }
236}296}
237297
=== modified file 'bus/endpoint.go'
--- bus/endpoint.go 2014-02-21 16:17:28 +0000
+++ bus/endpoint.go 2014-04-04 13:58:43 +0000
@@ -32,7 +32,7 @@
32// bus.Endpoint represents the DBus connection itself.32// bus.Endpoint represents the DBus connection itself.
33type Endpoint interface {33type Endpoint interface {
34 WatchSignal(member string, f func(...interface{}), d func()) error34 WatchSignal(member string, f func(...interface{}), d func()) error
35 Call(member string, args ...interface{}) ([]interface{}, error)35 Call(member string, args []interface{}, rvs ...interface{}) error
36 GetProperty(property string) (interface{}, error)36 GetProperty(property string) (interface{}, error)
37 Dial() error37 Dial() error
38 Close()38 Close()
@@ -118,16 +118,20 @@
118 return nil118 return nil
119}119}
120120
121// Call() invokes the provided member method (on the name, path and interface121// Call() invokes the provided member method (on the name, path and
122// provided when creating the endpoint). The return value is unpacked before122// interface provided when creating the endpoint). args can be built
123// being returned.123// using bus.Args(...). The return value is unpacked into rvs before being
124func (endp *endpoint) Call(member string, args ...interface{}) ([]interface{}, error) {124// returned.
125func (endp *endpoint) Call(member string, args []interface{}, rvs ...interface{}) error {
125 msg, err := endp.proxy.Call(endp.addr.Interface, member, args...)126 msg, err := endp.proxy.Call(endp.addr.Interface, member, args...)
126 if err != nil {127 if err != nil {
127 return nil, err128 return err
128 }129 }
129 rvs := endp.unpackOneMsg(msg, member)130 err = msg.Args(rvs...)
130 return rvs, nil131 if err != nil {
132 return err
133 }
134 return nil
131}135}
132136
133// GetProperty uses the org.freedesktop.DBus.Properties interface's Get method137// GetProperty uses the org.freedesktop.DBus.Properties interface's Get method
@@ -175,7 +179,11 @@
175179
176// unpackOneMsg unpacks the value from the response msg180// unpackOneMsg unpacks the value from the response msg
177func (endp *endpoint) unpackOneMsg(msg *dbus.Message, member string) []interface{} {181func (endp *endpoint) unpackOneMsg(msg *dbus.Message, member string) []interface{} {
178 return msg.AllArgs()182 var varmap map[string]dbus.Variant
183 if err := msg.Args(&varmap); err != nil {
184 return msg.AllArgs()
185 }
186 return []interface{}{varmap}
179}187}
180188
181// unpackMessages unpacks the value from the watch189// unpackMessages unpacks the value from the watch
182190
=== modified file 'bus/networkmanager/networkmanager.go'
--- bus/networkmanager/networkmanager.go 2014-01-21 13:21:19 +0000
+++ bus/networkmanager/networkmanager.go 2014-04-04 13:58:43 +0000
@@ -20,6 +20,8 @@
20package networkmanager20package networkmanager
2121
22import (22import (
23 "launchpad.net/go-dbus/v1"
24
23 "launchpad.net/ubuntu-push/bus"25 "launchpad.net/ubuntu-push/bus"
24 "launchpad.net/ubuntu-push/logger"26 "launchpad.net/ubuntu-push/logger"
25)27)
@@ -41,6 +43,12 @@
41 // WatchState listens for changes to NetworkManager's state, and sends43 // WatchState listens for changes to NetworkManager's state, and sends
42 // them out over the channel returned.44 // them out over the channel returned.
43 WatchState() (<-chan State, error)45 WatchState() (<-chan State, error)
46 // GetPrimaryConnection fetches and returns NetworkManager's current
47 // primary connection.
48 GetPrimaryConnection() string
49 // WatchPrimaryConnection listens for changes of NetworkManager's
50 // Primary Connection, and sends it out over the channel returned.
51 WatchPrimaryConnection() (<-chan string, error)
44}52}
4553
46type networkManager struct {54type networkManager struct {
@@ -68,13 +76,28 @@
68 return Unknown76 return Unknown
69 }77 }
7078
71 return State(s.(uint32))79 v, ok := s.(uint32)
80 if !ok {
81 nm.log.Errorf("Got weird state: %#v", s)
82 return Unknown
83 }
84
85 return State(v)
72}86}
7387
74func (nm *networkManager) WatchState() (<-chan State, error) {88func (nm *networkManager) WatchState() (<-chan State, error) {
75 ch := make(chan State)89 ch := make(chan State)
76 err := nm.bus.WatchSignal("StateChanged",90 err := nm.bus.WatchSignal("StateChanged",
77 func(ns ...interface{}) { ch <- State(ns[0].(uint32)) },91 func(ns ...interface{}) {
92 stint, ok := ns[0].(uint32)
93 if !ok {
94 nm.log.Errorf("got weird state: %#v", ns[0])
95 return
96 }
97 st := State(stint)
98 nm.log.Debugf("got state: %s", st)
99 ch <- State(stint)
100 },
78 func() { close(ch) })101 func() { close(ch) })
79 if err != nil {102 if err != nil {
80 nm.log.Debugf("Failed to set up the watch: %s", err)103 nm.log.Debugf("Failed to set up the watch: %s", err)
@@ -83,3 +106,49 @@
83106
84 return ch, nil107 return ch, nil
85}108}
109
110func (nm *networkManager) GetPrimaryConnection() string {
111 s, err := nm.bus.GetProperty("PrimaryConnection")
112 if err != nil {
113 nm.log.Errorf("Failed gettting current primary connection: %s", err)
114 nm.log.Debugf("Defaulting primary connection to empty")
115 return ""
116 }
117
118 v, ok := s.(dbus.ObjectPath)
119 if !ok {
120 nm.log.Errorf("got weird PrimaryConnection: %#v", s)
121 return ""
122 }
123
124 return string(v)
125}
126
127func (nm *networkManager) WatchPrimaryConnection() (<-chan string, error) {
128 ch := make(chan string)
129 err := nm.bus.WatchSignal("PropertiesChanged",
130 func(ppsi ...interface{}) {
131 pps, ok := ppsi[0].(map[string]dbus.Variant)
132 if !ok {
133 nm.log.Errorf("got weird PropertiesChanged: %#v", ppsi[0])
134 return
135 }
136 v, ok := pps["PrimaryConnection"]
137 if !ok {
138 return
139 }
140 con, ok := v.Value.(dbus.ObjectPath)
141 if !ok {
142 nm.log.Errorf("got weird PrimaryConnection via PropertiesChanged: %#v", v)
143 return
144 }
145 nm.log.Debugf("got primary connection: %s", con)
146 ch <- string(con)
147 }, func() { close(ch) })
148 if err != nil {
149 nm.log.Debugf("Failed to set up the watch: %s", err)
150 return nil, err
151 }
152
153 return ch, nil
154}
86155
=== modified file 'bus/networkmanager/networkmanager_test.go'
--- bus/networkmanager/networkmanager_test.go 2014-02-05 18:17:26 +0000
+++ bus/networkmanager/networkmanager_test.go 2014-04-04 13:58:43 +0000
@@ -17,12 +17,15 @@
17package networkmanager17package networkmanager
1818
19import (19import (
20 "testing"
21
22 "launchpad.net/go-dbus/v1"
20 . "launchpad.net/gocheck"23 . "launchpad.net/gocheck"
24
21 testingbus "launchpad.net/ubuntu-push/bus/testing"25 testingbus "launchpad.net/ubuntu-push/bus/testing"
22 "launchpad.net/ubuntu-push/logger"26 "launchpad.net/ubuntu-push/logger"
23 helpers "launchpad.net/ubuntu-push/testing"27 helpers "launchpad.net/ubuntu-push/testing"
24 "launchpad.net/ubuntu-push/testing/condition"28 "launchpad.net/ubuntu-push/testing/condition"
25 "testing"
26)29)
2730
28// hook up gocheck31// hook up gocheck
@@ -71,7 +74,7 @@
7174
72// GetState returns the right state when dbus works but delivers rubbish values75// GetState returns the right state when dbus works but delivers rubbish values
73func (s *NMSuite) TestGetStateRubbishValues(c *C) {76func (s *NMSuite) TestGetStateRubbishValues(c *C) {
74 nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false), 42), s.log)77 nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(true), "Unknown"), s.log)
75 state := nm.GetState()78 state := nm.GetState()
76 c.Check(state, Equals, Unknown)79 c.Check(state, Equals, Unknown)
77}80}
@@ -101,11 +104,123 @@
101}104}
102105
103// WatchState calls close on its channel when the watch bails106// WatchState calls close on its channel when the watch bails
104func (s *NMSuite) TestWatchClosesOnWatchBail(c *C) {107func (s *NMSuite) TestWatchStateClosesOnWatchBail(c *C) {
105 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true))108 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true))
106 nm := New(tc, s.log)109 nm := New(tc, s.log)
107 ch, err := nm.WatchState()110 ch, err := nm.WatchState()
108 c.Check(err, IsNil)111 c.Check(err, IsNil)
109 _, ok := <-ch112 _, ok := <-ch
110 c.Check(ok, Equals, false)113 c.Check(ok, Equals, false)
114}
115
116// WatchState survives rubbish values
117func (s *NMSuite) TestWatchStateSurvivesRubbishValues(c *C) {
118 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), "a")
119 nm := New(tc, s.log)
120 ch, err := nm.WatchState()
121 c.Check(err, IsNil)
122 _, ok := <-ch
123 c.Check(ok, Equals, false)
124}
125
126// GetPrimaryConnection returns the right state when everything works
127func (s *NMSuite) TestGetPrimaryConnection(c *C) {
128 nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(true), dbus.ObjectPath("/a/1")), s.log)
129 con := nm.GetPrimaryConnection()
130 c.Check(con, Equals, "/a/1")
131}
132
133// GetPrimaryConnection returns the right state when dbus fails
134func (s *NMSuite) TestGetPrimaryConnectionFail(c *C) {
135 nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false)), s.log)
136 con := nm.GetPrimaryConnection()
137 c.Check(con, Equals, "")
138}
139
140// GetPrimaryConnection returns the right state when dbus works but delivers rubbish values
141func (s *NMSuite) TestGetPrimaryConnectionRubbishValues(c *C) {
142 nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(true), "broken"), s.log)
143 con := nm.GetPrimaryConnection()
144 c.Check(con, Equals, "")
145}
146
147// GetPrimaryConnection returns the right state when dbus works but delivers a rubbish structure
148func (s *NMSuite) TestGetPrimaryConnectionRubbishStructure(c *C) {
149 nm := New(testingbus.NewMultiValuedTestingEndpoint(nil, condition.Work(true), []interface{}{}), s.log)
150 con := nm.GetPrimaryConnection()
151 c.Check(con, Equals, "")
152}
153
154func mkPriConMap(priCon string) map[string]dbus.Variant {
155 m := make(map[string]dbus.Variant)
156 m["PrimaryConnection"] = dbus.Variant{dbus.ObjectPath(priCon)}
157 return m
158}
159
160// WatchPrimaryConnection sends a stream of Connections over the channel
161func (s *NMSuite) TestWatchPrimaryConnection(c *C) {
162 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true),
163 mkPriConMap("/a/1"),
164 mkPriConMap("/b/2"),
165 mkPriConMap("/c/3"))
166 nm := New(tc, s.log)
167 ch, err := nm.WatchPrimaryConnection()
168 c.Check(err, IsNil)
169 l := []string{<-ch, <-ch, <-ch}
170 c.Check(l, DeepEquals, []string{"/a/1", "/b/2", "/c/3"})
171}
172
173// WatchPrimaryConnection returns on error if the dbus call fails
174func (s *NMSuite) TestWatchPrimaryConnectionFails(c *C) {
175 nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false)), s.log)
176 _, err := nm.WatchPrimaryConnection()
177 c.Check(err, NotNil)
178}
179
180// WatchPrimaryConnection calls close on its channel when the watch bails
181func (s *NMSuite) TestWatchPrimaryConnectionClosesOnWatchBail(c *C) {
182 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true))
183 nm := New(tc, s.log)
184 ch, err := nm.WatchPrimaryConnection()
185 c.Check(err, IsNil)
186 _, ok := <-ch
187 c.Check(ok, Equals, false)
188}
189
190// WatchPrimaryConnection survives rubbish values
191func (s *NMSuite) TestWatchPrimaryConnectionSurvivesRubbishValues(c *C) {
192 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), "a")
193 nm := New(tc, s.log)
194 ch, err := nm.WatchPrimaryConnection()
195 c.Assert(err, IsNil)
196 _, ok := <-ch
197 c.Check(ok, Equals, false)
198}
199
200// WatchPrimaryConnection ignores non-PrimaryConnection PropertyChanged
201func (s *NMSuite) TestWatchPrimaryConnectionIgnoresIrrelephant(c *C) {
202 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true),
203 map[string]dbus.Variant{"foo": dbus.Variant{}},
204 map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("42")}},
205 )
206 nm := New(tc, s.log)
207 ch, err := nm.WatchPrimaryConnection()
208 c.Assert(err, IsNil)
209 v, ok := <-ch
210 c.Check(ok, Equals, true)
211 c.Check(v, Equals, "42")
212}
213
214// WatchPrimaryConnection ignores rubbish PrimaryConnections
215func (s *NMSuite) TestWatchPrimaryConnectionIgnoresRubbishValues(c *C) {
216 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true),
217 map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{-12}},
218 map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("42")}},
219 )
220 nm := New(tc, s.log)
221 ch, err := nm.WatchPrimaryConnection()
222 c.Assert(err, IsNil)
223 v, ok := <-ch
224 c.Check(ok, Equals, true)
225 c.Check(v, Equals, "42")
111}226}
112227
=== modified file 'bus/notifications/raw.go'
--- bus/notifications/raw.go 2014-01-27 14:22:00 +0000
+++ bus/notifications/raw.go 2014-04-04 13:58:43 +0000
@@ -22,7 +22,6 @@
22// this is the lower-level api22// this is the lower-level api
2323
24import (24import (
25 "fmt"
26 "launchpad.net/go-dbus/v1"25 "launchpad.net/go-dbus/v1"
27 "launchpad.net/ubuntu-push/bus"26 "launchpad.net/ubuntu-push/bus"
28 "launchpad.net/ubuntu-push/logger"27 "launchpad.net/ubuntu-push/logger"
@@ -69,16 +68,13 @@
69 timeout int32) (uint32, error) {68 timeout int32) (uint32, error) {
70 // that's a long argument list! Take a breather.69 // that's a long argument list! Take a breather.
71 //70 //
72 rvs, err := raw.bus.Call("Notify", app_name, reuse_id, icon,71 var res uint32
73 summary, body, actions, hints, timeout)72 err := raw.bus.Call("Notify", bus.Args(app_name, reuse_id, icon,
73 summary, body, actions, hints, timeout), &res)
74 if err != nil {74 if err != nil {
75 return 0, err75 return 0, err
76 }76 }
77 if len(rvs) != 1 {77 return res, nil
78 return 0, fmt.Errorf("Wrong number of values in Notify response: %d",
79 len(rvs))
80 }
81 return rvs[0].(uint32), nil
82}78}
8379
84// WatchActions listens for ActionInvoked signals from the notification daemon80// WatchActions listens for ActionInvoked signals from the notification daemon
8581
=== added directory 'bus/systemimage'
=== added file 'bus/systemimage/systemimage.go'
--- bus/systemimage/systemimage.go 1970-01-01 00:00:00 +0000
+++ bus/systemimage/systemimage.go 2014-04-04 13:58:43 +0000
@@ -0,0 +1,68 @@
1/*
2 Copyright 2013-2014 Canonical Ltd.
3
4 This program is free software: you can redistribute it and/or modify it
5 under the terms of the GNU General Public License version 3, as published
6 by the Free Software Foundation.
7
8 This program is distributed in the hope that it will be useful, but
9 WITHOUT ANY WARRANTY; without even the implied warranties of
10 MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along
14 with this program. If not, see <http://www.gnu.org/licenses/>.
15*/
16
17// Package systemimage is a mimimal wrapper for the system-image dbus API.
18package systemimage
19
20import (
21 "launchpad.net/ubuntu-push/bus"
22 "launchpad.net/ubuntu-push/logger"
23)
24
25// system-image service lives on a well-known bus.Address
26var BusAddress bus.Address = bus.Address{
27 Interface: "com.canonical.SystemImage",
28 Path: "/Service",
29 Name: "com.canonical.SystemImage",
30}
31
32// InfoResult holds the result of the system-image service Info method.
33type InfoResult struct {
34 BuildNumber int32
35 Device string
36 Channel string
37 // xxx channel_target missing
38 LastUpdate string
39 VersionDetail map[string]string
40}
41
42// A SystemImage exposes the a subset of system-image service.
43type SystemImage interface {
44 Info() (*InfoResult, error)
45}
46
47type systemImage struct {
48 endp bus.Endpoint
49 log logger.Logger
50}
51
52// New builds a new system-image service wrapper that uses the provided bus.Endpoint
53func New(endp bus.Endpoint, log logger.Logger) SystemImage {
54 return &systemImage{endp, log}
55}
56
57var _ SystemImage = &systemImage{} // ensures it conforms
58
59func (si *systemImage) Info() (*InfoResult, error) {
60 si.log.Debugf("Invoking Info")
61 res := &InfoResult{}
62 err := si.endp.Call("Info", bus.Args(), &res.BuildNumber, &res.Device, &res.Channel, &res.LastUpdate, &res.VersionDetail)
63 if err != nil {
64 si.log.Errorf("Info failed: %v", err)
65 return nil, err
66 }
67 return res, err
68}
069
=== added file 'bus/systemimage/systemimage_test.go'
--- bus/systemimage/systemimage_test.go 1970-01-01 00:00:00 +0000
+++ bus/systemimage/systemimage_test.go 2014-04-04 13:58:43 +0000
@@ -0,0 +1,62 @@
1/*
2 Copyright 2013-2014 Canonical Ltd.
3
4 This program is free software: you can redistribute it and/or modify it
5 under the terms of the GNU General Public License version 3, as published
6 by the Free Software Foundation.
7
8 This program is distributed in the hope that it will be useful, but
9 WITHOUT ANY WARRANTY; without even the implied warranties of
10 MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along
14 with this program. If not, see <http://www.gnu.org/licenses/>.
15*/
16
17package systemimage
18
19import (
20 "testing"
21
22 . "launchpad.net/gocheck"
23
24 testibus "launchpad.net/ubuntu-push/bus/testing"
25 "launchpad.net/ubuntu-push/logger"
26 helpers "launchpad.net/ubuntu-push/testing"
27 "launchpad.net/ubuntu-push/testing/condition"
28)
29
30// hook up gocheck
31func TestSystemImage(t *testing.T) { TestingT(t) }
32
33type SISuite struct {
34 log logger.Logger
35}
36
37var _ = Suite(&SISuite{})
38
39func (s *SISuite) SetUpTest(c *C) {
40 s.log = helpers.NewTestLogger(c, "debug")
41}
42
43func (s *SISuite) TestWorks(c *C) {
44 endp := testibus.NewMultiValuedTestingEndpoint(nil, condition.Work(true), []interface{}{int32(101), "mako", "daily", "Unknown", map[string]string{}})
45 si := New(endp, s.log)
46 res, err := si.Info()
47 c.Assert(err, IsNil)
48 c.Check(res, DeepEquals, &InfoResult{
49 BuildNumber: 101,
50 Device: "mako",
51 Channel: "daily",
52 LastUpdate: "Unknown",
53 VersionDetail: map[string]string{},
54 })
55}
56
57func (s *SISuite) TestFailsIfCallFails(c *C) {
58 endp := testibus.NewTestingEndpoint(nil, condition.Work(false))
59 si := New(endp, s.log)
60 _, err := si.Info()
61 c.Check(err, NotNil)
62}
063
=== modified file 'bus/testing/testing_endpoint.go'
--- bus/testing/testing_endpoint.go 2014-02-06 13:26:13 +0000
+++ bus/testing/testing_endpoint.go 2014-04-04 13:58:43 +0000
@@ -21,6 +21,9 @@
21import (21import (
22 "errors"22 "errors"
23 "fmt"23 "fmt"
24
25 "launchpad.net/go-dbus/v1"
26
24 "launchpad.net/ubuntu-push/bus"27 "launchpad.net/ubuntu-push/bus"
25 "launchpad.net/ubuntu-push/testing/condition"28 "launchpad.net/ubuntu-push/testing/condition"
26 "sync"29 "sync"
@@ -37,6 +40,7 @@
37 callCond condition.Interface40 callCond condition.Interface
38 retvals [][]interface{}41 retvals [][]interface{}
39 watchTicker chan bool42 watchTicker chan bool
43 watchLck sync.RWMutex
40 callArgs []callArgs44 callArgs []callArgs
41 callArgsLck sync.RWMutex45 callArgsLck sync.RWMutex
42}46}
@@ -62,7 +66,9 @@
62// instead of the default timeout to wait while sending values over66// instead of the default timeout to wait while sending values over
63// WatchSignal. Set it to nil again to restore default behaviour.67// WatchSignal. Set it to nil again to restore default behaviour.
64func SetWatchTicker(tc bus.Endpoint, watchTicker chan bool) {68func SetWatchTicker(tc bus.Endpoint, watchTicker chan bool) {
69 tc.(*testingEndpoint).watchLck.Lock()
65 tc.(*testingEndpoint).watchTicker = watchTicker70 tc.(*testingEndpoint).watchTicker = watchTicker
71 tc.(*testingEndpoint).watchLck.Unlock()
66}72}
6773
68// GetCallArgs returns a list of the arguments for each Call() invocation.74// GetCallArgs returns a list of the arguments for each Call() invocation.
@@ -79,8 +85,11 @@
79 go func() {85 go func() {
80 for _, v := range tc.retvals {86 for _, v := range tc.retvals {
81 f(v...)87 f(v...)
82 if tc.watchTicker != nil {88 tc.watchLck.RLock()
83 <-tc.watchTicker89 ticker := tc.watchTicker
90 tc.watchLck.RUnlock()
91 if ticker != nil {
92 <-ticker
84 } else {93 } else {
85 time.Sleep(10 * time.Millisecond)94 time.Sleep(10 * time.Millisecond)
86 }95 }
@@ -95,32 +104,50 @@
95104
96// See Endpoint's Call. This Call will check its condition to decide whether105// See Endpoint's Call. This Call will check its condition to decide whether
97// to return an error, or the first of its return values106// to return an error, or the first of its return values
98func (tc *testingEndpoint) Call(member string, args ...interface{}) ([]interface{}, error) {107func (tc *testingEndpoint) Call(member string, args []interface{}, rvs ...interface{}) error {
99 tc.callArgsLck.Lock()108 tc.callArgsLck.Lock()
100 defer tc.callArgsLck.Unlock()109 defer tc.callArgsLck.Unlock()
101110
102 tc.callArgs = append(tc.callArgs, callArgs{member, args})111 tc.callArgs = append(tc.callArgs, callArgs{member, args})
103 if tc.callCond.OK() {112 if tc.callCond.OK() {
113 expected := len(rvs)
114 var provided int
104 if len(tc.retvals) == 0 {115 if len(tc.retvals) == 0 {
105 panic("No return values provided!")116 if expected != 0 {
106 }117 panic("No return values provided!")
107 return tc.retvals[0], nil118 }
119 provided = 0
120 } else {
121 provided = len(tc.retvals[0])
122 }
123 if provided != expected {
124 return errors.New("provided/expected return vals mismatch")
125 }
126 if provided != 0 {
127 x := dbus.NewMethodCallMessage("", "", "", "")
128 err := x.AppendArgs(tc.retvals[0]...)
129 if err != nil {
130 return err
131 }
132 err = x.Args(rvs...)
133 if err != nil {
134 return err
135 }
136 }
137 return nil
108 } else {138 } else {
109 return nil, errors.New("no way")139 return errors.New("no way")
110 }140 }
111}141}
112142
113// See Endpoint's GetProperty. This one is just another name for Call.143// See Endpoint's GetProperty. This one is just another name for Call.
114func (tc *testingEndpoint) GetProperty(property string) (interface{}, error) {144func (tc *testingEndpoint) GetProperty(property string) (interface{}, error) {
115 rvs, err := tc.Call(property)145 var res interface{}
146 err := tc.Call(property, bus.Args(), &res)
116 if err != nil {147 if err != nil {
117 return nil, err148 return nil, err
118 }149 }
119 if len(rvs) != 1 {150 return res, err
120 return nil, errors.New("Wrong number of values given to testingEndpoint" +
121 " -- GetProperty only returns a single value for now!")
122 }
123 return rvs[0], err
124}151}
125152
126// See Endpoint's Dial. This one will check its dialCondition to153// See Endpoint's Dial. This one will check its dialCondition to
127154
=== modified file 'bus/testing/testing_endpoint_test.go'
--- bus/testing/testing_endpoint_test.go 2014-02-05 02:13:35 +0000
+++ bus/testing/testing_endpoint_test.go 2014-04-04 13:58:43 +0000
@@ -18,6 +18,7 @@
1818
19import (19import (
20 . "launchpad.net/gocheck"20 . "launchpad.net/gocheck"
21 "launchpad.net/ubuntu-push/bus"
21 "launchpad.net/ubuntu-push/testing/condition"22 "launchpad.net/ubuntu-push/testing/condition"
22 "testing"23 "testing"
23 "time"24 "time"
@@ -35,26 +36,26 @@
35func (s *TestingEndpointSuite) TestCallReturnsFirstRetval(c *C) {36func (s *TestingEndpointSuite) TestCallReturnsFirstRetval(c *C) {
36 var m, n uint32 = 42, 1737 var m, n uint32 = 42, 17
37 endp := NewTestingEndpoint(nil, condition.Work(true), m, n)38 endp := NewTestingEndpoint(nil, condition.Work(true), m, n)
38 vs, e := endp.Call("what")39 var r uint32
40 e := endp.Call("what", bus.Args(), &r)
39 c.Check(e, IsNil)41 c.Check(e, IsNil)
40 c.Check(vs, HasLen, 1)42 c.Check(r, Equals, m)
41 c.Check(vs[0], Equals, m)
42}43}
4344
44// Test the same Call() but with multi-valued endpoint45// Test the same Call() but with multi-valued endpoint
45func (s *TestingEndpointSuite) TestMultiValuedCall(c *C) {46func (s *TestingEndpointSuite) TestMultiValuedCall(c *C) {
46 var m, n uint32 = 42, 1747 var m, n uint32 = 42, 17
47 endp := NewMultiValuedTestingEndpoint(nil, condition.Work(true), []interface{}{m}, []interface{}{n})48 endp := NewMultiValuedTestingEndpoint(nil, condition.Work(true), []interface{}{m}, []interface{}{n})
48 vs, e := endp.Call("what")49 var r uint32
50 e := endp.Call("what", bus.Args(), &r)
49 c.Check(e, IsNil)51 c.Check(e, IsNil)
50 c.Check(vs, HasLen, 1)52 c.Check(r, Equals, m)
51 c.Check(vs[0], Equals, m)
52}53}
5354
54// Test that Call() with a negative condition returns an error.55// Test that Call() with a negative condition returns an error.
55func (s *TestingEndpointSuite) TestCallFails(c *C) {56func (s *TestingEndpointSuite) TestCallFails(c *C) {
56 endp := NewTestingEndpoint(nil, condition.Work(false))57 endp := NewTestingEndpoint(nil, condition.Work(false))
57 _, e := endp.Call("what")58 e := endp.Call("what", bus.Args())
58 c.Check(e, NotNil)59 c.Check(e, NotNil)
59}60}
6061
@@ -62,13 +63,14 @@
62// a helpful message.63// a helpful message.
63func (s *TestingEndpointSuite) TestCallPanicsWithNiceMessage(c *C) {64func (s *TestingEndpointSuite) TestCallPanicsWithNiceMessage(c *C) {
64 endp := NewTestingEndpoint(nil, condition.Work(true))65 endp := NewTestingEndpoint(nil, condition.Work(true))
65 c.Check(func() { endp.Call("") }, PanicMatches, "No return values provided.*")66 var x int32
67 c.Check(func() { endp.Call("", bus.Args(), &x) }, PanicMatches, "No return values provided.*")
66}68}
6769
68// Test that Call() updates callArgs70// Test that Call() updates callArgs
69func (s *TestingEndpointSuite) TestCallArgs(c *C) {71func (s *TestingEndpointSuite) TestCallArgs(c *C) {
70 endp := NewTestingEndpoint(nil, condition.Work(true), 0)72 endp := NewTestingEndpoint(nil, condition.Work(true))
71 _, err := endp.Call("what", "is", "this", "thing")73 err := endp.Call("what", bus.Args("is", "this", "thing"))
72 c.Assert(err, IsNil)74 c.Assert(err, IsNil)
73 c.Check(GetCallArgs(endp), DeepEquals,75 c.Check(GetCallArgs(endp), DeepEquals,
74 []callArgs{{"what", []interface{}{"is", "this", "thing"}}})76 []callArgs{{"what", []interface{}{"is", "this", "thing"}}})
7577
=== modified file 'bus/urldispatcher/urldispatcher.go'
--- bus/urldispatcher/urldispatcher.go 2014-01-23 00:54:51 +0000
+++ bus/urldispatcher/urldispatcher.go 2014-04-04 13:58:43 +0000
@@ -49,7 +49,7 @@
4949
50func (ud *urlDispatcher) DispatchURL(url string) error {50func (ud *urlDispatcher) DispatchURL(url string) error {
51 ud.log.Debugf("Dispatching %s", url)51 ud.log.Debugf("Dispatching %s", url)
52 _, err := ud.endp.Call("DispatchURL", url)52 err := ud.endp.Call("DispatchURL", bus.Args(url))
53 if err != nil {53 if err != nil {
54 ud.log.Errorf("Dispatch to %s failed with %s", url, err)54 ud.log.Errorf("Dispatch to %s failed with %s", url, err)
55 }55 }
5656
=== modified file 'client/client.go'
--- client/client.go 2014-03-26 16:26:36 +0000
+++ client/client.go 2014-04-04 13:58:43 +0000
@@ -20,13 +20,19 @@
2020
21import (21import (
22 "encoding/pem"22 "encoding/pem"
23 "errors"
23 "fmt"24 "fmt"
24 "io/ioutil"25 "io/ioutil"
26 "os"
27 "strings"
28
25 "launchpad.net/go-dbus/v1"29 "launchpad.net/go-dbus/v1"
30
26 "launchpad.net/ubuntu-push/bus"31 "launchpad.net/ubuntu-push/bus"
27 "launchpad.net/ubuntu-push/bus/connectivity"32 "launchpad.net/ubuntu-push/bus/connectivity"
28 "launchpad.net/ubuntu-push/bus/networkmanager"33 "launchpad.net/ubuntu-push/bus/networkmanager"
29 "launchpad.net/ubuntu-push/bus/notifications"34 "launchpad.net/ubuntu-push/bus/notifications"
35 "launchpad.net/ubuntu-push/bus/systemimage"
30 "launchpad.net/ubuntu-push/bus/urldispatcher"36 "launchpad.net/ubuntu-push/bus/urldispatcher"
31 "launchpad.net/ubuntu-push/client/session"37 "launchpad.net/ubuntu-push/client/session"
32 "launchpad.net/ubuntu-push/client/session/levelmap"38 "launchpad.net/ubuntu-push/client/session/levelmap"
@@ -34,7 +40,6 @@
34 "launchpad.net/ubuntu-push/logger"40 "launchpad.net/ubuntu-push/logger"
35 "launchpad.net/ubuntu-push/util"41 "launchpad.net/ubuntu-push/util"
36 "launchpad.net/ubuntu-push/whoopsie/identifier"42 "launchpad.net/ubuntu-push/whoopsie/identifier"
37 "os"
38)43)
3944
40// ClientConfig holds the client configuration45// ClientConfig holds the client configuration
@@ -42,8 +47,13 @@
42 connectivity.ConnectivityConfig // q.v.47 connectivity.ConnectivityConfig // q.v.
43 // A reasonably large timeout for receive/answer pairs48 // A reasonably large timeout for receive/answer pairs
44 ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"`49 ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"`
45 // The server to connect to50 // A timeout to use when trying to connect to the server
46 Addr config.ConfigHostPort51 ConnectTimeout config.ConfigTimeDuration `json:"connect_timeout"`
52 // The server to connect to or url to query for hosts to connect to
53 Addr string
54 // Host list management
55 HostsCachingExpiryTime config.ConfigTimeDuration `json:"hosts_cache_expiry"` // potentially refresh host list after
56 ExpectAllRepairedTime config.ConfigTimeDuration `json:"expect_all_repaired"` // worth retrying all servers after
47 // The PEM-encoded server certificate57 // The PEM-encoded server certificate
48 CertPEMFile string `json:"cert_pem_file"`58 CertPEMFile string `json:"cert_pem_file"`
49 // The logging level (one of "debug", "info", "error")59 // The logging level (one of "debug", "info", "error")
@@ -62,6 +72,8 @@
62 notificationsEndp bus.Endpoint72 notificationsEndp bus.Endpoint
63 urlDispatcherEndp bus.Endpoint73 urlDispatcherEndp bus.Endpoint
64 connectivityEndp bus.Endpoint74 connectivityEndp bus.Endpoint
75 systemImageEndp bus.Endpoint
76 systemImageInfo *systemimage.InfoResult
65 connCh chan bool77 connCh chan bool
66 hasConnectivity bool78 hasConnectivity bool
67 actionsCh <-chan notifications.RawActionReply79 actionsCh <-chan notifications.RawActionReply
@@ -89,6 +101,12 @@
89 if err != nil {101 if err != nil {
90 return fmt.Errorf("reading config: %v", err)102 return fmt.Errorf("reading config: %v", err)
91 }103 }
104 // ignore spaces
105 client.config.Addr = strings.Replace(client.config.Addr, " ", "", -1)
106 if client.config.Addr == "" {
107 return errors.New("no hosts specified")
108 }
109
92 // later, we'll be specifying more logging options in the config file110 // later, we'll be specifying more logging options in the config file
93 client.log = logger.NewSimpleLogger(os.Stderr, client.config.LogLevel)111 client.log = logger.NewSimpleLogger(os.Stderr, client.config.LogLevel)
94112
@@ -97,6 +115,7 @@
97 client.notificationsEndp = bus.SessionBus.Endpoint(notifications.BusAddress, client.log)115 client.notificationsEndp = bus.SessionBus.Endpoint(notifications.BusAddress, client.log)
98 client.urlDispatcherEndp = bus.SessionBus.Endpoint(urldispatcher.BusAddress, client.log)116 client.urlDispatcherEndp = bus.SessionBus.Endpoint(urldispatcher.BusAddress, client.log)
99 client.connectivityEndp = bus.SystemBus.Endpoint(networkmanager.BusAddress, client.log)117 client.connectivityEndp = bus.SystemBus.Endpoint(networkmanager.BusAddress, client.log)
118 client.systemImageEndp = bus.SystemBus.Endpoint(systemimage.BusAddress, client.log)
100119
101 client.connCh = make(chan bool, 1)120 client.connCh = make(chan bool, 1)
102 client.sessionConnectedCh = make(chan uint32, 1)121 client.sessionConnectedCh = make(chan uint32, 1)
@@ -116,6 +135,18 @@
116 return nil135 return nil
117}136}
118137
138// deriveSessionConfig dervies the session configuration from the client configuration bits.
139func (client *PushClient) deriveSessionConfig(info map[string]interface{}) session.ClientSessionConfig {
140 return session.ClientSessionConfig{
141 ConnectTimeout: client.config.ConnectTimeout.TimeDuration(),
142 ExchangeTimeout: client.config.ExchangeTimeout.TimeDuration(),
143 HostsCachingExpiryTime: client.config.HostsCachingExpiryTime.TimeDuration(),
144 ExpectAllRepairedTime: client.config.ExpectAllRepairedTime.TimeDuration(),
145 PEM: client.pem,
146 Info: info,
147 }
148}
149
119// getDeviceId gets the whoopsie identifier for the device150// getDeviceId gets the whoopsie identifier for the device
120func (client *PushClient) getDeviceId() error {151func (client *PushClient) getDeviceId() error {
121 err := client.idder.Generate()152 err := client.idder.Generate()
@@ -133,8 +164,17 @@
133 iniCh := make(chan uint32)164 iniCh := make(chan uint32)
134 go func() { iniCh <- util.NewAutoRedialer(client.notificationsEndp).Redial() }()165 go func() { iniCh <- util.NewAutoRedialer(client.notificationsEndp).Redial() }()
135 go func() { iniCh <- util.NewAutoRedialer(client.urlDispatcherEndp).Redial() }()166 go func() { iniCh <- util.NewAutoRedialer(client.urlDispatcherEndp).Redial() }()
136 <-iniCh167 go func() { iniCh <- util.NewAutoRedialer(client.systemImageEndp).Redial() }()
137 <-iniCh168 <-iniCh
169 <-iniCh
170 <-iniCh
171
172 sysimg := systemimage.New(client.systemImageEndp, client.log)
173 info, err := sysimg.Info()
174 if err != nil {
175 return err
176 }
177 client.systemImageInfo = info
138178
139 actionsCh, err := notifications.Raw(client.notificationsEndp, client.log).WatchActions()179 actionsCh, err := notifications.Raw(client.notificationsEndp, client.log).WatchActions()
140 client.actionsCh = actionsCh180 client.actionsCh = actionsCh
@@ -143,8 +183,13 @@
143183
144// initSession creates the session object184// initSession creates the session object
145func (client *PushClient) initSession() error {185func (client *PushClient) initSession() error {
146 sess, err := session.NewSession(string(client.config.Addr), client.pem,186 info := map[string]interface{}{
147 client.config.ExchangeTimeout.Duration, client.deviceId,187 "device": client.systemImageInfo.Device,
188 "channel": client.systemImageInfo.Channel,
189 "build_number": client.systemImageInfo.BuildNumber,
190 }
191 sess, err := session.NewSession(client.config.Addr,
192 client.deriveSessionConfig(info), client.deviceId,
148 client.levelMapFactory, client.log)193 client.levelMapFactory, client.log)
149 if err != nil {194 if err != nil {
150 return err195 return err
@@ -185,8 +230,54 @@
185 }230 }
186}231}
187232
233// filterNotification finds out if the notification is about an actual
234// upgrade for the device. It expects msg.Decoded entries to look
235// like:
236//
237// {
238// "IMAGE-CHANNEL/DEVICE-MODEL": [BUILD-NUMBER, CHANNEL-ALIAS]
239// ...
240// }
241func (client *PushClient) filterNotification(msg *session.Notification) bool {
242 n := len(msg.Decoded)
243 if n == 0 {
244 return false
245 }
246 // they are all for us, consider last
247 last := msg.Decoded[n-1]
248 tag := fmt.Sprintf("%s/%s", client.systemImageInfo.Channel, client.systemImageInfo.Device)
249 entry, ok := last[tag]
250 if !ok {
251 return false
252 }
253 pair, ok := entry.([]interface{})
254 if !ok {
255 return false
256 }
257 if len(pair) < 1 {
258 return false
259 }
260 buildNumber, ok := pair[0].(float64)
261 if !ok {
262 return false
263 }
264 curBuildNumber := float64(client.systemImageInfo.BuildNumber)
265 if buildNumber > curBuildNumber {
266 return true
267 }
268 // xxx we should really compare channel_target and alias here
269 // going backward by a margin, assume switch of target
270 if buildNumber < curBuildNumber && (curBuildNumber-buildNumber) > 10 {
271 return true
272 }
273 return false
274}
275
188// handleNotification deals with receiving a notification276// handleNotification deals with receiving a notification
189func (client *PushClient) handleNotification(msg *session.Notification) error {277func (client *PushClient) handleNotification(msg *session.Notification) error {
278 if !client.filterNotification(msg) {
279 return nil
280 }
190 action_id := "dummy_id"281 action_id := "dummy_id"
191 a := []string{action_id, "Go get it!"} // action value not visible on the phone282 a := []string{action_id, "Go get it!"} // action value not visible on the phone
192 h := map[string]*dbus.Variant{"x-canonical-switch-to-application": &dbus.Variant{true}}283 h := map[string]*dbus.Variant{"x-canonical-switch-to-application": &dbus.Variant{true}}
@@ -260,7 +351,7 @@
260 return client.doStart(351 return client.doStart(
261 client.configure,352 client.configure,
262 client.getDeviceId,353 client.getDeviceId,
354 client.takeTheBus,
263 client.initSession,355 client.initSession,
264 client.takeTheBus,
265 )356 )
266}357}
267358
=== modified file 'client/client_test.go'
--- client/client_test.go 2014-03-26 16:26:36 +0000
+++ client/client_test.go 2014-04-04 13:58:43 +0000
@@ -17,13 +17,23 @@
17package client17package client
1818
19import (19import (
20 "encoding/json"
20 "errors"21 "errors"
21 "fmt"22 "fmt"
22 "io/ioutil"23 "io/ioutil"
24 "net/http"
25 "net/http/httptest"
26 "path/filepath"
27 "reflect"
28 "testing"
29 "time"
30
23 . "launchpad.net/gocheck"31 . "launchpad.net/gocheck"
32
24 "launchpad.net/ubuntu-push/bus"33 "launchpad.net/ubuntu-push/bus"
25 "launchpad.net/ubuntu-push/bus/networkmanager"34 "launchpad.net/ubuntu-push/bus/networkmanager"
26 "launchpad.net/ubuntu-push/bus/notifications"35 "launchpad.net/ubuntu-push/bus/notifications"
36 "launchpad.net/ubuntu-push/bus/systemimage"
27 testibus "launchpad.net/ubuntu-push/bus/testing"37 testibus "launchpad.net/ubuntu-push/bus/testing"
28 "launchpad.net/ubuntu-push/client/session"38 "launchpad.net/ubuntu-push/client/session"
29 "launchpad.net/ubuntu-push/client/session/levelmap"39 "launchpad.net/ubuntu-push/client/session/levelmap"
@@ -32,11 +42,6 @@
32 "launchpad.net/ubuntu-push/util"42 "launchpad.net/ubuntu-push/util"
33 "launchpad.net/ubuntu-push/whoopsie/identifier"43 "launchpad.net/ubuntu-push/whoopsie/identifier"
34 idtesting "launchpad.net/ubuntu-push/whoopsie/identifier/testing"44 idtesting "launchpad.net/ubuntu-push/whoopsie/identifier/testing"
35 "net/http"
36 "net/http/httptest"
37 "path/filepath"
38 "testing"
39 "time"
40)45)
4146
42func TestClient(t *testing.T) { TestingT(t) }47func TestClient(t *testing.T) { TestingT(t) }
@@ -83,22 +88,37 @@
83 cs.timeouts = nil88 cs.timeouts = nil
84}89}
8590
91func (cs *clientSuite) writeTestConfig(overrides map[string]interface{}) {
92 pem_file := helpers.SourceRelative("../server/acceptance/ssl/testing.cert")
93 cfgMap := map[string]interface{}{
94 "connect_timeout": "7ms",
95 "exchange_timeout": "10ms",
96 "hosts_cache_expiry": "1h",
97 "expect_all_repaired": "30m",
98 "stabilizing_timeout": "0ms",
99 "connectivity_check_url": "",
100 "connectivity_check_md5": "",
101 "addr": ":0",
102 "cert_pem_file": pem_file,
103 "recheck_timeout": "3h",
104 "log_level": "debug",
105 }
106 for k, v := range overrides {
107 cfgMap[k] = v
108 }
109 cfgBlob, err := json.Marshal(cfgMap)
110 if err != nil {
111 panic(err)
112 }
113 ioutil.WriteFile(cs.configPath, cfgBlob, 0600)
114}
115
86func (cs *clientSuite) SetUpTest(c *C) {116func (cs *clientSuite) SetUpTest(c *C) {
87 cs.log = helpers.NewTestLogger(c, "debug")117 cs.log = helpers.NewTestLogger(c, "debug")
88 dir := c.MkDir()118 dir := c.MkDir()
89 cs.configPath = filepath.Join(dir, "config")119 cs.configPath = filepath.Join(dir, "config")
90 cfg := fmt.Sprintf(`120
91{121 cs.writeTestConfig(nil)
92 "exchange_timeout": "10ms",
93 "stabilizing_timeout": "0ms",
94 "connectivity_check_url": "",
95 "connectivity_check_md5": "",
96 "addr": ":0",
97 "cert_pem_file": %#v,
98 "recheck_timeout": "3h",
99 "log_level": "debug"
100}`, helpers.SourceRelative("../server/acceptance/config/testing.cert"))
101 ioutil.WriteFile(cs.configPath, []byte(cfg), 0600)
102}122}
103123
104type sqlientSuite struct{ clientSuite }124type sqlientSuite struct{ clientSuite }
@@ -119,7 +139,7 @@
119 err := cli.configure()139 err := cli.configure()
120 c.Assert(err, IsNil)140 c.Assert(err, IsNil)
121 c.Assert(cli.config, NotNil)141 c.Assert(cli.config, NotNil)
122 c.Check(cli.config.ExchangeTimeout.Duration, Equals, time.Duration(10*time.Millisecond))142 c.Check(cli.config.ExchangeTimeout.TimeDuration(), Equals, time.Duration(10*time.Millisecond))
123}143}
124144
125func (cs *clientSuite) TestConfigureSetsUpLog(c *C) {145func (cs *clientSuite) TestConfigureSetsUpLog(c *C) {
@@ -179,41 +199,74 @@
179}199}
180200
181func (cs *clientSuite) TestConfigureBailsOnBadPEMFilename(c *C) {201func (cs *clientSuite) TestConfigureBailsOnBadPEMFilename(c *C) {
182 ioutil.WriteFile(cs.configPath, []byte(`202 cs.writeTestConfig(map[string]interface{}{
183{203 "cert_pem_file": "/a/b/c",
184 "exchange_timeout": "10ms",204 })
185 "stabilizing_timeout": "0ms",
186 "connectivity_check_url": "",
187 "connectivity_check_md5": "",
188 "addr": ":0",
189 "cert_pem_file": "/a/b/c",
190 "log_level": "debug",
191 "recheck_timeout": "3h"
192}`), 0600)
193
194 cli := NewPushClient(cs.configPath, cs.leveldbPath)205 cli := NewPushClient(cs.configPath, cs.leveldbPath)
195 err := cli.configure()206 err := cli.configure()
196 c.Assert(err, ErrorMatches, "reading PEM file: .*")207 c.Assert(err, ErrorMatches, "reading PEM file: .*")
197}208}
198209
199func (cs *clientSuite) TestConfigureBailsOnBadPEM(c *C) {210func (cs *clientSuite) TestConfigureBailsOnBadPEM(c *C) {
200 ioutil.WriteFile(cs.configPath, []byte(`211 cs.writeTestConfig(map[string]interface{}{
201{212 "cert_pem_file": "/etc/passwd",
202 "exchange_timeout": "10ms",213 })
203 "stabilizing_timeout": "0ms",
204 "connectivity_check_url": "",
205 "connectivity_check_md5": "",
206 "addr": ":0",
207 "cert_pem_file": "/etc/passwd",
208 "log_level": "debug",
209 "recheck_timeout": "3h"
210}`), 0600)
211
212 cli := NewPushClient(cs.configPath, cs.leveldbPath)214 cli := NewPushClient(cs.configPath, cs.leveldbPath)
213 err := cli.configure()215 err := cli.configure()
214 c.Assert(err, ErrorMatches, "no PEM found.*")216 c.Assert(err, ErrorMatches, "no PEM found.*")
215}217}
216218
219func (cs *clientSuite) TestConfigureBailsOnNoHosts(c *C) {
220 cs.writeTestConfig(map[string]interface{}{
221 "addr": " ",
222 })
223 cli := NewPushClient(cs.configPath, cs.leveldbPath)
224 err := cli.configure()
225 c.Assert(err, ErrorMatches, "no hosts specified")
226}
227
228func (cs *clientSuite) TestConfigureRemovesBlanksInAddr(c *C) {
229 cs.writeTestConfig(map[string]interface{}{
230 "addr": " foo: 443",
231 })
232 cli := NewPushClient(cs.configPath, cs.leveldbPath)
233 err := cli.configure()
234 c.Assert(err, IsNil)
235 c.Check(cli.config.Addr, Equals, "foo:443")
236}
237
238/*****************************************************************
239 deriveSessionConfig tests
240******************************************************************/
241
242func (cs *clientSuite) TestDeriveSessionConfig(c *C) {
243 info := map[string]interface{}{
244 "foo": 1,
245 }
246 cli := NewPushClient(cs.configPath, cs.leveldbPath)
247 err := cli.configure()
248 c.Assert(err, IsNil)
249 expected := session.ClientSessionConfig{
250 ConnectTimeout: 7 * time.Millisecond,
251 ExchangeTimeout: 10 * time.Millisecond,
252 HostsCachingExpiryTime: 1 * time.Hour,
253 ExpectAllRepairedTime: 30 * time.Minute,
254 PEM: cli.pem,
255 Info: info,
256 }
257 // sanity check that we are looking at all fields
258 vExpected := reflect.ValueOf(expected)
259 nf := vExpected.NumField()
260 for i := 0; i < nf; i++ {
261 fv := vExpected.Field(i)
262 // field isn't empty/zero
263 c.Assert(fv.Interface(), Not(DeepEquals), reflect.Zero(fv.Type()).Interface(), Commentf("forgot about: %s", vExpected.Type().Field(i).Name))
264 }
265 // finally compare
266 conf := cli.deriveSessionConfig(info)
267 c.Check(conf, DeepEquals, expected)
268}
269
217/*****************************************************************270/*****************************************************************
218 getDeviceId tests271 getDeviceId tests
219******************************************************************/272******************************************************************/
@@ -254,6 +307,8 @@
254 cEndp := testibus.NewTestingEndpoint(cCond, condition.Work(true),307 cEndp := testibus.NewTestingEndpoint(cCond, condition.Work(true),
255 uint32(networkmanager.ConnectedGlobal),308 uint32(networkmanager.ConnectedGlobal),
256 )309 )
310 siCond := condition.Fail2Work(2)
311 siEndp := testibus.NewMultiValuedTestingEndpoint(siCond, condition.Work(true), []interface{}{int32(101), "mako", "daily", "Unknown", map[string]string{}})
257 testibus.SetWatchTicker(cEndp, make(chan bool))312 testibus.SetWatchTicker(cEndp, make(chan bool))
258 // ok, create the thing313 // ok, create the thing
259 cli := NewPushClient(cs.configPath, cs.leveldbPath)314 cli := NewPushClient(cs.configPath, cs.leveldbPath)
@@ -269,6 +324,7 @@
269 cli.notificationsEndp = nEndp324 cli.notificationsEndp = nEndp
270 cli.urlDispatcherEndp = uEndp325 cli.urlDispatcherEndp = uEndp
271 cli.connectivityEndp = cEndp326 cli.connectivityEndp = cEndp
327 cli.systemImageEndp = siEndp
272328
273 c.Assert(cli.takeTheBus(), IsNil)329 c.Assert(cli.takeTheBus(), IsNil)
274 // the notifications and urldispatcher endpoints retried until connected330 // the notifications and urldispatcher endpoints retried until connected
@@ -280,6 +336,8 @@
280 c.Check(takeNextBool(cli.connCh), Equals, true)336 c.Check(takeNextBool(cli.connCh), Equals, true)
281 // the connectivity endpoint retried until connected337 // the connectivity endpoint retried until connected
282 c.Check(cCond.OK(), Equals, true)338 c.Check(cCond.OK(), Equals, true)
339 // the systemimage endpoint retried until connected
340 c.Check(siCond.OK(), Equals, true)
283}341}
284342
285// takeTheBus can, in fact, fail343// takeTheBus can, in fact, fail
@@ -295,6 +353,7 @@
295 cli.notificationsEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))353 cli.notificationsEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
296 cli.urlDispatcherEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))354 cli.urlDispatcherEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
297 cli.connectivityEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))355 cli.connectivityEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
356 cli.systemImageEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
298357
299 c.Check(cli.takeTheBus(), NotNil)358 c.Check(cli.takeTheBus(), NotNil)
300 c.Check(cli.actionsCh, IsNil)359 c.Check(cli.actionsCh, IsNil)
@@ -307,7 +366,9 @@
307func (cs *clientSuite) TestHandleErr(c *C) {366func (cs *clientSuite) TestHandleErr(c *C) {
308 cli := NewPushClient(cs.configPath, cs.leveldbPath)367 cli := NewPushClient(cs.configPath, cs.leveldbPath)
309 cli.log = cs.log368 cli.log = cs.log
369 cli.systemImageInfo = siInfoRes
310 c.Assert(cli.initSession(), IsNil)370 c.Assert(cli.initSession(), IsNil)
371 cs.log.ResetCapture()
311 cli.hasConnectivity = true372 cli.hasConnectivity = true
312 cli.handleErr(errors.New("bananas"))373 cli.handleErr(errors.New("bananas"))
313 c.Check(cs.log.Captured(), Matches, ".*session exited.*bananas\n")374 c.Check(cs.log.Captured(), Matches, ".*session exited.*bananas\n")
@@ -338,6 +399,7 @@
338func (cs *clientSuite) TestHandleConnStateD2C(c *C) {399func (cs *clientSuite) TestHandleConnStateD2C(c *C) {
339 cli := NewPushClient(cs.configPath, cs.leveldbPath)400 cli := NewPushClient(cs.configPath, cs.leveldbPath)
340 cli.log = cs.log401 cli.log = cs.log
402 cli.systemImageInfo = siInfoRes
341 c.Assert(cli.initSession(), IsNil)403 c.Assert(cli.initSession(), IsNil)
342404
343 c.Assert(cli.hasConnectivity, Equals, false)405 c.Assert(cli.hasConnectivity, Equals, false)
@@ -363,7 +425,7 @@
363func (cs *clientSuite) TestHandleConnStateC2D(c *C) {425func (cs *clientSuite) TestHandleConnStateC2D(c *C) {
364 cli := NewPushClient(cs.configPath, cs.leveldbPath)426 cli := NewPushClient(cs.configPath, cs.leveldbPath)
365 cli.log = cs.log427 cli.log = cs.log
366 cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, levelmap.NewLevelMap, cs.log)428 cli.session, _ = session.NewSession(cli.config.Addr, cli.deriveSessionConfig(nil), cli.deviceId, levelmap.NewLevelMap, cs.log)
367 cli.session.Dial()429 cli.session.Dial()
368 cli.hasConnectivity = true430 cli.hasConnectivity = true
369431
@@ -376,7 +438,7 @@
376func (cs *clientSuite) TestHandleConnStateC2DPending(c *C) {438func (cs *clientSuite) TestHandleConnStateC2DPending(c *C) {
377 cli := NewPushClient(cs.configPath, cs.leveldbPath)439 cli := NewPushClient(cs.configPath, cs.leveldbPath)
378 cli.log = cs.log440 cli.log = cs.log
379 cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, levelmap.NewLevelMap, cs.log)441 cli.session, _ = session.NewSession(cli.config.Addr, cli.deriveSessionConfig(nil), cli.deviceId, levelmap.NewLevelMap, cs.log)
380 cli.hasConnectivity = true442 cli.hasConnectivity = true
381443
382 cli.handleConnState(false)444 cli.handleConnState(false)
@@ -384,15 +446,110 @@
384}446}
385447
386/*****************************************************************448/*****************************************************************
449 filterNotification tests
450******************************************************************/
451
452var siInfoRes = &systemimage.InfoResult{
453 Device: "mako",
454 Channel: "daily",
455 BuildNumber: 102,
456 LastUpdate: "Unknown",
457}
458
459func (cs *clientSuite) TestFilterNotification(c *C) {
460 cli := NewPushClient(cs.configPath, cs.leveldbPath)
461 cli.systemImageInfo = siInfoRes
462 // empty
463 msg := &session.Notification{}
464 c.Check(cli.filterNotification(msg), Equals, false)
465 // same build number
466 msg = &session.Notification{
467 Decoded: []map[string]interface{}{
468 map[string]interface{}{
469 "daily/mako": []interface{}{float64(102), "tubular"},
470 },
471 },
472 }
473 c.Check(cli.filterNotification(msg), Equals, false)
474 // higher build number and pick last
475 msg = &session.Notification{
476 Decoded: []map[string]interface{}{
477 map[string]interface{}{
478 "daily/mako": []interface{}{float64(102), "tubular"},
479 },
480 map[string]interface{}{
481 "daily/mako": []interface{}{float64(103), "tubular"},
482 },
483 },
484 }
485 c.Check(cli.filterNotification(msg), Equals, true)
486 // going backward by a margin, assume switch of alias
487 msg = &session.Notification{
488 Decoded: []map[string]interface{}{
489 map[string]interface{}{
490 "daily/mako": []interface{}{float64(102), "tubular"},
491 },
492 map[string]interface{}{
493 "daily/mako": []interface{}{float64(2), "urban"},
494 },
495 },
496 }
497 c.Check(cli.filterNotification(msg), Equals, true)
498}
499
500func (cs *clientSuite) TestFilterNotificationRobust(c *C) {
501 cli := NewPushClient(cs.configPath, cs.leveldbPath)
502 cli.systemImageInfo = siInfoRes
503 msg := &session.Notification{
504 Decoded: []map[string]interface{}{
505 map[string]interface{}{},
506 },
507 }
508 c.Check(cli.filterNotification(msg), Equals, false)
509 for _, broken := range []interface{}{
510 5,
511 []interface{}{},
512 []interface{}{55},
513 } {
514 msg := &session.Notification{
515 Decoded: []map[string]interface{}{
516 map[string]interface{}{
517 "daily/mako": broken,
518 },
519 },
520 }
521 c.Check(cli.filterNotification(msg), Equals, false)
522 }
523}
524
525/*****************************************************************
387 handleNotification tests526 handleNotification tests
388******************************************************************/527******************************************************************/
389528
529var (
530 positiveNotification = &session.Notification{
531 Decoded: []map[string]interface{}{
532 map[string]interface{}{
533 "daily/mako": []interface{}{float64(103), "tubular"},
534 },
535 },
536 }
537 negativeNotification = &session.Notification{
538 Decoded: []map[string]interface{}{
539 map[string]interface{}{
540 "daily/mako": []interface{}{float64(102), "tubular"},
541 },
542 },
543 }
544)
545
390func (cs *clientSuite) TestHandleNotification(c *C) {546func (cs *clientSuite) TestHandleNotification(c *C) {
391 cli := NewPushClient(cs.configPath, cs.leveldbPath)547 cli := NewPushClient(cs.configPath, cs.leveldbPath)
548 cli.systemImageInfo = siInfoRes
392 endp := testibus.NewTestingEndpoint(nil, condition.Work(true), uint32(1))549 endp := testibus.NewTestingEndpoint(nil, condition.Work(true), uint32(1))
393 cli.notificationsEndp = endp550 cli.notificationsEndp = endp
394 cli.log = cs.log551 cli.log = cs.log
395 c.Check(cli.handleNotification(nil), IsNil)552 c.Check(cli.handleNotification(positiveNotification), IsNil)
396 // check we sent the notification553 // check we sent the notification
397 args := testibus.GetCallArgs(endp)554 args := testibus.GetCallArgs(endp)
398 c.Assert(args, HasLen, 1)555 c.Assert(args, HasLen, 1)
@@ -400,12 +557,26 @@
400 c.Check(cs.log.Captured(), Matches, `.* got notification id \d+\s*`)557 c.Check(cs.log.Captured(), Matches, `.* got notification id \d+\s*`)
401}558}
402559
560func (cs *clientSuite) TestHandleNotificationNothingToDo(c *C) {
561 cli := NewPushClient(cs.configPath, cs.leveldbPath)
562 cli.systemImageInfo = siInfoRes
563 endp := testibus.NewTestingEndpoint(nil, condition.Work(true), uint32(1))
564 cli.notificationsEndp = endp
565 cli.log = cs.log
566 c.Check(cli.handleNotification(negativeNotification), IsNil)
567 // check we sent the notification
568 args := testibus.GetCallArgs(endp)
569 c.Assert(args, HasLen, 0)
570 c.Check(cs.log.Captured(), Matches, "")
571}
572
403func (cs *clientSuite) TestHandleNotificationFail(c *C) {573func (cs *clientSuite) TestHandleNotificationFail(c *C) {
404 cli := NewPushClient(cs.configPath, cs.leveldbPath)574 cli := NewPushClient(cs.configPath, cs.leveldbPath)
575 cli.systemImageInfo = siInfoRes
405 cli.log = cs.log576 cli.log = cs.log
406 endp := testibus.NewTestingEndpoint(nil, condition.Work(false))577 endp := testibus.NewTestingEndpoint(nil, condition.Work(false))
407 cli.notificationsEndp = endp578 cli.notificationsEndp = endp
408 c.Check(cli.handleNotification(nil), NotNil)579 c.Check(cli.handleNotification(positiveNotification), NotNil)
409}580}
410581
411/*****************************************************************582/*****************************************************************
@@ -415,7 +586,7 @@
415func (cs *clientSuite) TestHandleClick(c *C) {586func (cs *clientSuite) TestHandleClick(c *C) {
416 cli := NewPushClient(cs.configPath, cs.leveldbPath)587 cli := NewPushClient(cs.configPath, cs.leveldbPath)
417 cli.log = cs.log588 cli.log = cs.log
418 endp := testibus.NewTestingEndpoint(nil, condition.Work(true), nil)589 endp := testibus.NewTestingEndpoint(nil, condition.Work(true))
419 cli.urlDispatcherEndp = endp590 cli.urlDispatcherEndp = endp
420 c.Check(cli.handleClick(), IsNil)591 c.Check(cli.handleClick(), IsNil)
421 // check we sent the notification592 // check we sent the notification
@@ -432,6 +603,7 @@
432func (cs *clientSuite) TestDoLoopConn(c *C) {603func (cs *clientSuite) TestDoLoopConn(c *C) {
433 cli := NewPushClient(cs.configPath, cs.leveldbPath)604 cli := NewPushClient(cs.configPath, cs.leveldbPath)
434 cli.log = cs.log605 cli.log = cs.log
606 cli.systemImageInfo = siInfoRes
435 cli.connCh = make(chan bool, 1)607 cli.connCh = make(chan bool, 1)
436 cli.connCh <- true608 cli.connCh <- true
437 c.Assert(cli.initSession(), IsNil)609 c.Assert(cli.initSession(), IsNil)
@@ -444,6 +616,7 @@
444func (cs *clientSuite) TestDoLoopClick(c *C) {616func (cs *clientSuite) TestDoLoopClick(c *C) {
445 cli := NewPushClient(cs.configPath, cs.leveldbPath)617 cli := NewPushClient(cs.configPath, cs.leveldbPath)
446 cli.log = cs.log618 cli.log = cs.log
619 cli.systemImageInfo = siInfoRes
447 c.Assert(cli.initSession(), IsNil)620 c.Assert(cli.initSession(), IsNil)
448 aCh := make(chan notifications.RawActionReply, 1)621 aCh := make(chan notifications.RawActionReply, 1)
449 aCh <- notifications.RawActionReply{}622 aCh <- notifications.RawActionReply{}
@@ -457,6 +630,7 @@
457func (cs *clientSuite) TestDoLoopNotif(c *C) {630func (cs *clientSuite) TestDoLoopNotif(c *C) {
458 cli := NewPushClient(cs.configPath, cs.leveldbPath)631 cli := NewPushClient(cs.configPath, cs.leveldbPath)
459 cli.log = cs.log632 cli.log = cs.log
633 cli.systemImageInfo = siInfoRes
460 c.Assert(cli.initSession(), IsNil)634 c.Assert(cli.initSession(), IsNil)
461 cli.session.MsgCh = make(chan *session.Notification, 1)635 cli.session.MsgCh = make(chan *session.Notification, 1)
462 cli.session.MsgCh <- &session.Notification{}636 cli.session.MsgCh <- &session.Notification{}
@@ -469,6 +643,7 @@
469func (cs *clientSuite) TestDoLoopErr(c *C) {643func (cs *clientSuite) TestDoLoopErr(c *C) {
470 cli := NewPushClient(cs.configPath, cs.leveldbPath)644 cli := NewPushClient(cs.configPath, cs.leveldbPath)
471 cli.log = cs.log645 cli.log = cs.log
646 cli.systemImageInfo = siInfoRes
472 c.Assert(cli.initSession(), IsNil)647 c.Assert(cli.initSession(), IsNil)
473 cli.session.ErrCh = make(chan error, 1)648 cli.session.ErrCh = make(chan error, 1)
474 cli.session.ErrCh <- nil649 cli.session.ErrCh <- nil
@@ -521,7 +696,7 @@
521 cli.urlDispatcherEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))696 cli.urlDispatcherEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(false))
522 cli.connectivityEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(true),697 cli.connectivityEndp = testibus.NewTestingEndpoint(condition.Work(true), condition.Work(true),
523 uint32(networkmanager.ConnectedGlobal))698 uint32(networkmanager.ConnectedGlobal))
524699 cli.systemImageInfo = siInfoRes
525 c.Assert(cli.initSession(), IsNil)700 c.Assert(cli.initSession(), IsNil)
526701
527 cli.session.MsgCh = make(chan *session.Notification)702 cli.session.MsgCh = make(chan *session.Notification)
@@ -558,7 +733,7 @@
558 c.Check(cli.hasConnectivity, Equals, false)733 c.Check(cli.hasConnectivity, Equals, false)
559734
560 // * session.MsgCh to the notifications handler735 // * session.MsgCh to the notifications handler
561 cli.session.MsgCh <- &session.Notification{}736 cli.session.MsgCh <- positiveNotification
562 tick()737 tick()
563 nargs := testibus.GetCallArgs(cli.notificationsEndp)738 nargs := testibus.GetCallArgs(cli.notificationsEndp)
564 c.Check(nargs, HasLen, 1)739 c.Check(nargs, HasLen, 1)
565740
=== modified file 'client/gethosts/gethost_test.go'
--- client/gethosts/gethost_test.go 2014-03-24 15:32:29 +0000
+++ client/gethosts/gethost_test.go 2014-04-04 13:58:43 +0000
@@ -61,18 +61,16 @@
61}61}
6262
63func (s *getHostsSuite) TestGetTimeout(c *C) {63func (s *getHostsSuite) TestGetTimeout(c *C) {
64 finish := make(chan bool, 1)64 started := make(chan bool, 1)
65 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {65 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
66 <-finish66 started <- true
67 time.Sleep(700 * time.Millisecond)
67 }))68 }))
68 defer func() {69 defer func() {
69 time.Sleep(100 * time.Millisecond) // work around -race issue70 <-started
70 ts.Close()71 ts.Close()
71 }()72 }()
72 defer func() {73 gh := New("foobar", ts.URL, 500*time.Millisecond)
73 finish <- true
74 }()
75 gh := New("foobar", ts.URL, 1*time.Second)
76 _, err := gh.Get()74 _, err := gh.Get()
77 c.Check(err, ErrorMatches, ".*closed.*")75 c.Check(err, ErrorMatches, ".*closed.*")
78}76}
7977
=== modified file 'client/session/session.go'
--- client/session/session.go 2014-03-26 16:26:36 +0000
+++ client/session/session.go 2014-04-04 13:58:43 +0000
@@ -21,22 +21,28 @@
21import (21import (
22 "crypto/tls"22 "crypto/tls"
23 "crypto/x509"23 "crypto/x509"
24 "encoding/json"
24 "errors"25 "errors"
25 "fmt"26 "fmt"
27 "math/rand"
28 "net"
29 "strings"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "launchpad.net/ubuntu-push/client/gethosts"
26 "launchpad.net/ubuntu-push/client/session/levelmap"35 "launchpad.net/ubuntu-push/client/session/levelmap"
27 "launchpad.net/ubuntu-push/logger"36 "launchpad.net/ubuntu-push/logger"
28 "launchpad.net/ubuntu-push/protocol"37 "launchpad.net/ubuntu-push/protocol"
29 "launchpad.net/ubuntu-push/util"38 "launchpad.net/ubuntu-push/util"
30 "math/rand"
31 "net"
32 "sync/atomic"
33 "time"
34)39)
3540
36var wireVersionBytes = []byte{protocol.ProtocolWireVersion}41var wireVersionBytes = []byte{protocol.ProtocolWireVersion}
3742
38type Notification struct {43type Notification struct {
39 TopLevel int6444 TopLevel int64
45 Decoded []map[string]interface{}
40}46}
4147
42type serverMsg struct {48type serverMsg struct {
@@ -45,6 +51,15 @@
45 protocol.NotificationsMsg51 protocol.NotificationsMsg
46}52}
4753
54// parseServerAddrSpec recognizes whether spec is a HTTP URL to get
55// hosts from or a |-separated list of host:port pairs.
56func parseServerAddrSpec(spec string) (hostsEndpoint string, fallbackHosts []string) {
57 if strings.HasPrefix(spec, "http") {
58 return spec, nil
59 }
60 return "", strings.Split(spec, "|")
61}
62
48// ClientSessionState is a way to broadly track the progress of the session63// ClientSessionState is a way to broadly track the progress of the session
49type ClientSessionState uint3264type ClientSessionState uint32
5065
@@ -56,15 +71,39 @@
56 Running71 Running
57)72)
5873
59// ClienSession holds a client<->server session and its configuration.74type hostGetter interface {
75 Get() ([]string, error)
76}
77
78// ClientSessionConfig groups the client session configuration.
79type ClientSessionConfig struct {
80 ConnectTimeout time.Duration
81 ExchangeTimeout time.Duration
82 HostsCachingExpiryTime time.Duration
83 ExpectAllRepairedTime time.Duration
84 PEM []byte
85 Info map[string]interface{}
86}
87
88// ClientSession holds a client<->server session and its configuration.
60type ClientSession struct {89type ClientSession struct {
61 // configuration90 // configuration
62 DeviceId string91 DeviceId string
63 ServerAddr string92 ClientSessionConfig
64 ExchangeTimeout time.Duration93 Levels levelmap.LevelMap
65 Levels levelmap.LevelMap94 Protocolator func(net.Conn) protocol.Protocol
66 Protocolator func(net.Conn) protocol.Protocol95 // hosts
96 getHost hostGetter
97 fallbackHosts []string
98 deliveryHostsTimestamp time.Time
99 deliveryHosts []string
100 lastAttemptTimestamp time.Time
101 leftToTry int
102 tryHost int
103 // hook for testing
104 timeSince func(time.Time) time.Duration
67 // connection105 // connection
106 connLock sync.RWMutex
68 Connection net.Conn107 Connection net.Conn
69 Log logger.Logger108 Log logger.Logger
70 TLS *tls.Config109 TLS *tls.Config
@@ -77,7 +116,7 @@
77 MsgCh chan *Notification116 MsgCh chan *Notification
78}117}
79118
80func NewSession(serverAddr string, pem []byte, exchangeTimeout time.Duration,119func NewSession(serverAddrSpec string, conf ClientSessionConfig,
81 deviceId string, levelmapFactory func() (levelmap.LevelMap, error),120 deviceId string, levelmapFactory func() (levelmap.LevelMap, error),
82 log logger.Logger) (*ClientSession, error) {121 log logger.Logger) (*ClientSession, error) {
83 state := uint32(Disconnected)122 state := uint32(Disconnected)
@@ -85,19 +124,27 @@
85 if err != nil {124 if err != nil {
86 return nil, err125 return nil, err
87 }126 }
127 var getHost hostGetter
128 log.Infof("using addr: %v", serverAddrSpec)
129 hostsEndpoint, fallbackHosts := parseServerAddrSpec(serverAddrSpec)
130 if hostsEndpoint != "" {
131 getHost = gethosts.New(deviceId, hostsEndpoint, conf.ExchangeTimeout)
132 }
88 sess := &ClientSession{133 sess := &ClientSession{
89 ExchangeTimeout: exchangeTimeout,134 ClientSessionConfig: conf,
90 ServerAddr: serverAddr,135 getHost: getHost,
91 DeviceId: deviceId,136 fallbackHosts: fallbackHosts,
92 Log: log,137 DeviceId: deviceId,
93 Protocolator: protocol.NewProtocol0,138 Log: log,
94 Levels: levels,139 Protocolator: protocol.NewProtocol0,
95 TLS: &tls.Config{InsecureSkipVerify: true}, // XXX140 Levels: levels,
96 stateP: &state,141 TLS: &tls.Config{InsecureSkipVerify: true}, // XXX
142 stateP: &state,
143 timeSince: time.Since,
97 }144 }
98 if pem != nil {145 if sess.PEM != nil {
99 cp := x509.NewCertPool()146 cp := x509.NewCertPool()
100 ok := cp.AppendCertsFromPEM(pem)147 ok := cp.AppendCertsFromPEM(sess.PEM)
101 if !ok {148 if !ok {
102 return nil, errors.New("could not parse certificate")149 return nil, errors.New("could not parse certificate")
103 }150 }
@@ -114,15 +161,90 @@
114 atomic.StoreUint32(sess.stateP, uint32(state))161 atomic.StoreUint32(sess.stateP, uint32(state))
115}162}
116163
164func (sess *ClientSession) setConnection(conn net.Conn) {
165 sess.connLock.Lock()
166 defer sess.connLock.Unlock()
167 sess.Connection = conn
168}
169
170func (sess *ClientSession) getConnection() net.Conn {
171 sess.connLock.RLock()
172 defer sess.connLock.RUnlock()
173 return sess.Connection
174}
175
176// getHosts sets deliveryHosts possibly querying a remote endpoint
177func (sess *ClientSession) getHosts() error {
178 if sess.getHost != nil {
179 if sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime {
180 return nil
181 }
182 hosts, err := sess.getHost.Get()
183 if err != nil {
184 sess.Log.Errorf("getHosts: %v", err)
185 sess.setState(Error)
186 return err
187 }
188 sess.deliveryHostsTimestamp = time.Now()
189 sess.deliveryHosts = hosts
190 } else {
191 sess.deliveryHosts = sess.fallbackHosts
192 }
193 return nil
194}
195
196// startConnectionAttempt/nextHostToTry help connect iterating over candidate hosts
197
198func (sess *ClientSession) startConnectionAttempt() {
199 if sess.timeSince(sess.lastAttemptTimestamp) > sess.ExpectAllRepairedTime {
200 sess.tryHost = 0
201 }
202 sess.leftToTry = len(sess.deliveryHosts)
203 if sess.leftToTry == 0 {
204 panic("should have got hosts from config or remote at this point")
205 }
206 sess.lastAttemptTimestamp = time.Now()
207}
208
209func (sess *ClientSession) nextHostToTry() string {
210 if sess.leftToTry == 0 {
211 return ""
212 }
213 res := sess.deliveryHosts[sess.tryHost]
214 sess.tryHost = (sess.tryHost + 1) % len(sess.deliveryHosts)
215 sess.leftToTry--
216 return res
217}
218
219// we reached the Started state, we can retry with the same host if we
220// have to retry again
221func (sess *ClientSession) started() {
222 sess.tryHost--
223 if sess.tryHost == -1 {
224 sess.tryHost = len(sess.deliveryHosts) - 1
225 }
226 sess.setState(Started)
227}
228
117// connect to a server using the configuration in the ClientSession229// connect to a server using the configuration in the ClientSession
118// and set up the connection.230// and set up the connection.
119func (sess *ClientSession) connect() error {231func (sess *ClientSession) connect() error {
120 conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout)232 sess.startConnectionAttempt()
121 if err != nil {233 var err error
122 sess.setState(Error)234 var conn net.Conn
123 return fmt.Errorf("connect: %s", err)235 for {
236 host := sess.nextHostToTry()
237 if host == "" {
238 sess.setState(Error)
239 return fmt.Errorf("connect: %s", err)
240 }
241 sess.Log.Debugf("trying to connect to: %v", host)
242 conn, err = net.DialTimeout("tcp", host, sess.ConnectTimeout)
243 if err == nil {
244 break
245 }
124 }246 }
125 sess.Connection = tls.Client(conn, sess.TLS)247 sess.setConnection(tls.Client(conn, sess.TLS))
126 sess.setState(Connected)248 sess.setState(Connected)
127 return nil249 return nil
128}250}
@@ -145,6 +267,8 @@
145 sess.doClose()267 sess.doClose()
146}268}
147func (sess *ClientSession) doClose() {269func (sess *ClientSession) doClose() {
270 sess.connLock.Lock()
271 defer sess.connLock.Unlock()
148 if sess.Connection != nil {272 if sess.Connection != nil {
149 sess.Connection.Close()273 sess.Connection.Close()
150 // we ignore Close errors, on purpose (the thinking being that274 // we ignore Close errors, on purpose (the thinking being that
@@ -167,6 +291,23 @@
167 return err291 return err
168}292}
169293
294func (sess *ClientSession) decodeBroadcast(bcast *serverMsg) *Notification {
295 decoded := make([]map[string]interface{}, 0)
296 for _, p := range bcast.Payloads {
297 var v map[string]interface{}
298 err := json.Unmarshal(p, &v)
299 if err != nil {
300 sess.Log.Debugf("expected map in broadcast: %v", err)
301 continue
302 }
303 decoded = append(decoded, v)
304 }
305 return &Notification{
306 TopLevel: bcast.TopLevel,
307 Decoded: decoded,
308 }
309}
310
170// handle "broadcast" messages311// handle "broadcast" messages
171func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {312func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {
172 err := sess.Levels.Set(bcast.ChanId, bcast.TopLevel)313 err := sess.Levels.Set(bcast.ChanId, bcast.TopLevel)
@@ -189,7 +330,7 @@
189 if bcast.ChanId == protocol.SystemChannelId {330 if bcast.ChanId == protocol.SystemChannelId {
190 // the system channel id, the only one we care about for now331 // the system channel id, the only one we care about for now
191 sess.Log.Debugf("sending it over")332 sess.Log.Debugf("sending it over")
192 sess.MsgCh <- &Notification{bcast.TopLevel}333 sess.MsgCh <- sess.decodeBroadcast(bcast)
193 sess.Log.Debugf("sent it over")334 sess.Log.Debugf("sent it over")
194 } else {335 } else {
195 sess.Log.Debugf("what is this weird channel, %#v?", bcast.ChanId)336 sess.Log.Debugf("what is this weird channel, %#v?", bcast.ChanId)
@@ -224,7 +365,7 @@
224365
225// Call this when you've connected and want to start looping.366// Call this when you've connected and want to start looping.
226func (sess *ClientSession) start() error {367func (sess *ClientSession) start() error {
227 conn := sess.Connection368 conn := sess.getConnection()
228 err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))369 err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
229 if err != nil {370 if err != nil {
230 sess.setState(Error)371 sess.setState(Error)
@@ -253,6 +394,7 @@
253 // xxx get the SSO Authorization string from the phone394 // xxx get the SSO Authorization string from the phone
254 Authorization: "",395 Authorization: "",
255 Levels: levels,396 Levels: levels,
397 Info: sess.Info,
256 })398 })
257 if err != nil {399 if err != nil {
258 sess.setState(Error)400 sess.setState(Error)
@@ -278,16 +420,20 @@
278 }420 }
279 sess.proto = proto421 sess.proto = proto
280 sess.pingInterval = pingInterval422 sess.pingInterval = pingInterval
281 sess.Log.Debugf("Connected %v.", conn.LocalAddr())423 sess.Log.Debugf("Connected %v.", conn.RemoteAddr())
282 sess.setState(Started)424 sess.started() // deals with choosing which host to retry with as well
283 return nil425 return nil
284}426}
285427
286// run calls connect, and if it works it calls start, and if it works428// run calls connect, and if it works it calls start, and if it works
287// it runs loop in a goroutine, and ships its return value over ErrCh.429// it runs loop in a goroutine, and ships its return value over ErrCh.
288func (sess *ClientSession) run(closer func(), connecter, starter, looper func() error) error {430func (sess *ClientSession) run(closer func(), hostGetter, connecter, starter, looper func() error) error {
289 closer()431 closer()
290 err := connecter()432 err := hostGetter()
433 if err != nil {
434 return err
435 }
436 err = connecter()
291 if err == nil {437 if err == nil {
292 err = starter()438 err = starter()
293 if err == nil {439 if err == nil {
@@ -317,7 +463,7 @@
317 // keep on trying.463 // keep on trying.
318 panic("can't Dial() without a protocol constructor.")464 panic("can't Dial() without a protocol constructor.")
319 }465 }
320 return sess.run(sess.doClose, sess.connect, sess.start, sess.loop)466 return sess.run(sess.doClose, sess.getHosts, sess.connect, sess.start, sess.loop)
321}467}
322468
323func init() {469func init() {
324470
=== modified file 'client/session/session_test.go'
--- client/session/session_test.go 2014-03-27 13:26:10 +0000
+++ client/session/session_test.go 2014-04-04 13:58:43 +0000
@@ -23,16 +23,21 @@
23 "fmt"23 "fmt"
24 "io"24 "io"
25 "io/ioutil"25 "io/ioutil"
26 "net"
27 "net/http"
28 "net/http/httptest"
29 "reflect"
30 "testing"
31 "time"
32
26 . "launchpad.net/gocheck"33 . "launchpad.net/gocheck"
34
27 "launchpad.net/ubuntu-push/client/session/levelmap"35 "launchpad.net/ubuntu-push/client/session/levelmap"
36 //"launchpad.net/ubuntu-push/client/gethosts"
28 "launchpad.net/ubuntu-push/logger"37 "launchpad.net/ubuntu-push/logger"
29 "launchpad.net/ubuntu-push/protocol"38 "launchpad.net/ubuntu-push/protocol"
30 helpers "launchpad.net/ubuntu-push/testing"39 helpers "launchpad.net/ubuntu-push/testing"
31 "launchpad.net/ubuntu-push/testing/condition"40 "launchpad.net/ubuntu-push/testing/condition"
32 "net"
33 "reflect"
34 "testing"
35 "time"
36)41)
3742
38func TestSession(t *testing.T) { TestingT(t) }43func TestSession(t *testing.T) { TestingT(t) }
@@ -181,23 +186,51 @@
181}186}
182187
183/****************************************************************188/****************************************************************
189 parseServerAddrSpec() tests
190****************************************************************/
191
192func (cs *clientSessionSuite) TestParseServerAddrSpec(c *C) {
193 hEp, fallbackHosts := parseServerAddrSpec("http://foo/hosts")
194 c.Check(hEp, Equals, "http://foo/hosts")
195 c.Check(fallbackHosts, IsNil)
196
197 hEp, fallbackHosts = parseServerAddrSpec("foo:443")
198 c.Check(hEp, Equals, "")
199 c.Check(fallbackHosts, DeepEquals, []string{"foo:443"})
200
201 hEp, fallbackHosts = parseServerAddrSpec("foo:443|bar:443")
202 c.Check(hEp, Equals, "")
203 c.Check(fallbackHosts, DeepEquals, []string{"foo:443", "bar:443"})
204}
205
206/****************************************************************
184 NewSession() tests207 NewSession() tests
185****************************************************************/208****************************************************************/
186209
210var dummyConf = ClientSessionConfig{}
211
187func (cs *clientSessionSuite) TestNewSessionPlainWorks(c *C) {212func (cs *clientSessionSuite) TestNewSessionPlainWorks(c *C) {
188 sess, err := NewSession("", nil, 0, "", cs.lvls, cs.log)213 sess, err := NewSession("foo:443", dummyConf, "", cs.lvls, cs.log)
189 c.Check(sess, NotNil)214 c.Check(sess, NotNil)
190 c.Check(err, IsNil)215 c.Check(err, IsNil)
216 c.Check(sess.fallbackHosts, DeepEquals, []string{"foo:443"})
191 // but no root CAs set217 // but no root CAs set
192 c.Check(sess.TLS.RootCAs, IsNil)218 c.Check(sess.TLS.RootCAs, IsNil)
193 c.Check(sess.State(), Equals, Disconnected)219 c.Check(sess.State(), Equals, Disconnected)
194}220}
195221
196var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert")222func (cs *clientSessionSuite) TestNewSessionHostEndpointWorks(c *C) {
223 sess, err := NewSession("http://foo/hosts", dummyConf, "wah", cs.lvls, cs.log)
224 c.Assert(err, IsNil)
225 c.Check(sess.getHost, NotNil)
226}
227
228var certfile string = helpers.SourceRelative("../../server/acceptance/ssl/testing.cert")
197var pem, _ = ioutil.ReadFile(certfile)229var pem, _ = ioutil.ReadFile(certfile)
198230
199func (cs *clientSessionSuite) TestNewSessionPEMWorks(c *C) {231func (cs *clientSessionSuite) TestNewSessionPEMWorks(c *C) {
200 sess, err := NewSession("", pem, 0, "wah", cs.lvls, cs.log)232 conf := ClientSessionConfig{PEM: pem}
233 sess, err := NewSession("", conf, "wah", cs.lvls, cs.log)
201 c.Check(sess, NotNil)234 c.Check(sess, NotNil)
202 c.Assert(err, IsNil)235 c.Assert(err, IsNil)
203 c.Check(sess.TLS.RootCAs, NotNil)236 c.Check(sess.TLS.RootCAs, NotNil)
@@ -205,25 +238,172 @@
205238
206func (cs *clientSessionSuite) TestNewSessionBadPEMFileContentFails(c *C) {239func (cs *clientSessionSuite) TestNewSessionBadPEMFileContentFails(c *C) {
207 badpem := []byte("This is not the PEM you're looking for.")240 badpem := []byte("This is not the PEM you're looking for.")
208 sess, err := NewSession("", badpem, 0, "wah", cs.lvls, cs.log)241 conf := ClientSessionConfig{PEM: badpem}
242 sess, err := NewSession("", conf, "wah", cs.lvls, cs.log)
209 c.Check(sess, IsNil)243 c.Check(sess, IsNil)
210 c.Check(err, NotNil)244 c.Check(err, NotNil)
211}245}
212246
213func (cs *clientSessionSuite) TestNewSessionBadLevelMapFails(c *C) {247func (cs *clientSessionSuite) TestNewSessionBadLevelMapFails(c *C) {
214 ferr := func() (levelmap.LevelMap, error) { return nil, errors.New("Busted.") }248 ferr := func() (levelmap.LevelMap, error) { return nil, errors.New("Busted.") }
215 sess, err := NewSession("", nil, 0, "wah", ferr, cs.log)249 sess, err := NewSession("", dummyConf, "wah", ferr, cs.log)
216 c.Check(sess, IsNil)250 c.Check(sess, IsNil)
217 c.Assert(err, NotNil)251 c.Assert(err, NotNil)
218}252}
219253
220/****************************************************************254/****************************************************************
255 getHosts() tests
256****************************************************************/
257
258func (cs *clientSessionSuite) TestGetHostsFallback(c *C) {
259 fallback := []string{"foo:443", "bar:443"}
260 sess := &ClientSession{fallbackHosts: fallback}
261 err := sess.getHosts()
262 c.Assert(err, IsNil)
263 c.Check(sess.deliveryHosts, DeepEquals, fallback)
264}
265
266type testHostGetter struct {
267 hosts []string
268 err error
269}
270
271func (thg *testHostGetter) Get() ([]string, error) {
272 return thg.hosts, thg.err
273}
274
275func (cs *clientSessionSuite) TestGetHostsRemote(c *C) {
276 hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil}
277 sess := &ClientSession{getHost: hostGetter, timeSince: time.Since}
278 err := sess.getHosts()
279 c.Assert(err, IsNil)
280 c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"})
281}
282
283func (cs *clientSessionSuite) TestGetHostsRemoteError(c *C) {
284 sess, err := NewSession("", dummyConf, "", cs.lvls, cs.log)
285 c.Assert(err, IsNil)
286 hostsErr := errors.New("failed")
287 hostGetter := &testHostGetter{nil, hostsErr}
288 sess.getHost = hostGetter
289 err = sess.getHosts()
290 c.Assert(err, Equals, hostsErr)
291 c.Check(sess.deliveryHosts, IsNil)
292 c.Check(sess.State(), Equals, Error)
293}
294
295func (cs *clientSessionSuite) TestGetHostsRemoteCaching(c *C) {
296 hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil}
297 sess := &ClientSession{
298 getHost: hostGetter,
299 ClientSessionConfig: ClientSessionConfig{
300 HostsCachingExpiryTime: 2 * time.Hour,
301 },
302 timeSince: time.Since,
303 }
304 err := sess.getHosts()
305 c.Assert(err, IsNil)
306 hostGetter.hosts = []string{"baz:443"}
307 // cached
308 err = sess.getHosts()
309 c.Assert(err, IsNil)
310 c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"})
311 // expired
312 sess.timeSince = func(ts time.Time) time.Duration {
313 return 3 * time.Hour
314 }
315 err = sess.getHosts()
316 c.Assert(err, IsNil)
317 c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"})
318}
319
320/****************************************************************
321 startConnectionAttempt()/nextHostToTry()/started tests
322****************************************************************/
323
324func (cs *clientSessionSuite) TestStartConnectionAttempt(c *C) {
325 since := time.Since(time.Time{})
326 sess := &ClientSession{
327 ClientSessionConfig: ClientSessionConfig{
328 ExpectAllRepairedTime: 10 * time.Second,
329 },
330 timeSince: func(ts time.Time) time.Duration {
331 return since
332 },
333 deliveryHosts: []string{"foo:443", "bar:443"},
334 }
335 // start from first host
336 sess.startConnectionAttempt()
337 c.Check(sess.lastAttemptTimestamp, Not(Equals), 0)
338 c.Check(sess.tryHost, Equals, 0)
339 c.Check(sess.leftToTry, Equals, 2)
340 since = 1 * time.Second
341 sess.tryHost = 1
342 // just continue
343 sess.startConnectionAttempt()
344 c.Check(sess.tryHost, Equals, 1)
345 sess.tryHost = 2
346}
347
348func (cs *clientSessionSuite) TestStartConnectionAttemptNoHostsPanic(c *C) {
349 since := time.Since(time.Time{})
350 sess := &ClientSession{
351 ClientSessionConfig: ClientSessionConfig{
352 ExpectAllRepairedTime: 10 * time.Second,
353 },
354 timeSince: func(ts time.Time) time.Duration {
355 return since
356 },
357 }
358 c.Check(sess.startConnectionAttempt, PanicMatches, "should have got hosts from config or remote at this point")
359}
360
361func (cs *clientSessionSuite) TestNextHostToTry(c *C) {
362 sess := &ClientSession{
363 deliveryHosts: []string{"foo:443", "bar:443", "baz:443"},
364 tryHost: 0,
365 leftToTry: 3,
366 }
367 c.Check(sess.nextHostToTry(), Equals, "foo:443")
368 c.Check(sess.nextHostToTry(), Equals, "bar:443")
369 c.Check(sess.nextHostToTry(), Equals, "baz:443")
370 c.Check(sess.nextHostToTry(), Equals, "")
371 c.Check(sess.nextHostToTry(), Equals, "")
372 c.Check(sess.tryHost, Equals, 0)
373
374 sess.leftToTry = 3
375 sess.tryHost = 1
376 c.Check(sess.nextHostToTry(), Equals, "bar:443")
377 c.Check(sess.nextHostToTry(), Equals, "baz:443")
378 c.Check(sess.nextHostToTry(), Equals, "foo:443")
379 c.Check(sess.nextHostToTry(), Equals, "")
380 c.Check(sess.nextHostToTry(), Equals, "")
381 c.Check(sess.tryHost, Equals, 1)
382}
383
384func (cs *clientSessionSuite) TestStarted(c *C) {
385 sess, err := NewSession("", dummyConf, "", cs.lvls, cs.log)
386 c.Assert(err, IsNil)
387
388 sess.deliveryHosts = []string{"foo:443", "bar:443", "baz:443"}
389 sess.tryHost = 1
390
391 sess.started()
392 c.Check(sess.tryHost, Equals, 0)
393 c.Check(sess.State(), Equals, Started)
394
395 sess.started()
396 c.Check(sess.tryHost, Equals, 2)
397}
398
399/****************************************************************
221 connect() tests400 connect() tests
222****************************************************************/401****************************************************************/
223402
224func (cs *clientSessionSuite) TestConnectFailsWithNoAddress(c *C) {403func (cs *clientSessionSuite) TestConnectFailsWithNoAddress(c *C) {
225 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)404 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
226 c.Assert(err, IsNil)405 c.Assert(err, IsNil)
406 sess.deliveryHosts = []string{"nowhere"}
227 err = sess.connect()407 err = sess.connect()
228 c.Check(err, ErrorMatches, ".*connect.*address.*")408 c.Check(err, ErrorMatches, ".*connect.*address.*")
229 c.Check(sess.State(), Equals, Error)409 c.Check(sess.State(), Equals, Error)
@@ -233,20 +413,36 @@
233 srv, err := net.Listen("tcp", "localhost:0")413 srv, err := net.Listen("tcp", "localhost:0")
234 c.Assert(err, IsNil)414 c.Assert(err, IsNil)
235 defer srv.Close()415 defer srv.Close()
236 sess, err := NewSession(srv.Addr().String(), nil, 0, "wah", cs.lvls, cs.log)416 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
237 c.Assert(err, IsNil)417 c.Assert(err, IsNil)
238 err = sess.connect()418 sess.deliveryHosts = []string{srv.Addr().String()}
239 c.Check(err, IsNil)419 err = sess.connect()
240 c.Check(sess.Connection, NotNil)420 c.Check(err, IsNil)
241 c.Check(sess.State(), Equals, Connected)421 c.Check(sess.Connection, NotNil)
422 c.Check(sess.State(), Equals, Connected)
423}
424
425func (cs *clientSessionSuite) TestConnectSecondConnects(c *C) {
426 srv, err := net.Listen("tcp", "localhost:0")
427 c.Assert(err, IsNil)
428 defer srv.Close()
429 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
430 c.Assert(err, IsNil)
431 sess.deliveryHosts = []string{"nowhere", srv.Addr().String()}
432 err = sess.connect()
433 c.Check(err, IsNil)
434 c.Check(sess.Connection, NotNil)
435 c.Check(sess.State(), Equals, Connected)
436 c.Check(sess.tryHost, Equals, 0)
242}437}
243438
244func (cs *clientSessionSuite) TestConnectConnectFail(c *C) {439func (cs *clientSessionSuite) TestConnectConnectFail(c *C) {
245 srv, err := net.Listen("tcp", "localhost:0")440 srv, err := net.Listen("tcp", "localhost:0")
246 c.Assert(err, IsNil)441 c.Assert(err, IsNil)
247 sess, err := NewSession(srv.Addr().String(), nil, 0, "wah", cs.lvls, cs.log)442 sess, err := NewSession(srv.Addr().String(), dummyConf, "wah", cs.lvls, cs.log)
248 srv.Close()443 srv.Close()
249 c.Assert(err, IsNil)444 c.Assert(err, IsNil)
445 sess.deliveryHosts = []string{srv.Addr().String()}
250 err = sess.connect()446 err = sess.connect()
251 c.Check(err, ErrorMatches, ".*connection refused")447 c.Check(err, ErrorMatches, ".*connection refused")
252 c.Check(sess.State(), Equals, Error)448 c.Check(sess.State(), Equals, Error)
@@ -257,7 +453,7 @@
257****************************************************************/453****************************************************************/
258454
259func (cs *clientSessionSuite) TestClose(c *C) {455func (cs *clientSessionSuite) TestClose(c *C) {
260 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)456 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
261 c.Assert(err, IsNil)457 c.Assert(err, IsNil)
262 sess.Connection = &testConn{Name: "TestClose"}458 sess.Connection = &testConn{Name: "TestClose"}
263 sess.Close()459 sess.Close()
@@ -266,7 +462,7 @@
266}462}
267463
268func (cs *clientSessionSuite) TestCloseTwice(c *C) {464func (cs *clientSessionSuite) TestCloseTwice(c *C) {
269 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)465 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
270 c.Assert(err, IsNil)466 c.Assert(err, IsNil)
271 sess.Connection = &testConn{Name: "TestCloseTwice"}467 sess.Connection = &testConn{Name: "TestCloseTwice"}
272 sess.Close()468 sess.Close()
@@ -277,7 +473,7 @@
277}473}
278474
279func (cs *clientSessionSuite) TestCloseFails(c *C) {475func (cs *clientSessionSuite) TestCloseFails(c *C) {
280 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)476 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
281 c.Assert(err, IsNil)477 c.Assert(err, IsNil)
282 sess.Connection = &testConn{Name: "TestCloseFails", CloseCondition: condition.Work(false)}478 sess.Connection = &testConn{Name: "TestCloseFails", CloseCondition: condition.Work(false)}
283 sess.Close()479 sess.Close()
@@ -291,7 +487,7 @@
291func (d *derp) Stop() { d.stopped = true }487func (d *derp) Stop() { d.stopped = true }
292488
293func (cs *clientSessionSuite) TestCloseStopsRetrier(c *C) {489func (cs *clientSessionSuite) TestCloseStopsRetrier(c *C) {
294 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)490 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
295 c.Assert(err, IsNil)491 c.Assert(err, IsNil)
296 ar := new(derp)492 ar := new(derp)
297 sess.retrier = ar493 sess.retrier = ar
@@ -308,7 +504,7 @@
308504
309func (cs *clientSessionSuite) TestAutoRedialWorks(c *C) {505func (cs *clientSessionSuite) TestAutoRedialWorks(c *C) {
310 // checks that AutoRedial sets up a retrier and tries redialing it506 // checks that AutoRedial sets up a retrier and tries redialing it
311 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)507 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
312 c.Assert(err, IsNil)508 c.Assert(err, IsNil)
313 ar := new(derp)509 ar := new(derp)
314 sess.retrier = ar510 sess.retrier = ar
@@ -319,7 +515,7 @@
319515
320func (cs *clientSessionSuite) TestAutoRedialStopsRetrier(c *C) {516func (cs *clientSessionSuite) TestAutoRedialStopsRetrier(c *C) {
321 // checks that AutoRedial stops the previous retrier517 // checks that AutoRedial stops the previous retrier
322 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)518 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
323 c.Assert(err, IsNil)519 c.Assert(err, IsNil)
324 ch := make(chan uint32)520 ch := make(chan uint32)
325 c.Check(sess.retrier, IsNil)521 c.Check(sess.retrier, IsNil)
@@ -344,7 +540,10 @@
344540
345func (s *msgSuite) SetUpTest(c *C) {541func (s *msgSuite) SetUpTest(c *C) {
346 var err error542 var err error
347 s.sess, err = NewSession("", nil, time.Millisecond, "wah", levelmap.NewLevelMap, helpers.NewTestLogger(c, "debug"))543 conf := ClientSessionConfig{
544 ExchangeTimeout: time.Millisecond,
545 }
546 s.sess, err = NewSession("", conf, "wah", levelmap.NewLevelMap, helpers.NewTestLogger(c, "debug"))
348 c.Assert(err, IsNil)547 c.Assert(err, IsNil)
349 s.sess.Connection = &testConn{Name: "TestHandle*"}548 s.sess.Connection = &testConn{Name: "TestHandle*"}
350 s.errCh = make(chan error, 1)549 s.errCh = make(chan error, 1)
@@ -383,14 +582,28 @@
383 AppId: "--ignored--",582 AppId: "--ignored--",
384 ChanId: "0",583 ChanId: "0",
385 TopLevel: 2,584 TopLevel: 2,
386 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},585 Payloads: []json.RawMessage{
586 json.RawMessage(`{"img1/m1":[101,"tubular"]}`),
587 json.RawMessage("false"), // shouldn't happen but robust
588 json.RawMessage(`{"img1/m1":[102,"tubular"]}`),
589 },
387 }, protocol.NotificationsMsg{}}590 }, protocol.NotificationsMsg{}}
388 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()591 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
389 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})592 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
390 s.upCh <- nil // ack ok593 s.upCh <- nil // ack ok
391 c.Check(<-s.errCh, Equals, nil)594 c.Check(<-s.errCh, Equals, nil)
392 c.Assert(len(s.sess.MsgCh), Equals, 1)595 c.Assert(len(s.sess.MsgCh), Equals, 1)
393 c.Check(<-s.sess.MsgCh, DeepEquals, &Notification{TopLevel: 2})596 c.Check(<-s.sess.MsgCh, DeepEquals, &Notification{
597 TopLevel: 2,
598 Decoded: []map[string]interface{}{
599 map[string]interface{}{
600 "img1/m1": []interface{}{float64(101), "tubular"},
601 },
602 map[string]interface{}{
603 "img1/m1": []interface{}{float64(102), "tubular"},
604 },
605 },
606 })
394 // and finally, the session keeps track of the levels607 // and finally, the session keeps track of the levels
395 levels, err := s.sess.Levels.GetAll()608 levels, err := s.sess.Levels.GetAll()
396 c.Check(err, IsNil)609 c.Check(err, IsNil)
@@ -519,7 +732,7 @@
519 start() tests732 start() tests
520****************************************************************/733****************************************************************/
521func (cs *clientSessionSuite) TestStartFailsIfSetDeadlineFails(c *C) {734func (cs *clientSessionSuite) TestStartFailsIfSetDeadlineFails(c *C) {
522 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)735 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
523 c.Assert(err, IsNil)736 c.Assert(err, IsNil)
524 sess.Connection = &testConn{Name: "TestStartFailsIfSetDeadlineFails",737 sess.Connection = &testConn{Name: "TestStartFailsIfSetDeadlineFails",
525 DeadlineCondition: condition.Work(false)} // setdeadline will fail738 DeadlineCondition: condition.Work(false)} // setdeadline will fail
@@ -529,7 +742,7 @@
529}742}
530743
531func (cs *clientSessionSuite) TestStartFailsIfWriteFails(c *C) {744func (cs *clientSessionSuite) TestStartFailsIfWriteFails(c *C) {
532 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)745 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
533 c.Assert(err, IsNil)746 c.Assert(err, IsNil)
534 sess.Connection = &testConn{Name: "TestStartFailsIfWriteFails",747 sess.Connection = &testConn{Name: "TestStartFailsIfWriteFails",
535 WriteCondition: condition.Work(false)} // write will fail748 WriteCondition: condition.Work(false)} // write will fail
@@ -539,7 +752,7 @@
539}752}
540753
541func (cs *clientSessionSuite) TestStartFailsIfGetLevelsFails(c *C) {754func (cs *clientSessionSuite) TestStartFailsIfGetLevelsFails(c *C) {
542 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)755 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
543 c.Assert(err, IsNil)756 c.Assert(err, IsNil)
544 sess.Levels = &brokenLevelMap{}757 sess.Levels = &brokenLevelMap{}
545 sess.Connection = &testConn{Name: "TestStartConnectMessageFails"}758 sess.Connection = &testConn{Name: "TestStartConnectMessageFails"}
@@ -559,7 +772,7 @@
559}772}
560773
561func (cs *clientSessionSuite) TestStartConnectMessageFails(c *C) {774func (cs *clientSessionSuite) TestStartConnectMessageFails(c *C) {
562 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)775 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
563 c.Assert(err, IsNil)776 c.Assert(err, IsNil)
564 sess.Connection = &testConn{Name: "TestStartConnectMessageFails"}777 sess.Connection = &testConn{Name: "TestStartConnectMessageFails"}
565 errCh := make(chan error, 1)778 errCh := make(chan error, 1)
@@ -585,7 +798,7 @@
585}798}
586799
587func (cs *clientSessionSuite) TestStartConnackReadError(c *C) {800func (cs *clientSessionSuite) TestStartConnackReadError(c *C) {
588 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)801 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
589 c.Assert(err, IsNil)802 c.Assert(err, IsNil)
590 sess.Connection = &testConn{Name: "TestStartConnackReadError"}803 sess.Connection = &testConn{Name: "TestStartConnackReadError"}
591 errCh := make(chan error, 1)804 errCh := make(chan error, 1)
@@ -609,7 +822,7 @@
609}822}
610823
611func (cs *clientSessionSuite) TestStartBadConnack(c *C) {824func (cs *clientSessionSuite) TestStartBadConnack(c *C) {
612 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)825 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
613 c.Assert(err, IsNil)826 c.Assert(err, IsNil)
614 sess.Connection = &testConn{Name: "TestStartBadConnack"}827 sess.Connection = &testConn{Name: "TestStartBadConnack"}
615 errCh := make(chan error, 1)828 errCh := make(chan error, 1)
@@ -633,7 +846,7 @@
633}846}
634847
635func (cs *clientSessionSuite) TestStartNotConnack(c *C) {848func (cs *clientSessionSuite) TestStartNotConnack(c *C) {
636 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)849 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
637 c.Assert(err, IsNil)850 c.Assert(err, IsNil)
638 sess.Connection = &testConn{Name: "TestStartBadConnack"}851 sess.Connection = &testConn{Name: "TestStartBadConnack"}
639 errCh := make(chan error, 1)852 errCh := make(chan error, 1)
@@ -657,7 +870,14 @@
657}870}
658871
659func (cs *clientSessionSuite) TestStartWorks(c *C) {872func (cs *clientSessionSuite) TestStartWorks(c *C) {
660 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)873 info := map[string]interface{}{
874 "foo": 1,
875 "bar": "baz",
876 }
877 conf := ClientSessionConfig{
878 Info: info,
879 }
880 sess, err := NewSession("", conf, "wah", cs.lvls, cs.log)
661 c.Assert(err, IsNil)881 c.Assert(err, IsNil)
662 sess.Connection = &testConn{Name: "TestStartWorks"}882 sess.Connection = &testConn{Name: "TestStartWorks"}
663 errCh := make(chan error, 1)883 errCh := make(chan error, 1)
@@ -671,8 +891,10 @@
671 }()891 }()
672892
673 c.Check(takeNext(downCh), Equals, "deadline 0")893 c.Check(takeNext(downCh), Equals, "deadline 0")
674 _, ok := takeNext(downCh).(protocol.ConnectMsg)894 msg, ok := takeNext(downCh).(protocol.ConnectMsg)
675 c.Check(ok, Equals, true)895 c.Check(ok, Equals, true)
896 c.Check(msg.DeviceId, Equals, "wah")
897 c.Check(msg.Info, DeepEquals, info)
676 upCh <- nil // no error898 upCh <- nil // no error
677 upCh <- protocol.ConnAckMsg{899 upCh <- protocol.ConnAckMsg{
678 Type: "connack",900 Type: "connack",
@@ -688,34 +910,49 @@
688 run() tests910 run() tests
689****************************************************************/911****************************************************************/
690912
691func (cs *clientSessionSuite) TestRunBailsIfConnectFails(c *C) {913func (cs *clientSessionSuite) TestRunBailsIfHostGetterFails(c *C) {
692 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)914 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
693 c.Assert(err, IsNil)915 c.Assert(err, IsNil)
694 failure := errors.New("TestRunBailsIfConnectFails")916 failure := errors.New("TestRunBailsIfHostGetterFails")
695 has_closed := false917 has_closed := false
696 err = sess.run(918 err = sess.run(
697 func() { has_closed = true },919 func() { has_closed = true },
698 func() error { return failure },920 func() error { return failure },
699 nil,921 nil,
922 nil,
700 nil)923 nil)
701 c.Check(err, Equals, failure)924 c.Check(err, Equals, failure)
702 c.Check(has_closed, Equals, true)925 c.Check(has_closed, Equals, true)
703}926}
704927
928func (cs *clientSessionSuite) TestRunBailsIfConnectFails(c *C) {
929 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
930 c.Assert(err, IsNil)
931 failure := errors.New("TestRunBailsIfConnectFails")
932 err = sess.run(
933 func() {},
934 func() error { return nil },
935 func() error { return failure },
936 nil,
937 nil)
938 c.Check(err, Equals, failure)
939}
940
705func (cs *clientSessionSuite) TestRunBailsIfStartFails(c *C) {941func (cs *clientSessionSuite) TestRunBailsIfStartFails(c *C) {
706 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)942 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
707 c.Assert(err, IsNil)943 c.Assert(err, IsNil)
708 failure := errors.New("TestRunBailsIfStartFails")944 failure := errors.New("TestRunBailsIfStartFails")
709 err = sess.run(945 err = sess.run(
710 func() {},946 func() {},
711 func() error { return nil },947 func() error { return nil },
948 func() error { return nil },
712 func() error { return failure },949 func() error { return failure },
713 nil)950 nil)
714 c.Check(err, Equals, failure)951 c.Check(err, Equals, failure)
715}952}
716953
717func (cs *clientSessionSuite) TestRunRunsEvenIfLoopFails(c *C) {954func (cs *clientSessionSuite) TestRunRunsEvenIfLoopFails(c *C) {
718 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)955 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
719 c.Assert(err, IsNil)956 c.Assert(err, IsNil)
720 // just to make a point: until here we haven't set ErrCh & MsgCh (no957 // just to make a point: until here we haven't set ErrCh & MsgCh (no
721 // biggie if this stops being true)958 // biggie if this stops being true)
@@ -727,6 +964,7 @@
727 func() {},964 func() {},
728 func() error { return nil },965 func() error { return nil },
729 func() error { return nil },966 func() error { return nil },
967 func() error { return nil },
730 func() error { sess.MsgCh <- notf; return <-failureCh })968 func() error { sess.MsgCh <- notf; return <-failureCh })
731 c.Check(err, Equals, nil)969 c.Check(err, Equals, nil)
732 // if run doesn't error it sets up the channels970 // if run doesn't error it sets up the channels
@@ -744,7 +982,7 @@
744****************************************************************/982****************************************************************/
745983
746func (cs *clientSessionSuite) TestJitter(c *C) {984func (cs *clientSessionSuite) TestJitter(c *C) {
747 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)985 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
748 c.Assert(err, IsNil)986 c.Assert(err, IsNil)
749 num_tries := 20 // should do the math987 num_tries := 20 // should do the math
750 spread := time.Second //988 spread := time.Second //
@@ -776,12 +1014,17 @@
7761014
777func (cs *clientSessionSuite) TestDialPanics(c *C) {1015func (cs *clientSessionSuite) TestDialPanics(c *C) {
778 // one last unhappy test1016 // one last unhappy test
779 sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log)1017 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
780 c.Assert(err, IsNil)1018 c.Assert(err, IsNil)
781 sess.Protocolator = nil1019 sess.Protocolator = nil
782 c.Check(sess.Dial, PanicMatches, ".*protocol constructor.")1020 c.Check(sess.Dial, PanicMatches, ".*protocol constructor.")
783}1021}
7841022
1023var (
1024 dialTestTimeout = 100 * time.Millisecond
1025 dialTestConf = ClientSessionConfig{ExchangeTimeout: dialTestTimeout}
1026)
1027
785func (cs *clientSessionSuite) TestDialWorks(c *C) {1028func (cs *clientSessionSuite) TestDialWorks(c *C) {
786 // happy path thoughts1029 // happy path thoughts
787 cert, err := tls.X509KeyPair(helpers.TestCertPEMBlock, helpers.TestKeyPEMBlock)1030 cert, err := tls.X509KeyPair(helpers.TestCertPEMBlock, helpers.TestKeyPEMBlock)
@@ -791,10 +1034,22 @@
791 SessionTicketsDisabled: true,1034 SessionTicketsDisabled: true,
792 }1035 }
7931036
794 timeout := 100 * time.Millisecond
795 lst, err := tls.Listen("tcp", "localhost:0", tlsCfg)1037 lst, err := tls.Listen("tcp", "localhost:0", tlsCfg)
796 c.Assert(err, IsNil)1038 c.Assert(err, IsNil)
797 sess, err := NewSession(lst.Addr().String(), nil, timeout, "wah", cs.lvls, cs.log)1039 // advertise
1040 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1041 b, err := json.Marshal(map[string]interface{}{
1042 "hosts": []string{"nowhere", lst.Addr().String()},
1043 })
1044 if err != nil {
1045 panic(err)
1046 }
1047 w.Header().Set("Content-Type", "application/json")
1048 w.Write(b)
1049 }))
1050 defer ts.Close()
1051
1052 sess, err := NewSession(ts.URL, dialTestConf, "wah", cs.lvls, cs.log)
798 c.Assert(err, IsNil)1053 c.Assert(err, IsNil)
799 tconn := &testConn{CloseCondition: condition.Fail2Work(10)}1054 tconn := &testConn{CloseCondition: condition.Fail2Work(10)}
800 sess.Connection = tconn1055 sess.Connection = tconn
@@ -819,10 +1074,13 @@
819 c.Check(tconn.CloseCondition.String(), Matches, ".* 9 to go.")1074 c.Check(tconn.CloseCondition.String(), Matches, ".* 9 to go.")
8201075
821 // now, start: 1. protocol version1076 // now, start: 1. protocol version
822 v, err := protocol.ReadWireFormatVersion(srv, timeout)1077 v, err := protocol.ReadWireFormatVersion(srv, dialTestTimeout)
823 c.Assert(err, IsNil)1078 c.Assert(err, IsNil)
824 c.Assert(v, Equals, protocol.ProtocolWireVersion)1079 c.Assert(v, Equals, protocol.ProtocolWireVersion)
8251080
1081 // if something goes wrong session would try the first/other host
1082 c.Check(sess.tryHost, Equals, 0)
1083
826 // 2. "connect" (but on the fake protcol above! woo)1084 // 2. "connect" (but on the fake protcol above! woo)
8271085
828 c.Check(takeNext(downCh), Equals, "deadline 100ms")1086 c.Check(takeNext(downCh), Equals, "deadline 100ms")
@@ -843,6 +1101,9 @@
843 c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"})1101 c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"})
844 upCh <- nil1102 upCh <- nil
8451103
1104 // session would retry the same host
1105 c.Check(sess.tryHost, Equals, 1)
1106
846 // and broadcasts...1107 // and broadcasts...
847 b := &protocol.BroadcastMsg{1108 b := &protocol.BroadcastMsg{
848 Type: "broadcast",1109 Type: "broadcast",
@@ -870,3 +1131,30 @@
870 upCh <- failure1131 upCh <- failure
871 c.Check(<-sess.ErrCh, Equals, failure)1132 c.Check(<-sess.ErrCh, Equals, failure)
872}1133}
1134
1135func (cs *clientSessionSuite) TestDialWorksDirect(c *C) {
1136 // happy path thoughts
1137 cert, err := tls.X509KeyPair(helpers.TestCertPEMBlock, helpers.TestKeyPEMBlock)
1138 c.Assert(err, IsNil)
1139 tlsCfg := &tls.Config{
1140 Certificates: []tls.Certificate{cert},
1141 SessionTicketsDisabled: true,
1142 }
1143
1144 lst, err := tls.Listen("tcp", "localhost:0", tlsCfg)
1145 c.Assert(err, IsNil)
1146 sess, err := NewSession(lst.Addr().String(), dialTestConf, "wah", cs.lvls, cs.log)
1147 c.Assert(err, IsNil)
1148 defer sess.Close()
1149
1150 upCh := make(chan interface{}, 5)
1151 downCh := make(chan interface{}, 5)
1152 proto := &testProtocol{up: upCh, down: downCh}
1153 sess.Protocolator = func(net.Conn) protocol.Protocol { return proto }
1154
1155 go sess.Dial()
1156
1157 _, err = lst.Accept()
1158 c.Assert(err, IsNil)
1159 // connect done
1160}
8731161
=== modified file 'debian/config.json'
--- debian/config.json 2014-03-20 12:17:40 +0000
+++ debian/config.json 2014-04-04 13:58:43 +0000
@@ -1,6 +1,9 @@
1{1{
2 "connect_timeout": "20s",
2 "exchange_timeout": "30s",3 "exchange_timeout": "30s",
3 "addr": "push-delivery.ubuntu.com:443",4 "hosts_cache_expiry": "12h",
5 "expect_all_repaired": "40m",
6 "addr": "https://push.ubuntu.com/delivery-hosts",
4 "cert_pem_file": "",7 "cert_pem_file": "",
5 "stabilizing_timeout": "2s",8 "stabilizing_timeout": "2s",
6 "recheck_timeout": "10m",9 "recheck_timeout": "10m",
710
=== modified file 'debian/ubuntu-push-client.conf'
--- debian/ubuntu-push-client.conf 2014-02-07 11:31:54 +0000
+++ debian/ubuntu-push-client.conf 2014-04-04 13:58:43 +0000
@@ -1,6 +1,6 @@
1description "Starts the ubuntu push notifications client side daemon"1description "ubuntu push notification client-side daemon"
22
3start on dbus3start on started dbus
4stop on runlevel [06]4stop on stopped dbus
55
6exec /usr/lib/ubuntu-push-client/ubuntu-push-client6exec /usr/lib/ubuntu-push-client/ubuntu-push-client
77
=== added directory 'sampleconfigs'
=== renamed file 'server/acceptance/config/config.json' => 'sampleconfigs/dev.json'
--- server/acceptance/config/config.json 2014-01-17 17:20:34 +0000
+++ sampleconfigs/dev.json 2014-04-04 13:58:43 +0000
@@ -4,9 +4,9 @@
4 "broker_queue_size": 10000,4 "broker_queue_size": 10000,
5 "session_queue_size": 10,5 "session_queue_size": 10,
6 "addr": "127.0.0.1:9090",6 "addr": "127.0.0.1:9090",
7 "key_pem_file": "testing.key",7 "key_pem_file": "../server/acceptance/ssl/testing.key",
8 "cert_pem_file": "testing.cert",8 "cert_pem_file": "../server/acceptance/ssl/testing.cert",
9 "http_addr": "127.0.0.1:8888",9 "http_addr": "127.0.0.1:8080",
10 "http_read_timeout": "5s",10 "http_read_timeout": "5s",
11 "http_write_timeout": "5s"11 "http_write_timeout": "5s"
12}12}
1313
=== modified file 'server/acceptance/acceptanceclient.go'
--- server/acceptance/acceptanceclient.go 2014-02-21 16:17:28 +0000
+++ server/acceptance/acceptanceclient.go 2014-04-04 13:58:43 +0000
@@ -35,6 +35,8 @@
35type ClientSession struct {35type ClientSession struct {
36 // configuration36 // configuration
37 DeviceId string37 DeviceId string
38 Model string
39 ImageChannel string
38 ServerAddr string40 ServerAddr string
39 ExchangeTimeout time.Duration41 ExchangeTimeout time.Duration
40 CertPEMBlock []byte42 CertPEMBlock []byte
@@ -86,6 +88,10 @@
86 Type: "connect",88 Type: "connect",
87 DeviceId: sess.DeviceId,89 DeviceId: sess.DeviceId,
88 Levels: sess.Levels,90 Levels: sess.Levels,
91 Info: map[string]interface{}{
92 "device": sess.Model,
93 "channel": sess.ImageChannel,
94 },
89 })95 })
90 if err != nil {96 if err != nil {
91 return err97 return err
9298
=== modified file 'server/acceptance/cmd/acceptanceclient.go'
--- server/acceptance/cmd/acceptanceclient.go 2014-02-21 16:17:28 +0000
+++ server/acceptance/cmd/acceptanceclient.go 2014-04-04 13:58:43 +0000
@@ -30,6 +30,8 @@
30var (30var (
31 insecureFlag = flag.Bool("insecure", false, "disable checking of server certificate and hostname")31 insecureFlag = flag.Bool("insecure", false, "disable checking of server certificate and hostname")
32 reportPingsFlag = flag.Bool("reportPings", true, "report each Ping from the server")32 reportPingsFlag = flag.Bool("reportPings", true, "report each Ping from the server")
33 deviceModel = flag.String("model", "?", "device image model")
34 imageChannel = flag.String("imageChannel", "?", "image channel")
33)35)
3436
35type configuration struct {37type configuration struct {
@@ -64,9 +66,12 @@
64 ServerAddr: cfg.Addr.HostPort(),66 ServerAddr: cfg.Addr.HostPort(),
65 DeviceId: flag.Arg(1),67 DeviceId: flag.Arg(1),
66 // flags68 // flags
67 ReportPings: *reportPingsFlag,69 Model: *deviceModel,
68 Insecure: *insecureFlag,70 ImageChannel: *imageChannel,
71 ReportPings: *reportPingsFlag,
72 Insecure: *insecureFlag,
69 }73 }
74 log.Printf("with: %#v", session)
70 session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName))75 session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName))
71 if err != nil {76 if err != nil {
72 log.Fatalf("reading CertPEMFile: %v", err)77 log.Fatalf("reading CertPEMFile: %v", err)
7378
=== renamed directory 'server/acceptance/config' => 'server/acceptance/ssl'
=== modified file 'server/acceptance/suites/broadcast.go'
--- server/acceptance/suites/broadcast.go 2014-02-21 21:39:54 +0000
+++ server/acceptance/suites/broadcast.go 2014-04-04 13:58:43 +0000
@@ -41,11 +41,34 @@
41 got, err := s.PostRequest("/broadcast", &api.Broadcast{41 got, err := s.PostRequest("/broadcast", &api.Broadcast{
42 Channel: "system",42 Channel: "system",
43 ExpireOn: future,43 ExpireOn: future,
44 Data: json.RawMessage(`{"n": 42}`),44 Data: json.RawMessage(`{"img1/m1": 42}`),
45 })45 })
46 c.Assert(err, IsNil)46 c.Assert(err, IsNil)
47 c.Assert(got, Matches, ".*ok.*")47 c.Assert(got, Matches, ".*ok.*")
48 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)48 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`)
49 stop()
50 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
51 c.Check(len(errCh), Equals, 0)
52}
53
54func (s *BroadcastAcceptanceSuite) TestBroadcastToConnectedChannelFilter(c *C) {
55 events, errCh, stop := s.StartClient(c, "DEVB", nil)
56 got, err := s.PostRequest("/broadcast", &api.Broadcast{
57 Channel: "system",
58 ExpireOn: future,
59 Data: json.RawMessage(`{"img1/m2": 10}`),
60 })
61 c.Assert(err, IsNil)
62 got, err = s.PostRequest("/broadcast", &api.Broadcast{
63 Channel: "system",
64 ExpireOn: future,
65 Data: json.RawMessage(`{"img1/m1": 20}`),
66 })
67 c.Assert(err, IsNil)
68 c.Assert(got, Matches, ".*ok.*")
69 // xxx don't send this one
70 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[]`)
71 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":20}]`)
49 stop()72 stop()
50 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)73 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
51 c.Check(len(errCh), Equals, 0)74 c.Check(len(errCh), Equals, 0)
@@ -56,14 +79,14 @@
56 got, err := s.PostRequest("/broadcast", &api.Broadcast{79 got, err := s.PostRequest("/broadcast", &api.Broadcast{
57 Channel: "system",80 Channel: "system",
58 ExpireOn: future,81 ExpireOn: future,
59 Data: json.RawMessage(`{"b": 1}`),82 Data: json.RawMessage(`{"img1/m1": 1}`),
60 })83 })
61 c.Assert(err, IsNil)84 c.Assert(err, IsNil)
62 c.Assert(got, Matches, ".*ok.*")85 c.Assert(got, Matches, ".*ok.*")
6386
64 events, errCh, stop := s.StartClient(c, "DEVB", nil)87 events, errCh, stop := s.StartClient(c, "DEVB", nil)
65 // gettting pending on connect88 // gettting pending on connect
66 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)89 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":1}]`)
67 stop()90 stop()
68 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)91 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
69 c.Check(len(errCh), Equals, 0)92 c.Check(len(errCh), Equals, 0)
@@ -71,7 +94,7 @@
7194
72func (s *BroadcastAcceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) {95func (s *BroadcastAcceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) {
73 // send bunch of broadcasts that will be pending96 // send bunch of broadcasts that will be pending
74 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))97 payloadFmt := fmt.Sprintf(`{"img1/m1":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
75 for i := 0; i < 32; i++ {98 for i := 0; i < 32; i++ {
76 got, err := s.PostRequest("/broadcast", &api.Broadcast{99 got, err := s.PostRequest("/broadcast", &api.Broadcast{
77 Channel: "system",100 Channel: "system",
@@ -84,7 +107,7 @@
84107
85 events, errCh, stop := s.StartClient(c, "DEVC", nil)108 events, errCh, stop := s.StartClient(c, "DEVC", nil)
86 // gettting pending on connect109 // gettting pending on connect
87 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`)110 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"img1/m1":0,.*`)
88 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)111 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)
89 stop()112 stop()
90 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)113 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
@@ -100,12 +123,12 @@
100 got, err := s.PostRequest("/broadcast", &api.Broadcast{123 got, err := s.PostRequest("/broadcast", &api.Broadcast{
101 Channel: "system",124 Channel: "system",
102 ExpireOn: future,125 ExpireOn: future,
103 Data: json.RawMessage(`{"n": 42}`),126 Data: json.RawMessage(`{"img1/m1": 42}`),
104 })127 })
105 c.Assert(err, IsNil)128 c.Assert(err, IsNil)
106 c.Assert(got, Matches, ".*ok.*")129 c.Assert(got, Matches, ".*ok.*")
107 c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)130 c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`)
108 c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)131 c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":42}]`)
109 stop1()132 stop1()
110 stop2()133 stop2()
111 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)134 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
@@ -119,11 +142,11 @@
119 got, err := s.PostRequest("/broadcast", &api.Broadcast{142 got, err := s.PostRequest("/broadcast", &api.Broadcast{
120 Channel: "system",143 Channel: "system",
121 ExpireOn: future,144 ExpireOn: future,
122 Data: json.RawMessage(`{"b": 1}`),145 Data: json.RawMessage(`{"img1/m1": 1}`),
123 })146 })
124 c.Assert(err, IsNil)147 c.Assert(err, IsNil)
125 c.Assert(got, Matches, ".*ok.*")148 c.Assert(got, Matches, ".*ok.*")
126 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)149 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"img1/m1":1}]`)
127 stop()150 stop()
128 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)151 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
129 c.Check(len(errCh), Equals, 0)152 c.Check(len(errCh), Equals, 0)
@@ -131,7 +154,7 @@
131 got, err = s.PostRequest("/broadcast", &api.Broadcast{154 got, err = s.PostRequest("/broadcast", &api.Broadcast{
132 Channel: "system",155 Channel: "system",
133 ExpireOn: future,156 ExpireOn: future,
134 Data: json.RawMessage(`{"b": 2}`),157 Data: json.RawMessage(`{"img1/m1": 2}`),
135 })158 })
136 c.Assert(err, IsNil)159 c.Assert(err, IsNil)
137 c.Assert(got, Matches, ".*ok.*")160 c.Assert(got, Matches, ".*ok.*")
@@ -139,7 +162,7 @@
139 events, errCh, stop = s.StartClient(c, "DEVD", map[string]int64{162 events, errCh, stop = s.StartClient(c, "DEVD", map[string]int64{
140 protocol.SystemChannelId: 1,163 protocol.SystemChannelId: 1,
141 })164 })
142 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)165 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":2}]`)
143 stop()166 stop()
144 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)167 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
145 c.Check(len(errCh), Equals, 0)168 c.Check(len(errCh), Equals, 0)
@@ -150,14 +173,14 @@
150 got, err := s.PostRequest("/broadcast", &api.Broadcast{173 got, err := s.PostRequest("/broadcast", &api.Broadcast{
151 Channel: "system",174 Channel: "system",
152 ExpireOn: future,175 ExpireOn: future,
153 Data: json.RawMessage(`{"b": 1}`),176 Data: json.RawMessage(`{"img1/m1": 1}`),
154 })177 })
155 c.Assert(err, IsNil)178 c.Assert(err, IsNil)
156 c.Assert(got, Matches, ".*ok.*")179 c.Assert(got, Matches, ".*ok.*")
157 got, err = s.PostRequest("/broadcast", &api.Broadcast{180 got, err = s.PostRequest("/broadcast", &api.Broadcast{
158 Channel: "system",181 Channel: "system",
159 ExpireOn: future,182 ExpireOn: future,
160 Data: json.RawMessage(`{"b": 2}`),183 Data: json.RawMessage(`{"img1/m1": 2}`),
161 })184 })
162 c.Assert(err, IsNil)185 c.Assert(err, IsNil)
163 c.Assert(got, Matches, ".*ok.*")186 c.Assert(got, Matches, ".*ok.*")
@@ -166,7 +189,7 @@
166 protocol.SystemChannelId: 10,189 protocol.SystemChannelId: 10,
167 })190 })
168 // gettting last one pending on connect191 // gettting last one pending on connect
169 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)192 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":2}]`)
170 stop()193 stop()
171 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)194 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
172 c.Check(len(errCh), Equals, 0)195 c.Check(len(errCh), Equals, 0)
@@ -189,14 +212,14 @@
189 got, err := s.PostRequest("/broadcast", &api.Broadcast{212 got, err := s.PostRequest("/broadcast", &api.Broadcast{
190 Channel: "system",213 Channel: "system",
191 ExpireOn: future,214 ExpireOn: future,
192 Data: json.RawMessage(`{"b": 1}`),215 Data: json.RawMessage(`{"img1/m1": 1}`),
193 })216 })
194 c.Assert(err, IsNil)217 c.Assert(err, IsNil)
195 c.Assert(got, Matches, ".*ok.*")218 c.Assert(got, Matches, ".*ok.*")
196 got, err = s.PostRequest("/broadcast", &api.Broadcast{219 got, err = s.PostRequest("/broadcast", &api.Broadcast{
197 Channel: "system",220 Channel: "system",
198 ExpireOn: future,221 ExpireOn: future,
199 Data: json.RawMessage(`{"b": 2}`),222 Data: json.RawMessage(`{"img1/m1": 2}`),
200 })223 })
201 c.Assert(err, IsNil)224 c.Assert(err, IsNil)
202 c.Assert(got, Matches, ".*ok.*")225 c.Assert(got, Matches, ".*ok.*")
@@ -205,7 +228,7 @@
205 protocol.SystemChannelId: -10,228 protocol.SystemChannelId: -10,
206 })229 })
207 // gettting pending on connect230 // gettting pending on connect
208 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`)231 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":1},{"img1/m1":2}]`)
209 stop()232 stop()
210 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)233 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
211 c.Check(len(errCh), Equals, 0)234 c.Check(len(errCh), Equals, 0)
@@ -216,14 +239,14 @@
216 got, err := s.PostRequest("/broadcast", &api.Broadcast{239 got, err := s.PostRequest("/broadcast", &api.Broadcast{
217 Channel: "system",240 Channel: "system",
218 ExpireOn: future,241 ExpireOn: future,
219 Data: json.RawMessage(`{"b": 1}`),242 Data: json.RawMessage(`{"img1/m1": 1}`),
220 })243 })
221 c.Assert(err, IsNil)244 c.Assert(err, IsNil)
222 c.Assert(got, Matches, ".*ok.*")245 c.Assert(got, Matches, ".*ok.*")
223 got, err = s.PostRequest("/broadcast", &api.Broadcast{246 got, err = s.PostRequest("/broadcast", &api.Broadcast{
224 Channel: "system",247 Channel: "system",
225 ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339),248 ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339),
226 Data: json.RawMessage(`{"b": 2}`),249 Data: json.RawMessage(`{"img1/m1": 2}`),
227 })250 })
228 c.Assert(err, IsNil)251 c.Assert(err, IsNil)
229 c.Assert(got, Matches, ".*ok.*")252 c.Assert(got, Matches, ".*ok.*")
@@ -233,7 +256,7 @@
233256
234 events, errCh, stop := s.StartClient(c, "DEVB", nil)257 events, errCh, stop := s.StartClient(c, "DEVB", nil)
235 // gettting pending on connect258 // gettting pending on connect
236 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`)259 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":1}]`)
237 stop()260 stop()
238 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)261 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
239 c.Check(len(errCh), Equals, 0)262 c.Check(len(errCh), Equals, 0)
240263
=== modified file 'server/acceptance/suites/helpers.go'
--- server/acceptance/suites/helpers.go 2014-03-25 19:08:00 +0000
+++ server/acceptance/suites/helpers.go 2014-04-04 13:58:43 +0000
@@ -48,8 +48,8 @@
48 "session_queue_size": 10,48 "session_queue_size": 10,
49 "broker_queue_size": 100,49 "broker_queue_size": 100,
50 "addr": addr,50 "addr": addr,
51 "key_pem_file": helpers.SourceRelative("../config/testing.key"),51 "key_pem_file": helpers.SourceRelative("../ssl/testing.key"),
52 "cert_pem_file": helpers.SourceRelative("../config/testing.cert"),52 "cert_pem_file": helpers.SourceRelative("../ssl/testing.cert"),
53 })53 })
54}54}
5555
5656
=== modified file 'server/acceptance/suites/pingpong.go'
--- server/acceptance/suites/pingpong.go 2014-02-21 20:29:16 +0000
+++ server/acceptance/suites/pingpong.go 2014-04-04 13:58:43 +0000
@@ -34,7 +34,7 @@
34func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) {34func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) {
35 errCh := make(chan error, 1)35 errCh := make(chan error, 1)
36 events := make(chan string, 10)36 events := make(chan string, 10)
37 sess := testClientSession(s.ServerAddr, "DEVA", true)37 sess := testClientSession(s.ServerAddr, "DEVA", "m1", "img1", true)
38 err := sess.Dial()38 err := sess.Dial()
39 c.Assert(err, IsNil)39 c.Assert(err, IsNil)
40 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {40 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
@@ -68,7 +68,7 @@
68func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) {68func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) {
69 errCh := make(chan error, 1)69 errCh := make(chan error, 1)
70 events := make(chan string, 10)70 events := make(chan string, 10)
71 sess := testClientSession(s.ServerAddr, "DEVB", true)71 sess := testClientSession(s.ServerAddr, "DEVB", "m1", "img1", true)
72 err := sess.Dial()72 err := sess.Dial()
73 c.Assert(err, IsNil)73 c.Assert(err, IsNil)
74 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {74 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
7575
=== modified file 'server/acceptance/suites/suite.go'
--- server/acceptance/suites/suite.go 2014-03-25 19:08:00 +0000
+++ server/acceptance/suites/suite.go 2014-04-04 13:58:43 +0000
@@ -46,7 +46,7 @@
46func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {46func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
47 errCh := make(chan error, 1)47 errCh := make(chan error, 1)
48 cliEvents := make(chan string, 10)48 cliEvents := make(chan string, 10)
49 sess := testClientSession(h.ServerAddr, devId, false)49 sess := testClientSession(h.ServerAddr, devId, "m1", "img1", false)
50 sess.Levels = levels50 sess.Levels = levels
51 err := sess.Dial()51 err := sess.Dial()
52 c.Assert(err, IsNil)52 c.Assert(err, IsNil)
@@ -127,16 +127,18 @@
127 return string(body), err127 return string(body), err
128}128}
129129
130func testClientSession(addr string, deviceId string, reportPings bool) *acceptance.ClientSession {130func testClientSession(addr string, deviceId, model, imageChannel string, reportPings bool) *acceptance.ClientSession {
131 certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("../config/testing.cert"))131 certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("../ssl/testing.cert"))
132 if err != nil {132 if err != nil {
133 panic(fmt.Sprintf("could not read config/testing.cert: %v", err))133 panic(fmt.Sprintf("could not read ssl/testing.cert: %v", err))
134 }134 }
135 return &acceptance.ClientSession{135 return &acceptance.ClientSession{
136 ExchangeTimeout: 100 * time.Millisecond,136 ExchangeTimeout: 100 * time.Millisecond,
137 ServerAddr: addr,137 ServerAddr: addr,
138 CertPEMBlock: certPEMBlock,138 CertPEMBlock: certPEMBlock,
139 DeviceId: deviceId,139 DeviceId: deviceId,
140 Model: model,
141 ImageChannel: imageChannel,
140 ReportPings: reportPings,142 ReportPings: reportPings,
141 }143 }
142}144}
143145
=== modified file 'server/broker/broker.go'
--- server/broker/broker.go 2014-02-21 16:04:44 +0000
+++ server/broker/broker.go 2014-04-04 13:58:43 +0000
@@ -49,6 +49,19 @@
49// LevelsMap is the type for holding channel levels for session.49// LevelsMap is the type for holding channel levels for session.
50type LevelsMap map[store.InternalChannelId]int6450type LevelsMap map[store.InternalChannelId]int64
5151
52// GetInfoString helps retrivieng a string out of a protocol.ConnectMsg.Info
53func GetInfoString(msg *protocol.ConnectMsg, name, defaultVal string) (string, error) {
54 v, ok := msg.Info[name]
55 if !ok {
56 return defaultVal, nil
57 }
58 s, ok := v.(string)
59 if !ok {
60 return "", ErrUnexpectedValue
61 }
62 return s, nil
63}
64
52// BrokerSession holds broker session state.65// BrokerSession holds broker session state.
53type BrokerSession interface {66type BrokerSession interface {
54 // SessionChannel returns the session control channel67 // SessionChannel returns the session control channel
@@ -56,6 +69,10 @@
56 SessionChannel() <-chan Exchange69 SessionChannel() <-chan Exchange
57 // DeviceIdentifier returns the device id string.70 // DeviceIdentifier returns the device id string.
58 DeviceIdentifier() string71 DeviceIdentifier() string
72 // DeviceImageModel returns the device model.
73 DeviceImageModel() string
74 // DeviceImageChannel returns the device system image channel.
75 DeviceImageChannel() string
59 // Levels returns the current channel levels for the session76 // Levels returns the current channel levels for the session
60 Levels() LevelsMap77 Levels() LevelsMap
61 // ExchangeScratchArea returns the scratch area for exchanges.78 // ExchangeScratchArea returns the scratch area for exchanges.
@@ -71,6 +88,9 @@
71 return fmt.Sprintf("session aborted (%s)", ea.Reason)88 return fmt.Sprintf("session aborted (%s)", ea.Reason)
72}89}
7390
91// Unexpect value in message
92var ErrUnexpectedValue = &ErrAbort{"unexpected value in message"}
93
74// BrokerConfig gives access to the typical broker configuration.94// BrokerConfig gives access to the typical broker configuration.
75type BrokerConfig interface {95type BrokerConfig interface {
76 // SessionQueueSize gives the session queue size.96 // SessionQueueSize gives the session queue size.
7797
=== modified file 'server/broker/broker_test.go'
--- server/broker/broker_test.go 2014-02-10 23:19:08 +0000
+++ server/broker/broker_test.go 2014-04-04 13:58:43 +0000
@@ -20,6 +20,8 @@
20 "fmt"20 "fmt"
2121
22 . "launchpad.net/gocheck"22 . "launchpad.net/gocheck"
23
24 "launchpad.net/ubuntu-push/protocol"
23)25)
2426
25type brokerSuite struct{}27type brokerSuite struct{}
@@ -30,3 +32,19 @@
30 err := &ErrAbort{"expected FOO"}32 err := &ErrAbort{"expected FOO"}
31 c.Check(fmt.Sprintf("%s", err), Equals, "session aborted (expected FOO)")33 c.Check(fmt.Sprintf("%s", err), Equals, "session aborted (expected FOO)")
32}34}
35
36func (s *brokerSuite) TestGetInfoString(c *C) {
37 connectMsg := &protocol.ConnectMsg{}
38 v, err := GetInfoString(connectMsg, "foo", "?")
39 c.Check(err, IsNil)
40 c.Check(v, Equals, "?")
41
42 connectMsg.Info = map[string]interface{}{"foo": "yay"}
43 v, err = GetInfoString(connectMsg, "foo", "?")
44 c.Check(err, IsNil)
45 c.Check(v, Equals, "yay")
46
47 connectMsg.Info["foo"] = 33
48 v, err = GetInfoString(connectMsg, "foo", "?")
49 c.Check(err, Equals, ErrUnexpectedValue)
50}
3351
=== modified file 'server/broker/exchanges.go'
--- server/broker/exchanges.go 2014-02-26 16:04:57 +0000
+++ server/broker/exchanges.go 2014-04-04 13:58:43 +0000
@@ -18,6 +18,7 @@
1818
19import (19import (
20 "encoding/json"20 "encoding/json"
21 "fmt"
2122
22 "launchpad.net/ubuntu-push/protocol"23 "launchpad.net/ubuntu-push/protocol"
23 "launchpad.net/ubuntu-push/server/store"24 "launchpad.net/ubuntu-push/server/store"
@@ -37,11 +38,24 @@
37 ChanId store.InternalChannelId38 ChanId store.InternalChannelId
38 TopLevel int6439 TopLevel int64
39 NotificationPayloads []json.RawMessage40 NotificationPayloads []json.RawMessage
41 Decoded []map[string]interface{}
40}42}
4143
42// check interface already here44// check interface already here
43var _ Exchange = &BroadcastExchange{}45var _ Exchange = &BroadcastExchange{}
4446
47// Init ensures the BroadcastExchange is fully initialized for the sessions.
48func (sbe *BroadcastExchange) Init() {
49 decoded := make([]map[string]interface{}, len(sbe.NotificationPayloads))
50 sbe.Decoded = decoded
51 for i, p := range sbe.NotificationPayloads {
52 err := json.Unmarshal(p, &decoded[i])
53 if err != nil {
54 decoded[i] = nil
55 }
56 }
57}
58
45func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {59func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
46 c := int64(len(payloads))60 c := int64(len(payloads))
47 if c == 0 {61 if c == 0 {
@@ -58,6 +72,20 @@
58 }72 }
59}73}
6074
75func channelFilter(tag string, chanId store.InternalChannelId, payloads []json.RawMessage, decoded []map[string]interface{}) []json.RawMessage {
76 if len(payloads) != 0 && chanId == store.SystemInternalChannelId {
77 decoded := decoded[len(decoded)-len(payloads):]
78 filtered := make([]json.RawMessage, 0)
79 for i, decoded1 := range decoded {
80 if _, ok := decoded1[tag]; ok {
81 filtered = append(filtered, payloads[i])
82 }
83 }
84 payloads = filtered
85 }
86 return payloads
87}
88
61// Prepare session for a BROADCAST.89// Prepare session for a BROADCAST.
62func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {90func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
63 scratchArea := sess.ExchangeScratchArea()91 scratchArea := sess.ExchangeScratchArea()
@@ -65,6 +93,9 @@
65 scratchArea.broadcastMsg.Type = "broadcast"93 scratchArea.broadcastMsg.Type = "broadcast"
66 clientLevel := sess.Levels()[sbe.ChanId]94 clientLevel := sess.Levels()[sbe.ChanId]
67 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)95 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
96 tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel())
97 payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded)
98
68 // xxx need an AppId as well, later99 // xxx need an AppId as well, later
69 scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId)100 scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId)
70 scratchArea.broadcastMsg.TopLevel = sbe.TopLevel101 scratchArea.broadcastMsg.TopLevel = sbe.TopLevel
71102
=== modified file 'server/broker/exchanges_test.go'
--- server/broker/exchanges_test.go 2014-02-26 16:04:57 +0000
+++ server/broker/exchanges_test.go 2014-04-04 13:58:43 +0000
@@ -35,24 +35,45 @@
3535
36var _ = Suite(&exchangesSuite{})36var _ = Suite(&exchangesSuite{})
3737
38func (s *exchangesSuite) TestBroadcastExchangeInit(c *C) {
39 exchg := &broker.BroadcastExchange{
40 ChanId: store.SystemInternalChannelId,
41 TopLevel: 3,
42 NotificationPayloads: []json.RawMessage{
43 json.RawMessage(`{"a":"x"}`),
44 json.RawMessage(`[]`),
45 json.RawMessage(`{"a":"y"}`),
46 },
47 }
48 exchg.Init()
49 c.Check(exchg.Decoded, DeepEquals, []map[string]interface{}{
50 map[string]interface{}{"a": "x"},
51 nil,
52 map[string]interface{}{"a": "y"},
53 })
54}
55
38func (s *exchangesSuite) TestBroadcastExchange(c *C) {56func (s *exchangesSuite) TestBroadcastExchange(c *C) {
39 sess := &testing.TestBrokerSession{57 sess := &testing.TestBrokerSession{
40 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),58 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
59 Model: "m1",
60 ImageChannel: "img1",
41 }61 }
42 exchg := &broker.BroadcastExchange{62 exchg := &broker.BroadcastExchange{
43 ChanId: store.SystemInternalChannelId,63 ChanId: store.SystemInternalChannelId,
44 TopLevel: 3,64 TopLevel: 3,
45 NotificationPayloads: []json.RawMessage{65 NotificationPayloads: []json.RawMessage{
46 json.RawMessage(`{"a":"x"}`),66 json.RawMessage(`{"img1/m1":100}`),
47 json.RawMessage(`{"a":"y"}`),67 json.RawMessage(`{"img2/m2":200}`),
48 },68 },
49 }69 }
70 exchg.Init()
50 outMsg, inMsg, err := exchg.Prepare(sess)71 outMsg, inMsg, err := exchg.Prepare(sess)
51 c.Assert(err, IsNil)72 c.Assert(err, IsNil)
52 // check73 // check
53 marshalled, err := json.Marshal(outMsg)74 marshalled, err := json.Marshal(outMsg)
54 c.Assert(err, IsNil)75 c.Assert(err, IsNil)
55 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)76 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img1/m1":100}]}`)
56 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)77 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
57 c.Assert(err, IsNil)78 c.Assert(err, IsNil)
58 err = exchg.Acked(sess, true)79 err = exchg.Acked(sess, true)
@@ -62,9 +83,11 @@
6283
63func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {84func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {
64 sess := &testing.TestBrokerSession{85 sess := &testing.TestBrokerSession{
65 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),86 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
87 Model: "m1",
88 ImageChannel: "img1",
66 }89 }
67 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))90 payloadFmt := fmt.Sprintf(`{"img1/m1":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
68 needsSplitting := make([]json.RawMessage, 32)91 needsSplitting := make([]json.RawMessage, 32)
69 for i := 0; i < 32; i++ {92 for i := 0; i < 32; i++ {
70 needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i))93 needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i))
@@ -76,6 +99,7 @@
76 TopLevel: topLevel,99 TopLevel: topLevel,
77 NotificationPayloads: needsSplitting,100 NotificationPayloads: needsSplitting,
78 }101 }
102 exchg.Init()
79 outMsg, _, err := exchg.Prepare(sess)103 outMsg, _, err := exchg.Prepare(sess)
80 c.Assert(err, IsNil)104 c.Assert(err, IsNil)
81 parts := 0105 parts := 0
@@ -91,10 +115,11 @@
91 ChanId: store.SystemInternalChannelId,115 ChanId: store.SystemInternalChannelId,
92 TopLevel: topLevel + 2,116 TopLevel: topLevel + 2,
93 NotificationPayloads: []json.RawMessage{117 NotificationPayloads: []json.RawMessage{
94 json.RawMessage(`{"a":"x"}`),118 json.RawMessage(`{"img1/m1":"x"}`),
95 json.RawMessage(`{"a":"y"}`),119 json.RawMessage(`{"img1/m1":"y"}`),
96 },120 },
97 }121 }
122 exchg.Init()
98 outMsg, _, err = exchg.Prepare(sess)123 outMsg, _, err = exchg.Prepare(sess)
99 c.Assert(err, IsNil)124 c.Assert(err, IsNil)
100 done := outMsg.Split() // shouldn't panic125 done := outMsg.Split() // shouldn't panic
@@ -103,21 +128,24 @@
103128
104func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {129func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {
105 sess := &testing.TestBrokerSession{130 sess := &testing.TestBrokerSession{
106 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),131 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
132 Model: "m1",
133 ImageChannel: "img2",
107 }134 }
108 exchg := &broker.BroadcastExchange{135 exchg := &broker.BroadcastExchange{
109 ChanId: store.SystemInternalChannelId,136 ChanId: store.SystemInternalChannelId,
110 TopLevel: 3,137 TopLevel: 3,
111 NotificationPayloads: []json.RawMessage{138 NotificationPayloads: []json.RawMessage{
112 json.RawMessage(`{"a":"y"}`),139 json.RawMessage(`{"img2/m1":1}`),
113 },140 },
114 }141 }
142 exchg.Init()
115 outMsg, inMsg, err := exchg.Prepare(sess)143 outMsg, inMsg, err := exchg.Prepare(sess)
116 c.Assert(err, IsNil)144 c.Assert(err, IsNil)
117 // check145 // check
118 marshalled, err := json.Marshal(outMsg)146 marshalled, err := json.Marshal(outMsg)
119 c.Assert(err, IsNil)147 c.Assert(err, IsNil)
120 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)148 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img2/m1":1}]}`)
121 err = json.Unmarshal([]byte(`{}`), inMsg)149 err = json.Unmarshal([]byte(`{}`), inMsg)
122 c.Assert(err, IsNil)150 c.Assert(err, IsNil)
123 err = exchg.Acked(sess, true)151 err = exchg.Acked(sess, true)
@@ -130,23 +158,55 @@
130 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{158 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{
131 store.SystemInternalChannelId: 2,159 store.SystemInternalChannelId: 2,
132 }),160 }),
161 Model: "m1",
162 ImageChannel: "img1",
133 }163 }
134 exchg := &broker.BroadcastExchange{164 exchg := &broker.BroadcastExchange{
135 ChanId: store.SystemInternalChannelId,165 ChanId: store.SystemInternalChannelId,
136 TopLevel: 3,166 TopLevel: 3,
137 NotificationPayloads: []json.RawMessage{167 NotificationPayloads: []json.RawMessage{
138 json.RawMessage(`{"a":"x"}`),168 json.RawMessage(`{"img1/m1":100}`),
139 json.RawMessage(`{"a":"y"}`),169 json.RawMessage(`{"img1/m1":101}`),
140 },170 },
141 }171 }
142 outMsg, inMsg, err := exchg.Prepare(sess)172 exchg.Init()
143 c.Assert(err, IsNil)173 outMsg, inMsg, err := exchg.Prepare(sess)
144 // check174 c.Assert(err, IsNil)
145 marshalled, err := json.Marshal(outMsg)175 // check
146 c.Assert(err, IsNil)176 marshalled, err := json.Marshal(outMsg)
147 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)177 c.Assert(err, IsNil)
148 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)178 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"img1/m1":101}]}`)
149 c.Assert(err, IsNil)179 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
150 err = exchg.Acked(sess, true)180 c.Assert(err, IsNil)
151 c.Assert(err, IsNil)181 err = exchg.Acked(sess, true)
182 c.Assert(err, IsNil)
183}
184
185func (s *exchangesSuite) TestBroadcastExchangeChannelFilter(c *C) {
186 sess := &testing.TestBrokerSession{
187 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
188 Model: "m1",
189 ImageChannel: "img1",
190 }
191 exchg := &broker.BroadcastExchange{
192 ChanId: store.SystemInternalChannelId,
193 TopLevel: 5,
194 NotificationPayloads: []json.RawMessage{
195 json.RawMessage(`{"img1/m1":100}`),
196 json.RawMessage(`{"img2/m2":200}`),
197 json.RawMessage(`{"img1/m1":101}`),
198 },
199 }
200 exchg.Init()
201 outMsg, inMsg, err := exchg.Prepare(sess)
202 c.Assert(err, IsNil)
203 // check
204 marshalled, err := json.Marshal(outMsg)
205 c.Assert(err, IsNil)
206 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":5,"Payloads":[{"img1/m1":100},{"img1/m1":101}]}`)
207 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
208 c.Assert(err, IsNil)
209 err = exchg.Acked(sess, true)
210 c.Assert(err, IsNil)
211 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5))
152}212}
153213
=== modified file 'server/broker/exchg_impl_test.go'
--- server/broker/exchg_impl_test.go 2014-02-10 23:19:08 +0000
+++ server/broker/exchg_impl_test.go 2014-04-04 13:58:43 +0000
@@ -20,6 +20,8 @@
20 "encoding/json"20 "encoding/json"
2121
22 . "launchpad.net/gocheck"22 . "launchpad.net/gocheck"
23
24 "launchpad.net/ubuntu-push/server/store"
23)25)
2426
25type exchangesImplSuite struct{}27type exchangesImplSuite struct{}
@@ -56,3 +58,31 @@
56 res = filterByLevel(5, 10, nil)58 res = filterByLevel(5, 10, nil)
57 c.Check(len(res), Equals, 0)59 c.Check(len(res), Equals, 0)
58}60}
61
62func (s *exchangesImplSuite) TestChannelFilter(c *C) {
63 payloads := []json.RawMessage{
64 json.RawMessage(`{"a/x": 3}`),
65 json.RawMessage(`{"b/x": 4}`),
66 json.RawMessage(`{"a/y": 5}`),
67 json.RawMessage(`{"a/x": 6}`),
68 }
69 decoded := make([]map[string]interface{}, 4)
70 for i, p := range payloads {
71 err := json.Unmarshal(p, &decoded[i])
72 c.Assert(err, IsNil)
73 }
74
75 other := store.InternalChannelId("1")
76
77 c.Check(channelFilter("", store.SystemInternalChannelId, nil, nil), IsNil)
78 c.Check(channelFilter("", other, payloads[1:], decoded), DeepEquals, payloads[1:])
79
80 // use tag when channel is the sytem channel
81
82 c.Check(channelFilter("c/z", store.SystemInternalChannelId, payloads, decoded), HasLen, 0)
83
84 c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]})
85
86 c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]})
87
88}
5989
=== modified file 'server/broker/simple/simple.go'
--- server/broker/simple/simple.go 2014-02-21 16:04:44 +0000
+++ server/broker/simple/simple.go 2014-04-04 13:58:43 +0000
@@ -46,11 +46,13 @@
4646
47// simpleBrokerSession represents a session in the broker.47// simpleBrokerSession represents a session in the broker.
48type simpleBrokerSession struct {48type simpleBrokerSession struct {
49 registered bool49 registered bool
50 deviceId string50 deviceId string
51 done chan bool51 model string
52 exchanges chan broker.Exchange52 imageChannel string
53 levels broker.LevelsMap53 done chan bool
54 exchanges chan broker.Exchange
55 levels broker.LevelsMap
54 // for exchanges56 // for exchanges
55 exchgScratch broker.ExchangesScratchArea57 exchgScratch broker.ExchangesScratchArea
56}58}
@@ -75,6 +77,14 @@
75 return sess.deviceId77 return sess.deviceId
76}78}
7779
80func (sess *simpleBrokerSession) DeviceImageModel() string {
81 return sess.model
82}
83
84func (sess *simpleBrokerSession) DeviceImageChannel() string {
85 return sess.imageChannel
86}
87
78func (sess *simpleBrokerSession) Levels() broker.LevelsMap {88func (sess *simpleBrokerSession) Levels() broker.LevelsMap {
79 return sess.levels89 return sess.levels
80}90}
@@ -147,6 +157,7 @@
147 TopLevel: topLevel,157 TopLevel: topLevel,
148 NotificationPayloads: payloads,158 NotificationPayloads: payloads,
149 }159 }
160 broadcastExchg.Init()
150 sess.exchanges <- broadcastExchg161 sess.exchanges <- broadcastExchg
151 }162 }
152 }163 }
@@ -157,6 +168,14 @@
157// pending notifications as well.168// pending notifications as well.
158func (b *SimpleBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) {169func (b *SimpleBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) {
159 // xxx sanity check DeviceId170 // xxx sanity check DeviceId
171 model, err := broker.GetInfoString(connect, "device", "?")
172 if err != nil {
173 return nil, err
174 }
175 imageChannel, err := broker.GetInfoString(connect, "channel", "?")
176 if err != nil {
177 return nil, err
178 }
160 levels := map[store.InternalChannelId]int64{}179 levels := map[store.InternalChannelId]int64{}
161 for hexId, v := range connect.Levels {180 for hexId, v := range connect.Levels {
162 id, err := store.HexToInternalChannelId(hexId)181 id, err := store.HexToInternalChannelId(hexId)
@@ -166,14 +185,16 @@
166 levels[id] = v185 levels[id] = v
167 }186 }
168 sess := &simpleBrokerSession{187 sess := &simpleBrokerSession{
169 deviceId: connect.DeviceId,188 deviceId: connect.DeviceId,
170 done: make(chan bool),189 model: model,
171 exchanges: make(chan broker.Exchange, b.sessionQueueSize),190 imageChannel: imageChannel,
172 levels: levels,191 done: make(chan bool),
192 exchanges: make(chan broker.Exchange, b.sessionQueueSize),
193 levels: levels,
173 }194 }
174 b.sessionCh <- sess195 b.sessionCh <- sess
175 <-sess.done196 <-sess.done
176 err := b.feedPending(sess)197 err = b.feedPending(sess)
177 if err != nil {198 if err != nil {
178 return nil, err199 return nil, err
179 }200 }
@@ -219,6 +240,7 @@
219 TopLevel: topLevel,240 TopLevel: topLevel,
220 NotificationPayloads: payloads,241 NotificationPayloads: payloads,
221 }242 }
243 broadcastExchg.Init()
222 for _, sess := range b.registry {244 for _, sess := range b.registry {
223 sess.exchanges <- broadcastExchg245 sess.exchanges <- broadcastExchg
224 }246 }
225247
=== modified file 'server/broker/simple/simple_test.go'
--- server/broker/simple/simple_test.go 2014-02-10 23:29:53 +0000
+++ server/broker/simple/simple_test.go 2014-04-04 13:58:43 +0000
@@ -48,6 +48,7 @@
48 sto := store.NewInMemoryPendingStore()48 sto := store.NewInMemoryPendingStore()
49 muchLater := time.Now().Add(10 * time.Minute)49 muchLater := time.Now().Add(10 * time.Minute)
50 notification1 := json.RawMessage(`{"m": "M"}`)50 notification1 := json.RawMessage(`{"m": "M"}`)
51 decoded1 := map[string]interface{}{"m": "M"}
51 sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)52 sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
52 b := NewSimpleBroker(sto, testBrokerConfig, nil)53 b := NewSimpleBroker(sto, testBrokerConfig, nil)
53 sess := &simpleBrokerSession{54 sess := &simpleBrokerSession{
@@ -60,6 +61,7 @@
60 ChanId: store.SystemInternalChannelId,61 ChanId: store.SystemInternalChannelId,
61 TopLevel: 1,62 TopLevel: 1,
62 NotificationPayloads: []json.RawMessage{notification1},63 NotificationPayloads: []json.RawMessage{notification1},
64 Decoded: []map[string]interface{}{decoded1},
63 })65 })
64}66}
6567
6668
=== modified file 'server/broker/testing/impls.go'
--- server/broker/testing/impls.go 2014-01-23 20:13:22 +0000
+++ server/broker/testing/impls.go 2014-04-04 13:58:43 +0000
@@ -24,6 +24,8 @@
24// Test implementation of BrokerSession.24// Test implementation of BrokerSession.
25type TestBrokerSession struct {25type TestBrokerSession struct {
26 DeviceId string26 DeviceId string
27 Model string
28 ImageChannel string
27 Exchanges chan broker.Exchange29 Exchanges chan broker.Exchange
28 LevelsMap broker.LevelsMap30 LevelsMap broker.LevelsMap
29 exchgScratch broker.ExchangesScratchArea31 exchgScratch broker.ExchangesScratchArea
@@ -33,6 +35,14 @@
33 return tbs.DeviceId35 return tbs.DeviceId
34}36}
3537
38func (tbs *TestBrokerSession) DeviceImageModel() string {
39 return tbs.Model
40}
41
42func (tbs *TestBrokerSession) DeviceImageChannel() string {
43 return tbs.ImageChannel
44}
45
36func (tbs *TestBrokerSession) SessionChannel() <-chan broker.Exchange {46func (tbs *TestBrokerSession) SessionChannel() <-chan broker.Exchange {
37 return tbs.Exchanges47 return tbs.Exchanges
38}48}
3949
=== modified file 'server/broker/testsuite/suite.go'
--- server/broker/testsuite/suite.go 2014-03-19 23:46:18 +0000
+++ server/broker/testsuite/suite.go 2014-04-04 13:58:43 +0000
@@ -81,10 +81,20 @@
81 b := s.MakeBroker(sto, testBrokerConfig, nil)81 b := s.MakeBroker(sto, testBrokerConfig, nil)
82 b.Start()82 b.Start()
83 defer b.Stop()83 defer b.Stop()
84 sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"0": 5}})84 sess, err := b.Register(&protocol.ConnectMsg{
85 Type: "connect",
86 DeviceId: "dev-1",
87 Levels: map[string]int64{"0": 5},
88 Info: map[string]interface{}{
89 "device": "model",
90 "channel": "daily",
91 },
92 })
85 c.Assert(err, IsNil)93 c.Assert(err, IsNil)
86 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess)94 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess)
87 c.Assert(sess.DeviceIdentifier(), Equals, "dev-1")95 c.Assert(sess.DeviceIdentifier(), Equals, "dev-1")
96 c.Check(sess.DeviceImageModel(), Equals, "model")
97 c.Check(sess.DeviceImageChannel(), Equals, "daily")
88 c.Assert(sess.ExchangeScratchArea(), Not(IsNil))98 c.Assert(sess.ExchangeScratchArea(), Not(IsNil))
89 c.Check(sess.Levels(), DeepEquals, broker.LevelsMap(map[store.InternalChannelId]int64{99 c.Check(sess.Levels(), DeepEquals, broker.LevelsMap(map[store.InternalChannelId]int64{
90 store.SystemInternalChannelId: 5,100 store.SystemInternalChannelId: 5,
@@ -105,6 +115,22 @@
105 c.Check(err, FitsTypeOf, &broker.ErrAbort{})115 c.Check(err, FitsTypeOf, &broker.ErrAbort{})
106}116}
107117
118func (s *CommonBrokerSuite) TestRegistrationInfoErrors(c *C) {
119 sto := store.NewInMemoryPendingStore()
120 b := s.MakeBroker(sto, testBrokerConfig, nil)
121 b.Start()
122 defer b.Stop()
123 info := map[string]interface{}{
124 "device": -1,
125 }
126 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})
127 c.Check(err, Equals, broker.ErrUnexpectedValue)
128 info["device"] = "m"
129 info["channel"] = -1
130 _, err = b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})
131 c.Check(err, Equals, broker.ErrUnexpectedValue)
132}
133
108func (s *CommonBrokerSuite) TestRegistrationFeedPending(c *C) {134func (s *CommonBrokerSuite) TestRegistrationFeedPending(c *C) {
109 sto := store.NewInMemoryPendingStore()135 sto := store.NewInMemoryPendingStore()
110 notification1 := json.RawMessage(`{"m": "M"}`)136 notification1 := json.RawMessage(`{"m": "M"}`)
@@ -149,6 +175,7 @@
149func (s *CommonBrokerSuite) TestBroadcast(c *C) {175func (s *CommonBrokerSuite) TestBroadcast(c *C) {
150 sto := store.NewInMemoryPendingStore()176 sto := store.NewInMemoryPendingStore()
151 notification1 := json.RawMessage(`{"m": "M"}`)177 notification1 := json.RawMessage(`{"m": "M"}`)
178 decoded1 := map[string]interface{}{"m": "M"}
152 b := s.MakeBroker(sto, testBrokerConfig, nil)179 b := s.MakeBroker(sto, testBrokerConfig, nil)
153 b.Start()180 b.Start()
154 defer b.Stop()181 defer b.Stop()
@@ -168,6 +195,7 @@
168 ChanId: store.SystemInternalChannelId,195 ChanId: store.SystemInternalChannelId,
169 TopLevel: 1,196 TopLevel: 1,
170 NotificationPayloads: []json.RawMessage{notification1},197 NotificationPayloads: []json.RawMessage{notification1},
198 Decoded: []map[string]interface{}{decoded1},
171 })199 })
172 }200 }
173 select {201 select {
@@ -178,6 +206,7 @@
178 ChanId: store.SystemInternalChannelId,206 ChanId: store.SystemInternalChannelId,
179 TopLevel: 1,207 TopLevel: 1,
180 NotificationPayloads: []json.RawMessage{notification1},208 NotificationPayloads: []json.RawMessage{notification1},
209 Decoded: []map[string]interface{}{decoded1},
181 })210 })
182 }211 }
183}212}

Subscribers

People subscribed via source and target branches