Merge lp:~chipaca/ubuntu-push/endpoint-names into lp:ubuntu-push

Proposed by John Lenton
Status: Superseded
Proposed branch: lp:~chipaca/ubuntu-push/endpoint-names
Merge into: lp:ubuntu-push
Diff against target: 4098 lines (+1990/-393)
50 files modified
Makefile (+3/-0)
README (+8/-2)
bus/connectivity/connectivity.go (+8/-7)
bus/connectivity/connectivity_test.go (+78/-3)
bus/endpoint.go (+15/-1)
bus/testing/testing_endpoint.go (+12/-1)
bus/testing/testing_endpoint_test.go (+15/-0)
client/client_test.go (+5/-2)
client/session/session.go (+40/-12)
client/session/session_test.go (+63/-5)
debian/changelog (+6/-0)
debian/control (+2/-0)
debian/rules (+5/-1)
debian/ubuntu-push-client.install (+1/-0)
dependencies.tsv (+1/-0)
protocol/messages.go (+56/-0)
protocol/messages_test.go (+18/-2)
protocol/state-diag-client.gv (+4/-1)
protocol/state-diag-client.svg (+77/-49)
protocol/state-diag-session.gv (+10/-0)
protocol/state-diag-session.svg (+132/-74)
server/acceptance/acceptance_test.go (+3/-0)
server/acceptance/acceptanceclient.go (+22/-1)
server/acceptance/suites/broadcast.go (+7/-6)
server/acceptance/suites/suite.go (+12/-0)
server/acceptance/suites/unicast.go (+96/-0)
server/api/handlers.go (+103/-21)
server/api/handlers_test.go (+214/-12)
server/broker/broker.go (+7/-1)
server/broker/exchanges.go (+73/-41)
server/broker/exchanges_test.go (+127/-26)
server/broker/exchg_impl_test.go (+19/-17)
server/broker/simple/simple.go (+56/-11)
server/broker/simple/simple_test.go (+5/-4)
server/broker/simple/suite_test.go (+3/-0)
server/broker/testing/impls.go (+13/-0)
server/broker/testsuite/suite.go (+121/-34)
server/session/session.go (+23/-11)
server/session/session_test.go (+43/-14)
server/session/tracker.go (+12/-5)
server/session/tracker_test.go (+4/-4)
server/store/inmemory.go (+52/-18)
server/store/inmemory_test.go (+72/-3)
server/store/store.go (+74/-3)
server/store/store_test.go (+46/-1)
signing-helper/CMakeLists.txt (+39/-0)
signing-helper/signing-helper.cpp (+97/-0)
signing-helper/signing.h (+76/-0)
testing/helpers.go (+11/-0)
ubuntu-push-client.go (+1/-0)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/endpoint-names
Reviewer Review Type Date Requested Status
Ubuntu Push Hackers Pending
Review via email: mp+219202@code.launchpad.net

This proposal has been superseded by a proposal from 2014-05-12.

Commit message

Added endpoint.GrabName

Description of the change

Added endpoint.GrabName

To post a comment you must log in.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Makefile'
--- Makefile 2014-03-31 17:58:54 +0000
+++ Makefile 2014-05-12 14:11:46 +0000
@@ -11,10 +11,12 @@
11GODEPS += launchpad.net/go-dbus/v111GODEPS += launchpad.net/go-dbus/v1
12GODEPS += launchpad.net/go-xdg/v012GODEPS += launchpad.net/go-xdg/v0
13GODEPS += code.google.com/p/gosqlite/sqlite313GODEPS += code.google.com/p/gosqlite/sqlite3
14GODEPS += launchpad.net/~ubuntu-push-hackers/ubuntu-push/go-uuid/uuid
1415
15TOTEST = $(shell env GOPATH=$(GOPATH) go list $(PROJECT)/...|grep -v acceptance|grep -v http13client )16TOTEST = $(shell env GOPATH=$(GOPATH) go list $(PROJECT)/...|grep -v acceptance|grep -v http13client )
1617
17bootstrap:18bootstrap:
19 $(RM) -r $(GOPATH)/pkg
18 mkdir -p $(GOPATH)/bin20 mkdir -p $(GOPATH)/bin
19 mkdir -p $(GOPATH)/pkg21 mkdir -p $(GOPATH)/pkg
20 go get -u launchpad.net/godeps22 go get -u launchpad.net/godeps
@@ -33,6 +35,7 @@
3335
34build-client:36build-client:
35 go build ubuntu-push-client.go37 go build ubuntu-push-client.go
38 (cd signing-helper && cmake . && make)
3639
37build-server-dev:40build-server-dev:
38 go build -o push-server-dev launchpad.net/ubuntu-push/server/dev41 go build -o push-server-dev launchpad.net/ubuntu-push/server/dev
3942
=== modified file 'README'
--- README 2014-03-31 17:58:54 +0000
+++ README 2014-05-12 14:11:46 +0000
@@ -6,11 +6,17 @@
6The code expects to be checked out as launchpad.net/ubuntu-push in a Go6The code expects to be checked out as launchpad.net/ubuntu-push in a Go
7workspace, see "go help gopath".7workspace, see "go help gopath".
88
9To setup Go dependencies, install libsqlite3-dev and run:9To setup Go dependencies, install the following dependencies:
10
11 build-essential
12 libsqlite3-dev
13
14and run:
1015
11 make bootstrap16 make bootstrap
1217
13To run tests, install libgcrypt11-dev and libwhoopsie-dev and run:18To run tests, install libglib2.0-dev, libgcrypt11-dev, libwhoopsie-dev,
19and run:
1420
15 make check21 make check
1622
1723
=== modified file 'bus/connectivity/connectivity.go'
--- bus/connectivity/connectivity.go 2014-04-04 11:08:28 +0000
+++ bus/connectivity/connectivity.go 2014-05-12 14:11:46 +0000
@@ -72,19 +72,20 @@
72 cs.connAttempts += ar.Redial()72 cs.connAttempts += ar.Redial()
73 nm := networkmanager.New(cs.endp, cs.log)73 nm := networkmanager.New(cs.endp, cs.log)
7474
75 // set up the watch
76 stateCh, err = nm.WatchState()
77 if err != nil {
78 cs.log.Debugf("failed to set up the state watch: %s", err)
79 goto Continue
80 }
81
75 // Get the current state.82 // Get the current state.
76 initial = nm.GetState()83 initial = nm.GetState()
77 if initial == networkmanager.Unknown {84 if initial == networkmanager.Unknown {
78 cs.log.Debugf("Failed to get state.")85 cs.log.Debugf("Failed to get state.")
79 goto Continue86 goto Continue
80 }87 }
8188 cs.log.Debugf("got initial state of %s", initial)
82 // set up the watch
83 stateCh, err = nm.WatchState()
84 if err != nil {
85 cs.log.Debugf("failed to set up the state watch: %s", err)
86 goto Continue
87 }
8889
89 primary = nm.GetPrimaryConnection()90 primary = nm.GetPrimaryConnection()
90 cs.log.Debugf("primary connection starts as %#v", primary)91 cs.log.Debugf("primary connection starts as %#v", primary)
9192
=== modified file 'bus/connectivity/connectivity_test.go'
--- bus/connectivity/connectivity_test.go 2014-04-04 12:01:42 +0000
+++ bus/connectivity/connectivity_test.go 2014-05-12 14:11:46 +0000
@@ -17,8 +17,15 @@
17package connectivity17package connectivity
1818
19import (19import (
20 "net/http/httptest"
21 "sync"
22 "testing"
23 "time"
24
20 "launchpad.net/go-dbus/v1"25 "launchpad.net/go-dbus/v1"
21 . "launchpad.net/gocheck"26 . "launchpad.net/gocheck"
27
28 "launchpad.net/ubuntu-push/bus"
22 "launchpad.net/ubuntu-push/bus/networkmanager"29 "launchpad.net/ubuntu-push/bus/networkmanager"
23 testingbus "launchpad.net/ubuntu-push/bus/testing"30 testingbus "launchpad.net/ubuntu-push/bus/testing"
24 "launchpad.net/ubuntu-push/config"31 "launchpad.net/ubuntu-push/config"
@@ -26,9 +33,6 @@
26 helpers "launchpad.net/ubuntu-push/testing"33 helpers "launchpad.net/ubuntu-push/testing"
27 "launchpad.net/ubuntu-push/testing/condition"34 "launchpad.net/ubuntu-push/testing/condition"
28 "launchpad.net/ubuntu-push/util"35 "launchpad.net/ubuntu-push/util"
29 "net/http/httptest"
30 "testing"
31 "time"
32)36)
3337
34// hook up gocheck38// hook up gocheck
@@ -115,6 +119,77 @@
115 c.Check(<-cs.networkStateCh, Equals, networkmanager.ConnectedGlobal)119 c.Check(<-cs.networkStateCh, Equals, networkmanager.ConnectedGlobal)
116}120}
117121
122// a racyEndpoint is an endpoint that behaves differently depending on
123// how much time passes between getting the state and setting up the
124// watch
125type racyEndpoint struct {
126 stateGot bool
127 maxTime time.Time
128 delta time.Duration
129 lock sync.RWMutex
130}
131
132func (rep *racyEndpoint) GetProperty(prop string) (interface{}, error) {
133 switch prop {
134 case "state":
135 rep.lock.Lock()
136 defer rep.lock.Unlock()
137 rep.stateGot = true
138 rep.maxTime = time.Now().Add(rep.delta)
139 return uint32(networkmanager.Connecting), nil
140 case "PrimaryConnection":
141 return dbus.ObjectPath("/something"), nil
142 default:
143 return nil, nil
144 }
145}
146
147func (rep *racyEndpoint) WatchSignal(member string, f func(...interface{}), d func()) error {
148 if member == "StateChanged" {
149 // we count never having gotten the state as happening "after" now.
150 rep.lock.RLock()
151 defer rep.lock.RUnlock()
152 ok := !rep.stateGot || time.Now().Before(rep.maxTime)
153 go func() {
154 if ok {
155 f(uint32(networkmanager.ConnectedGlobal))
156 }
157 d()
158 }()
159 }
160 return nil
161}
162
163func (*racyEndpoint) Close() {}
164func (*racyEndpoint) Dial() error { return nil }
165func (*racyEndpoint) String() string { return "racyEndpoint" }
166func (*racyEndpoint) Call(string, []interface{}, ...interface{}) error { return nil }
167func (*racyEndpoint) GrabName(bool) <-chan error { return nil }
168
169var _ bus.Endpoint = (*racyEndpoint)(nil)
170
171// takeNext takes a value from given channel with a 1s timeout
172func takeNext(ch <-chan networkmanager.State) networkmanager.State {
173 select {
174 case <-time.After(time.Second):
175 panic("channel stuck: too long waiting")
176 case v := <-ch:
177 return v
178 }
179}
180
181// test that if the nm state goes from connecting to connected very
182// shortly after calling GetState, we don't lose the event.
183func (s *ConnSuite) TestStartAvoidsRace(c *C) {
184 for delta := time.Second; delta > 1; delta /= 2 {
185 rep := &racyEndpoint{delta: delta}
186 cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: rep}
187 f := Commentf("when delta=%s", delta)
188 c.Assert(cs.start(), Equals, networkmanager.Connecting, f)
189 c.Assert(takeNext(cs.networkStateCh), Equals, networkmanager.ConnectedGlobal, f)
190 }
191}
192
118/*193/*
119 tests for connectedStateStep()194 tests for connectedStateStep()
120*/195*/
121196
=== modified file 'bus/endpoint.go'
--- bus/endpoint.go 2014-04-02 08:23:15 +0000
+++ bus/endpoint.go 2014-05-12 14:11:46 +0000
@@ -31,6 +31,7 @@
3131
32// bus.Endpoint represents the DBus connection itself.32// bus.Endpoint represents the DBus connection itself.
33type Endpoint interface {33type Endpoint interface {
34 GrabName(allowReplacement bool) <-chan error
34 WatchSignal(member string, f func(...interface{}), d func()) error35 WatchSignal(member string, f func(...interface{}), d func()) error
35 Call(member string, args []interface{}, rvs ...interface{}) error36 Call(member string, args []interface{}, rvs ...interface{}) error
36 GetProperty(property string) (interface{}, error)37 GetProperty(property string) (interface{}, error)
@@ -53,7 +54,7 @@
53}54}
5455
55// ensure endpoint implements Endpoint56// ensure endpoint implements Endpoint
56var _ Endpoint = &endpoint{}57var _ Endpoint = (*endpoint)(nil)
5758
58/*59/*
59 public methods60 public methods
@@ -173,6 +174,19 @@
173 return fmt.Sprintf("<Connection to %s %#v>", endp.bus, endp.addr)174 return fmt.Sprintf("<Connection to %s %#v>", endp.bus, endp.addr)
174}175}
175176
177// GrabName(...) takes over the name on the bus, reporting errors over the
178// returned channel.
179//
180// While the first result will be nil on success, successive results would
181// typically indicate another process trying to take over the name.
182func (endp *endpoint) GrabName(allowReplacement bool) <-chan error {
183 flags := dbus.NameFlagAllowReplacement | dbus.NameFlagReplaceExisting
184 if !allowReplacement {
185 flags = 0
186 }
187 return endp.bus.RequestName(endp.addr.Name, flags).C
188}
189
176/*190/*
177 private methods191 private methods
178*/192*/
179193
=== modified file 'bus/testing/testing_endpoint.go'
--- bus/testing/testing_endpoint.go 2014-04-04 11:08:28 +0000
+++ bus/testing/testing_endpoint.go 2014-05-12 14:11:46 +0000
@@ -169,5 +169,16 @@
169// see Endpoint's Close. This one does nothing.169// see Endpoint's Close. This one does nothing.
170func (tc *testingEndpoint) Close() {}170func (tc *testingEndpoint) Close() {}
171171
172func (tc *testingEndpoint) GrabName(allowReplacement bool) <-chan error {
173 tc.callArgsLck.Lock()
174 defer tc.callArgsLck.Unlock()
175
176 args := callArgs{Member: "::GrabName"}
177 args.Args = append(args.Args, allowReplacement)
178 tc.callArgs = append(tc.callArgs, args)
179
180 return nil
181}
182
172// ensure testingEndpoint implements bus.Endpoint183// ensure testingEndpoint implements bus.Endpoint
173var _ bus.Endpoint = &testingEndpoint{}184var _ bus.Endpoint = (*testingEndpoint)(nil)
174185
=== modified file 'bus/testing/testing_endpoint_test.go'
--- bus/testing/testing_endpoint_test.go 2014-04-02 08:23:15 +0000
+++ bus/testing/testing_endpoint_test.go 2014-05-12 14:11:46 +0000
@@ -173,3 +173,18 @@
173 endp := NewTestingEndpoint(condition.Fail2Work(2), nil, "hello there")173 endp := NewTestingEndpoint(condition.Fail2Work(2), nil, "hello there")
174 c.Check(endp.String(), Matches, ".*Still Broken.*hello there.*")174 c.Check(endp.String(), Matches, ".*Still Broken.*hello there.*")
175}175}
176
177// Test that GrabName updates callArgs
178func (s *TestingEndpointSuite) TestGrabNameUpdatesCallArgs(c *C) {
179 endp := NewTestingEndpoint(nil, condition.Work(true))
180 endp.GrabName(false)
181 endp.GrabName(true)
182 c.Check(GetCallArgs(endp), DeepEquals, []callArgs{
183 {
184 Member: "::GrabName",
185 Args: []interface{}{false},
186 }, {
187 Member: "::GrabName",
188 Args: []interface{}{true},
189 }})
190}
176191
=== modified file 'client/client_test.go'
--- client/client_test.go 2014-04-18 16:31:04 +0000
+++ client/client_test.go 2014-05-12 14:11:46 +0000
@@ -265,8 +265,9 @@
265 ExchangeTimeout: 10 * time.Millisecond,265 ExchangeTimeout: 10 * time.Millisecond,
266 HostsCachingExpiryTime: 1 * time.Hour,266 HostsCachingExpiryTime: 1 * time.Hour,
267 ExpectAllRepairedTime: 30 * time.Minute,267 ExpectAllRepairedTime: 30 * time.Minute,
268 PEM: cli.pem,268 PEM: cli.pem,
269 Info: info,269 Info: info,
270 AuthHelper: []string{},
270 }271 }
271 // sanity check that we are looking at all fields272 // sanity check that we are looking at all fields
272 vExpected := reflect.ValueOf(expected)273 vExpected := reflect.ValueOf(expected)
@@ -276,6 +277,8 @@
276 // field isn't empty/zero277 // field isn't empty/zero
277 c.Assert(fv.Interface(), Not(DeepEquals), reflect.Zero(fv.Type()).Interface(), Commentf("forgot about: %s", vExpected.Type().Field(i).Name))278 c.Assert(fv.Interface(), Not(DeepEquals), reflect.Zero(fv.Type()).Interface(), Commentf("forgot about: %s", vExpected.Type().Field(i).Name))
278 }279 }
280 // but AuthHelper really should be nil for now
281 expected.AuthHelper = nil
279 // finally compare282 // finally compare
280 conf := cli.deriveSessionConfig(info)283 conf := cli.deriveSessionConfig(info)
281 c.Check(conf, DeepEquals, expected)284 c.Check(conf, DeepEquals, expected)
282285
=== modified file 'client/session/session.go'
--- client/session/session.go 2014-04-18 16:37:31 +0000
+++ client/session/session.go 2014-05-12 14:11:46 +0000
@@ -26,6 +26,7 @@
26 "fmt"26 "fmt"
27 "math/rand"27 "math/rand"
28 "net"28 "net"
29 "os/exec"
29 "strings"30 "strings"
30 "sync"31 "sync"
31 "sync/atomic"32 "sync/atomic"
@@ -38,7 +39,9 @@
38 "launchpad.net/ubuntu-push/util"39 "launchpad.net/ubuntu-push/util"
39)40)
4041
41var wireVersionBytes = []byte{protocol.ProtocolWireVersion}42var (
43 wireVersionBytes = []byte{protocol.ProtocolWireVersion}
44)
4245
43type Notification struct {46type Notification struct {
44 TopLevel int6447 TopLevel int64
@@ -84,6 +87,7 @@
84 ExpectAllRepairedTime time.Duration87 ExpectAllRepairedTime time.Duration
85 PEM []byte88 PEM []byte
86 Info map[string]interface{}89 Info map[string]interface{}
90 AuthHelper []string
87}91}
8892
89// ClientSession holds a client<->server session and its configuration.93// ClientSession holds a client<->server session and its configuration.
@@ -115,6 +119,8 @@
115 stateP *uint32119 stateP *uint32
116 ErrCh chan error120 ErrCh chan error
117 MsgCh chan *Notification121 MsgCh chan *Notification
122 // authorization
123 auth string
118 // autoredial knobs124 // autoredial knobs
119 shouldDelayP *uint32125 shouldDelayP *uint32
120 lastAutoRedial time.Time126 lastAutoRedial time.Time
@@ -234,6 +240,27 @@
234 return nil240 return nil
235}241}
236242
243// addAuthorization gets the authorization blob to send to the server
244// and adds it to the session.
245func (sess *ClientSession) addAuthorization() error {
246 sess.Log.Debugf("adding authorization")
247 // using a helper, for now at least
248 if len(sess.AuthHelper) == 0 {
249 // do nothing if helper is unset or empty
250 return nil
251 }
252
253 auth, err := exec.Command(sess.AuthHelper[0], sess.AuthHelper[1:]...).Output()
254 if err != nil {
255 // For now we just log the error, as we don't want to block unauthorized users
256 sess.Log.Errorf("unable to get the authorization token from the account: %v", err)
257 } else {
258 sess.auth = strings.TrimSpace(string(auth))
259 }
260
261 return nil
262}
263
237func (sess *ClientSession) resetHosts() {264func (sess *ClientSession) resetHosts() {
238 sess.deliveryHosts = nil265 sess.deliveryHosts = nil
239}266}
@@ -457,10 +484,9 @@
457 return err484 return err
458 }485 }
459 err = proto.WriteMessage(protocol.ConnectMsg{486 err = proto.WriteMessage(protocol.ConnectMsg{
460 Type: "connect",487 Type: "connect",
461 DeviceId: sess.DeviceId,488 DeviceId: sess.DeviceId,
462 // xxx get the SSO Authorization string from the phone489 Authorization: sess.auth,
463 Authorization: "",
464 Levels: levels,490 Levels: levels,
465 Info: sess.Info,491 Info: sess.Info,
466 })492 })
@@ -495,13 +521,15 @@
495521
496// run calls connect, and if it works it calls start, and if it works522// run calls connect, and if it works it calls start, and if it works
497// it runs loop in a goroutine, and ships its return value over ErrCh.523// it runs loop in a goroutine, and ships its return value over ErrCh.
498func (sess *ClientSession) run(closer func(), hostGetter, connecter, starter, looper func() error) error {524func (sess *ClientSession) run(closer func(), authChecker, hostGetter, connecter, starter, looper func() error) error {
499 closer()525 closer()
500 err := hostGetter()526 if err := authChecker(); err != nil {
501 if err != nil {527 return err
502 return err528 }
503 }529 if err := hostGetter(); err != nil {
504 err = connecter()530 return err
531 }
532 err := connecter()
505 if err == nil {533 if err == nil {
506 err = starter()534 err = starter()
507 if err == nil {535 if err == nil {
@@ -531,7 +559,7 @@
531 // keep on trying.559 // keep on trying.
532 panic("can't Dial() without a protocol constructor.")560 panic("can't Dial() without a protocol constructor.")
533 }561 }
534 return sess.run(sess.doClose, sess.getHosts, sess.connect, sess.start, sess.loop)562 return sess.run(sess.doClose, sess.addAuthorization, sess.getHosts, sess.connect, sess.start, sess.loop)
535}563}
536564
537func init() {565func init() {
538566
=== modified file 'client/session/session_test.go'
--- client/session/session_test.go 2014-04-18 16:37:31 +0000
+++ client/session/session_test.go 2014-05-12 14:11:46 +0000
@@ -34,7 +34,6 @@
3434
35 "launchpad.net/ubuntu-push/client/gethosts"35 "launchpad.net/ubuntu-push/client/gethosts"
36 "launchpad.net/ubuntu-push/client/session/levelmap"36 "launchpad.net/ubuntu-push/client/session/levelmap"
37 "launchpad.net/ubuntu-push/logger"
38 "launchpad.net/ubuntu-push/protocol"37 "launchpad.net/ubuntu-push/protocol"
39 helpers "launchpad.net/ubuntu-push/testing"38 helpers "launchpad.net/ubuntu-push/testing"
40 "launchpad.net/ubuntu-push/testing/condition"39 "launchpad.net/ubuntu-push/testing/condition"
@@ -166,7 +165,7 @@
166/////165/////
167166
168type clientSessionSuite struct {167type clientSessionSuite struct {
169 log logger.Logger168 log *helpers.TestLogger
170 lvls func() (levelmap.LevelMap, error)169 lvls func() (levelmap.LevelMap, error)
171}170}
172171
@@ -347,6 +346,43 @@
347}346}
348347
349/****************************************************************348/****************************************************************
349 addAuthorization() tests
350****************************************************************/
351
352func (cs *clientSessionSuite) TestAddAuthorizationAddsAuthorization(c *C) {
353 sess := &ClientSession{Log: cs.log}
354 sess.AuthHelper = []string{"echo", "some auth"}
355 c.Assert(sess.auth, Equals, "")
356 err := sess.addAuthorization()
357 c.Assert(err, IsNil)
358 c.Check(sess.auth, Equals, "some auth")
359}
360
361func (cs *clientSessionSuite) TestAddAuthorizationIgnoresErrors(c *C) {
362 sess := &ClientSession{Log: cs.log}
363 sess.AuthHelper = []string{"sh", "-c", "echo hello; false"}
364
365 c.Assert(sess.auth, Equals, "")
366 err := sess.addAuthorization()
367 c.Assert(err, IsNil)
368 c.Check(sess.auth, Equals, "")
369}
370
371func (cs *clientSessionSuite) TestAddAuthorizationSkipsIfUnsetOrNil(c *C) {
372 sess := &ClientSession{Log: cs.log}
373 sess.AuthHelper = nil
374 c.Assert(sess.auth, Equals, "")
375 err := sess.addAuthorization()
376 c.Assert(err, IsNil)
377 c.Check(sess.auth, Equals, "")
378
379 sess.AuthHelper = []string{}
380 err = sess.addAuthorization()
381 c.Assert(err, IsNil)
382 c.Check(sess.auth, Equals, "")
383}
384
385/****************************************************************
350 startConnectionAttempt()/nextHostToTry()/started tests386 startConnectionAttempt()/nextHostToTry()/started tests
351****************************************************************/387****************************************************************/
352388
@@ -931,9 +967,10 @@
931967
932 c.Check(takeNext(downCh), Equals, "deadline 0")968 c.Check(takeNext(downCh), Equals, "deadline 0")
933 c.Check(takeNext(downCh), DeepEquals, protocol.ConnectMsg{969 c.Check(takeNext(downCh), DeepEquals, protocol.ConnectMsg{
934 Type: "connect",970 Type: "connect",
935 DeviceId: sess.DeviceId,971 DeviceId: sess.DeviceId,
936 Levels: map[string]int64{},972 Levels: map[string]int64{},
973 Authorization: "",
937 })974 })
938 upCh <- errors.New("Overflow error in /dev/null")975 upCh <- errors.New("Overflow error in /dev/null")
939 err = <-errCh976 err = <-errCh
@@ -1038,6 +1075,7 @@
1038 msg, ok := takeNext(downCh).(protocol.ConnectMsg)1075 msg, ok := takeNext(downCh).(protocol.ConnectMsg)
1039 c.Check(ok, Equals, true)1076 c.Check(ok, Equals, true)
1040 c.Check(msg.DeviceId, Equals, "wah")1077 c.Check(msg.DeviceId, Equals, "wah")
1078 c.Check(msg.Authorization, Equals, "")
1041 c.Check(msg.Info, DeepEquals, info)1079 c.Check(msg.Info, DeepEquals, info)
1042 upCh <- nil // no error1080 upCh <- nil // no error
1043 upCh <- protocol.ConnAckMsg{1081 upCh <- protocol.ConnAckMsg{
@@ -1054,6 +1092,22 @@
1054 run() tests1092 run() tests
1055****************************************************************/1093****************************************************************/
10561094
1095func (cs *clientSessionSuite) TestRunBailsIfAuthCheckFails(c *C) {
1096 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
1097 c.Assert(err, IsNil)
1098 failure := errors.New("TestRunBailsIfAuthCheckFails")
1099 has_closed := false
1100 err = sess.run(
1101 func() { has_closed = true },
1102 func() error { return failure },
1103 nil,
1104 nil,
1105 nil,
1106 nil)
1107 c.Check(err, Equals, failure)
1108 c.Check(has_closed, Equals, true)
1109}
1110
1057func (cs *clientSessionSuite) TestRunBailsIfHostGetterFails(c *C) {1111func (cs *clientSessionSuite) TestRunBailsIfHostGetterFails(c *C) {
1058 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)1112 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
1059 c.Assert(err, IsNil)1113 c.Assert(err, IsNil)
@@ -1061,6 +1115,7 @@
1061 has_closed := false1115 has_closed := false
1062 err = sess.run(1116 err = sess.run(
1063 func() { has_closed = true },1117 func() { has_closed = true },
1118 func() error { return nil },
1064 func() error { return failure },1119 func() error { return failure },
1065 nil,1120 nil,
1066 nil,1121 nil,
@@ -1076,6 +1131,7 @@
1076 err = sess.run(1131 err = sess.run(
1077 func() {},1132 func() {},
1078 func() error { return nil },1133 func() error { return nil },
1134 func() error { return nil },
1079 func() error { return failure },1135 func() error { return failure },
1080 nil,1136 nil,
1081 nil)1137 nil)
@@ -1090,6 +1146,7 @@
1090 func() {},1146 func() {},
1091 func() error { return nil },1147 func() error { return nil },
1092 func() error { return nil },1148 func() error { return nil },
1149 func() error { return nil },
1093 func() error { return failure },1150 func() error { return failure },
1094 nil)1151 nil)
1095 c.Check(err, Equals, failure)1152 c.Check(err, Equals, failure)
@@ -1109,6 +1166,7 @@
1109 func() error { return nil },1166 func() error { return nil },
1110 func() error { return nil },1167 func() error { return nil },
1111 func() error { return nil },1168 func() error { return nil },
1169 func() error { return nil },
1112 func() error { sess.MsgCh <- notf; return <-failureCh })1170 func() error { sess.MsgCh <- notf; return <-failureCh })
1113 c.Check(err, Equals, nil)1171 c.Check(err, Equals, nil)
1114 // if run doesn't error it sets up the channels1172 // if run doesn't error it sets up the channels
11151173
=== modified file 'debian/changelog'
--- debian/changelog 2014-04-23 11:54:00 +0000
+++ debian/changelog 2014-05-12 14:11:46 +0000
@@ -1,3 +1,9 @@
1ubuntu-push (0.21-0.ubuntu1) UNRELEASED; urgency=medium
2
3 * New upstream release: first auth bits, and Qt dependency.
4
5 -- John Lenton <john.lenton@canonical.com> Tue, 15 Apr 2014 14:04:35 +0100
6
1ubuntu-push (0.2.1+14.04.20140423.1-0ubuntu1) trusty; urgency=high7ubuntu-push (0.2.1+14.04.20140423.1-0ubuntu1) trusty; urgency=high
28
3 [ Samuele Pedroni ]9 [ Samuele Pedroni ]
410
=== modified file 'debian/control'
--- debian/control 2014-03-25 16:26:20 +0000
+++ debian/control 2014-05-12 14:11:46 +0000
@@ -14,6 +14,8 @@
14 libgcrypt11-dev,14 libgcrypt11-dev,
15 libglib2.0-dev (>= 2.31.6),15 libglib2.0-dev (>= 2.31.6),
16 libwhoopsie-dev,16 libwhoopsie-dev,
17 libubuntuoneauth-2.0-dev,
18 cmake,
17Standards-Version: 3.9.519Standards-Version: 3.9.5
18Homepage: http://launchpad.net/ubuntu-push20Homepage: http://launchpad.net/ubuntu-push
19Vcs-Bzr: lp:ubuntu-push21Vcs-Bzr: lp:ubuntu-push
2022
=== modified file 'debian/rules'
--- debian/rules 2014-03-24 12:22:55 +0000
+++ debian/rules 2014-05-12 14:11:46 +0000
@@ -2,9 +2,13 @@
2# -*- makefile -*-2# -*- makefile -*-
33
4export DH_GOPKG := launchpad.net/ubuntu-push4export DH_GOPKG := launchpad.net/ubuntu-push
5export DEB_BUILD_OPTIONS := nostrip
6export UBUNTU_PUSH_TEST_RESOURCES_ROOT := $(CURDIR)5export UBUNTU_PUSH_TEST_RESOURCES_ROOT := $(CURDIR)
76
7override_dh_auto_build:
8 cd $$( find ./ -type d -regex '\./[^/]*/src/launchpad.net' -printf "%h\n" | head -n1)
9 dh_auto_build --buildsystem=golang
10 (cd signing-helper && cmake . && make)
11
8override_dh_install:12override_dh_install:
9 dh_install -Xusr/bin/cmd -Xusr/bin/dev --fail-missing13 dh_install -Xusr/bin/cmd -Xusr/bin/dev --fail-missing
1014
1115
=== modified file 'debian/ubuntu-push-client.install'
--- debian/ubuntu-push-client.install 2014-03-26 16:27:19 +0000
+++ debian/ubuntu-push-client.install 2014-05-12 14:11:46 +0000
@@ -1,4 +1,5 @@
1#!/usr/bin/dh-exec1#!/usr/bin/dh-exec
2debian/config.json /etc/xdg/ubuntu-push-client2debian/config.json /etc/xdg/ubuntu-push-client
3debian/ubuntu-push-client.conf /usr/share/upstart/sessions3debian/ubuntu-push-client.conf /usr/share/upstart/sessions
4signing-helper/signing-helper /usr/lib/ubuntu-push-client
4usr/bin/ubuntu-push => /usr/lib/ubuntu-push-client/ubuntu-push-client5usr/bin/ubuntu-push => /usr/lib/ubuntu-push-client/ubuntu-push-client
56
=== modified file 'dependencies.tsv'
--- dependencies.tsv 2014-03-12 13:23:26 +0000
+++ dependencies.tsv 2014-05-12 14:11:46 +0000
@@ -2,3 +2,4 @@
2launchpad.net/go-dbus/v1 bzr james@jamesh.id.au-20140206110213-pbzcr6ucaz3rqmnw 1252launchpad.net/go-dbus/v1 bzr james@jamesh.id.au-20140206110213-pbzcr6ucaz3rqmnw 125
3launchpad.net/go-xdg/v0 bzr john.lenton@canonical.com-20140208094800-gubd5md7cro3mtxa 103launchpad.net/go-xdg/v0 bzr john.lenton@canonical.com-20140208094800-gubd5md7cro3mtxa 10
4launchpad.net/gocheck bzr gustavo@niemeyer.net-20140127131816-zshobk1qqme626xw 864launchpad.net/gocheck bzr gustavo@niemeyer.net-20140127131816-zshobk1qqme626xw 86
5launchpad.net/~ubuntu-push-hackers/ubuntu-push/go-uuid bzr samuele.pedroni@canonical.com-20140130122455-pm9h8etl4owp90lg 1
56
=== modified file 'protocol/messages.go'
--- protocol/messages.go 2014-04-04 13:54:45 +0000
+++ protocol/messages.go 2014-05-12 14:11:46 +0000
@@ -54,6 +54,14 @@
54 Split() (done bool)54 Split() (done bool)
55}55}
5656
57// OnewayMsg are messages that are not to be followed by a response,
58// after sending them the session either aborts or continues.
59type OnewayMsg interface {
60 SplittableMsg
61 // continue session after the message?
62 OnewayContinue() bool
63}
64
57// CONNBROKEN message, server side is breaking the connection for reason.65// CONNBROKEN message, server side is breaking the connection for reason.
58type ConnBrokenMsg struct {66type ConnBrokenMsg struct {
59 Type string `json:"T"`67 Type string `json:"T"`
@@ -65,11 +73,35 @@
65 return true73 return true
66}74}
6775
76func (m *ConnBrokenMsg) OnewayContinue() bool {
77 return false
78}
79
68// CONNBROKEN reasons80// CONNBROKEN reasons
69const (81const (
70 BrokenHostMismatch = "host-mismatch"82 BrokenHostMismatch = "host-mismatch"
71)83)
7284
85// CONNWARN message, server side is warning about partial functionality
86// because reason.
87type ConnWarnMsg struct {
88 Type string `json:"T"`
89 // reason
90 Reason string
91}
92
93func (m *ConnWarnMsg) Split() bool {
94 return true
95}
96func (m *ConnWarnMsg) OnewayContinue() bool {
97 return true
98}
99
100// CONNWARN reasons
101const (
102 WarnUnauthorized = "unauthorized"
103)
104
73// PING/PONG messages105// PING/PONG messages
74type PingPongMsg struct {106type PingPongMsg struct {
75 Type string `json:"T"`107 Type string `json:"T"`
@@ -122,6 +154,17 @@
122 Notifications []Notification154 Notifications []Notification
123}155}
124156
157// Reset resets the splitting state if the message storage is to be
158// reused.
159func (m *NotificationsMsg) Reset() {
160 // xxx
161}
162
163func (m *NotificationsMsg) Split() bool {
164 // xxx
165 return true
166}
167
125// A single unicast notification168// A single unicast notification
126type Notification struct {169type Notification struct {
127 AppId string `json:"A"`170 AppId string `json:"A"`
@@ -130,6 +173,19 @@
130 Payload json.RawMessage `json:"P"`173 Payload json.RawMessage `json:"P"`
131}174}
132175
176// ExtractPayloads gets only the payloads out of a slice of notications.
177func ExtractPayloads(notifications []Notification) []json.RawMessage {
178 n := len(notifications)
179 if n == 0 {
180 return nil
181 }
182 payloads := make([]json.RawMessage, n)
183 for i := 0; i < n; i++ {
184 payloads[i] = notifications[i].Payload
185 }
186 return payloads
187}
188
133// ACKnowledgement message189// ACKnowledgement message
134type AckMsg struct {190type AckMsg struct {
135 Type string `json:"T"`191 Type string `json:"T"`
136192
=== modified file 'protocol/messages_test.go'
--- protocol/messages_test.go 2014-04-04 13:19:10 +0000
+++ protocol/messages_test.go 2014-05-12 14:11:46 +0000
@@ -104,6 +104,22 @@
104 c.Check(b.splitting, Equals, 0)104 c.Check(b.splitting, Equals, 0)
105}105}
106106
107func (s *messagesSuite) TestSplitConnBrokenMsg(c *C) {107func (s *messagesSuite) TestConnBrokenMsg(c *C) {
108 c.Check((&ConnBrokenMsg{}).Split(), Equals, true)108 m := &ConnBrokenMsg{}
109 c.Check(m.Split(), Equals, true)
110 c.Check(m.OnewayContinue(), Equals, false)
111}
112
113func (s *messagesSuite) TestConnWarnMsg(c *C) {
114 m := &ConnWarnMsg{}
115 c.Check(m.Split(), Equals, true)
116 c.Check(m.OnewayContinue(), Equals, true)
117}
118
119func (s *messagesSuite) TestExtractPayloads(c *C) {
120 c.Check(ExtractPayloads(nil), IsNil)
121 p1 := json.RawMessage(`{"a":1}`)
122 p2 := json.RawMessage(`{"b":2}`)
123 ns := []Notification{Notification{Payload: p1}, Notification{Payload: p2}}
124 c.Check(ExtractPayloads(ns), DeepEquals, []json.RawMessage{p1, p2})
109}125}
110126
=== modified file 'protocol/state-diag-client.gv'
--- protocol/state-diag-client.gv 2014-01-16 20:07:13 +0000
+++ protocol/state-diag-client.gv 2014-05-12 14:11:46 +0000
@@ -2,7 +2,7 @@
2 label = "State diagram for client";2 label = "State diagram for client";
3 size="12,6";3 size="12,6";
4 rankdir=LR;4 rankdir=LR;
5 node [shape = doublecircle]; pingTimeout;5 node [shape = doublecircle]; pingTimeout; connBroken;
6 node [shape = circle];6 node [shape = circle];
7 start1 -> start2 [ label = "Write wire version" ];7 start1 -> start2 [ label = "Write wire version" ];
8 start2 -> start3 [ label = "Write CONNECT" ];8 start2 -> start3 [ label = "Write CONNECT" ];
@@ -13,4 +13,7 @@
13 broadcast -> loop [label = "Write ACK"];13 broadcast -> loop [label = "Write ACK"];
14 loop -> pingTimeout [14 loop -> pingTimeout [
15 label = "Elapsed ping interval + exchange interval"];15 label = "Elapsed ping interval + exchange interval"];
16 loop -> connBroken [label = "Read CONNBROKEN"];
17 loop -> warn [label = "Read CONNWARN"];
18 warn -> loop;
16}19}
1720
=== modified file 'protocol/state-diag-client.svg'
--- protocol/state-diag-client.svg 2014-01-16 19:37:57 +0000
+++ protocol/state-diag-client.svg 2014-05-12 14:11:46 +0000
@@ -4,95 +4,123 @@
4<!-- Generated by graphviz version 2.26.3 (20100126.1600)4<!-- Generated by graphviz version 2.26.3 (20100126.1600)
5 -->5 -->
6<!-- Title: state_diagram_client Pages: 1 -->6<!-- Title: state_diagram_client Pages: 1 -->
7<svg width="864pt" height="279pt"7<svg width="822pt" height="432pt"
8 viewBox="0.00 0.00 864.00 278.89" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">8 viewBox="0.00 0.00 822.36 432.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
9<g id="graph1" class="graph" transform="scale(0.683544 0.683544) rotate(0) translate(4 404)">9<g id="graph1" class="graph" transform="scale(0.650602 0.650602) rotate(0) translate(4 660)">
10<title>state_diagram_client</title>10<title>state_diagram_client</title>
11<polygon fill="white" stroke="white" points="-4,5 -4,-404 1261,-404 1261,5 -4,5"/>11<polygon fill="white" stroke="white" points="-4,5 -4,-660 1261,-660 1261,5 -4,5"/>
12<text text-anchor="middle" x="628" y="-9.4" font-family="Times Roman,serif" font-size="14.00">State diagram for client</text>12<text text-anchor="middle" x="628" y="-9.4" font-family="Times Roman,serif" font-size="14.00">State diagram for client</text>
13<!-- pingTimeout -->13<!-- pingTimeout -->
14<g id="node1" class="node"><title>pingTimeout</title>14<g id="node1" class="node"><title>pingTimeout</title>
15<ellipse fill="none" stroke="black" cx="1180" cy="-324" rx="72.1249" ry="72.1249"/>15<ellipse fill="none" stroke="black" cx="1180" cy="-580" rx="72.1249" ry="72.1249"/>
16<ellipse fill="none" stroke="black" cx="1180" cy="-324" rx="76.1249" ry="76.1249"/>16<ellipse fill="none" stroke="black" cx="1180" cy="-580" rx="76.1249" ry="76.1249"/>
17<text text-anchor="middle" x="1180" y="-320.4" font-family="Times Roman,serif" font-size="14.00">pingTimeout</text>17<text text-anchor="middle" x="1180" y="-576.4" font-family="Times Roman,serif" font-size="14.00">pingTimeout</text>
18</g>
19<!-- connBroken -->
20<g id="node2" class="node"><title>connBroken</title>
21<ellipse fill="none" stroke="black" cx="1180" cy="-413" rx="68.8251" ry="69.2965"/>
22<ellipse fill="none" stroke="black" cx="1180" cy="-413" rx="72.7978" ry="73.2965"/>
23<text text-anchor="middle" x="1180" y="-409.4" font-family="Times Roman,serif" font-size="14.00">connBroken</text>
18</g>24</g>
19<!-- start1 -->25<!-- start1 -->
20<g id="node2" class="node"><title>start1</title>26<g id="node3" class="node"><title>start1</title>
21<ellipse fill="none" stroke="black" cx="42" cy="-166" rx="41.2167" ry="41.7193"/>27<ellipse fill="none" stroke="black" cx="42" cy="-231" rx="41.2167" ry="41.7193"/>
22<text text-anchor="middle" x="42" y="-162.4" font-family="Times Roman,serif" font-size="14.00">start1</text>28<text text-anchor="middle" x="42" y="-227.4" font-family="Times Roman,serif" font-size="14.00">start1</text>
23</g>29</g>
24<!-- start2 -->30<!-- start2 -->
25<g id="node4" class="node"><title>start2</title>31<g id="node5" class="node"><title>start2</title>
26<ellipse fill="none" stroke="black" cx="292" cy="-166" rx="41.2167" ry="41.7193"/>32<ellipse fill="none" stroke="black" cx="292" cy="-231" rx="41.2167" ry="41.7193"/>
27<text text-anchor="middle" x="292" y="-162.4" font-family="Times Roman,serif" font-size="14.00">start2</text>33<text text-anchor="middle" x="292" y="-227.4" font-family="Times Roman,serif" font-size="14.00">start2</text>
28</g>34</g>
29<!-- start1&#45;&gt;start2 -->35<!-- start1&#45;&gt;start2 -->
30<g id="edge2" class="edge"><title>start1&#45;&gt;start2</title>36<g id="edge2" class="edge"><title>start1&#45;&gt;start2</title>
31<path fill="none" stroke="black" d="M83.5631,-166C126.547,-166 193.757,-166 240.181,-166"/>37<path fill="none" stroke="black" d="M83.5631,-231C126.547,-231 193.757,-231 240.181,-231"/>
32<polygon fill="black" stroke="black" points="240.338,-169.5 250.338,-166 240.338,-162.5 240.338,-169.5"/>38<polygon fill="black" stroke="black" points="240.338,-234.5 250.338,-231 240.338,-227.5 240.338,-234.5"/>
33<text text-anchor="middle" x="167" y="-171.4" font-family="Times Roman,serif" font-size="14.00">Write wire version</text>39<text text-anchor="middle" x="167" y="-236.4" font-family="Times Roman,serif" font-size="14.00">Write wire version</text>
34</g>40</g>
35<!-- start3 -->41<!-- start3 -->
36<g id="node6" class="node"><title>start3</title>42<g id="node7" class="node"><title>start3</title>
37<ellipse fill="none" stroke="black" cx="526" cy="-166" rx="41.2167" ry="41.7193"/>43<ellipse fill="none" stroke="black" cx="526" cy="-231" rx="41.2167" ry="41.7193"/>
38<text text-anchor="middle" x="526" y="-162.4" font-family="Times Roman,serif" font-size="14.00">start3</text>44<text text-anchor="middle" x="526" y="-227.4" font-family="Times Roman,serif" font-size="14.00">start3</text>
39</g>45</g>
40<!-- start2&#45;&gt;start3 -->46<!-- start2&#45;&gt;start3 -->
41<g id="edge4" class="edge"><title>start2&#45;&gt;start3</title>47<g id="edge4" class="edge"><title>start2&#45;&gt;start3</title>
42<path fill="none" stroke="black" d="M333.565,-166C372.875,-166 431.992,-166 474.321,-166"/>48<path fill="none" stroke="black" d="M333.565,-231C372.875,-231 431.992,-231 474.321,-231"/>
43<polygon fill="black" stroke="black" points="474.429,-169.5 484.429,-166 474.429,-162.5 474.429,-169.5"/>49<polygon fill="black" stroke="black" points="474.429,-234.5 484.429,-231 474.429,-227.5 474.429,-234.5"/>
44<text text-anchor="middle" x="409" y="-171.4" font-family="Times Roman,serif" font-size="14.00">Write CONNECT</text>50<text text-anchor="middle" x="409" y="-236.4" font-family="Times Roman,serif" font-size="14.00">Write CONNECT</text>
45</g>51</g>
46<!-- loop -->52<!-- loop -->
47<g id="node8" class="node"><title>loop</title>53<g id="node9" class="node"><title>loop</title>
48<ellipse fill="none" stroke="black" cx="746" cy="-166" rx="31.8198" ry="31.8198"/>54<ellipse fill="none" stroke="black" cx="746" cy="-231" rx="31.8198" ry="31.8198"/>
49<text text-anchor="middle" x="746" y="-162.4" font-family="Times Roman,serif" font-size="14.00">loop</text>55<text text-anchor="middle" x="746" y="-227.4" font-family="Times Roman,serif" font-size="14.00">loop</text>
50</g>56</g>
51<!-- start3&#45;&gt;loop -->57<!-- start3&#45;&gt;loop -->
52<g id="edge6" class="edge"><title>start3&#45;&gt;loop</title>58<g id="edge6" class="edge"><title>start3&#45;&gt;loop</title>
53<path fill="none" stroke="black" d="M567.639,-166C606.633,-166 664.616,-166 703.793,-166"/>59<path fill="none" stroke="black" d="M567.639,-231C606.633,-231 664.616,-231 703.793,-231"/>
54<polygon fill="black" stroke="black" points="703.818,-169.5 713.818,-166 703.818,-162.5 703.818,-169.5"/>60<polygon fill="black" stroke="black" points="703.818,-234.5 713.818,-231 703.818,-227.5 703.818,-234.5"/>
55<text text-anchor="middle" x="641" y="-171.4" font-family="Times Roman,serif" font-size="14.00">Read CONNACK</text>61<text text-anchor="middle" x="641" y="-236.4" font-family="Times Roman,serif" font-size="14.00">Read CONNACK</text>
56</g>62</g>
57<!-- loop&#45;&gt;pingTimeout -->63<!-- loop&#45;&gt;pingTimeout -->
58<g id="edge16" class="edge"><title>loop&#45;&gt;pingTimeout</title>64<g id="edge16" class="edge"><title>loop&#45;&gt;pingTimeout</title>
59<path fill="none" stroke="black" d="M763.666,-192.937C772.211,-204.042 783.361,-216.128 796,-224 888.06,-281.339 1012.12,-305.973 1094,-316.443"/>65<path fill="none" stroke="black" d="M750.211,-262.971C757.458,-313.528 773.689,-408.79 796,-434 872.806,-520.784 1006.81,-556.22 1094.46,-570.528"/>
60<polygon fill="black" stroke="black" points="1093.67,-319.928 1104.02,-317.68 1094.53,-312.981 1093.67,-319.928"/>66<polygon fill="black" stroke="black" points="1093.96,-573.992 1104.39,-572.09 1095.05,-567.078 1093.96,-573.992"/>
61<text text-anchor="middle" x="941" y="-319.4" font-family="Times Roman,serif" font-size="14.00">Elapsed ping interval + exchange interval</text>67<text text-anchor="middle" x="941" y="-572.4" font-family="Times Roman,serif" font-size="14.00">Elapsed ping interval + exchange interval</text>
68</g>
69<!-- loop&#45;&gt;connBroken -->
70<g id="edge18" class="edge"><title>loop&#45;&gt;connBroken</title>
71<path fill="none" stroke="black" d="M755.1,-261.824C762.755,-282.438 775.756,-308.526 796,-324 883.382,-390.791 1012.39,-408.797 1096.33,-412.948"/>
72<polygon fill="black" stroke="black" points="1096.19,-416.445 1106.33,-413.388 1096.5,-409.452 1096.19,-416.445"/>
73<text text-anchor="middle" x="941" y="-417.4" font-family="Times Roman,serif" font-size="14.00">Read CONNBROKEN</text>
62</g>74</g>
63<!-- pong -->75<!-- pong -->
64<g id="node10" class="node"><title>pong</title>76<g id="node11" class="node"><title>pong</title>
65<ellipse fill="none" stroke="black" cx="1180" cy="-195" rx="34.8574" ry="35.3553"/>77<ellipse fill="none" stroke="black" cx="1180" cy="-287" rx="34.8574" ry="35.3553"/>
66<text text-anchor="middle" x="1180" y="-191.4" font-family="Times Roman,serif" font-size="14.00">pong</text>78<text text-anchor="middle" x="1180" y="-283.4" font-family="Times Roman,serif" font-size="14.00">pong</text>
67</g>79</g>
68<!-- loop&#45;&gt;pong -->80<!-- loop&#45;&gt;pong -->
69<g id="edge8" class="edge"><title>loop&#45;&gt;pong</title>81<g id="edge8" class="edge"><title>loop&#45;&gt;pong</title>
70<path fill="none" stroke="black" d="M775.392,-179.044C782.046,-181.465 789.167,-183.653 796,-185 916.362,-208.722 1062.02,-203.515 1134.48,-198.706"/>82<path fill="none" stroke="black" d="M768.467,-253.959C776.476,-260.698 786.005,-267.259 796,-271 911.696,-314.31 1060.9,-303.343 1134.62,-293.955"/>
71<polygon fill="black" stroke="black" points="1134.89,-202.186 1144.62,-198.003 1134.4,-195.203 1134.89,-202.186"/>83<polygon fill="black" stroke="black" points="1135.49,-297.371 1144.94,-292.588 1134.57,-290.432 1135.49,-297.371"/>
72<text text-anchor="middle" x="941" y="-207.4" font-family="Times Roman,serif" font-size="14.00">Read PING</text>84<text text-anchor="middle" x="941" y="-307.4" font-family="Times Roman,serif" font-size="14.00">Read PING</text>
73</g>85</g>
74<!-- broadcast -->86<!-- broadcast -->
75<g id="node12" class="node"><title>broadcast</title>87<g id="node13" class="node"><title>broadcast</title>
76<ellipse fill="none" stroke="black" cx="1180" cy="-84" rx="58.1882" ry="58.6899"/>88<ellipse fill="none" stroke="black" cx="1180" cy="-176" rx="58.1882" ry="58.6899"/>
77<text text-anchor="middle" x="1180" y="-80.4" font-family="Times Roman,serif" font-size="14.00">broadcast</text>89<text text-anchor="middle" x="1180" y="-172.4" font-family="Times Roman,serif" font-size="14.00">broadcast</text>
78</g>90</g>
79<!-- loop&#45;&gt;broadcast -->91<!-- loop&#45;&gt;broadcast -->
80<g id="edge10" class="edge"><title>loop&#45;&gt;broadcast</title>92<g id="edge10" class="edge"><title>loop&#45;&gt;broadcast</title>
81<path fill="none" stroke="black" d="M770.52,-145.1C778.217,-139.607 787.053,-134.301 796,-131 917.482,-86.1746 957.924,-122.075 1086,-103 1094.61,-101.717 1103.63,-100.165 1112.53,-98.5074"/>93<path fill="none" stroke="black" d="M775.45,-218.228C782.1,-215.791 789.205,-213.528 796,-212 922.145,-183.64 957.464,-202.973 1086,-189 1094.36,-188.091 1103.12,-187.028 1111.79,-185.909"/>
82<polygon fill="black" stroke="black" points="1113.34,-101.917 1122.5,-96.5998 1112.02,-95.0419 1113.34,-101.917"/>94<polygon fill="black" stroke="black" points="1112.44,-189.353 1121.9,-184.574 1111.53,-182.413 1112.44,-189.353"/>
83<text text-anchor="middle" x="941" y="-136.4" font-family="Times Roman,serif" font-size="14.00">Read BROADCAST</text>95<text text-anchor="middle" x="941" y="-217.4" font-family="Times Roman,serif" font-size="14.00">Read BROADCAST</text>
96</g>
97<!-- warn -->
98<g id="node19" class="node"><title>warn</title>
99<ellipse fill="none" stroke="black" cx="1180" cy="-63" rx="36.7696" ry="36.7696"/>
100<text text-anchor="middle" x="1180" y="-59.4" font-family="Times Roman,serif" font-size="14.00">warn</text>
101</g>
102<!-- loop&#45;&gt;warn -->
103<g id="edge20" class="edge"><title>loop&#45;&gt;warn</title>
104<path fill="none" stroke="black" d="M753.357,-199.767C760.401,-177.027 773.396,-147.441 796,-131 901.425,-54.3166 958.242,-112.935 1086,-87 1101.84,-83.7841 1119.02,-79.6061 1134.3,-75.6396"/>
105<polygon fill="black" stroke="black" points="1135.26,-79.0068 1144.04,-73.0757 1133.48,-72.2376 1135.26,-79.0068"/>
106<text text-anchor="middle" x="941" y="-136.4" font-family="Times Roman,serif" font-size="14.00">Read CONNWARN</text>
84</g>107</g>
85<!-- pong&#45;&gt;loop -->108<!-- pong&#45;&gt;loop -->
86<g id="edge12" class="edge"><title>pong&#45;&gt;loop</title>109<g id="edge12" class="edge"><title>pong&#45;&gt;loop</title>
87<path fill="none" stroke="black" d="M1147.19,-180.867C1129.44,-173.986 1106.92,-166.463 1086,-163 980.081,-145.465 853.051,-154.36 788.368,-160.981"/>110<path fill="none" stroke="black" d="M1148.22,-271.079C1130.39,-262.942 1107.48,-253.77 1086,-249 1030.54,-236.684 866.695,-232.715 788.482,-231.502"/>
88<polygon fill="black" stroke="black" points="787.736,-157.528 778.16,-162.06 788.472,-164.489 787.736,-157.528"/>111<polygon fill="black" stroke="black" points="788.085,-227.996 778.035,-231.348 787.982,-234.995 788.085,-227.996"/>
89<text text-anchor="middle" x="941" y="-168.4" font-family="Times Roman,serif" font-size="14.00">Write PONG</text>112<text text-anchor="middle" x="941" y="-254.4" font-family="Times Roman,serif" font-size="14.00">Write PONG</text>
90</g>113</g>
91<!-- broadcast&#45;&gt;loop -->114<!-- broadcast&#45;&gt;loop -->
92<g id="edge14" class="edge"><title>broadcast&#45;&gt;loop</title>115<g id="edge14" class="edge"><title>broadcast&#45;&gt;loop</title>
93<path fill="none" stroke="black" d="M1123.8,-67.0114C1044.83,-46.6166 899.156,-22.0001 796,-81 778.946,-90.7538 767.135,-108.842 759.293,-125.833"/>116<path fill="none" stroke="black" d="M1121.7,-168.205C1028.72,-156.837 851.665,-139.849 796,-167 784,-172.853 774.037,-183.132 766.245,-193.762"/>
94<polygon fill="black" stroke="black" points="756.044,-124.528 755.336,-135.099 762.482,-127.277 756.044,-124.528"/>117<polygon fill="black" stroke="black" points="763.182,-192.043 760.465,-202.284 768.975,-195.973 763.182,-192.043"/>
95<text text-anchor="middle" x="941" y="-86.4" font-family="Times Roman,serif" font-size="14.00">Write ACK</text>118<text text-anchor="middle" x="941" y="-172.4" font-family="Times Roman,serif" font-size="14.00">Write ACK</text>
119</g>
120<!-- warn&#45;&gt;loop -->
121<g id="edge22" class="edge"><title>warn&#45;&gt;loop</title>
122<path fill="none" stroke="black" d="M1144.07,-53.3553C1070.4,-35.8873 900.397,-7.71825 796,-87 779.313,-99.6722 764.14,-151.763 754.991,-189.659"/>
123<polygon fill="black" stroke="black" points="751.574,-188.904 752.686,-199.44 758.387,-190.51 751.574,-188.904"/>
96</g>124</g>
97</g>125</g>
98</svg>126</svg>
99127
=== modified file 'protocol/state-diag-session.gv'
--- protocol/state-diag-session.gv 2014-01-16 20:07:13 +0000
+++ protocol/state-diag-session.gv 2014-05-12 14:11:46 +0000
@@ -2,6 +2,7 @@
2 label = "State diagram for session";2 label = "State diagram for session";
3 size="12,6";3 size="12,6";
4 rankdir=LR;4 rankdir=LR;
5 node [shape = doublecircle]; stop;
5 node [shape = circle];6 node [shape = circle];
6 start1 -> start2 [ label = "Read wire version" ];7 start1 -> start2 [ label = "Read wire version" ];
7 start2 -> start3 [ label = "Read CONNECT" ];8 start2 -> start3 [ label = "Read CONNECT" ];
@@ -17,4 +18,13 @@
17 split_broadcast -> split_ack_wait [label = "Write split BROADCAST"];18 split_broadcast -> split_ack_wait [label = "Write split BROADCAST"];
18 split_ack_wait -> split_broadcast [label = "Read ACK"];19 split_ack_wait -> split_broadcast [label = "Read ACK"];
19 split_broadcast -> loop [label = "All split msgs written"];20 split_broadcast -> loop [label = "All split msgs written"];
21 // other
22 loop -> conn_broken [label = "Receive connbroken request"];
23 loop -> conn_warn [label = "Receive connwarn request"];
24 conn_broken -> stop [label = "Write CONNBROKEN"];
25 conn_warn -> loop [label = "Write CONNWARN"];
26 // timeouts
27 ack_wait -> stop [label = "Elapsed exhange timeout"];
28 split_ack_wait -> stop [label = "Elapsed exhange timeout"];
29 pong_wait -> stop [label = "Elapsed exhange timeout"];
20}30}
2131
=== modified file 'protocol/state-diag-session.svg'
--- protocol/state-diag-session.svg 2014-01-16 19:37:57 +0000
+++ protocol/state-diag-session.svg 2014-05-12 14:11:46 +0000
@@ -4,139 +4,197 @@
4<!-- Generated by graphviz version 2.26.3 (20100126.1600)4<!-- Generated by graphviz version 2.26.3 (20100126.1600)
5 -->5 -->
6<!-- Title: state_diagram_session Pages: 1 -->6<!-- Title: state_diagram_session Pages: 1 -->
7<svg width="864pt" height="208pt"7<svg width="864pt" height="266pt"
8 viewBox="0.00 0.00 864.00 207.94" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">8 viewBox="0.00 0.00 864.00 265.73" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
9<g id="graph1" class="graph" transform="scale(0.435923 0.435923) rotate(0) translate(4 473)">9<g id="graph1" class="graph" transform="scale(0.367035 0.367035) rotate(0) translate(4 720)">
10<title>state_diagram_session</title>10<title>state_diagram_session</title>
11<polygon fill="white" stroke="white" points="-4,5 -4,-473 1979,-473 1979,5 -4,5"/>11<polygon fill="white" stroke="white" points="-4,5 -4,-720 2351,-720 2351,5 -4,5"/>
12<text text-anchor="middle" x="987" y="-9.4" font-family="Times Roman,serif" font-size="14.00">State diagram for session</text>12<text text-anchor="middle" x="1173" y="-9.4" font-family="Times Roman,serif" font-size="14.00">State diagram for session</text>
13<!-- stop -->
14<g id="node1" class="node"><title>stop</title>
15<ellipse fill="none" stroke="black" cx="2309" cy="-335" rx="32.0813" ry="32.5269"/>
16<ellipse fill="none" stroke="black" cx="2309" cy="-335" rx="36.0265" ry="36.5269"/>
17<text text-anchor="middle" x="2309" y="-331.4" font-family="Times Roman,serif" font-size="14.00">stop</text>
18</g>
13<!-- start1 -->19<!-- start1 -->
14<g id="node1" class="node"><title>start1</title>20<g id="node2" class="node"><title>start1</title>
15<ellipse fill="none" stroke="black" cx="42" cy="-294" rx="41.2167" ry="41.7193"/>21<ellipse fill="none" stroke="black" cx="42" cy="-395" rx="41.2167" ry="41.7193"/>
16<text text-anchor="middle" x="42" y="-290.4" font-family="Times Roman,serif" font-size="14.00">start1</text>22<text text-anchor="middle" x="42" y="-391.4" font-family="Times Roman,serif" font-size="14.00">start1</text>
17</g>23</g>
18<!-- start2 -->24<!-- start2 -->
19<g id="node3" class="node"><title>start2</title>25<g id="node4" class="node"><title>start2</title>
20<ellipse fill="none" stroke="black" cx="286" cy="-294" rx="41.2167" ry="41.7193"/>26<ellipse fill="none" stroke="black" cx="286" cy="-395" rx="41.2167" ry="41.7193"/>
21<text text-anchor="middle" x="286" y="-290.4" font-family="Times Roman,serif" font-size="14.00">start2</text>27<text text-anchor="middle" x="286" y="-391.4" font-family="Times Roman,serif" font-size="14.00">start2</text>
22</g>28</g>
23<!-- start1&#45;&gt;start2 -->29<!-- start1&#45;&gt;start2 -->
24<g id="edge2" class="edge"><title>start1&#45;&gt;start2</title>30<g id="edge2" class="edge"><title>start1&#45;&gt;start2</title>
25<path fill="none" stroke="black" d="M83.6679,-294C125.213,-294 189.13,-294 233.981,-294"/>31<path fill="none" stroke="black" d="M83.6679,-395C125.213,-395 189.13,-395 233.981,-395"/>
26<polygon fill="black" stroke="black" points="234.096,-297.5 244.096,-294 234.096,-290.5 234.096,-297.5"/>32<polygon fill="black" stroke="black" points="234.096,-398.5 244.096,-395 234.096,-391.5 234.096,-398.5"/>
27<text text-anchor="middle" x="164" y="-299.4" font-family="Times Roman,serif" font-size="14.00">Read wire version</text>33<text text-anchor="middle" x="164" y="-400.4" font-family="Times Roman,serif" font-size="14.00">Read wire version</text>
28</g>34</g>
29<!-- start3 -->35<!-- start3 -->
30<g id="node5" class="node"><title>start3</title>36<g id="node6" class="node"><title>start3</title>
31<ellipse fill="none" stroke="black" cx="516" cy="-294" rx="41.2167" ry="41.7193"/>37<ellipse fill="none" stroke="black" cx="537" cy="-395" rx="41.2167" ry="41.7193"/>
32<text text-anchor="middle" x="516" y="-290.4" font-family="Times Roman,serif" font-size="14.00">start3</text>38<text text-anchor="middle" x="537" y="-391.4" font-family="Times Roman,serif" font-size="14.00">start3</text>
33</g>39</g>
34<!-- start2&#45;&gt;start3 -->40<!-- start2&#45;&gt;start3 -->
35<g id="edge4" class="edge"><title>start2&#45;&gt;start3</title>41<g id="edge4" class="edge"><title>start2&#45;&gt;start3</title>
36<path fill="none" stroke="black" d="M327.651,-294C365.959,-294 422.903,-294 464.145,-294"/>42<path fill="none" stroke="black" d="M327.729,-395C370.886,-395 438.364,-395 484.973,-395"/>
37<polygon fill="black" stroke="black" points="464.271,-297.5 474.271,-294 464.271,-290.5 464.271,-297.5"/>43<polygon fill="black" stroke="black" points="485.171,-398.5 495.171,-395 485.171,-391.5 485.171,-398.5"/>
38<text text-anchor="middle" x="401" y="-299.4" font-family="Times Roman,serif" font-size="14.00">Read CONNECT</text>44<text text-anchor="middle" x="401" y="-400.4" font-family="Times Roman,serif" font-size="14.00">Read CONNECT</text>
39</g>45</g>
40<!-- loop -->46<!-- loop -->
41<g id="node7" class="node"><title>loop</title>47<g id="node8" class="node"><title>loop</title>
42<ellipse fill="none" stroke="black" cx="740" cy="-294" rx="31.8198" ry="31.8198"/>48<ellipse fill="none" stroke="black" cx="790" cy="-395" rx="31.8198" ry="31.8198"/>
43<text text-anchor="middle" x="740" y="-290.4" font-family="Times Roman,serif" font-size="14.00">loop</text>49<text text-anchor="middle" x="790" y="-391.4" font-family="Times Roman,serif" font-size="14.00">loop</text>
44</g>50</g>
45<!-- start3&#45;&gt;loop -->51<!-- start3&#45;&gt;loop -->
46<g id="edge6" class="edge"><title>start3&#45;&gt;loop</title>52<g id="edge6" class="edge"><title>start3&#45;&gt;loop</title>
47<path fill="none" stroke="black" d="M557.608,-294C597.53,-294 657.517,-294 697.677,-294"/>53<path fill="none" stroke="black" d="M578.778,-395C625.49,-395 700.728,-395 747.665,-395"/>
48<polygon fill="black" stroke="black" points="697.687,-297.5 707.687,-294 697.687,-290.5 697.687,-297.5"/>54<polygon fill="black" stroke="black" points="747.805,-398.5 757.805,-395 747.805,-391.5 747.805,-398.5"/>
49<text text-anchor="middle" x="633" y="-299.4" font-family="Times Roman,serif" font-size="14.00">Write CONNACK</text>55<text text-anchor="middle" x="675" y="-400.4" font-family="Times Roman,serif" font-size="14.00">Write CONNACK</text>
50</g>56</g>
51<!-- ping -->57<!-- ping -->
52<g id="node9" class="node"><title>ping</title>58<g id="node10" class="node"><title>ping</title>
53<ellipse fill="none" stroke="black" cx="1063" cy="-416" rx="32.0265" ry="32.5269"/>59<ellipse fill="none" stroke="black" cx="1135" cy="-593" rx="32.0265" ry="32.5269"/>
54<text text-anchor="middle" x="1063" y="-412.4" font-family="Times Roman,serif" font-size="14.00">ping</text>60<text text-anchor="middle" x="1135" y="-589.4" font-family="Times Roman,serif" font-size="14.00">ping</text>
55</g>61</g>
56<!-- loop&#45;&gt;ping -->62<!-- loop&#45;&gt;ping -->
57<g id="edge8" class="edge"><title>loop&#45;&gt;ping</title>63<g id="edge8" class="edge"><title>loop&#45;&gt;ping</title>
58<path fill="none" stroke="black" d="M754.564,-322.853C763.046,-336.78 775.035,-352.491 790,-362 861.597,-407.491 963.396,-415.983 1020.29,-416.829"/>64<path fill="none" stroke="black" d="M800.39,-425.317C809.609,-448.006 825.187,-478.237 848,-497 920.691,-556.785 1032.18,-579.907 1092.58,-588.403"/>
59<polygon fill="black" stroke="black" points="1020.35,-420.33 1030.38,-416.906 1020.4,-413.33 1020.35,-420.33"/>65<polygon fill="black" stroke="black" points="1092.15,-591.877 1102.53,-589.734 1093.08,-584.939 1092.15,-591.877"/>
60<text text-anchor="middle" x="881" y="-418.4" font-family="Times Roman,serif" font-size="14.00">Elapsed ping interval</text>66<text text-anchor="middle" x="946" y="-583.4" font-family="Times Roman,serif" font-size="14.00">Elapsed ping interval</text>
61</g>67</g>
62<!-- broadcast -->68<!-- broadcast -->
63<g id="node11" class="node"><title>broadcast</title>69<g id="node12" class="node"><title>broadcast</title>
64<ellipse fill="none" stroke="black" cx="1063" cy="-200" rx="58.1882" ry="58.6899"/>70<ellipse fill="none" stroke="black" cx="1135" cy="-281" rx="58.1882" ry="58.6899"/>
65<text text-anchor="middle" x="1063" y="-196.4" font-family="Times Roman,serif" font-size="14.00">broadcast</text>71<text text-anchor="middle" x="1135" y="-277.4" font-family="Times Roman,serif" font-size="14.00">broadcast</text>
66</g>72</g>
67<!-- loop&#45;&gt;broadcast -->73<!-- loop&#45;&gt;broadcast -->
68<g id="edge10" class="edge"><title>loop&#45;&gt;broadcast</title>74<g id="edge10" class="edge"><title>loop&#45;&gt;broadcast</title>
69<path fill="none" stroke="black" d="M766.046,-274.934C773.498,-270.155 781.824,-265.421 790,-262 856.828,-234.035 938.382,-217.617 994.86,-208.779"/>75<path fill="none" stroke="black" d="M811.332,-370.953C821.492,-360.892 834.388,-349.946 848,-343 917.32,-307.628 1006.03,-292.395 1066.35,-285.86"/>
70<polygon fill="black" stroke="black" points="995.396,-212.238 1004.75,-207.269 994.34,-205.318 995.396,-212.238"/>76<polygon fill="black" stroke="black" points="1066.94,-289.319 1076.53,-284.811 1066.22,-282.355 1066.94,-289.319"/>
71<text text-anchor="middle" x="881" y="-267.4" font-family="Times Roman,serif" font-size="14.00">Receive broadcast request</text>77<text text-anchor="middle" x="946" y="-348.4" font-family="Times Roman,serif" font-size="14.00">Receive broadcast request</text>
78</g>
79<!-- conn_broken -->
80<g id="node26" class="node"><title>conn_broken</title>
81<ellipse fill="none" stroke="black" cx="1361" cy="-99" rx="73.0388" ry="73.5391"/>
82<text text-anchor="middle" x="1361" y="-95.4" font-family="Times Roman,serif" font-size="14.00">conn_broken</text>
83</g>
84<!-- loop&#45;&gt;conn_broken -->
85<g id="edge28" class="edge"><title>loop&#45;&gt;conn_broken</title>
86<path fill="none" stroke="black" d="M793.216,-363.054C799.833,-304.219 817.014,-182.243 848,-155 967.196,-50.2026 1167.08,-63.6291 1278.91,-81.8408"/>
87<polygon fill="black" stroke="black" points="1278.34,-85.2954 1288.79,-83.4998 1279.5,-78.392 1278.34,-85.2954"/>
88<text text-anchor="middle" x="946" y="-160.4" font-family="Times Roman,serif" font-size="14.00">Receive connbroken request</text>
89</g>
90<!-- conn_warn -->
91<g id="node28" class="node"><title>conn_warn</title>
92<ellipse fill="none" stroke="black" cx="1135" cy="-477" rx="65.7609" ry="65.7609"/>
93<text text-anchor="middle" x="1135" y="-473.4" font-family="Times Roman,serif" font-size="14.00">conn_warn</text>
94</g>
95<!-- loop&#45;&gt;conn_warn -->
96<g id="edge30" class="edge"><title>loop&#45;&gt;conn_warn</title>
97<path fill="none" stroke="black" d="M814.092,-416.512C823.957,-424.185 835.89,-432.126 848,-437 915.942,-464.343 999.421,-473.523 1058.8,-476.355"/>
98<polygon fill="black" stroke="black" points="1058.71,-479.855 1068.85,-476.786 1059.01,-472.861 1058.71,-479.855"/>
99<text text-anchor="middle" x="946" y="-480.4" font-family="Times Roman,serif" font-size="14.00">Receive connwarn request</text>
72</g>100</g>
73<!-- pong_wait -->101<!-- pong_wait -->
74<g id="node13" class="node"><title>pong_wait</title>102<g id="node14" class="node"><title>pong_wait</title>
75<ellipse fill="none" stroke="black" cx="1526" cy="-406" rx="62.9325" ry="62.9325"/>103<ellipse fill="none" stroke="black" cx="537" cy="-653" rx="62.9325" ry="62.9325"/>
76<text text-anchor="middle" x="1526" y="-402.4" font-family="Times Roman,serif" font-size="14.00">pong_wait</text>104<text text-anchor="middle" x="537" y="-649.4" font-family="Times Roman,serif" font-size="14.00">pong_wait</text>
77</g>105</g>
78<!-- ping&#45;&gt;pong_wait -->106<!-- ping&#45;&gt;pong_wait -->
79<g id="edge12" class="edge"><title>ping&#45;&gt;pong_wait</title>107<g id="edge12" class="edge"><title>ping&#45;&gt;pong_wait</title>
80<path fill="none" stroke="black" d="M1095.56,-415.297C1169.19,-413.707 1350.04,-409.8 1452.36,-407.591"/>108<path fill="none" stroke="black" d="M1103.4,-600.831C1035.81,-617.134 871.913,-654.261 732,-667 681.542,-671.594 668.481,-671.33 618,-667 615.134,-666.754 612.217,-666.46 609.275,-666.127"/>
81<polygon fill="black" stroke="black" points="1452.69,-411.084 1462.61,-407.369 1452.54,-404.086 1452.69,-411.084"/>109<polygon fill="black" stroke="black" points="609.573,-662.637 599.214,-664.858 608.697,-669.582 609.573,-662.637"/>
82<text text-anchor="middle" x="1289" y="-418.4" font-family="Times Roman,serif" font-size="14.00">Write PING</text>110<text text-anchor="middle" x="790" y="-670.4" font-family="Times Roman,serif" font-size="14.00">Write PING</text>
83</g>111</g>
84<!-- ack_wait -->112<!-- ack_wait -->
85<g id="node15" class="node"><title>ack_wait</title>113<g id="node16" class="node"><title>ack_wait</title>
86<ellipse fill="none" stroke="black" cx="1526" cy="-269" rx="55.1543" ry="55.1543"/>114<ellipse fill="none" stroke="black" cx="1598" cy="-373" rx="55.1543" ry="55.1543"/>
87<text text-anchor="middle" x="1526" y="-265.4" font-family="Times Roman,serif" font-size="14.00">ack_wait</text>115<text text-anchor="middle" x="1598" y="-369.4" font-family="Times Roman,serif" font-size="14.00">ack_wait</text>
88</g>116</g>
89<!-- broadcast&#45;&gt;ack_wait -->117<!-- broadcast&#45;&gt;ack_wait -->
90<g id="edge14" class="edge"><title>broadcast&#45;&gt;ack_wait</title>118<g id="edge14" class="edge"><title>broadcast&#45;&gt;ack_wait</title>
91<path fill="none" stroke="black" d="M1121.17,-208.669C1207.93,-221.599 1370.7,-245.856 1461.17,-259.339"/>119<path fill="none" stroke="black" d="M1193.25,-288.88C1264.95,-299.067 1390.23,-318.45 1496,-343 1508.9,-345.993 1522.57,-349.664 1535.58,-353.397"/>
92<polygon fill="black" stroke="black" points="1460.9,-262.837 1471.3,-260.849 1461.93,-255.913 1460.9,-262.837"/>120<polygon fill="black" stroke="black" points="1534.79,-356.813 1545.37,-356.254 1536.75,-350.093 1534.79,-356.813"/>
93<text text-anchor="middle" x="1289" y="-257.4" font-family="Times Roman,serif" font-size="14.00">Write BROADCAST [fits one wire msg]</text>121<text text-anchor="middle" x="1361" y="-348.4" font-family="Times Roman,serif" font-size="14.00">Write BROADCAST [fits one wire msg]</text>
94</g>122</g>
95<!-- split_broadcast -->123<!-- split_broadcast -->
96<g id="node17" class="node"><title>split_broadcast</title>124<g id="node18" class="node"><title>split_broadcast</title>
97<ellipse fill="none" stroke="black" cx="1526" cy="-110" rx="84.1457" ry="84.1457"/>125<ellipse fill="none" stroke="black" cx="1598" cy="-216" rx="84.1457" ry="84.1457"/>
98<text text-anchor="middle" x="1526" y="-106.4" font-family="Times Roman,serif" font-size="14.00">split_broadcast</text>126<text text-anchor="middle" x="1598" y="-212.4" font-family="Times Roman,serif" font-size="14.00">split_broadcast</text>
99</g>127</g>
100<!-- broadcast&#45;&gt;split_broadcast -->128<!-- broadcast&#45;&gt;split_broadcast -->
101<g id="edge16" class="edge"><title>broadcast&#45;&gt;split_broadcast</title>129<g id="edge16" class="edge"><title>broadcast&#45;&gt;split_broadcast</title>
102<path fill="none" stroke="black" d="M1120.7,-188.783C1199.06,-173.553 1340.01,-146.154 1433.29,-128.021"/>130<path fill="none" stroke="black" d="M1193.17,-272.834C1271.44,-261.846 1411.56,-242.174 1504.66,-229.104"/>
103<polygon fill="black" stroke="black" points="1434.15,-131.421 1443.29,-126.077 1432.81,-124.549 1434.15,-131.421"/>131<polygon fill="black" stroke="black" points="1505.23,-232.558 1514.65,-227.702 1504.26,-225.626 1505.23,-232.558"/>
104<text text-anchor="middle" x="1289" y="-185.4" font-family="Times Roman,serif" font-size="14.00">BROADCAST does not fit one wire msg</text>132<text text-anchor="middle" x="1361" y="-272.4" font-family="Times Roman,serif" font-size="14.00">BROADCAST does not fit one wire msg</text>
133</g>
134<!-- pong_wait&#45;&gt;stop -->
135<g id="edge40" class="edge"><title>pong_wait&#45;&gt;stop</title>
136<path fill="none" stroke="black" d="M600.164,-653C651.344,-653 725.322,-653 790,-653 790,-653 790,-653 1972,-653 2131.11,-653 2245.53,-463.168 2289.33,-376.844"/>
137<polygon fill="black" stroke="black" points="2292.5,-378.343 2293.84,-367.834 2286.24,-375.212 2292.5,-378.343"/>
138<text text-anchor="middle" x="1361" y="-658.4" font-family="Times Roman,serif" font-size="14.00">Elapsed exhange timeout</text>
105</g>139</g>
106<!-- pong_wait&#45;&gt;loop -->140<!-- pong_wait&#45;&gt;loop -->
107<g id="edge18" class="edge"><title>pong_wait&#45;&gt;loop</title>141<g id="edge18" class="edge"><title>pong_wait&#45;&gt;loop</title>
108<path fill="none" stroke="black" d="M1463.29,-398.59C1336,-383.27 1038.34,-346.004 790,-304 787.177,-303.523 784.269,-303.006 781.343,-302.468"/>142<path fill="none" stroke="black" d="M581.359,-607.765C632.774,-555.333 716.085,-470.376 760.273,-425.314"/>
109<polygon fill="black" stroke="black" points="781.898,-299.011 771.42,-300.582 780.59,-305.888 781.898,-299.011"/>143<polygon fill="black" stroke="black" points="762.774,-427.763 767.277,-418.172 757.776,-422.862 762.774,-427.763"/>
110<text text-anchor="middle" x="1063" y="-359.4" font-family="Times Roman,serif" font-size="14.00">Read PONG</text>144<text text-anchor="middle" x="675" y="-574.4" font-family="Times Roman,serif" font-size="14.00">Read PONG</text>
145</g>
146<!-- ack_wait&#45;&gt;stop -->
147<g id="edge36" class="edge"><title>ack_wait&#45;&gt;stop</title>
148<path fill="none" stroke="black" d="M1653.02,-372.757C1765.96,-371.776 2032.03,-366.986 2254,-344 2256.89,-343.701 2259.85,-343.348 2262.84,-342.959"/>
149<polygon fill="black" stroke="black" points="2263.58,-346.389 2272.99,-341.517 2262.59,-339.459 2263.58,-346.389"/>
150<text text-anchor="middle" x="1972" y="-373.4" font-family="Times Roman,serif" font-size="14.00">Elapsed exhange timeout</text>
111</g>151</g>
112<!-- ack_wait&#45;&gt;loop -->152<!-- ack_wait&#45;&gt;loop -->
113<g id="edge20" class="edge"><title>ack_wait&#45;&gt;loop</title>153<g id="edge20" class="edge"><title>ack_wait&#45;&gt;loop</title>
114<path fill="none" stroke="black" d="M1470.96,-271.898C1455.75,-272.644 1439.24,-273.404 1424,-274 1181.02,-283.507 889.323,-290.597 782.144,-293.057"/>154<path fill="none" stroke="black" d="M1542.83,-374.081C1445.66,-375.995 1237.64,-380.146 1062,-384 966.886,-386.087 942.947,-382.989 848,-389 842.829,-389.327 837.406,-389.764 832.04,-390.252"/>
115<polygon fill="black" stroke="black" points="781.977,-289.56 772.059,-293.288 782.136,-296.558 781.977,-289.56"/>155<polygon fill="black" stroke="black" points="831.485,-386.79 821.871,-391.242 832.163,-393.757 831.485,-386.79"/>
116<text text-anchor="middle" x="1063" y="-292.4" font-family="Times Roman,serif" font-size="14.00">Read ACK</text>156<text text-anchor="middle" x="1135" y="-389.4" font-family="Times Roman,serif" font-size="14.00">Read ACK</text>
117</g>157</g>
118<!-- split_broadcast&#45;&gt;loop -->158<!-- split_broadcast&#45;&gt;loop -->
119<g id="edge26" class="edge"><title>split_broadcast&#45;&gt;loop</title>159<g id="edge26" class="edge"><title>split_broadcast&#45;&gt;loop</title>
120<path fill="none" stroke="black" d="M1442.56,-99.9981C1336.06,-89.6718 1146.8,-79.5577 990,-115 894.383,-136.612 862.41,-141.921 790,-208 775.817,-220.943 764.522,-238.865 756.283,-254.999"/>160<path fill="none" stroke="black" d="M1515.33,-201.109C1409.22,-184.681 1219.98,-164.458 1062,-196 960.985,-216.168 924.079,-215.554 848,-285 827.351,-303.849 812.751,-331.776 803.364,-354.774"/>
121<polygon fill="black" stroke="black" points="753.014,-253.718 751.785,-264.241 759.308,-256.781 753.014,-253.718"/>161<polygon fill="black" stroke="black" points="800.027,-353.699 799.658,-364.287 806.549,-356.24 800.027,-353.699"/>
122<text text-anchor="middle" x="1063" y="-120.4" font-family="Times Roman,serif" font-size="14.00">All split msgs written</text>162<text text-anchor="middle" x="1135" y="-201.4" font-family="Times Roman,serif" font-size="14.00">All split msgs written</text>
123</g>163</g>
124<!-- split_ack_wait -->164<!-- split_ack_wait -->
125<g id="node21" class="node"><title>split_ack_wait</title>165<g id="node22" class="node"><title>split_ack_wait</title>
126<ellipse fill="none" stroke="black" cx="1893" cy="-110" rx="80.1095" ry="80.6102"/>166<ellipse fill="none" stroke="black" cx="1972" cy="-257" rx="80.1095" ry="80.6102"/>
127<text text-anchor="middle" x="1893" y="-106.4" font-family="Times Roman,serif" font-size="14.00">split_ack_wait</text>167<text text-anchor="middle" x="1972" y="-253.4" font-family="Times Roman,serif" font-size="14.00">split_ack_wait</text>
128</g>168</g>
129<!-- split_broadcast&#45;&gt;split_ack_wait -->169<!-- split_broadcast&#45;&gt;split_ack_wait -->
130<g id="edge22" class="edge"><title>split_broadcast&#45;&gt;split_ack_wait</title>170<g id="edge22" class="edge"><title>split_broadcast&#45;&gt;split_ack_wait</title>
131<path fill="none" stroke="black" d="M1610.2,-110C1667.61,-110 1743.59,-110 1802.33,-110"/>171<path fill="none" stroke="black" d="M1680.55,-232.126C1687.12,-233.18 1693.66,-234.156 1700,-235 1760.22,-243.022 1828.36,-248.528 1881.38,-252.025"/>
132<polygon fill="black" stroke="black" points="1802.35,-113.5 1812.35,-110 1802.34,-106.5 1802.35,-113.5"/>172<polygon fill="black" stroke="black" points="1881.23,-255.523 1891.43,-252.676 1881.68,-248.537 1881.23,-255.523"/>
133<text text-anchor="middle" x="1711" y="-115.4" font-family="Times Roman,serif" font-size="14.00">Write split BROADCAST</text>173<text text-anchor="middle" x="1783" y="-256.4" font-family="Times Roman,serif" font-size="14.00">Write split BROADCAST</text>
174</g>
175<!-- split_ack_wait&#45;&gt;stop -->
176<g id="edge38" class="edge"><title>split_ack_wait&#45;&gt;stop</title>
177<path fill="none" stroke="black" d="M2050.59,-275.189C2116.55,-290.456 2208.51,-311.74 2263.08,-324.372"/>
178<polygon fill="black" stroke="black" points="2262.62,-327.857 2273.15,-326.702 2264.2,-321.037 2262.62,-327.857"/>
179<text text-anchor="middle" x="2166" y="-327.4" font-family="Times Roman,serif" font-size="14.00">Elapsed exhange timeout</text>
134</g>180</g>
135<!-- split_ack_wait&#45;&gt;split_broadcast -->181<!-- split_ack_wait&#45;&gt;split_broadcast -->
136<g id="edge24" class="edge"><title>split_ack_wait&#45;&gt;split_broadcast</title>182<g id="edge24" class="edge"><title>split_ack_wait&#45;&gt;split_broadcast</title>
137<path fill="none" stroke="black" d="M1814.64,-90.9448C1807.69,-89.7544 1800.74,-88.7397 1794,-88 1720.66,-79.9496 1701.36,-80.1783 1628,-88 1624.71,-88.3505 1621.38,-88.7628 1618.01,-89.2264"/>183<path fill="none" stroke="black" d="M1899.33,-222.493C1888.37,-218.573 1877.04,-215.2 1866,-213 1808.89,-201.617 1743.56,-202.081 1691.71,-205.529"/>
138<polygon fill="black" stroke="black" points="1617.23,-85.8043 1607.87,-90.7602 1618.28,-92.7256 1617.23,-85.8043"/>184<polygon fill="black" stroke="black" points="1691.25,-202.053 1681.52,-206.256 1691.74,-209.035 1691.25,-202.053"/>
139<text text-anchor="middle" x="1711" y="-93.4" font-family="Times Roman,serif" font-size="14.00">Read ACK</text>185<text text-anchor="middle" x="1783" y="-218.4" font-family="Times Roman,serif" font-size="14.00">Read ACK</text>
186</g>
187<!-- conn_broken&#45;&gt;stop -->
188<g id="edge32" class="edge"><title>conn_broken&#45;&gt;stop</title>
189<path fill="none" stroke="black" d="M1434.61,-95.8477C1564.45,-92.5456 1841.16,-95.6985 2060,-168 2154.32,-199.163 2175.34,-218.326 2254,-279 2262.17,-285.305 2270.33,-292.782 2277.75,-300.185"/>
190<polygon fill="black" stroke="black" points="2275.42,-302.808 2284.91,-307.517 2280.43,-297.917 2275.42,-302.808"/>
191<text text-anchor="middle" x="1783" y="-127.4" font-family="Times Roman,serif" font-size="14.00">Write CONNBROKEN</text>
192</g>
193<!-- conn_warn&#45;&gt;loop -->
194<g id="edge34" class="edge"><title>conn_warn&#45;&gt;loop</title>
195<path fill="none" stroke="black" d="M1083.63,-435.301C1071.29,-427.246 1057.71,-419.822 1044,-415 972.758,-389.933 883.562,-389.406 832.05,-391.836"/>
196<polygon fill="black" stroke="black" points="831.758,-388.346 821.959,-392.373 832.131,-395.337 831.758,-388.346"/>
197<text text-anchor="middle" x="946" y="-420.4" font-family="Times Roman,serif" font-size="14.00">Write CONNWARN</text>
140</g>198</g>
141</g>199</g>
142</svg>200</svg>
143201
=== modified file 'server/acceptance/acceptance_test.go'
--- server/acceptance/acceptance_test.go 2014-04-07 19:39:19 +0000
+++ server/acceptance/acceptance_test.go 2014-05-12 14:11:46 +0000
@@ -59,3 +59,6 @@
5959
60// broadcast60// broadcast
61var _ = Suite(&suites.BroadcastAcceptanceSuite{suites.AcceptanceSuite{StartServer: StartServer}})61var _ = Suite(&suites.BroadcastAcceptanceSuite{suites.AcceptanceSuite{StartServer: StartServer}})
62
63// unicast
64var _ = Suite(&suites.UnicastAcceptanceSuite{suites.AcceptanceSuite{StartServer: StartServer}, nil})
6265
=== modified file 'server/acceptance/acceptanceclient.go'
--- server/acceptance/acceptanceclient.go 2014-04-09 19:30:53 +0000
+++ server/acceptance/acceptanceclient.go 2014-05-12 14:11:46 +0000
@@ -24,6 +24,7 @@
24 "errors"24 "errors"
25 "fmt"25 "fmt"
26 "net"26 "net"
27 "strings"
27 "time"28 "time"
2829
29 "launchpad.net/ubuntu-push/protocol"30 "launchpad.net/ubuntu-push/protocol"
@@ -44,6 +45,7 @@
44 Levels map[string]int6445 Levels map[string]int64
45 Insecure bool // don't verify certs46 Insecure bool // don't verify certs
46 Prefix string // prefix for events47 Prefix string // prefix for events
48 Auth string
47 // connection49 // connection
48 Connection net.Conn50 Connection net.Conn
49}51}
@@ -73,6 +75,7 @@
73 Type string `json:"T"`75 Type string `json:"T"`
74 protocol.BroadcastMsg76 protocol.BroadcastMsg
75 protocol.NotificationsMsg77 protocol.NotificationsMsg
78 protocol.ConnWarnMsg
76}79}
7780
78// Run the session with the server, emits a stream of events.81// Run the session with the server, emits a stream of events.
@@ -93,6 +96,7 @@
93 "device": sess.Model,96 "device": sess.Model,
94 "channel": sess.ImageChannel,97 "channel": sess.ImageChannel,
95 },98 },
99 Authorization: sess.Auth,
96 })100 })
97 if err != nil {101 if err != nil {
98 return err102 return err
@@ -125,9 +129,24 @@
125 if sess.ReportPings {129 if sess.ReportPings {
126 events <- sess.Prefix + "ping"130 events <- sess.Prefix + "ping"
127 }131 }
132 case "notifications":
133 conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
134 err := proto.WriteMessage(protocol.AckMsg{Type: "ack"})
135 if err != nil {
136 return err
137 }
138 parts := make([]string, len(recv.Notifications))
139 for i, notif := range recv.Notifications {
140 pack, err := json.Marshal(&notif.Payload)
141 if err != nil {
142 return err
143 }
144 parts[i] = fmt.Sprintf("app:%v payload:%s;", notif.AppId, pack)
145 }
146 events <- fmt.Sprintf("%sunicast %s", sess.Prefix, strings.Join(parts, " "))
128 case "broadcast":147 case "broadcast":
129 conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))148 conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
130 err := proto.WriteMessage(protocol.PingPongMsg{Type: "ack"})149 err := proto.WriteMessage(protocol.AckMsg{Type: "ack"})
131 if err != nil {150 if err != nil {
132 return err151 return err
133 }152 }
@@ -136,6 +155,8 @@
136 return err155 return err
137 }156 }
138 events <- fmt.Sprintf("%sbroadcast chan:%v app:%v topLevel:%d payloads:%s", sess.Prefix, recv.ChanId, recv.AppId, recv.TopLevel, pack)157 events <- fmt.Sprintf("%sbroadcast chan:%v app:%v topLevel:%d payloads:%s", sess.Prefix, recv.ChanId, recv.AppId, recv.TopLevel, pack)
158 case "warn":
159 events <- fmt.Sprintf("%swarn %s", sess.Prefix, recv.Reason)
139 }160 }
140 }161 }
141 return nil162 return nil
142163
=== modified file 'server/acceptance/suites/broadcast.go'
--- server/acceptance/suites/broadcast.go 2014-04-07 19:39:19 +0000
+++ server/acceptance/suites/broadcast.go 2014-05-12 14:11:46 +0000
@@ -29,14 +29,11 @@
29 "launchpad.net/ubuntu-push/server/api"29 "launchpad.net/ubuntu-push/server/api"
30)30)
3131
32// BroadCastAcceptanceSuite has tests about broadcast.32// BroadcastAcceptanceSuite has tests about broadcast.
33type BroadcastAcceptanceSuite struct {33type BroadcastAcceptanceSuite struct {
34 AcceptanceSuite34 AcceptanceSuite
35}35}
3636
37// Long after the end of the tests.
38var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)
39
40func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) {37func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) {
41 events, errCh, stop := s.StartClient(c, "DEVB", nil)38 events, errCh, stop := s.StartClient(c, "DEVB", nil)
42 got, err := s.PostRequest("/broadcast", &api.Broadcast{39 got, err := s.PostRequest("/broadcast", &api.Broadcast{
@@ -265,7 +262,11 @@
265262
266func (s *BroadcastAcceptanceSuite) TestGetHosts(c *C) {263func (s *BroadcastAcceptanceSuite) TestGetHosts(c *C) {
267 gh := gethosts.New("", s.ServerAPIURL+"/delivery-hosts", 2*time.Second)264 gh := gethosts.New("", s.ServerAPIURL+"/delivery-hosts", 2*time.Second)
268 hosts, err := gh.Get()265 host, err := gh.Get()
269 c.Assert(err, IsNil)266 c.Assert(err, IsNil)
270 c.Check(hosts, DeepEquals, []string{s.ServerAddr})267 expected := &gethosts.Host{
268 Domain: "localhost",
269 Hosts: []string{s.ServerAddr},
270 }
271 c.Check(host, DeepEquals, expected)
271}272}
272273
=== modified file 'server/acceptance/suites/suite.go'
--- server/acceptance/suites/suite.go 2014-04-03 16:47:47 +0000
+++ server/acceptance/suites/suite.go 2014-05-12 14:11:46 +0000
@@ -44,10 +44,19 @@
4444
45// Start a client.45// Start a client.
46func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {46func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
47 return h.StartClientAuth(c, devId, levels, "")
48}
49
50// Start a client with auth.
51func (h *ServerHandle) StartClientAuth(c *C, devId string, levels map[string]int64, auth string) (events <-chan string, errorCh <-chan error, stop func()) {
47 errCh := make(chan error, 1)52 errCh := make(chan error, 1)
48 cliEvents := make(chan string, 10)53 cliEvents := make(chan string, 10)
49 sess := testClientSession(h.ServerAddr, devId, "m1", "img1", false)54 sess := testClientSession(h.ServerAddr, devId, "m1", "img1", false)
50 sess.Levels = levels55 sess.Levels = levels
56 sess.Auth = auth
57 if auth != "" {
58 sess.ExchangeTimeout = 5 * time.Second
59 }
51 err := sess.Dial()60 err := sess.Dial()
52 c.Assert(err, IsNil)61 c.Assert(err, IsNil)
53 clientShutdown := make(chan bool, 1) // abused as an atomic flag62 clientShutdown := make(chan bool, 1) // abused as an atomic flag
@@ -186,3 +195,6 @@
186 }195 }
187 return196 return
188}197}
198
199// Long after the end of the tests.
200var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)
189201
=== added file 'server/acceptance/suites/unicast.go'
--- server/acceptance/suites/unicast.go 1970-01-01 00:00:00 +0000
+++ server/acceptance/suites/unicast.go 2014-05-12 14:11:46 +0000
@@ -0,0 +1,96 @@
1/*
2 Copyright 2013-2014 Canonical Ltd.
3
4 This program is free software: you can redistribute it and/or modify it
5 under the terms of the GNU General Public License version 3, as published
6 by the Free Software Foundation.
7
8 This program is distributed in the hope that it will be useful, but
9 WITHOUT ANY WARRANTY; without even the implied warranties of
10 MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along
14 with this program. If not, see <http://www.gnu.org/licenses/>.
15*/
16
17package suites
18
19import (
20 "encoding/json"
21 //"fmt"
22 //"strings"
23 //"time"
24
25 . "launchpad.net/gocheck"
26
27 //"launchpad.net/ubuntu-push/protocol"
28 "launchpad.net/ubuntu-push/server/api"
29)
30
31// UnicastAcceptanceSuite has tests about unicast.
32type UnicastAcceptanceSuite struct {
33 AcceptanceSuite
34 AssociatedAuth func(string) (string, string)
35}
36
37func (s *UnicastAcceptanceSuite) associatedAuth(deviceId string) (userId string, auth string) {
38 if s.AssociatedAuth != nil {
39 return s.AssociatedAuth(deviceId)
40 }
41 return deviceId, ""
42}
43
44func (s *UnicastAcceptanceSuite) TestUnicastToConnected(c *C) {
45 userId, auth := s.associatedAuth("DEV1")
46 events, errCh, stop := s.StartClientAuth(c, "DEV1", nil, auth)
47 got, err := s.PostRequest("/notify", &api.Unicast{
48 UserId: userId,
49 DeviceId: "DEV1",
50 AppId: "app1",
51 ExpireOn: future,
52 Data: json.RawMessage(`{"a": 42}`),
53 })
54 c.Assert(err, IsNil)
55 c.Assert(got, Matches, ".*ok.*")
56 c.Check(NextEvent(events, errCh), Equals, `unicast app:app1 payload:{"a":42};`)
57 stop()
58 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
59 c.Check(len(errCh), Equals, 0)
60}
61
62func (s *UnicastAcceptanceSuite) TestUnicastCorrectDistribution(c *C) {
63 userId1, auth1 := s.associatedAuth("DEV1")
64 userId2, auth2 := s.associatedAuth("DEV2")
65 // start 1st client
66 events1, errCh1, stop1 := s.StartClientAuth(c, "DEV1", nil, auth1)
67 // start 2nd client
68 events2, errCh2, stop2 := s.StartClientAuth(c, "DEV2", nil, auth2)
69 // unicast to one and the other
70 got, err := s.PostRequest("/notify", &api.Unicast{
71 UserId: userId1,
72 DeviceId: "DEV1",
73 AppId: "app1",
74 ExpireOn: future,
75 Data: json.RawMessage(`{"to": 1}`),
76 })
77 c.Assert(err, IsNil)
78 c.Assert(got, Matches, ".*ok.*")
79 got, err = s.PostRequest("/notify", &api.Unicast{
80 UserId: userId2,
81 DeviceId: "DEV2",
82 AppId: "app1",
83 ExpireOn: future,
84 Data: json.RawMessage(`{"to": 2}`),
85 })
86 c.Assert(err, IsNil)
87 c.Assert(got, Matches, ".*ok.*")
88 c.Check(NextEvent(events1, errCh1), Equals, `unicast app:app1 payload:{"to":1};`)
89 c.Check(NextEvent(events2, errCh2), Equals, `unicast app:app1 payload:{"to":2};`)
90 stop1()
91 stop2()
92 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
93 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
94 c.Check(len(errCh1), Equals, 0)
95 c.Check(len(errCh2), Equals, 0)
96}
097
=== modified file 'server/api/handlers.go'
--- server/api/handlers.go 2014-02-20 17:09:03 +0000
+++ server/api/handlers.go 2014-05-12 14:11:46 +0000
@@ -19,12 +19,15 @@
19package api19package api
2020
21import (21import (
22 "encoding/base64"
22 "encoding/json"23 "encoding/json"
23 "fmt"24 "fmt"
24 "io"25 "io"
25 "net/http"26 "net/http"
26 "time"27 "time"
2728
29 "launchpad.net/~ubuntu-push-hackers/ubuntu-push/go-uuid/uuid"
30
28 "launchpad.net/ubuntu-push/logger"31 "launchpad.net/ubuntu-push/logger"
29 "launchpad.net/ubuntu-push/server/broker"32 "launchpad.net/ubuntu-push/server/broker"
30 "launchpad.net/ubuntu-push/server/store"33 "launchpad.net/ubuntu-push/server/store"
@@ -93,6 +96,11 @@
93 ioError,96 ioError,
94 "Could not read request body",97 "Could not read request body",
95 }98 }
99 ErrMissingIdField = &APIError{
100 http.StatusBadRequest,
101 invalidRequest,
102 "Missing id field",
103 }
96 ErrMissingData = &APIError{104 ErrMissingData = &APIError{
97 http.StatusBadRequest,105 http.StatusBadRequest,
98 invalidRequest,106 invalidRequest,
@@ -130,10 +138,17 @@
130 }138 }
131)139)
132140
133type Message struct {141type castCommon struct {
134 Registration string `json:"registration"`142}
135 CoalesceTag string `json:"coalesce_tag"`143
136 Data json.RawMessage `json:"data"`144type Unicast struct {
145 UserId string `json:"userid"`
146 DeviceId string `json:"deviceid"`
147 AppId string `json:"appid"`
148 //Registration string `json:"registration"`
149 //CoalesceTag string `json:"coalesce_tag"`
150 ExpireOn string `json:"expire_on"`
151 Data json.RawMessage `json:"data"`
137}152}
138153
139// Broadcast request JSON object.154// Broadcast request JSON object.
@@ -198,11 +213,11 @@
198213
199var zeroTime = time.Time{}214var zeroTime = time.Time{}
200215
201func checkBroadcast(bcast *Broadcast) (time.Time, *APIError) {216func checkCastCommon(data json.RawMessage, expireOn string) (time.Time, *APIError) {
202 if len(bcast.Data) == 0 {217 if len(data) == 0 {
203 return zeroTime, ErrMissingData218 return zeroTime, ErrMissingData
204 }219 }
205 expire, err := time.Parse(time.RFC3339, bcast.ExpireOn)220 expire, err := time.Parse(time.RFC3339, expireOn)
206 if err != nil {221 if err != nil {
207 return zeroTime, ErrInvalidExpiration222 return zeroTime, ErrInvalidExpiration
208 }223 }
@@ -212,6 +227,10 @@
212 return expire, nil227 return expire, nil
213}228}
214229
230func checkBroadcast(bcast *Broadcast) (time.Time, *APIError) {
231 return checkCastCommon(bcast.Data, bcast.ExpireOn)
232}
233
215type StoreForRequest func(w http.ResponseWriter, request *http.Request) (store.PendingStore, error)234type StoreForRequest func(w http.ResponseWriter, request *http.Request) (store.PendingStore, error)
216235
217// context holds the interfaces to delegate to serving requests236// context holds the interfaces to delegate to serving requests
@@ -234,6 +253,20 @@
234 return sto, nil253 return sto, nil
235}254}
236255
256func (ctx *context) prepare(w http.ResponseWriter, request *http.Request, reqObj interface{}) (store.PendingStore, *APIError) {
257 body, apiErr := ReadBody(request, MaxRequestBodyBytes)
258 if apiErr != nil {
259 return nil, apiErr
260 }
261
262 err := json.Unmarshal(body, reqObj)
263 if err != nil {
264 return nil, ErrMalformedJSONObject
265 }
266
267 return ctx.getStore(w, request)
268}
269
237type BroadcastHandler struct {270type BroadcastHandler struct {
238 *context271 *context
239}272}
@@ -270,23 +303,13 @@
270 }303 }
271 }()304 }()
272305
273 body, apiErr := ReadBody(request, MaxRequestBodyBytes)
274 if apiErr != nil {
275 return
276 }
277
278 sto, apiErr := h.getStore(writer, request)
279 if apiErr != nil {
280 return
281 }
282 defer sto.Close()
283
284 broadcast := &Broadcast{}306 broadcast := &Broadcast{}
285 err := json.Unmarshal(body, broadcast)307
286 if err != nil {308 sto, apiErr := h.prepare(writer, request, broadcast)
287 apiErr = ErrMalformedJSONObject309 if apiErr != nil {
288 return310 return
289 }311 }
312 defer sto.Close()
290313
291 apiErr = h.doBroadcast(sto, broadcast)314 apiErr = h.doBroadcast(sto, broadcast)
292 if apiErr != nil {315 if apiErr != nil {
@@ -297,6 +320,64 @@
297 fmt.Fprintf(writer, `{"ok":true}`)320 fmt.Fprintf(writer, `{"ok":true}`)
298}321}
299322
323type UnicastHandler struct {
324 *context
325}
326
327func checkUnicast(ucast *Unicast) (time.Time, *APIError) {
328 if ucast.UserId == "" || ucast.DeviceId == "" || ucast.AppId == "" {
329 return zeroTime, ErrMissingIdField
330 }
331 return checkCastCommon(ucast.Data, ucast.ExpireOn)
332}
333
334// use a base64 encoded TimeUUID
335var generateMsgId = func() string {
336 return base64.StdEncoding.EncodeToString(uuid.NewUUID())
337}
338
339func (h *UnicastHandler) doUnicast(sto store.PendingStore, ucast *Unicast) *APIError {
340 expire, apiErr := checkUnicast(ucast)
341 if apiErr != nil {
342 return apiErr
343 }
344 chanId := store.UnicastInternalChannelId(ucast.UserId, ucast.DeviceId)
345 msgId := generateMsgId()
346 err := sto.AppendToUnicastChannel(chanId, ucast.AppId, ucast.Data, msgId, expire)
347 if err != nil {
348 h.logger.Errorf("could not store notification: %v", err)
349 return ErrCouldNotStoreNotification
350 }
351
352 h.broker.Unicast(chanId)
353 return nil
354}
355
356func (h *UnicastHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
357 var apiErr *APIError
358 defer func() {
359 if apiErr != nil {
360 RespondError(writer, apiErr)
361 }
362 }()
363
364 unicast := &Unicast{}
365
366 sto, apiErr := h.prepare(writer, request, unicast)
367 if apiErr != nil {
368 return
369 }
370 defer sto.Close()
371
372 apiErr = h.doUnicast(sto, unicast)
373 if apiErr != nil {
374 return
375 }
376
377 writer.Header().Set("Content-Type", "application/json")
378 fmt.Fprintf(writer, `{"ok":true}`)
379}
380
300// MakeHandlersMux makes a handler that dispatches for the various API endpoints.381// MakeHandlersMux makes a handler that dispatches for the various API endpoints.
301func MakeHandlersMux(storeForRequest StoreForRequest, broker broker.BrokerSending, logger logger.Logger) *http.ServeMux {382func MakeHandlersMux(storeForRequest StoreForRequest, broker broker.BrokerSending, logger logger.Logger) *http.ServeMux {
302 ctx := &context{383 ctx := &context{
@@ -306,5 +387,6 @@
306 }387 }
307 mux := http.NewServeMux()388 mux := http.NewServeMux()
308 mux.Handle("/broadcast", &BroadcastHandler{context: ctx})389 mux.Handle("/broadcast", &BroadcastHandler{context: ctx})
390 mux.Handle("/notify", &UnicastHandler{context: ctx})
309 return mux391 return mux
310}392}
311393
=== modified file 'server/api/handlers_test.go'
--- server/api/handlers_test.go 2014-02-21 11:32:38 +0000
+++ server/api/handlers_test.go 2014-05-12 14:11:46 +0000
@@ -18,6 +18,7 @@
1818
19import (19import (
20 "bytes"20 "bytes"
21 "encoding/base64"
21 "encoding/json"22 "encoding/json"
22 "errors"23 "errors"
23 "fmt"24 "fmt"
@@ -30,8 +31,9 @@
3031
31 . "launchpad.net/gocheck"32 . "launchpad.net/gocheck"
3233
34 "launchpad.net/ubuntu-push/protocol"
33 "launchpad.net/ubuntu-push/server/store"35 "launchpad.net/ubuntu-push/server/store"
34 helpers "launchpad.net/ubuntu-push/testing"36 help "launchpad.net/ubuntu-push/testing"
35)37)
3638
37func TestHandlers(t *testing.T) { TestingT(t) }39func TestHandlers(t *testing.T) { TestingT(t) }
@@ -41,14 +43,14 @@
41 json string43 json string
42 client *http.Client44 client *http.Client
43 c *C45 c *C
44 testlog *helpers.TestLogger46 testlog *help.TestLogger
45}47}
4648
47var _ = Suite(&handlersSuite{})49var _ = Suite(&handlersSuite{})
4850
49func (s *handlersSuite) SetUpTest(c *C) {51func (s *handlersSuite) SetUpTest(c *C) {
50 s.client = &http.Client{}52 s.client = &http.Client{}
51 s.testlog = helpers.NewTestLogger(c, "error")53 s.testlog = help.NewTestLogger(c, "error")
52}54}
5355
54func (s *handlersSuite) TestAPIError(c *C) {56func (s *handlersSuite) TestAPIError(c *C) {
@@ -98,7 +100,7 @@
98100
99var future = time.Now().Add(4 * time.Hour).Format(time.RFC3339)101var future = time.Now().Add(4 * time.Hour).Format(time.RFC3339)
100102
101func (s *handlersSuite) TestCheckBroadcast(c *C) {103func (s *handlersSuite) TestCheckCastBroadcastAndCommon(c *C) {
102 payload := json.RawMessage(`{"foo":"bar"}`)104 payload := json.RawMessage(`{"foo":"bar"}`)
103 broadcast := &Broadcast{105 broadcast := &Broadcast{
104 Channel: "system",106 Channel: "system",
@@ -134,19 +136,27 @@
134}136}
135137
136type checkBrokerSending struct {138type checkBrokerSending struct {
137 store store.PendingStore139 store store.PendingStore
138 chanId store.InternalChannelId140 chanId store.InternalChannelId
139 err error141 err error
140 top int64142 top int64
141 payloads []json.RawMessage143 notifications []protocol.Notification
142}144}
143145
144func (cbsend *checkBrokerSending) Broadcast(chanId store.InternalChannelId) {146func (cbsend *checkBrokerSending) Broadcast(chanId store.InternalChannelId) {
145 top, payloads, err := cbsend.store.GetChannelSnapshot(chanId)147 top, notifications, err := cbsend.store.GetChannelSnapshot(chanId)
146 cbsend.err = err148 cbsend.err = err
147 cbsend.chanId = chanId149 cbsend.chanId = chanId
148 cbsend.top = top150 cbsend.top = top
149 cbsend.payloads = payloads151 cbsend.notifications = notifications
152}
153
154func (cbsend *checkBrokerSending) Unicast(chanIds ...store.InternalChannelId) {
155 // for now
156 if len(chanIds) != 1 {
157 panic("not expecting many chan ids for now")
158 }
159 cbsend.Broadcast(chanIds[0])
150}160}
151161
152func (s *handlersSuite) TestDoBroadcast(c *C) {162func (s *handlersSuite) TestDoBroadcast(c *C) {
@@ -163,7 +173,7 @@
163 c.Check(bsend.err, IsNil)173 c.Check(bsend.err, IsNil)
164 c.Check(bsend.chanId, Equals, store.SystemInternalChannelId)174 c.Check(bsend.chanId, Equals, store.SystemInternalChannelId)
165 c.Check(bsend.top, Equals, int64(1))175 c.Check(bsend.top, Equals, int64(1))
166 c.Check(bsend.payloads, DeepEquals, []json.RawMessage{payload})176 c.Check(bsend.notifications, DeepEquals, help.Ns(payload))
167}177}
168178
169func (s *handlersSuite) TestDoBroadcastUnknownChannel(c *C) {179func (s *handlersSuite) TestDoBroadcastUnknownChannel(c *C) {
@@ -192,6 +202,11 @@
192 return isto.intercept("AppendToChannel", err)202 return isto.intercept("AppendToChannel", err)
193}203}
194204
205func (isto *interceptInMemoryPendingStore) AppendToUnicastChannel(chanId store.InternalChannelId, appId string, payload json.RawMessage, msgId string, expiration time.Time) error {
206 err := isto.InMemoryPendingStore.AppendToUnicastChannel(chanId, appId, payload, msgId, expiration)
207 return isto.intercept("AppendToUnicastChannel", err)
208}
209
195func (s *handlersSuite) TestDoBroadcastUnknownError(c *C) {210func (s *handlersSuite) TestDoBroadcastUnknownError(c *C) {
196 sto := &interceptInMemoryPendingStore{211 sto := &interceptInMemoryPendingStore{
197 store.NewInMemoryPendingStore(),212 store.NewInMemoryPendingStore(),
@@ -229,6 +244,115 @@
229 c.Check(s.testlog.Captured(), Equals, "ERROR could not store notification: fail\n")244 c.Check(s.testlog.Captured(), Equals, "ERROR could not store notification: fail\n")
230}245}
231246
247func (s *handlersSuite) TestCheckUnicast(c *C) {
248 payload := json.RawMessage(`{"foo":"bar"}`)
249 unicast := func() *Unicast {
250 return &Unicast{
251 UserId: "user1",
252 DeviceId: "DEV1",
253 AppId: "app1",
254 ExpireOn: future,
255 Data: payload,
256 }
257 }
258 u := unicast()
259 expire, apiErr := checkUnicast(u)
260 c.Assert(apiErr, IsNil)
261 c.Check(expire.Format(time.RFC3339), Equals, future)
262
263 u = unicast()
264 u.UserId = ""
265 expire, apiErr = checkUnicast(u)
266 c.Check(apiErr, Equals, ErrMissingIdField)
267
268 u = unicast()
269 u.AppId = ""
270 expire, apiErr = checkUnicast(u)
271 c.Check(apiErr, Equals, ErrMissingIdField)
272
273 u = unicast()
274 u.DeviceId = ""
275 expire, apiErr = checkUnicast(u)
276 c.Check(apiErr, Equals, ErrMissingIdField)
277
278 u = unicast()
279 u.Data = json.RawMessage(nil)
280 expire, apiErr = checkUnicast(u)
281 c.Check(apiErr, Equals, ErrMissingData)
282}
283
284func (s *handlersSuite) TestGenerateMsgId(c *C) {
285 msgId := generateMsgId()
286 decoded, err := base64.StdEncoding.DecodeString(msgId)
287 c.Assert(err, IsNil)
288 c.Check(decoded, HasLen, 16)
289}
290
291func (s *handlersSuite) TestDoUnicast(c *C) {
292 prevGenMsgId := generateMsgId
293 defer func() {
294 generateMsgId = prevGenMsgId
295 }()
296 generateMsgId = func() string {
297 return "MSG-ID"
298 }
299 sto := store.NewInMemoryPendingStore()
300 bsend := &checkBrokerSending{store: sto}
301 bh := &UnicastHandler{&context{nil, bsend, nil}}
302 payload := json.RawMessage(`{"a": 1}`)
303 apiErr := bh.doUnicast(sto, &Unicast{
304 UserId: "user1",
305 DeviceId: "DEV1",
306 AppId: "app1",
307 ExpireOn: future,
308 Data: payload,
309 })
310 c.Check(apiErr, IsNil)
311 c.Check(bsend.err, IsNil)
312 c.Check(bsend.chanId, Equals, store.UnicastInternalChannelId("user1", "DEV1"))
313 c.Check(bsend.top, Equals, int64(0))
314 c.Check(bsend.notifications, DeepEquals, []protocol.Notification{
315 protocol.Notification{
316 AppId: "app1",
317 MsgId: "MSG-ID",
318 Payload: payload,
319 },
320 })
321}
322
323func (s *handlersSuite) TestDoUnicastMissingIdField(c *C) {
324 sto := store.NewInMemoryPendingStore()
325 bh := &UnicastHandler{}
326 apiErr := bh.doUnicast(sto, &Unicast{
327 ExpireOn: future,
328 Data: json.RawMessage(`{"a": 1}`),
329 })
330 c.Check(apiErr, Equals, ErrMissingIdField)
331}
332
333func (s *handlersSuite) TestDoUnicastCouldNotStoreNotification(c *C) {
334 sto := &interceptInMemoryPendingStore{
335 store.NewInMemoryPendingStore(),
336 func(meth string, err error) error {
337 if meth == "AppendToUnicastChannel" {
338 return errors.New("fail")
339 }
340 return err
341 },
342 }
343 ctx := &context{logger: s.testlog}
344 bh := &UnicastHandler{ctx}
345 apiErr := bh.doUnicast(sto, &Unicast{
346 UserId: "user1",
347 DeviceId: "DEV1",
348 AppId: "app1",
349 ExpireOn: future,
350 Data: json.RawMessage(`{"a": 1}`),
351 })
352 c.Check(apiErr, Equals, ErrCouldNotStoreNotification)
353 c.Check(s.testlog.Captured(), Equals, "ERROR could not store notification: fail\n")
354}
355
232func newPostRequest(path string, message interface{}, server *httptest.Server) *http.Request {356func newPostRequest(path string, message interface{}, server *httptest.Server) *http.Request {
233 packedMessage, err := json.Marshal(message)357 packedMessage, err := json.Marshal(message)
234 if err != nil {358 if err != nil {
@@ -268,6 +392,14 @@
268 bsend.chanId <- chanId392 bsend.chanId <- chanId
269}393}
270394
395func (bsend testBrokerSending) Unicast(chanIds ...store.InternalChannelId) {
396 // for now
397 if len(chanIds) != 1 {
398 panic("not expecting many chan ids for now")
399 }
400 bsend.chanId <- chanIds[0]
401}
402
271func (s *handlersSuite) TestRespondsToBasicSystemBroadcast(c *C) {403func (s *handlersSuite) TestRespondsToBasicSystemBroadcast(c *C) {
272 sto := store.NewInMemoryPendingStore()404 sto := store.NewInMemoryPendingStore()
273 stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {405 stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
@@ -480,3 +612,73 @@
480612
481 checkError(c, response, ErrWrongRequestMethod)613 checkError(c, response, ErrWrongRequestMethod)
482}614}
615
616func (s *handlersSuite) TestRespondsUnicast(c *C) {
617 sto := store.NewInMemoryPendingStore()
618 stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
619 return sto, nil
620 }
621 bsend := testBrokerSending{make(chan store.InternalChannelId, 1)}
622 testServer := httptest.NewServer(MakeHandlersMux(stoForReq, bsend, nil))
623 defer testServer.Close()
624
625 payload := json.RawMessage(`{"foo":"bar"}`)
626
627 request := newPostRequest("/notify", &Unicast{
628 UserId: "user2",
629 DeviceId: "dev3",
630 AppId: "app2",
631 ExpireOn: future,
632 Data: payload,
633 }, testServer)
634
635 response, err := s.client.Do(request)
636 c.Assert(err, IsNil)
637
638 c.Check(response.StatusCode, Equals, http.StatusOK)
639 c.Check(response.Header.Get("Content-Type"), Equals, "application/json")
640 body, err := getResponseBody(response)
641 c.Assert(err, IsNil)
642 c.Check(string(body), Matches, ".*ok.*")
643
644 chanId := store.UnicastInternalChannelId("user2", "dev3")
645 c.Check(<-bsend.chanId, Equals, chanId)
646 top, notifications, err := sto.GetChannelSnapshot(chanId)
647 c.Assert(err, IsNil)
648 c.Check(top, Equals, int64(0))
649 c.Check(notifications, HasLen, 1)
650}
651
652func (s *handlersSuite) TestCannotUnicastTooBigMessages(c *C) {
653 testServer := httptest.NewServer(&UnicastHandler{})
654 defer testServer.Close()
655
656 bigString := strings.Repeat("a", MaxRequestBodyBytes)
657 dataString := fmt.Sprintf(`"%v"`, bigString)
658
659 request := newPostRequest("/", &Unicast{
660 ExpireOn: future,
661 Data: json.RawMessage([]byte(dataString)),
662 }, testServer)
663
664 response, err := s.client.Do(request)
665 c.Assert(err, IsNil)
666 checkError(c, response, ErrRequestBodyTooLarge)
667}
668
669func (s *handlersSuite) TestCannotUnicastWithMissingFields(c *C) {
670 stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
671 return store.NewInMemoryPendingStore(), nil
672 }
673 ctx := &context{stoForReq, nil, nil}
674 testServer := httptest.NewServer(&UnicastHandler{ctx})
675 defer testServer.Close()
676
677 request := newPostRequest("/", &Unicast{
678 Data: json.RawMessage(`{"foo":"bar"}`),
679 }, testServer)
680
681 response, err := s.client.Do(request)
682 c.Assert(err, IsNil)
683 checkError(c, response, ErrMissingIdField)
684}
483685
=== modified file 'server/broker/broker.go'
--- server/broker/broker.go 2014-04-04 09:58:34 +0000
+++ server/broker/broker.go 2014-05-12 14:11:46 +0000
@@ -30,7 +30,7 @@
30// through them.30// through them.
31type Broker interface {31type Broker interface {
32 // Register the session.32 // Register the session.
33 Register(*protocol.ConnectMsg) (BrokerSession, error)33 Register(connMsg *protocol.ConnectMsg, sessionId string) (BrokerSession, error)
34 // Unregister the session.34 // Unregister the session.
35 Unregister(BrokerSession)35 Unregister(BrokerSession)
36}36}
@@ -39,6 +39,8 @@
39type BrokerSending interface {39type BrokerSending interface {
40 // Broadcast channel.40 // Broadcast channel.
41 Broadcast(chanId store.InternalChannelId)41 Broadcast(chanId store.InternalChannelId)
42 // Unicast over channels.
43 Unicast(chanIds ...store.InternalChannelId)
42}44}
4345
44// Exchange leads the session through performing an exchange, typically delivery.46// Exchange leads the session through performing an exchange, typically delivery.
@@ -81,6 +83,10 @@
81 Levels() LevelsMap83 Levels() LevelsMap
82 // ExchangeScratchArea returns the scratch area for exchanges.84 // ExchangeScratchArea returns the scratch area for exchanges.
83 ExchangeScratchArea() *ExchangesScratchArea85 ExchangeScratchArea() *ExchangesScratchArea
86 // Get gets the content of the channel with chanId.
87 Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error)
88 // DropByMsgId drops notifications from the channel chanId by message id.
89 DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error
84}90}
8591
86// Session aborted error.92// Session aborted error.
8793
=== modified file 'server/broker/exchanges.go'
--- server/broker/exchanges.go 2014-04-04 13:19:10 +0000
+++ server/broker/exchanges.go 2014-05-12 14:11:46 +0000
@@ -28,18 +28,18 @@
2828
29// Scratch area for exchanges, sessions should hold one of these.29// Scratch area for exchanges, sessions should hold one of these.
30type ExchangesScratchArea struct {30type ExchangesScratchArea struct {
31 broadcastMsg protocol.BroadcastMsg31 broadcastMsg protocol.BroadcastMsg
32 ackMsg protocol.AckMsg32 notificationsMsg protocol.NotificationsMsg
33 connBrokenMsg protocol.ConnBrokenMsg33 ackMsg protocol.AckMsg
34}34}
3535
36// BroadcastExchange leads a session through delivering a BROADCAST.36// BroadcastExchange leads a session through delivering a BROADCAST.
37// For simplicity it is fully public.37// For simplicity it is fully public.
38type BroadcastExchange struct {38type BroadcastExchange struct {
39 ChanId store.InternalChannelId39 ChanId store.InternalChannelId
40 TopLevel int6440 TopLevel int64
41 NotificationPayloads []json.RawMessage41 Notifications []protocol.Notification
42 Decoded []map[string]interface{}42 Decoded []map[string]interface{}
43}43}
4444
45// check interface already here45// check interface already here
@@ -47,18 +47,18 @@
4747
48// Init ensures the BroadcastExchange is fully initialized for the sessions.48// Init ensures the BroadcastExchange is fully initialized for the sessions.
49func (sbe *BroadcastExchange) Init() {49func (sbe *BroadcastExchange) Init() {
50 decoded := make([]map[string]interface{}, len(sbe.NotificationPayloads))50 decoded := make([]map[string]interface{}, len(sbe.Notifications))
51 sbe.Decoded = decoded51 sbe.Decoded = decoded
52 for i, p := range sbe.NotificationPayloads {52 for i, notif := range sbe.Notifications {
53 err := json.Unmarshal(p, &decoded[i])53 err := json.Unmarshal(notif.Payload, &decoded[i])
54 if err != nil {54 if err != nil {
55 decoded[i] = nil55 decoded[i] = nil
56 }56 }
57 }57 }
58}58}
5959
60func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {60func filterByLevel(clientLevel, topLevel int64, notifs []protocol.Notification) []protocol.Notification {
61 c := int64(len(payloads))61 c := int64(len(notifs))
62 if c == 0 {62 if c == 0 {
63 return nil63 return nil
64 }64 }
@@ -67,32 +67,32 @@
67 delta = 167 delta = 1
68 }68 }
69 if delta < c {69 if delta < c {
70 return payloads[c-delta:]70 return notifs[c-delta:]
71 } else {71 } else {
72 return payloads72 return notifs
73 }73 }
74}74}
7575
76func channelFilter(tag string, chanId store.InternalChannelId, payloads []json.RawMessage, decoded []map[string]interface{}) []json.RawMessage {76func channelFilter(tag string, chanId store.InternalChannelId, notifs []protocol.Notification, decoded []map[string]interface{}) []json.RawMessage {
77 if len(payloads) != 0 && chanId == store.SystemInternalChannelId {77 if len(notifs) != 0 && chanId == store.SystemInternalChannelId {
78 decoded := decoded[len(decoded)-len(payloads):]78 decoded := decoded[len(decoded)-len(notifs):]
79 filtered := make([]json.RawMessage, 0)79 filtered := make([]json.RawMessage, 0)
80 for i, decoded1 := range decoded {80 for i, decoded1 := range decoded {
81 if _, ok := decoded1[tag]; ok {81 if _, ok := decoded1[tag]; ok {
82 filtered = append(filtered, payloads[i])82 filtered = append(filtered, notifs[i].Payload)
83 }83 }
84 }84 }
85 payloads = filtered85 return filtered
86 }86 }
87 return payloads87 return protocol.ExtractPayloads(notifs)
88}88}
8989
90// Prepare session for a BROADCAST.90// Prepare session for a BROADCAST.
91func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {91func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
92 clientLevel := sess.Levels()[sbe.ChanId]92 clientLevel := sess.Levels()[sbe.ChanId]
93 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)93 notifs := filterByLevel(clientLevel, sbe.TopLevel, sbe.Notifications)
94 tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel())94 tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel())
95 payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded)95 payloads := channelFilter(tag, sbe.ChanId, notifs, sbe.Decoded)
96 if len(payloads) == 0 && sbe.TopLevel >= clientLevel {96 if len(payloads) == 0 && sbe.TopLevel >= clientLevel {
97 // empty and don't need to force resync => do nothing97 // empty and don't need to force resync => do nothing
98 return nil, nil, ErrNop98 return nil, nil, ErrNop
@@ -119,23 +119,55 @@
119 return nil119 return nil
120}120}
121121
122// ConnBrokenExchange breaks a session giving a reason.122// ConnMetaExchange allows to send a CONNBROKEN or CONNWARN message.
123type ConnBrokenExchange struct {123type ConnMetaExchange struct {
124 Reason string124 Msg protocol.OnewayMsg
125}125}
126126
127// check interface already here127// check interface already here
128var _ Exchange = (*ConnBrokenExchange)(nil)128var _ Exchange = (*ConnMetaExchange)(nil)
129129
130// Prepare session for a CONNBROKEN.130// Prepare session for a CONNBROKEN/WARN.
131func (cbe *ConnBrokenExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {131func (cbe *ConnMetaExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
132 scratchArea := sess.ExchangeScratchArea()132 return cbe.Msg, nil, nil
133 scratchArea.connBrokenMsg.Type = "connbroken"133}
134 scratchArea.connBrokenMsg.Reason = cbe.Reason134
135 return &scratchArea.connBrokenMsg, nil, nil135// CONNBROKEN/WARN aren't acked.
136}136func (cbe *ConnMetaExchange) Acked(sess BrokerSession, done bool) error {
137137 panic("Acked should not get invoked on ConnMetaExchange")
138// CONNBROKEN isn't acked138}
139func (cbe *ConnBrokenExchange) Acked(sess BrokerSession, done bool) error {139
140 panic("Acked should not get invoked on ConnBrokenExchange")140// UnicastExchange leads a session through delivering a NOTIFICATIONS message.
141// For simplicity it is fully public.
142type UnicastExchange struct {
143 ChanId store.InternalChannelId
144}
145
146// check interface already here
147var _ Exchange = (*UnicastExchange)(nil)
148
149// Prepare session for a NOTIFICATIONS.
150func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
151 _, notifs, err := sess.Get(sue.ChanId, false)
152 if err != nil {
153 return nil, nil, err
154 }
155 scratchArea := sess.ExchangeScratchArea()
156 scratchArea.notificationsMsg.Reset()
157 scratchArea.notificationsMsg.Type = "notifications"
158 scratchArea.notificationsMsg.Notifications = notifs
159 return &scratchArea.notificationsMsg, &scratchArea.ackMsg, nil
160}
161
162// Acked deals with an ACK for a NOTIFICATIONS.
163func (sue *UnicastExchange) Acked(sess BrokerSession, done bool) error {
164 scratchArea := sess.ExchangeScratchArea()
165 if scratchArea.ackMsg.Type != "ack" {
166 return &ErrAbort{"expected ACK message"}
167 }
168 err := sess.DropByMsgId(sue.ChanId, scratchArea.notificationsMsg.Notifications)
169 if err != nil {
170 return err
171 }
172 return nil
141}173}
142174
=== modified file 'server/broker/exchanges_test.go'
--- server/broker/exchanges_test.go 2014-04-04 13:19:10 +0000
+++ server/broker/exchanges_test.go 2014-05-12 14:11:46 +0000
@@ -18,15 +18,18 @@
1818
19import (19import (
20 "encoding/json"20 "encoding/json"
21 "errors"
21 "fmt"22 "fmt"
22 "strings"23 "strings"
23 stdtesting "testing"24 stdtesting "testing"
2425
25 . "launchpad.net/gocheck"26 . "launchpad.net/gocheck"
2627
28 "launchpad.net/ubuntu-push/protocol"
27 "launchpad.net/ubuntu-push/server/broker"29 "launchpad.net/ubuntu-push/server/broker"
28 "launchpad.net/ubuntu-push/server/broker/testing"30 "launchpad.net/ubuntu-push/server/broker/testing"
29 "launchpad.net/ubuntu-push/server/store"31 "launchpad.net/ubuntu-push/server/store"
32 help "launchpad.net/ubuntu-push/testing"
30)33)
3134
32func TestBroker(t *stdtesting.T) { TestingT(t) }35func TestBroker(t *stdtesting.T) { TestingT(t) }
@@ -39,11 +42,11 @@
39 exchg := &broker.BroadcastExchange{42 exchg := &broker.BroadcastExchange{
40 ChanId: store.SystemInternalChannelId,43 ChanId: store.SystemInternalChannelId,
41 TopLevel: 3,44 TopLevel: 3,
42 NotificationPayloads: []json.RawMessage{45 Notifications: help.Ns(
43 json.RawMessage(`{"a":"x"}`),46 json.RawMessage(`{"a":"x"}`),
44 json.RawMessage(`[]`),47 json.RawMessage(`[]`),
45 json.RawMessage(`{"a":"y"}`),48 json.RawMessage(`{"a":"y"}`),
46 },49 ),
47 }50 }
48 exchg.Init()51 exchg.Init()
49 c.Check(exchg.Decoded, DeepEquals, []map[string]interface{}{52 c.Check(exchg.Decoded, DeepEquals, []map[string]interface{}{
@@ -62,10 +65,10 @@
62 exchg := &broker.BroadcastExchange{65 exchg := &broker.BroadcastExchange{
63 ChanId: store.SystemInternalChannelId,66 ChanId: store.SystemInternalChannelId,
64 TopLevel: 3,67 TopLevel: 3,
65 NotificationPayloads: []json.RawMessage{68 Notifications: help.Ns(
66 json.RawMessage(`{"img1/m1":100}`),69 json.RawMessage(`{"img1/m1":100}`),
67 json.RawMessage(`{"img2/m2":200}`),70 json.RawMessage(`{"img2/m2":200}`),
68 },71 ),
69 }72 }
70 exchg.Init()73 exchg.Init()
71 outMsg, inMsg, err := exchg.Prepare(sess)74 outMsg, inMsg, err := exchg.Prepare(sess)
@@ -88,9 +91,9 @@
88 ImageChannel: "img1",91 ImageChannel: "img1",
89 }92 }
90 exchg := &broker.BroadcastExchange{93 exchg := &broker.BroadcastExchange{
91 ChanId: store.SystemInternalChannelId,94 ChanId: store.SystemInternalChannelId,
92 TopLevel: 3,95 TopLevel: 3,
93 NotificationPayloads: []json.RawMessage{},96 Notifications: []protocol.Notification{},
94 }97 }
95 exchg.Init()98 exchg.Init()
96 outMsg, inMsg, err := exchg.Prepare(sess)99 outMsg, inMsg, err := exchg.Prepare(sess)
@@ -108,9 +111,9 @@
108 ImageChannel: "img1",111 ImageChannel: "img1",
109 }112 }
110 exchg := &broker.BroadcastExchange{113 exchg := &broker.BroadcastExchange{
111 ChanId: store.SystemInternalChannelId,114 ChanId: store.SystemInternalChannelId,
112 TopLevel: 3,115 TopLevel: 3,
113 NotificationPayloads: []json.RawMessage{},116 Notifications: []protocol.Notification{},
114 }117 }
115 exchg.Init()118 exchg.Init()
116 outMsg, inMsg, err := exchg.Prepare(sess)119 outMsg, inMsg, err := exchg.Prepare(sess)
@@ -133,9 +136,9 @@
133136
134 topLevel := int64(len(needsSplitting))137 topLevel := int64(len(needsSplitting))
135 exchg := &broker.BroadcastExchange{138 exchg := &broker.BroadcastExchange{
136 ChanId: store.SystemInternalChannelId,139 ChanId: store.SystemInternalChannelId,
137 TopLevel: topLevel,140 TopLevel: topLevel,
138 NotificationPayloads: needsSplitting,141 Notifications: help.Ns(needsSplitting...),
139 }142 }
140 exchg.Init()143 exchg.Init()
141 outMsg, _, err := exchg.Prepare(sess)144 outMsg, _, err := exchg.Prepare(sess)
@@ -152,10 +155,10 @@
152 exchg = &broker.BroadcastExchange{155 exchg = &broker.BroadcastExchange{
153 ChanId: store.SystemInternalChannelId,156 ChanId: store.SystemInternalChannelId,
154 TopLevel: topLevel + 2,157 TopLevel: topLevel + 2,
155 NotificationPayloads: []json.RawMessage{158 Notifications: help.Ns(
156 json.RawMessage(`{"img1/m1":"x"}`),159 json.RawMessage(`{"img1/m1":"x"}`),
157 json.RawMessage(`{"img1/m1":"y"}`),160 json.RawMessage(`{"img1/m1":"y"}`),
158 },161 ),
159 }162 }
160 exchg.Init()163 exchg.Init()
161 outMsg, _, err = exchg.Prepare(sess)164 outMsg, _, err = exchg.Prepare(sess)
@@ -173,9 +176,9 @@
173 exchg := &broker.BroadcastExchange{176 exchg := &broker.BroadcastExchange{
174 ChanId: store.SystemInternalChannelId,177 ChanId: store.SystemInternalChannelId,
175 TopLevel: 3,178 TopLevel: 3,
176 NotificationPayloads: []json.RawMessage{179 Notifications: help.Ns(
177 json.RawMessage(`{"img2/m1":1}`),180 json.RawMessage(`{"img2/m1":1}`),
178 },181 ),
179 }182 }
180 exchg.Init()183 exchg.Init()
181 outMsg, inMsg, err := exchg.Prepare(sess)184 outMsg, inMsg, err := exchg.Prepare(sess)
@@ -202,10 +205,10 @@
202 exchg := &broker.BroadcastExchange{205 exchg := &broker.BroadcastExchange{
203 ChanId: store.SystemInternalChannelId,206 ChanId: store.SystemInternalChannelId,
204 TopLevel: 3,207 TopLevel: 3,
205 NotificationPayloads: []json.RawMessage{208 Notifications: help.Ns(
206 json.RawMessage(`{"img1/m1":100}`),209 json.RawMessage(`{"img1/m1":100}`),
207 json.RawMessage(`{"img1/m1":101}`),210 json.RawMessage(`{"img1/m1":101}`),
208 },211 ),
209 }212 }
210 exchg.Init()213 exchg.Init()
211 outMsg, inMsg, err := exchg.Prepare(sess)214 outMsg, inMsg, err := exchg.Prepare(sess)
@@ -229,11 +232,11 @@
229 exchg := &broker.BroadcastExchange{232 exchg := &broker.BroadcastExchange{
230 ChanId: store.SystemInternalChannelId,233 ChanId: store.SystemInternalChannelId,
231 TopLevel: 5,234 TopLevel: 5,
232 NotificationPayloads: []json.RawMessage{235 Notifications: help.Ns(
233 json.RawMessage(`{"img1/m1":100}`),236 json.RawMessage(`{"img1/m1":100}`),
234 json.RawMessage(`{"img2/m2":200}`),237 json.RawMessage(`{"img2/m2":200}`),
235 json.RawMessage(`{"img1/m1":101}`),238 json.RawMessage(`{"img1/m1":101}`),
236 },239 ),
237 }240 }
238 exchg.Init()241 exchg.Init()
239 outMsg, inMsg, err := exchg.Prepare(sess)242 outMsg, inMsg, err := exchg.Prepare(sess)
@@ -249,16 +252,114 @@
249 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5))252 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5))
250}253}
251254
252func (s *exchangesSuite) TestConnBrokenExchange(c *C) {255func (s *exchangesSuite) TestConnMetaExchange(c *C) {
253 sess := &testing.TestBrokerSession{}256 sess := &testing.TestBrokerSession{}
254 cbe := &broker.ConnBrokenExchange{"REASON"}257 var msg protocol.OnewayMsg = &protocol.ConnWarnMsg{"connwarn", "REASON"}
258 cbe := &broker.ConnMetaExchange{msg}
255 outMsg, inMsg, err := cbe.Prepare(sess)259 outMsg, inMsg, err := cbe.Prepare(sess)
256 c.Assert(err, IsNil)260 c.Assert(err, IsNil)
261 c.Check(msg, Equals, outMsg)
257 c.Check(inMsg, IsNil) // no answer is expected262 c.Check(inMsg, IsNil) // no answer is expected
258 // check263 // check
259 marshalled, err := json.Marshal(outMsg)264 marshalled, err := json.Marshal(outMsg)
260 c.Assert(err, IsNil)265 c.Assert(err, IsNil)
261 c.Check(string(marshalled), Equals, `{"T":"connbroken","Reason":"REASON"}`)266 c.Check(string(marshalled), Equals, `{"T":"connwarn","Reason":"REASON"}`)
262267
263 c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnBrokenExchange")268 c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnMetaExchange")
269}
270
271func (s *exchangesSuite) TestUnicastExchange(c *C) {
272 chanId1 := store.UnicastInternalChannelId("u1", "d1")
273 notifs := []protocol.Notification{
274 protocol.Notification{
275 MsgId: "msg1",
276 AppId: "app1",
277 Payload: json.RawMessage(`{"m": 1}`),
278 },
279 protocol.Notification{
280 MsgId: "msg2",
281 AppId: "app2",
282 Payload: json.RawMessage(`{"m": 2}`),
283 },
284 }
285 dropped := make(chan []protocol.Notification, 2)
286 sess := &testing.TestBrokerSession{
287 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
288 c.Check(chanId, Equals, chanId1)
289 c.Check(cachedOk, Equals, false)
290 return 0, notifs, nil
291 },
292 DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error {
293 c.Check(chanId, Equals, chanId1)
294 dropped <- targets
295 return nil
296 },
297 }
298 exchg := &broker.UnicastExchange{chanId1}
299 outMsg, inMsg, err := exchg.Prepare(sess)
300 c.Assert(err, IsNil)
301 // check
302 marshalled, err := json.Marshal(outMsg)
303 c.Assert(err, IsNil)
304 c.Check(string(marshalled), Equals, `{"T":"notifications","Notifications":[{"A":"app1","M":"msg1","P":{"m":1}},{"A":"app2","M":"msg2","P":{"m":2}}]}`)
305 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
306 c.Assert(err, IsNil)
307 err = exchg.Acked(sess, true)
308 c.Assert(err, IsNil)
309 c.Check(dropped, HasLen, 1)
310 c.Check(<-dropped, DeepEquals, notifs)
311}
312
313func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) {
314 notifs := []protocol.Notification{}
315 dropped := make(chan []protocol.Notification, 2)
316 sess := &testing.TestBrokerSession{
317 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
318 return 0, notifs, nil
319 },
320 DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error {
321 dropped <- targets
322 return nil
323 },
324 }
325 exchg := &broker.UnicastExchange{}
326 _, inMsg, err := exchg.Prepare(sess)
327 c.Assert(err, IsNil)
328 err = json.Unmarshal([]byte(`{}`), inMsg)
329 c.Assert(err, IsNil)
330 err = exchg.Acked(sess, true)
331 c.Assert(err, Not(IsNil))
332 c.Check(dropped, HasLen, 0)
333}
334
335func (s *exchangesSuite) TestUnicastExchangeErrorOnPrepare(c *C) {
336 fail := errors.New("fail")
337 sess := &testing.TestBrokerSession{
338 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
339 return 0, nil, fail
340 },
341 }
342 exchg := &broker.UnicastExchange{}
343 _, _, err := exchg.Prepare(sess)
344 c.Assert(err, Equals, fail)
345}
346
347func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) {
348 notifs := []protocol.Notification{}
349 fail := errors.New("fail")
350 sess := &testing.TestBrokerSession{
351 DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
352 return 0, notifs, nil
353 },
354 DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error {
355 return fail
356 },
357 }
358 exchg := &broker.UnicastExchange{}
359 _, inMsg, err := exchg.Prepare(sess)
360 c.Assert(err, IsNil)
361 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
362 c.Assert(err, IsNil)
363 err = exchg.Acked(sess, true)
364 c.Assert(err, Equals, fail)
264}365}
265366
=== modified file 'server/broker/exchg_impl_test.go'
--- server/broker/exchg_impl_test.go 2014-04-03 16:00:53 +0000
+++ server/broker/exchg_impl_test.go 2014-05-12 14:11:46 +0000
@@ -22,6 +22,7 @@
22 . "launchpad.net/gocheck"22 . "launchpad.net/gocheck"
2323
24 "launchpad.net/ubuntu-push/server/store"24 "launchpad.net/ubuntu-push/server/store"
25 help "launchpad.net/ubuntu-push/testing"
25)26)
2627
27type exchangesImplSuite struct{}28type exchangesImplSuite struct{}
@@ -29,27 +30,27 @@
29var _ = Suite(&exchangesImplSuite{})30var _ = Suite(&exchangesImplSuite{})
3031
31func (s *exchangesImplSuite) TestFilterByLevel(c *C) {32func (s *exchangesImplSuite) TestFilterByLevel(c *C) {
32 payloads := []json.RawMessage{33 notifs := help.Ns(
33 json.RawMessage(`{"a": 3}`),34 json.RawMessage(`{"a": 3}`),
34 json.RawMessage(`{"a": 4}`),35 json.RawMessage(`{"a": 4}`),
35 json.RawMessage(`{"a": 5}`),36 json.RawMessage(`{"a": 5}`),
36 }37 )
37 res := filterByLevel(5, 5, payloads)38 res := filterByLevel(5, 5, notifs)
38 c.Check(len(res), Equals, 0)39 c.Check(len(res), Equals, 0)
39 res = filterByLevel(4, 5, payloads)40 res = filterByLevel(4, 5, notifs)
40 c.Check(len(res), Equals, 1)41 c.Check(len(res), Equals, 1)
41 c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`))42 c.Check(res[0].Payload, DeepEquals, json.RawMessage(`{"a": 5}`))
42 res = filterByLevel(3, 5, payloads)43 res = filterByLevel(3, 5, notifs)
43 c.Check(len(res), Equals, 2)44 c.Check(len(res), Equals, 2)
44 c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`))45 c.Check(res[0].Payload, DeepEquals, json.RawMessage(`{"a": 4}`))
45 res = filterByLevel(2, 5, payloads)46 res = filterByLevel(2, 5, notifs)
46 c.Check(len(res), Equals, 3)47 c.Check(len(res), Equals, 3)
47 res = filterByLevel(1, 5, payloads)48 res = filterByLevel(1, 5, notifs)
48 c.Check(len(res), Equals, 3)49 c.Check(len(res), Equals, 3)
49 // too ahead, pick only last50 // too ahead, pick only last
50 res = filterByLevel(10, 5, payloads)51 res = filterByLevel(10, 5, notifs)
51 c.Check(len(res), Equals, 1)52 c.Check(len(res), Equals, 1)
52 c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`))53 c.Check(res[0].Payload, DeepEquals, json.RawMessage(`{"a": 5}`))
53}54}
5455
55func (s *exchangesImplSuite) TestFilterByLevelEmpty(c *C) {56func (s *exchangesImplSuite) TestFilterByLevelEmpty(c *C) {
@@ -71,18 +72,19 @@
71 err := json.Unmarshal(p, &decoded[i])72 err := json.Unmarshal(p, &decoded[i])
72 c.Assert(err, IsNil)73 c.Assert(err, IsNil)
73 }74 }
75 notifs := help.Ns(payloads...)
7476
75 other := store.InternalChannelId("1")77 other := store.InternalChannelId("1")
7678
77 c.Check(channelFilter("", store.SystemInternalChannelId, nil, nil), IsNil)79 c.Check(channelFilter("", store.SystemInternalChannelId, nil, nil), IsNil)
78 c.Check(channelFilter("", other, payloads[1:], decoded), DeepEquals, payloads[1:])80 c.Check(channelFilter("", other, notifs[1:], decoded), DeepEquals, payloads[1:])
7981
80 // use tag when channel is the sytem channel82 // use tag when channel is the sytem channel
8183
82 c.Check(channelFilter("c/z", store.SystemInternalChannelId, payloads, decoded), HasLen, 0)84 c.Check(channelFilter("c/z", store.SystemInternalChannelId, notifs, decoded), HasLen, 0)
8385
84 c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]})86 c.Check(channelFilter("a/x", store.SystemInternalChannelId, notifs, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]})
8587
86 c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]})88 c.Check(channelFilter("a/x", store.SystemInternalChannelId, notifs[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]})
8789
88}90}
8991
=== modified file 'server/broker/simple/simple.go'
--- server/broker/simple/simple.go 2014-04-11 08:47:18 +0000
+++ server/broker/simple/simple.go 2014-05-12 14:11:46 +0000
@@ -46,6 +46,7 @@
4646
47// simpleBrokerSession represents a session in the broker.47// simpleBrokerSession represents a session in the broker.
48type simpleBrokerSession struct {48type simpleBrokerSession struct {
49 broker *SimpleBroker
49 registered bool50 registered bool
50 deviceId string51 deviceId string
51 model string52 model string
@@ -61,6 +62,7 @@
6162
62const (63const (
63 broadcastDelivery deliveryKind = iota64 broadcastDelivery deliveryKind = iota
65 unicastDelivery
64)66)
6567
66// delivery holds all the information to request a delivery68// delivery holds all the information to request a delivery
@@ -93,6 +95,14 @@
93 return &sess.exchgScratch95 return &sess.exchgScratch
94}96}
9597
98func (sess *simpleBrokerSession) Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
99 return sess.broker.get(chanId, cachedOk)
100}
101
102func (sess *simpleBrokerSession) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error {
103 return sess.broker.drop(chanId, targets)
104}
105
96// NewSimpleBroker makes a new SimpleBroker.106// NewSimpleBroker makes a new SimpleBroker.
97func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker {107func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker {
98 sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize())108 sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize())
@@ -144,7 +154,7 @@
144 // find relevant channels, for now only system154 // find relevant channels, for now only system
145 channels := []store.InternalChannelId{store.SystemInternalChannelId}155 channels := []store.InternalChannelId{store.SystemInternalChannelId}
146 for _, chanId := range channels {156 for _, chanId := range channels {
147 topLevel, payloads, err := b.sto.GetChannelSnapshot(chanId)157 topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId)
148 if err != nil {158 if err != nil {
149 // next broadcast will try again159 // next broadcast will try again
150 b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err)160 b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err)
@@ -153,9 +163,9 @@
153 clientLevel := sess.levels[chanId]163 clientLevel := sess.levels[chanId]
154 if clientLevel != topLevel {164 if clientLevel != topLevel {
155 broadcastExchg := &broker.BroadcastExchange{165 broadcastExchg := &broker.BroadcastExchange{
156 ChanId: chanId,166 ChanId: chanId,
157 TopLevel: topLevel,167 TopLevel: topLevel,
158 NotificationPayloads: payloads,168 Notifications: notifications,
159 }169 }
160 broadcastExchg.Init()170 broadcastExchg.Init()
161 sess.exchanges <- broadcastExchg171 sess.exchanges <- broadcastExchg
@@ -166,7 +176,7 @@
166176
167// Register registers a session with the broker. It feeds the session177// Register registers a session with the broker. It feeds the session
168// pending notifications as well.178// pending notifications as well.
169func (b *SimpleBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) {179func (b *SimpleBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) {
170 // xxx sanity check DeviceId180 // xxx sanity check DeviceId
171 model, err := broker.GetInfoString(connect, "device", "?")181 model, err := broker.GetInfoString(connect, "device", "?")
172 if err != nil {182 if err != nil {
@@ -185,6 +195,7 @@
185 levels[id] = v195 levels[id] = v
186 }196 }
187 sess := &simpleBrokerSession{197 sess := &simpleBrokerSession{
198 broker: b,
188 deviceId: connect.DeviceId,199 deviceId: connect.DeviceId,
189 model: model,200 model: model,
190 imageChannel: imageChannel,201 imageChannel: imageChannel,
@@ -207,6 +218,24 @@
207 b.sessionCh <- sess218 b.sessionCh <- sess
208}219}
209220
221func (b *SimpleBroker) get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
222 topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId)
223 if err != nil {
224 b.logger.Errorf("unsuccessful, get channel snapshot for %v (cachedOk=%v): %v", chanId, cachedOk, err)
225 }
226 return topLevel, notifications, err
227
228}
229
230func (b *SimpleBroker) drop(chanId store.InternalChannelId, targets []protocol.Notification) error {
231 err := b.sto.DropByMsgId(chanId, targets)
232 if err != nil {
233 b.logger.Errorf("unsuccessful, drop from channel %v: %v", chanId, err)
234 }
235 return err
236
237}
238
210// run runs the agent logic of the broker.239// run runs the agent logic of the broker.
211func (b *SimpleBroker) run() {240func (b *SimpleBroker) run() {
212Loop:241Loop:
@@ -224,7 +253,7 @@
224 } else { // register253 } else { // register
225 prev := b.registry[sess.deviceId]254 prev := b.registry[sess.deviceId]
226 if prev != nil { // kick it255 if prev != nil { // kick it
227 close(prev.exchanges)256 prev.exchanges <- nil
228 }257 }
229 b.registry[sess.deviceId] = sess258 b.registry[sess.deviceId] = sess
230 sess.registered = true259 sess.registered = true
@@ -233,21 +262,27 @@
233 case delivery := <-b.deliveryCh:262 case delivery := <-b.deliveryCh:
234 switch delivery.kind {263 switch delivery.kind {
235 case broadcastDelivery:264 case broadcastDelivery:
236 topLevel, payloads, err := b.sto.GetChannelSnapshot(delivery.chanId)265 topLevel, notifications, err := b.get(delivery.chanId, false)
237 if err != nil {266 if err != nil {
238 // next broadcast will try again267 // next broadcast will try again
239 b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err)
240 continue Loop268 continue Loop
241 }269 }
242 broadcastExchg := &broker.BroadcastExchange{270 broadcastExchg := &broker.BroadcastExchange{
243 ChanId: delivery.chanId,271 ChanId: delivery.chanId,
244 TopLevel: topLevel,272 TopLevel: topLevel,
245 NotificationPayloads: payloads,273 Notifications: notifications,
246 }274 }
247 broadcastExchg.Init()275 broadcastExchg.Init()
248 for _, sess := range b.registry {276 for _, sess := range b.registry {
249 sess.exchanges <- broadcastExchg277 sess.exchanges <- broadcastExchg
250 }278 }
279 case unicastDelivery:
280 chanId := delivery.chanId
281 _, devId := chanId.UnicastUserAndDevice()
282 sess := b.registry[devId]
283 if sess != nil {
284 sess.exchanges <- &broker.UnicastExchange{chanId}
285 }
251 }286 }
252 }287 }
253 }288 }
@@ -260,3 +295,13 @@
260 chanId: chanId,295 chanId: chanId,
261 }296 }
262}297}
298
299// Unicast requests unicast for the channels.
300func (b *SimpleBroker) Unicast(chanIds ...store.InternalChannelId) {
301 for _, chanId := range chanIds {
302 b.deliveryCh <- &delivery{
303 kind: unicastDelivery,
304 chanId: chanId,
305 }
306 }
307}
263308
=== modified file 'server/broker/simple/simple_test.go'
--- server/broker/simple/simple_test.go 2014-04-03 16:00:53 +0000
+++ server/broker/simple/simple_test.go 2014-05-12 14:11:46 +0000
@@ -26,6 +26,7 @@
26 "launchpad.net/ubuntu-push/server/broker"26 "launchpad.net/ubuntu-push/server/broker"
27 "launchpad.net/ubuntu-push/server/broker/testing"27 "launchpad.net/ubuntu-push/server/broker/testing"
28 "launchpad.net/ubuntu-push/server/store"28 "launchpad.net/ubuntu-push/server/store"
29 help "launchpad.net/ubuntu-push/testing"
29)30)
3031
31func TestSimple(t *stdtesting.T) { TestingT(t) }32func TestSimple(t *stdtesting.T) { TestingT(t) }
@@ -58,10 +59,10 @@
58 c.Assert(len(sess.exchanges), Equals, 1)59 c.Assert(len(sess.exchanges), Equals, 1)
59 exchg1 := <-sess.exchanges60 exchg1 := <-sess.exchanges
60 c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{61 c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{
61 ChanId: store.SystemInternalChannelId,62 ChanId: store.SystemInternalChannelId,
62 TopLevel: 1,63 TopLevel: 1,
63 NotificationPayloads: []json.RawMessage{notification1},64 Notifications: help.Ns(notification1),
64 Decoded: []map[string]interface{}{decoded1},65 Decoded: []map[string]interface{}{decoded1},
65 })66 })
66}67}
6768
6869
=== modified file 'server/broker/simple/suite_test.go'
--- server/broker/simple/suite_test.go 2014-02-10 23:19:08 +0000
+++ server/broker/simple/suite_test.go 2014-05-12 14:11:46 +0000
@@ -42,4 +42,7 @@
42 RevealBroadcastExchange: func(exchg broker.Exchange) *broker.BroadcastExchange {42 RevealBroadcastExchange: func(exchg broker.Exchange) *broker.BroadcastExchange {
43 return exchg.(*broker.BroadcastExchange)43 return exchg.(*broker.BroadcastExchange)
44 },44 },
45 RevealUnicastExchange: func(exchg broker.Exchange) *broker.UnicastExchange {
46 return exchg.(*broker.UnicastExchange)
47 },
45}})48}})
4649
=== modified file 'server/broker/testing/impls.go'
--- server/broker/testing/impls.go 2014-04-03 14:31:10 +0000
+++ server/broker/testing/impls.go 2014-05-12 14:11:46 +0000
@@ -18,7 +18,9 @@
18package testing18package testing
1919
20import (20import (
21 "launchpad.net/ubuntu-push/protocol"
21 "launchpad.net/ubuntu-push/server/broker"22 "launchpad.net/ubuntu-push/server/broker"
23 "launchpad.net/ubuntu-push/server/store"
22)24)
2325
24// Test implementation of BrokerSession.26// Test implementation of BrokerSession.
@@ -29,6 +31,9 @@
29 Exchanges chan broker.Exchange31 Exchanges chan broker.Exchange
30 LevelsMap broker.LevelsMap32 LevelsMap broker.LevelsMap
31 exchgScratch broker.ExchangesScratchArea33 exchgScratch broker.ExchangesScratchArea
34 // hooks
35 DoGet func(store.InternalChannelId, bool) (int64, []protocol.Notification, error)
36 DoDropByMsgId func(store.InternalChannelId, []protocol.Notification) error
32}37}
3338
34func (tbs *TestBrokerSession) DeviceIdentifier() string {39func (tbs *TestBrokerSession) DeviceIdentifier() string {
@@ -55,6 +60,14 @@
55 return &tbs.exchgScratch60 return &tbs.exchgScratch
56}61}
5762
63func (tbs *TestBrokerSession) Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
64 return tbs.DoGet(chanId, cachedOk)
65}
66
67func (tbs *TestBrokerSession) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error {
68 return tbs.DoDropByMsgId(chanId, targets)
69}
70
58// Test implementation of BrokerConfig.71// Test implementation of BrokerConfig.
59type TestBrokerConfig struct {72type TestBrokerConfig struct {
60 ConfigSessionQueueSize uint73 ConfigSessionQueueSize uint
6174
=== modified file 'server/broker/testsuite/suite.go'
--- server/broker/testsuite/suite.go 2014-04-11 08:47:18 +0000
+++ server/broker/testsuite/suite.go 2014-05-12 14:11:46 +0000
@@ -30,7 +30,7 @@
30 "launchpad.net/ubuntu-push/server/broker"30 "launchpad.net/ubuntu-push/server/broker"
31 "launchpad.net/ubuntu-push/server/broker/testing"31 "launchpad.net/ubuntu-push/server/broker/testing"
32 "launchpad.net/ubuntu-push/server/store"32 "launchpad.net/ubuntu-push/server/store"
33 helpers "launchpad.net/ubuntu-push/testing"33 help "launchpad.net/ubuntu-push/testing"
34)34)
3535
36// The expected interface for tested brokers.36// The expected interface for tested brokers.
@@ -50,12 +50,14 @@
50 RevealSession func(broker.Broker, string) broker.BrokerSession50 RevealSession func(broker.Broker, string) broker.BrokerSession
51 // Let us get to a broker.BroadcastExchange from an Exchange.51 // Let us get to a broker.BroadcastExchange from an Exchange.
52 RevealBroadcastExchange func(broker.Exchange) *broker.BroadcastExchange52 RevealBroadcastExchange func(broker.Exchange) *broker.BroadcastExchange
53 // Let us get to a broker.UnicastExchange from an Exchange.
54 RevealUnicastExchange func(broker.Exchange) *broker.UnicastExchange
53 // private55 // private
54 testlog *helpers.TestLogger56 testlog *help.TestLogger
55}57}
5658
57func (s *CommonBrokerSuite) SetUpTest(c *C) {59func (s *CommonBrokerSuite) SetUpTest(c *C) {
58 s.testlog = helpers.NewTestLogger(c, "error")60 s.testlog = help.NewTestLogger(c, "error")
59}61}
6062
61var testBrokerConfig = &testing.TestBrokerConfig{10, 5}63var testBrokerConfig = &testing.TestBrokerConfig{10, 5}
@@ -89,7 +91,7 @@
89 "device": "model",91 "device": "model",
90 "channel": "daily",92 "channel": "daily",
91 },93 },
92 })94 }, "s1")
93 c.Assert(err, IsNil)95 c.Assert(err, IsNil)
94 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess)96 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess)
95 c.Assert(sess.DeviceIdentifier(), Equals, "dev-1")97 c.Assert(sess.DeviceIdentifier(), Equals, "dev-1")
@@ -101,7 +103,7 @@
101 }))103 }))
102 b.Unregister(sess)104 b.Unregister(sess)
103 // just to make sure the unregister was processed105 // just to make sure the unregister was processed
104 _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""})106 _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""}, "s2")
105 c.Assert(err, IsNil)107 c.Assert(err, IsNil)
106 c.Check(s.RevealSession(b, "dev-1"), IsNil)108 c.Check(s.RevealSession(b, "dev-1"), IsNil)
107}109}
@@ -111,7 +113,7 @@
111 b := s.MakeBroker(sto, testBrokerConfig, nil)113 b := s.MakeBroker(sto, testBrokerConfig, nil)
112 b.Start()114 b.Start()
113 defer b.Stop()115 defer b.Stop()
114 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"z": 5}})116 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"z": 5}}, "s1")
115 c.Check(err, FitsTypeOf, &broker.ErrAbort{})117 c.Check(err, FitsTypeOf, &broker.ErrAbort{})
116}118}
117119
@@ -123,11 +125,11 @@
123 info := map[string]interface{}{125 info := map[string]interface{}{
124 "device": -1,126 "device": -1,
125 }127 }
126 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})128 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", Info: info}, "s1")
127 c.Check(err, Equals, broker.ErrUnexpectedValue)129 c.Check(err, Equals, broker.ErrUnexpectedValue)
128 info["device"] = "m"130 info["device"] = "m"
129 info["channel"] = -1131 info["channel"] = -1
130 _, err = b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})132 _, err = b.Register(&protocol.ConnectMsg{Type: "connect", Info: info}, "s2")
131 c.Check(err, Equals, broker.ErrUnexpectedValue)133 c.Check(err, Equals, broker.ErrUnexpectedValue)
132}134}
133135
@@ -139,7 +141,7 @@
139 b := s.MakeBroker(sto, testBrokerConfig, nil)141 b := s.MakeBroker(sto, testBrokerConfig, nil)
140 b.Start()142 b.Start()
141 defer b.Stop()143 defer b.Stop()
142 sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})144 sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
143 c.Assert(err, IsNil)145 c.Assert(err, IsNil)
144 c.Check(len(sess.SessionChannel()), Equals, 1)146 c.Check(len(sess.SessionChannel()), Equals, 1)
145}147}
@@ -149,7 +151,7 @@
149 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)151 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)
150 b.Start()152 b.Start()
151 defer b.Stop()153 defer b.Stop()
152 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})154 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
153 c.Assert(err, IsNil)155 c.Assert(err, IsNil)
154 // but156 // but
155 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful feed pending, get channel snapshot for 0: get channel snapshot fail\n")157 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful feed pending, get channel snapshot for 0: get channel snapshot fail\n")
@@ -160,22 +162,25 @@
160 b := s.MakeBroker(sto, testBrokerConfig, nil)162 b := s.MakeBroker(sto, testBrokerConfig, nil)
161 b.Start()163 b.Start()
162 defer b.Stop()164 defer b.Stop()
163 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})165 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
164 c.Assert(err, IsNil)166 c.Assert(err, IsNil)
165 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})167 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s2")
166 c.Assert(err, IsNil)168 c.Assert(err, IsNil)
167 checkAndFalse := false169 // previous session got signaled by sending nil on its channel
168 // previous session got signaled by closing its channel170 var sentinel broker.Exchange
171 got := false
169 select {172 select {
170 case _, ok := <-sess1.SessionChannel():173 case sentinel = <-sess1.SessionChannel():
171 checkAndFalse = ok == false174 got = true
172 default:175 case <-time.After(5 * time.Second):
176 c.Fatal("taking too long to get sentinel")
173 }177 }
174 c.Check(checkAndFalse, Equals, true)178 c.Check(got, Equals, true)
179 c.Check(sentinel, IsNil)
175 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess2)180 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess2)
176 b.Unregister(sess1)181 b.Unregister(sess1)
177 // just to make sure the unregister was processed182 // just to make sure the unregister was processed
178 _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""})183 _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""}, "s3")
179 c.Assert(err, IsNil)184 c.Assert(err, IsNil)
180 c.Check(s.RevealSession(b, "dev-1"), Equals, sess2)185 c.Check(s.RevealSession(b, "dev-1"), Equals, sess2)
181}186}
@@ -187,9 +192,9 @@
187 b := s.MakeBroker(sto, testBrokerConfig, nil)192 b := s.MakeBroker(sto, testBrokerConfig, nil)
188 b.Start()193 b.Start()
189 defer b.Stop()194 defer b.Stop()
190 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})195 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
191 c.Assert(err, IsNil)196 c.Assert(err, IsNil)
192 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"})197 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}, "s2")
193 c.Assert(err, IsNil)198 c.Assert(err, IsNil)
194 // add notification to channel *after* the registrations199 // add notification to channel *after* the registrations
195 muchLater := time.Now().Add(10 * time.Minute)200 muchLater := time.Now().Add(10 * time.Minute)
@@ -200,10 +205,10 @@
200 c.Fatal("taking too long to get broadcast exchange")205 c.Fatal("taking too long to get broadcast exchange")
201 case exchg1 := <-sess1.SessionChannel():206 case exchg1 := <-sess1.SessionChannel():
202 c.Check(s.RevealBroadcastExchange(exchg1), DeepEquals, &broker.BroadcastExchange{207 c.Check(s.RevealBroadcastExchange(exchg1), DeepEquals, &broker.BroadcastExchange{
203 ChanId: store.SystemInternalChannelId,208 ChanId: store.SystemInternalChannelId,
204 TopLevel: 1,209 TopLevel: 1,
205 NotificationPayloads: []json.RawMessage{notification1},210 Notifications: help.Ns(notification1),
206 Decoded: []map[string]interface{}{decoded1},211 Decoded: []map[string]interface{}{decoded1},
207 })212 })
208 }213 }
209 select {214 select {
@@ -211,10 +216,10 @@
211 c.Fatal("taking too long to get broadcast exchange")216 c.Fatal("taking too long to get broadcast exchange")
212 case exchg2 := <-sess2.SessionChannel():217 case exchg2 := <-sess2.SessionChannel():
213 c.Check(s.RevealBroadcastExchange(exchg2), DeepEquals, &broker.BroadcastExchange{218 c.Check(s.RevealBroadcastExchange(exchg2), DeepEquals, &broker.BroadcastExchange{
214 ChanId: store.SystemInternalChannelId,219 ChanId: store.SystemInternalChannelId,
215 TopLevel: 1,220 TopLevel: 1,
216 NotificationPayloads: []json.RawMessage{notification1},221 Notifications: help.Ns(notification1),
217 Decoded: []map[string]interface{}{decoded1},222 Decoded: []map[string]interface{}{decoded1},
218 })223 })
219 }224 }
220}225}
@@ -224,7 +229,7 @@
224 countdownToFail int229 countdownToFail int
225}230}
226231
227func (sto *testFailingStore) GetChannelSnapshot(chanId store.InternalChannelId) (int64, []json.RawMessage, error) {232func (sto *testFailingStore) GetChannelSnapshot(chanId store.InternalChannelId) (int64, []protocol.Notification, error) {
228 if sto.countdownToFail == 0 {233 if sto.countdownToFail == 0 {
229 return 0, nil, errors.New("get channel snapshot fail")234 return 0, nil, errors.New("get channel snapshot fail")
230 }235 }
@@ -232,6 +237,10 @@
232 return 0, nil, nil237 return 0, nil, nil
233}238}
234239
240func (sto *testFailingStore) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error {
241 return errors.New("drop fail")
242}
243
235func (s *CommonBrokerSuite) TestBroadcastFail(c *C) {244func (s *CommonBrokerSuite) TestBroadcastFail(c *C) {
236 logged := make(chan bool, 1)245 logged := make(chan bool, 1)
237 s.testlog.SetLogEventCb(func(string) {246 s.testlog.SetLogEventCb(func(string) {
@@ -241,7 +250,7 @@
241 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)250 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)
242 b.Start()251 b.Start()
243 defer b.Stop()252 defer b.Stop()
244 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})253 _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
245 c.Assert(err, IsNil)254 c.Assert(err, IsNil)
246 b.Broadcast(store.SystemInternalChannelId)255 b.Broadcast(store.SystemInternalChannelId)
247 select {256 select {
@@ -249,5 +258,83 @@
249 c.Fatal("taking too long to log error")258 c.Fatal("taking too long to log error")
250 case <-logged:259 case <-logged:
251 }260 }
252 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful broadcast, get channel snapshot for 0: get channel snapshot fail\n")261 c.Check(s.testlog.Captured(), Matches, "ERROR.*: get channel snapshot fail\n")
262}
263
264func (s *CommonBrokerSuite) TestUnicast(c *C) {
265 sto := store.NewInMemoryPendingStore()
266 notification1 := json.RawMessage(`{"m": "M1"}`)
267 notification2 := json.RawMessage(`{"m": "M2"}`)
268 chanId1 := store.UnicastInternalChannelId("dev1", "dev1")
269 chanId2 := store.UnicastInternalChannelId("dev2", "dev2")
270 b := s.MakeBroker(sto, testBrokerConfig, nil)
271 b.Start()
272 defer b.Stop()
273 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1")
274 c.Assert(err, IsNil)
275 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2")
276 c.Assert(err, IsNil)
277 // add notification to channel *after* the registrations
278 muchLater := time.Now().Add(10 * time.Minute)
279 sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater)
280 sto.AppendToUnicastChannel(chanId2, "app2", notification2, "msg2", muchLater)
281 b.Unicast(chanId2, chanId1)
282 select {
283 case <-time.After(5 * time.Second):
284 c.Fatal("taking too long to get unicast exchange")
285 case exchg1 := <-sess1.SessionChannel():
286 u1 := s.RevealUnicastExchange(exchg1)
287 c.Check(u1.ChanId, Equals, chanId1)
288 }
289 select {
290 case <-time.After(5 * time.Second):
291 c.Fatal("taking too long to get unicast exchange")
292 case exchg2 := <-sess2.SessionChannel():
293 u2 := s.RevealUnicastExchange(exchg2)
294 c.Check(u2.ChanId, Equals, chanId2)
295 }
296}
297
298func (s *CommonBrokerSuite) TestGetAndDrop(c *C) {
299 sto := store.NewInMemoryPendingStore()
300 notification1 := json.RawMessage(`{"m": "M1"}`)
301 chanId1 := store.UnicastInternalChannelId("dev3", "dev3")
302 b := s.MakeBroker(sto, testBrokerConfig, nil)
303 b.Start()
304 defer b.Stop()
305 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1")
306 c.Assert(err, IsNil)
307 muchLater := time.Now().Add(10 * time.Minute)
308 sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater)
309 _, expected, err := sto.GetChannelSnapshot(chanId1)
310 c.Assert(err, IsNil)
311 _, notifs, err := sess1.Get(chanId1, false)
312 c.Check(notifs, HasLen, 1)
313 c.Check(notifs, DeepEquals, expected)
314 err = sess1.DropByMsgId(chanId1, notifs)
315 c.Assert(err, IsNil)
316 _, notifs, err = sess1.Get(chanId1, true)
317 c.Check(notifs, HasLen, 0)
318 _, expected, err = sto.GetChannelSnapshot(chanId1)
319 c.Assert(err, IsNil)
320 c.Check(expected, HasLen, 0)
321
322}
323
324func (s *CommonBrokerSuite) TestGetAndDropErrors(c *C) {
325 chanId1 := store.UnicastInternalChannelId("dev3", "dev3")
326 sto := &testFailingStore{countdownToFail: 1}
327 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)
328 b.Start()
329 defer b.Stop()
330 sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1")
331 c.Assert(err, IsNil)
332 _, _, err = sess1.Get(chanId1, false)
333 c.Assert(err, ErrorMatches, "get channel snapshot fail")
334 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, get channel snapshot for Udev3:dev3 \\(cachedOk=false\\): get channel snapshot fail\n")
335 s.testlog.ResetCapture()
336
337 err = sess1.DropByMsgId(chanId1, nil)
338 c.Assert(err, ErrorMatches, "drop fail")
339 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n")
253}340}
254341
=== modified file 'server/session/session.go'
--- server/session/session.go 2014-04-11 08:47:18 +0000
+++ server/session/session.go 2014-05-12 14:11:46 +0000
@@ -18,6 +18,7 @@
18package session18package session
1919
20import (20import (
21 "errors"
21 "net"22 "net"
22 "time"23 "time"
2324
@@ -35,7 +36,7 @@
35}36}
3637
37// sessionStart manages the start of the protocol session.38// sessionStart manages the start of the protocol session.
38func sessionStart(proto protocol.Protocol, brkr broker.Broker, cfg SessionConfig) (broker.BrokerSession, error) {39func sessionStart(proto protocol.Protocol, brkr broker.Broker, cfg SessionConfig, sessionId string) (broker.BrokerSession, error) {
39 var connMsg protocol.ConnectMsg40 var connMsg protocol.ConnectMsg
40 proto.SetDeadline(time.Now().Add(cfg.ExchangeTimeout()))41 proto.SetDeadline(time.Now().Add(cfg.ExchangeTimeout()))
41 err := proto.ReadMessage(&connMsg)42 err := proto.ReadMessage(&connMsg)
@@ -52,9 +53,11 @@
52 if err != nil {53 if err != nil {
53 return nil, err54 return nil, err
54 }55 }
55 return brkr.Register(&connMsg)56 return brkr.Register(&connMsg, sessionId)
56}57}
5758
59var errOneway = errors.New("oneway")
60
58// exchange writes outMsg message, reads answer in inMsg61// exchange writes outMsg message, reads answer in inMsg
59func exchange(proto protocol.Protocol, outMsg, inMsg interface{}, exchangeTimeout time.Duration) error {62func exchange(proto protocol.Protocol, outMsg, inMsg interface{}, exchangeTimeout time.Duration) error {
60 proto.SetDeadline(time.Now().Add(exchangeTimeout))63 proto.SetDeadline(time.Now().Add(exchangeTimeout))
@@ -62,7 +65,10 @@
62 if err != nil {65 if err != nil {
63 return err66 return err
64 }67 }
65 if inMsg == nil { // no answer expected, breaking connection68 if inMsg == nil { // no answer expected
69 if outMsg.(protocol.OnewayMsg).OnewayContinue() {
70 return errOneway
71 }
66 return &broker.ErrAbort{"session broken for reason"}72 return &broker.ErrAbort{"session broken for reason"}
67 }73 }
68 err = proto.ReadMessage(inMsg)74 err = proto.ReadMessage(inMsg)
@@ -78,6 +84,10 @@
78 exchangeTimeout := cfg.ExchangeTimeout()84 exchangeTimeout := cfg.ExchangeTimeout()
79 pingTimer := time.NewTimer(pingInterval)85 pingTimer := time.NewTimer(pingInterval)
80 intervalStart := time.Now()86 intervalStart := time.Now()
87 pingTimerReset := func() {
88 pingTimer.Reset(pingInterval)
89 intervalStart = time.Now()
90 }
81 ch := sess.SessionChannel()91 ch := sess.SessionChannel()
82Loop:92Loop:
83 for {93 for {
@@ -93,16 +103,15 @@
93 if pongMsg.Type != "pong" {103 if pongMsg.Type != "pong" {
94 return &broker.ErrAbort{"expected PONG message"}104 return &broker.ErrAbort{"expected PONG message"}
95 }105 }
96 pingTimer.Reset(pingInterval)106 pingTimerReset()
97 case exchg, ok := <-ch:107 case exchg := <-ch:
98 pingTimer.Stop()108 pingTimer.Stop()
99 if !ok {109 if exchg == nil {
100 return &broker.ErrAbort{"terminated"}110 return &broker.ErrAbort{"terminated"}
101 }111 }
102 outMsg, inMsg, err := exchg.Prepare(sess)112 outMsg, inMsg, err := exchg.Prepare(sess)
103 if err == broker.ErrNop { // nothing to do113 if err == broker.ErrNop { // nothing to do
104 pingTimer.Reset(pingInterval)114 pingTimerReset()
105 intervalStart = time.Now()
106 continue Loop115 continue Loop
107 }116 }
108 if err != nil {117 if err != nil {
@@ -111,12 +120,15 @@
111 for {120 for {
112 done := outMsg.Split()121 done := outMsg.Split()
113 err = exchange(proto, outMsg, inMsg, exchangeTimeout)122 err = exchange(proto, outMsg, inMsg, exchangeTimeout)
123 if err == errOneway {
124 pingTimerReset()
125 continue Loop
126 }
114 if err != nil {127 if err != nil {
115 return err128 return err
116 }129 }
117 if done {130 if done {
118 pingTimer.Reset(pingInterval)131 pingTimerReset()
119 intervalStart = time.Now()
120 }132 }
121 err = exchg.Acked(sess, done)133 err = exchg.Acked(sess, done)
122 if err != nil {134 if err != nil {
@@ -142,7 +154,7 @@
142 return track.End(&broker.ErrAbort{"unexpected wire format version"})154 return track.End(&broker.ErrAbort{"unexpected wire format version"})
143 }155 }
144 proto := protocol.NewProtocol0(conn)156 proto := protocol.NewProtocol0(conn)
145 sess, err := sessionStart(proto, brkr, cfg)157 sess, err := sessionStart(proto, brkr, cfg, track.SessionId())
146 if err != nil {158 if err != nil {
147 return track.End(err)159 return track.End(err)
148 }160 }
149161
=== modified file 'server/session/session_test.go'
--- server/session/session_test.go 2014-04-11 08:47:18 +0000
+++ server/session/session_test.go 2014-05-12 14:11:46 +0000
@@ -130,8 +130,8 @@
130 return &testBroker{registration: make(chan interface{}, 2)}130 return &testBroker{registration: make(chan interface{}, 2)}
131}131}
132132
133func (tb *testBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) {133func (tb *testBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) {
134 tb.registration <- "register " + connect.DeviceId134 tb.registration <- fmt.Sprintf("register %s %s", connect.DeviceId, sessionId)
135 return &testing.TestBrokerSession{DeviceId: connect.DeviceId}, tb.err135 return &testing.TestBrokerSession{DeviceId: connect.DeviceId}, tb.err
136}136}
137137
@@ -148,7 +148,7 @@
148 brkr := newTestBroker()148 brkr := newTestBroker()
149 go func() {149 go func() {
150 var err error150 var err error
151 sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout)151 sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout, "s1")
152 errCh <- err152 errCh <- err
153 }()153 }()
154 c.Check(takeNext(down), Equals, "deadline 5ms")154 c.Check(takeNext(down), Equals, "deadline 5ms")
@@ -160,7 +160,7 @@
160 up <- nil // no write error160 up <- nil // no write error
161 err := <-errCh161 err := <-errCh
162 c.Check(err, IsNil)162 c.Check(err, IsNil)
163 c.Check(takeNext(brkr.registration), Equals, "register dev-1")163 c.Check(takeNext(brkr.registration), Equals, "register dev-1 s1")
164 c.Check(sess.DeviceIdentifier(), Equals, "dev-1")164 c.Check(sess.DeviceIdentifier(), Equals, "dev-1")
165}165}
166166
@@ -175,7 +175,7 @@
175 brkr.err = errRegister175 brkr.err = errRegister
176 go func() {176 go func() {
177 var err error177 var err error
178 sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout)178 sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout, "s2")
179 errCh <- err179 errCh <- err
180 }()180 }()
181 up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"}181 up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"}
@@ -190,7 +190,7 @@
190 down := make(chan interface{}, 5)190 down := make(chan interface{}, 5)
191 tp := &testProtocol{up, down}191 tp := &testProtocol{up, down}
192 up <- io.ErrUnexpectedEOF192 up <- io.ErrUnexpectedEOF
193 _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout)193 _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout, "s3")
194 c.Check(err, Equals, io.ErrUnexpectedEOF)194 c.Check(err, Equals, io.ErrUnexpectedEOF)
195}195}
196196
@@ -200,7 +200,7 @@
200 tp := &testProtocol{up, down}200 tp := &testProtocol{up, down}
201 up <- protocol.ConnectMsg{Type: "connect"}201 up <- protocol.ConnectMsg{Type: "connect"}
202 up <- io.ErrUnexpectedEOF202 up <- io.ErrUnexpectedEOF
203 _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout)203 _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout, "s4")
204 c.Check(err, Equals, io.ErrUnexpectedEOF)204 c.Check(err, Equals, io.ErrUnexpectedEOF)
205 // sanity205 // sanity
206 c.Check(takeNext(down), Matches, "deadline.*")206 c.Check(takeNext(down), Matches, "deadline.*")
@@ -212,7 +212,7 @@
212 down := make(chan interface{}, 5)212 down := make(chan interface{}, 5)
213 tp := &testProtocol{up, down}213 tp := &testProtocol{up, down}
214 up <- protocol.ConnectMsg{Type: "what"}214 up <- protocol.ConnectMsg{Type: "what"}
215 _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout)215 _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout, "s5")
216 c.Check(err, DeepEquals, &broker.ErrAbort{"expected CONNECT message"})216 c.Check(err, DeepEquals, &broker.ErrAbort{"expected CONNECT message"})
217}217}
218218
@@ -222,14 +222,14 @@
222}222}
223223
224func (s *sessionSuite) TestSessionLoop(c *C) {224func (s *sessionSuite) TestSessionLoop(c *C) {
225 nopTrack := NewTracker(s.testlog)225 track := &testTracker{NewTracker(s.testlog), make(chan interface{}, 2)}
226 errCh := make(chan error, 1)226 errCh := make(chan error, 1)
227 up := make(chan interface{}, 5)227 up := make(chan interface{}, 5)
228 down := make(chan interface{}, 5)228 down := make(chan interface{}, 5)
229 tp := &testProtocol{up, down}229 tp := &testProtocol{up, down}
230 sess := &testing.TestBrokerSession{}230 sess := &testing.TestBrokerSession{}
231 go func() {231 go func() {
232 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)232 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, track)
233 }()233 }()
234 c.Check(takeNext(down), Equals, "deadline 2ms")234 c.Check(takeNext(down), Equals, "deadline 2ms")
235 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})235 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
@@ -241,6 +241,9 @@
241 up <- io.ErrUnexpectedEOF241 up <- io.ErrUnexpectedEOF
242 err := <-errCh242 err := <-errCh
243 c.Check(err, Equals, io.ErrUnexpectedEOF)243 c.Check(err, Equals, io.ErrUnexpectedEOF)
244 c.Check(track.interval, HasLen, 2)
245 c.Check((<-track.interval).(time.Duration) <= 8*time.Millisecond, Equals, true)
246 c.Check((<-track.interval).(time.Duration) <= 8*time.Millisecond, Equals, true)
244}247}
245248
246func (s *sessionSuite) TestSessionLoopWriteError(c *C) {249func (s *sessionSuite) TestSessionLoopWriteError(c *C) {
@@ -357,7 +360,7 @@
357 go func() {360 go func() {
358 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)361 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
359 }()362 }()
360 close(exchanges)363 exchanges <- nil
361 err := <-errCh364 err := <-errCh
362 c.Check(err, DeepEquals, &broker.ErrAbort{"terminated"})365 c.Check(err, DeepEquals, &broker.ErrAbort{"terminated"})
363}366}
@@ -477,18 +480,44 @@
477 down := make(chan interface{}, 5)480 down := make(chan interface{}, 5)
478 tp := &testProtocol{up, down}481 tp := &testProtocol{up, down}
479 exchanges := make(chan broker.Exchange, 1)482 exchanges := make(chan broker.Exchange, 1)
480 exchanges <- &broker.ConnBrokenExchange{"REASON"}483 msg := &protocol.ConnBrokenMsg{"connbroken", "BREASON"}
484 exchanges <- &broker.ConnMetaExchange{msg}
481 sess := &testing.TestBrokerSession{Exchanges: exchanges}485 sess := &testing.TestBrokerSession{Exchanges: exchanges}
482 go func() {486 go func() {
483 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)487 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
484 }()488 }()
485 c.Check(takeNext(down), Equals, "deadline 2ms")489 c.Check(takeNext(down), Equals, "deadline 2ms")
486 c.Check(takeNext(down), DeepEquals, protocol.ConnBrokenMsg{"connbroken", "REASON"})490 c.Check(takeNext(down), DeepEquals, protocol.ConnBrokenMsg{"connbroken", "BREASON"})
487 up <- nil // no write error491 up <- nil // no write error
488 err := <-errCh492 err := <-errCh
489 c.Check(err, DeepEquals, &broker.ErrAbort{"session broken for reason"})493 c.Check(err, DeepEquals, &broker.ErrAbort{"session broken for reason"})
490}494}
491495
496func (s *sessionSuite) TestSessionLoopConnWarnExchange(c *C) {
497 nopTrack := NewTracker(s.testlog)
498 errCh := make(chan error, 1)
499 up := make(chan interface{}, 5)
500 down := make(chan interface{}, 5)
501 tp := &testProtocol{up, down}
502 exchanges := make(chan broker.Exchange, 1)
503 msg := &protocol.ConnWarnMsg{"connwarn", "WREASON"}
504 exchanges <- &broker.ConnMetaExchange{msg}
505 sess := &testing.TestBrokerSession{Exchanges: exchanges}
506 go func() {
507 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
508 }()
509 c.Check(takeNext(down), Equals, "deadline 2ms")
510 c.Check(takeNext(down), DeepEquals, protocol.ConnWarnMsg{"connwarn", "WREASON"})
511 up <- nil // no write error
512 // session continues
513 c.Check(takeNext(down), Equals, "deadline 2ms")
514 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
515 up <- nil // no write error
516 up <- io.EOF
517 err := <-errCh
518 c.Check(err, Equals, io.EOF)
519}
520
492type testTracker struct {521type testTracker struct {
493 SessionTracker522 SessionTracker
494 interval chan interface{}523 interval chan interface{}
@@ -593,7 +622,7 @@
593 msg, err = downStream.ReadBytes(byte('}'))622 msg, err = downStream.ReadBytes(byte('}'))
594 c.Check(err, IsNil)623 c.Check(err, IsNil)
595 c.Check(msg, DeepEquals, []byte("\x00\x0c{\"T\":\"ping\"}"))624 c.Check(msg, DeepEquals, []byte("\x00\x0c{\"T\":\"ping\"}"))
596 c.Check(takeNext(brkr.registration), Equals, "register DEV")625 c.Check(takeNext(brkr.registration), Equals, "register DEV "+track.SessionId())
597 c.Check(len(brkr.registration), Equals, 0) // not yet unregistered626 c.Check(len(brkr.registration), Equals, 0) // not yet unregistered
598 cli.Close()627 cli.Close()
599 err = <-errCh628 err = <-errCh
600629
=== modified file 'server/session/tracker.go'
--- server/session/tracker.go 2014-02-10 23:19:08 +0000
+++ server/session/tracker.go 2014-05-12 14:11:46 +0000
@@ -17,6 +17,7 @@
17package session17package session
1818
19import (19import (
20 "fmt"
20 "net"21 "net"
21 "time"22 "time"
2223
@@ -29,6 +30,8 @@
29 logger.Logger30 logger.Logger
30 // Session got started.31 // Session got started.
31 Start(WithRemoteAddr)32 Start(WithRemoteAddr)
33 // SessionId
34 SessionId() string
32 // Session got registered with broker as sess BrokerSession.35 // Session got registered with broker as sess BrokerSession.
33 Registered(sess broker.BrokerSession)36 Registered(sess broker.BrokerSession)
34 // Report effective elapsed ping interval.37 // Report effective elapsed ping interval.
@@ -47,7 +50,7 @@
47// Tracker implements SessionTracker simply.50// Tracker implements SessionTracker simply.
48type tracker struct {51type tracker struct {
49 logger.Logger52 logger.Logger
50 sessionId int64 // xxx use timeuuid later53 sessionId string
51}54}
5255
53func NewTracker(logger logger.Logger) SessionTracker {56func NewTracker(logger logger.Logger) SessionTracker {
@@ -55,18 +58,22 @@
55}58}
5659
57func (trk *tracker) Start(conn WithRemoteAddr) {60func (trk *tracker) Start(conn WithRemoteAddr) {
58 trk.sessionId = time.Now().UnixNano() - sessionsEpoch61 trk.sessionId = fmt.Sprintf("%x", time.Now().UnixNano()-sessionsEpoch)
59 trk.Debugf("session(%x) connected %v", trk.sessionId, conn.RemoteAddr())62 trk.Debugf("session(%s) connected %v", trk.sessionId, conn.RemoteAddr())
63}
64
65func (trk *tracker) SessionId() string {
66 return trk.sessionId
60}67}
6168
62func (trk *tracker) Registered(sess broker.BrokerSession) {69func (trk *tracker) Registered(sess broker.BrokerSession) {
63 trk.Infof("session(%x) registered %v", trk.sessionId, sess.DeviceIdentifier())70 trk.Infof("session(%s) registered %v", trk.sessionId, sess.DeviceIdentifier())
64}71}
6572
66func (trk *tracker) EffectivePingInterval(time.Duration) {73func (trk *tracker) EffectivePingInterval(time.Duration) {
67}74}
6875
69func (trk *tracker) End(err error) error {76func (trk *tracker) End(err error) error {
70 trk.Debugf("session(%x) ended with: %v", trk.sessionId, err)77 trk.Debugf("session(%s) ended with: %v", trk.sessionId, err)
71 return err78 return err
72}79}
7380
=== modified file 'server/session/tracker_test.go'
--- server/session/tracker_test.go 2014-02-10 23:19:08 +0000
+++ server/session/tracker_test.go 2014-05-12 14:11:46 +0000
@@ -46,8 +46,8 @@
46func (s *trackerSuite) TestSessionTrackStart(c *C) {46func (s *trackerSuite) TestSessionTrackStart(c *C) {
47 track := NewTracker(s.testlog)47 track := NewTracker(s.testlog)
48 track.Start(&testRemoteAddrable{})48 track.Start(&testRemoteAddrable{})
49 c.Check(track.(*tracker).sessionId, Not(Equals), 0)49 c.Check(track.SessionId(), Not(Equals), "")
50 regExpected := fmt.Sprintf(`DEBUG session\(%x\) connected 127\.0\.0\.1:9999\n`, track.(*tracker).sessionId)50 regExpected := fmt.Sprintf(`DEBUG session\(%s\) connected 127\.0\.0\.1:9999\n`, track.SessionId())
51 c.Check(s.testlog.Captured(), Matches, regExpected)51 c.Check(s.testlog.Captured(), Matches, regExpected)
52}52}
5353
@@ -55,7 +55,7 @@
55 track := NewTracker(s.testlog)55 track := NewTracker(s.testlog)
56 track.Start(&testRemoteAddrable{})56 track.Start(&testRemoteAddrable{})
57 track.Registered(&testing.TestBrokerSession{DeviceId: "DEV-ID"})57 track.Registered(&testing.TestBrokerSession{DeviceId: "DEV-ID"})
58 regExpected := fmt.Sprintf(`.*connected.*\nINFO session\(%x\) registered DEV-ID\n`, track.(*tracker).sessionId)58 regExpected := fmt.Sprintf(`.*connected.*\nINFO session\(%s\) registered DEV-ID\n`, track.SessionId())
59 c.Check(s.testlog.Captured(), Matches, regExpected)59 c.Check(s.testlog.Captured(), Matches, regExpected)
60}60}
6161
@@ -63,6 +63,6 @@
63 track := NewTracker(s.testlog)63 track := NewTracker(s.testlog)
64 track.Start(&testRemoteAddrable{})64 track.Start(&testRemoteAddrable{})
65 track.End(&broker.ErrAbort{})65 track.End(&broker.ErrAbort{})
66 regExpected := fmt.Sprintf(`.*connected.*\nDEBUG session\(%x\) ended with: session aborted \(\)\n`, track.(*tracker).sessionId)66 regExpected := fmt.Sprintf(`.*connected.*\nDEBUG session\(%s\) ended with: session aborted \(\)\n`, track.SessionId())
67 c.Check(s.testlog.Captured(), Matches, regExpected)67 c.Check(s.testlog.Captured(), Matches, regExpected)
68}68}
6969
=== modified file 'server/store/inmemory.go'
--- server/store/inmemory.go 2014-02-18 14:19:05 +0000
+++ server/store/inmemory.go 2014-05-12 14:11:46 +0000
@@ -20,18 +20,15 @@
20 "encoding/json"20 "encoding/json"
21 "sync"21 "sync"
22 "time"22 "time"
23
24 "launchpad.net/ubuntu-push/protocol"
23)25)
2426
25// one stored notification
26type notification struct {
27 payload json.RawMessage
28 expiration time.Time
29}
30
31// one stored channel27// one stored channel
32type channel struct {28type channel struct {
33 topLevel int6429 topLevel int64
34 notifications []notification30 notifications []protocol.Notification
31 expirations []time.Time
35}32}
3633
37// InMemoryPendingStore is a basic in-memory pending notification store.34// InMemoryPendingStore is a basic in-memory pending notification store.
@@ -54,23 +51,35 @@
54 return InternalChannelId(""), ErrUnknownChannel51 return InternalChannelId(""), ErrUnknownChannel
55}52}
5653
57func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notificationPayload json.RawMessage, expiration time.Time) error {54func (sto *InMemoryPendingStore) appendToChannel(chanId InternalChannelId, newNotification protocol.Notification, inc int64, expiration time.Time) error {
58 sto.lock.Lock()55 sto.lock.Lock()
59 defer sto.lock.Unlock()56 defer sto.lock.Unlock()
60 prev := sto.store[chanId]57 prev := sto.store[chanId]
61 if prev == nil {58 if prev == nil {
62 prev = &channel{}59 prev = &channel{}
63 }60 }
64 prev.topLevel++61 prev.topLevel += inc
65 prev.notifications = append(prev.notifications, notification{62 prev.notifications = append(prev.notifications, newNotification)
66 payload: notificationPayload,63 prev.expirations = append(prev.expirations, expiration)
67 expiration: expiration,
68 })
69 sto.store[chanId] = prev64 sto.store[chanId] = prev
70 return nil65 return nil
71}66}
7267
73func (sto *InMemoryPendingStore) GetChannelSnapshot(chanId InternalChannelId) (int64, []json.RawMessage, error) {68func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notificationPayload json.RawMessage, expiration time.Time) error {
69 newNotification := protocol.Notification{Payload: notificationPayload}
70 return sto.appendToChannel(chanId, newNotification, 1, expiration)
71}
72
73func (sto *InMemoryPendingStore) AppendToUnicastChannel(chanId InternalChannelId, appId string, notificationPayload json.RawMessage, msgId string, expiration time.Time) error {
74 newNotification := protocol.Notification{
75 Payload: notificationPayload,
76 AppId: appId,
77 MsgId: msgId,
78 }
79 return sto.appendToChannel(chanId, newNotification, 0, expiration)
80}
81
82func (sto *InMemoryPendingStore) GetChannelSnapshot(chanId InternalChannelId) (int64, []protocol.Notification, error) {
74 sto.lock.Lock()83 sto.lock.Lock()
75 defer sto.lock.Unlock()84 defer sto.lock.Unlock()
76 channel, ok := sto.store[chanId]85 channel, ok := sto.store[chanId]
@@ -79,14 +88,19 @@
79 }88 }
80 topLevel := channel.topLevel89 topLevel := channel.topLevel
81 n := len(channel.notifications)90 n := len(channel.notifications)
82 res := make([]json.RawMessage, 0, n)91 res := make([]protocol.Notification, 0, n)
92 exps := make([]time.Time, 0, n)
83 now := time.Now()93 now := time.Now()
84 for _, notification := range channel.notifications {94 for i, expiration := range channel.expirations {
85 if notification.expiration.Before(now) {95 if expiration.Before(now) {
86 continue96 continue
87 }97 }
88 res = append(res, notification.payload)98 res = append(res, channel.notifications[i])
99 exps = append(exps, expiration)
89 }100 }
101 // store as well
102 channel.notifications = res
103 channel.expirations = exps
90 return topLevel, res, nil104 return topLevel, res, nil
91}105}
92106
@@ -94,5 +108,25 @@
94 // ignored108 // ignored
95}109}
96110
111func (sto *InMemoryPendingStore) DropByMsgId(chanId InternalChannelId, targets []protocol.Notification) error {
112 sto.lock.Lock()
113 defer sto.lock.Unlock()
114 channel, ok := sto.store[chanId]
115 if !ok {
116 return nil
117 }
118 expById := make(map[string]time.Time, len(channel.notifications))
119 for i, notif := range channel.notifications {
120 expById[notif.MsgId] = channel.expirations[i]
121 }
122 channel.notifications = FilterOutByMsgId(channel.notifications, targets)
123 exps := make([]time.Time, len(channel.notifications))
124 for i, notif := range channel.notifications {
125 exps[i] = expById[notif.MsgId]
126 }
127 channel.expirations = exps
128 return nil
129}
130
97// sanity check we implement the interface131// sanity check we implement the interface
98var _ PendingStore = (*InMemoryPendingStore)(nil)132var _ PendingStore = (*InMemoryPendingStore)(nil)
99133
=== modified file 'server/store/inmemory_test.go'
--- server/store/inmemory_test.go 2014-02-14 12:38:38 +0000
+++ server/store/inmemory_test.go 2014-05-12 14:11:46 +0000
@@ -21,6 +21,9 @@
21 "time"21 "time"
2222
23 . "launchpad.net/gocheck"23 . "launchpad.net/gocheck"
24
25 "launchpad.net/ubuntu-push/protocol"
26 help "launchpad.net/ubuntu-push/testing"
24)27)
2528
26type inMemorySuite struct{}29type inMemorySuite struct{}
@@ -45,7 +48,7 @@
45 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)48 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)
46 c.Assert(err, IsNil)49 c.Assert(err, IsNil)
47 c.Check(top, Equals, int64(0))50 c.Check(top, Equals, int64(0))
48 c.Check(res, DeepEquals, []json.RawMessage(nil))51 c.Check(res, DeepEquals, []protocol.Notification(nil))
49}52}
5053
51func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshot(c *C) {54func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshot(c *C) {
@@ -61,7 +64,33 @@
61 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)64 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)
62 c.Assert(err, IsNil)65 c.Assert(err, IsNil)
63 c.Check(top, Equals, int64(2))66 c.Check(top, Equals, int64(2))
64 c.Check(res, DeepEquals, []json.RawMessage{notification1, notification2})67 c.Check(res, DeepEquals, help.Ns(notification1, notification2))
68}
69
70func (s *inMemorySuite) TestAppendToUnicastChannelAndGetChannelSnapshot(c *C) {
71 sto := NewInMemoryPendingStore()
72
73 chanId := UnicastInternalChannelId("user", "dev1")
74 notification1 := json.RawMessage(`{"a":1}`)
75 notification2 := json.RawMessage(`{"b":2}`)
76 app1 := "app1"
77 app2 := "app2"
78 msg1 := "msg1"
79 msg2 := "msg2"
80
81 muchLater := time.Now().Add(time.Minute)
82
83 err := sto.AppendToUnicastChannel(chanId, app1, notification1, msg1, muchLater)
84 c.Assert(err, IsNil)
85 err = sto.AppendToUnicastChannel(chanId, app2, notification2, msg2, muchLater)
86 c.Assert(err, IsNil)
87 top, res, err := sto.GetChannelSnapshot(chanId)
88 c.Assert(err, IsNil)
89 c.Check(res, DeepEquals, []protocol.Notification{
90 protocol.Notification{Payload: notification1, AppId: app1, MsgId: msg1},
91 protocol.Notification{Payload: notification2, AppId: app2, MsgId: msg2},
92 })
93 c.Check(top, Equals, int64(0))
65}94}
6695
67func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshotWithExpiration(c *C) {96func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshotWithExpiration(c *C) {
@@ -81,5 +110,45 @@
81 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)110 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)
82 c.Assert(err, IsNil)111 c.Assert(err, IsNil)
83 c.Check(top, Equals, int64(2))112 c.Check(top, Equals, int64(2))
84 c.Check(res, DeepEquals, []json.RawMessage{notification1})113 c.Check(res, DeepEquals, help.Ns(notification1))
114}
115
116func (s *inMemorySuite) TestDropByMsgId(c *C) {
117 sto := NewInMemoryPendingStore()
118
119 chanId := UnicastInternalChannelId("user", "dev2")
120
121 // nothing to do is fine
122 err := sto.DropByMsgId(chanId, nil)
123 c.Assert(err, IsNil)
124
125 notification1 := json.RawMessage(`{"a":1}`)
126 notification2 := json.RawMessage(`{"b":2}`)
127 notification3 := json.RawMessage(`{"a":2}`)
128 app1 := "app1"
129 app2 := "app2"
130 msg1 := "msg1"
131 msg2 := "msg2"
132 msg3 := "msg3"
133
134 muchLater := time.Now().Add(time.Minute)
135
136 err = sto.AppendToUnicastChannel(chanId, app1, notification1, msg1, muchLater)
137 c.Assert(err, IsNil)
138 err = sto.AppendToUnicastChannel(chanId, app2, notification2, msg2, muchLater)
139 c.Assert(err, IsNil)
140 err = sto.AppendToUnicastChannel(chanId, app1, notification3, msg3, muchLater)
141 c.Assert(err, IsNil)
142 _, res, err := sto.GetChannelSnapshot(chanId)
143 c.Assert(err, IsNil)
144
145 err = sto.DropByMsgId(chanId, res[:2])
146 c.Assert(err, IsNil)
147
148 _, res, err = sto.GetChannelSnapshot(chanId)
149 c.Assert(err, IsNil)
150 c.Check(res, HasLen, 1)
151 c.Check(res, DeepEquals, []protocol.Notification{
152 protocol.Notification{Payload: notification3, AppId: app1, MsgId: msg3},
153 })
85}154}
86155
=== modified file 'server/store/store.go'
--- server/store/store.go 2014-02-18 13:43:07 +0000
+++ server/store/store.go 2014-05-12 14:11:46 +0000
@@ -21,11 +21,36 @@
21 "encoding/hex"21 "encoding/hex"
22 "encoding/json"22 "encoding/json"
23 "errors"23 "errors"
24 "fmt"
25 "strings"
24 "time"26 "time"
27
28 "launchpad.net/ubuntu-push/protocol"
25)29)
2630
27type InternalChannelId string31type InternalChannelId string
2832
33// BroadcastChannel returns whether the id represents a broadcast channel.
34func (icid InternalChannelId) BroadcastChannel() bool {
35 marker := icid[0]
36 return marker == 'B' || marker == '0'
37}
38
39// UnicastChannel returns whether the id represents a unicast channel.
40func (icid InternalChannelId) UnicastChannel() bool {
41 marker := icid[0]
42 return marker == 'U'
43}
44
45// UnicastUserAndDevice returns the user and device ids of a unicast channel.
46func (icid InternalChannelId) UnicastUserAndDevice() (userId, deviceId string) {
47 if !icid.UnicastChannel() {
48 panic("UnicastUserAndDevice is for unicast channels")
49 }
50 parts := strings.SplitN(string(icid)[1:], ":", 2)
51 return parts[0], parts[1]
52}
53
29var ErrUnknownChannel = errors.New("unknown channel name")54var ErrUnknownChannel = errors.New("unknown channel name")
30var ErrFull = errors.New("channel is full")55var ErrFull = errors.New("channel is full")
31var ErrExpected128BitsHexRepr = errors.New("expected 128 bits hex repr")56var ErrExpected128BitsHexRepr = errors.New("expected 128 bits hex repr")
@@ -36,7 +61,10 @@
36 if chanId == SystemInternalChannelId {61 if chanId == SystemInternalChannelId {
37 return "0"62 return "0"
38 }63 }
39 panic("general InternalChannelIdToHex not implemeted yet")64 if !chanId.BroadcastChannel() {
65 panic("InternalChannelIdToHex is for broadcast channels")
66 }
67 return string(chanId)[1:]
40}68}
4169
42var zero128 [16]byte70var zero128 [16]byte
@@ -58,7 +86,14 @@
58 if idbytes == zero128 {86 if idbytes == zero128 {
59 return SystemInternalChannelId, nil87 return SystemInternalChannelId, nil
60 }88 }
61 return InternalChannelId(idbytes[:]), nil89 // mark with B(roadcast) prefix
90 s := "B" + hexRepr
91 return InternalChannelId(s), nil
92}
93
94// UnicastInternalChannelId builds a channel id for the userId, deviceId pair.
95func UnicastInternalChannelId(userId, deviceId string) InternalChannelId {
96 return InternalChannelId(fmt.Sprintf("U%s:%s", userId, deviceId))
62}97}
6398
64// PendingStore let store notifications into channels.99// PendingStore let store notifications into channels.
@@ -68,9 +103,45 @@
68 GetInternalChannelId(name string) (InternalChannelId, error)103 GetInternalChannelId(name string) (InternalChannelId, error)
69 // AppendToChannel appends a notification to the channel.104 // AppendToChannel appends a notification to the channel.
70 AppendToChannel(chanId InternalChannelId, notification json.RawMessage, expiration time.Time) error105 AppendToChannel(chanId InternalChannelId, notification json.RawMessage, expiration time.Time) error
106 // AppendToUnicastChannel appends a notification to the unicast channel.
71 // GetChannelSnapshot gets all the current notifications and107 // GetChannelSnapshot gets all the current notifications and
108 AppendToUnicastChannel(chanId InternalChannelId, appId string, notification json.RawMessage, msgId string, expiration time.Time) error
72 // current top level in the channel.109 // current top level in the channel.
73 GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, payloads []json.RawMessage, err error)110 GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, notifications []protocol.Notification, err error)
111 // DropByMsgId drops notifications from a unicast channel based on message ids.
112 DropByMsgId(chanId InternalChannelId, targets []protocol.Notification) error
74 // Close is to be called when done with the store.113 // Close is to be called when done with the store.
75 Close()114 Close()
76}115}
116
117// FilterOutByMsgId returns the notifications from orig whose msg id is not
118// mentioned in targets.
119func FilterOutByMsgId(orig, targets []protocol.Notification) []protocol.Notification {
120 n := len(orig)
121 t := len(targets)
122 // common case, removing the continuous head
123 if t > 0 && n >= t {
124 if targets[0].MsgId == orig[0].MsgId {
125 for i := t - 1; i >= 0; i-- {
126 if i == 0 {
127 return orig[t:]
128 }
129 if targets[i].MsgId != orig[i].MsgId {
130 break
131 }
132 }
133 }
134 }
135 // slow way
136 ids := make(map[string]bool, t)
137 for _, target := range targets {
138 ids[target.MsgId] = true
139 }
140 acc := make([]protocol.Notification, 0, n)
141 for _, notif := range orig {
142 if !ids[notif.MsgId] {
143 acc = append(acc, notif)
144 }
145 }
146 return acc
147}
77148
=== modified file 'server/store/store_test.go'
--- server/store/store_test.go 2014-02-10 23:19:08 +0000
+++ server/store/store_test.go 2014-05-12 14:11:46 +0000
@@ -33,6 +33,8 @@
3333
34func (s *storeSuite) TestInternalChannelIdToHex(c *C) {34func (s *storeSuite) TestInternalChannelIdToHex(c *C) {
35 c.Check(InternalChannelIdToHex(SystemInternalChannelId), Equals, protocol.SystemChannelId)35 c.Check(InternalChannelIdToHex(SystemInternalChannelId), Equals, protocol.SystemChannelId)
36 c.Check(InternalChannelIdToHex(InternalChannelId("Bf1c9bf7096084cb2a154979ce00c7f50")), Equals, "f1c9bf7096084cb2a154979ce00c7f50")
37 c.Check(func() { InternalChannelIdToHex(InternalChannelId("U")) }, PanicMatches, "InternalChannelIdToHex is for broadcast channels")
36}38}
3739
38func (s *storeSuite) TestHexToInternalChannelId(c *C) {40func (s *storeSuite) TestHexToInternalChannelId(c *C) {
@@ -42,9 +44,11 @@
42 i1, err := HexToInternalChannelId("00000000000000000000000000000000")44 i1, err := HexToInternalChannelId("00000000000000000000000000000000")
43 c.Check(err, IsNil)45 c.Check(err, IsNil)
44 c.Check(i1, Equals, SystemInternalChannelId)46 c.Check(i1, Equals, SystemInternalChannelId)
47 c.Check(i1.BroadcastChannel(), Equals, true)
45 i2, err := HexToInternalChannelId("f1c9bf7096084cb2a154979ce00c7f50")48 i2, err := HexToInternalChannelId("f1c9bf7096084cb2a154979ce00c7f50")
46 c.Check(err, IsNil)49 c.Check(err, IsNil)
47 c.Check(i2, Equals, InternalChannelId("\xf1\xc9\xbf\x70\x96\x08\x4c\xb2\xa1\x54\x97\x9c\xe0\x0c\x7f\x50"))50 c.Check(i2.BroadcastChannel(), Equals, true)
51 c.Check(i2, Equals, InternalChannelId("Bf1c9bf7096084cb2a154979ce00c7f50"))
48 _, err = HexToInternalChannelId("01")52 _, err = HexToInternalChannelId("01")
49 c.Check(err, Equals, ErrExpected128BitsHexRepr)53 c.Check(err, Equals, ErrExpected128BitsHexRepr)
50 _, err = HexToInternalChannelId("abceddddddddddddddddzeeeeeeeeeee")54 _, err = HexToInternalChannelId("abceddddddddddddddddzeeeeeeeeeee")
@@ -52,3 +56,44 @@
52 _, err = HexToInternalChannelId("f1c9bf7096084cb2a154979ce00c7f50ff")56 _, err = HexToInternalChannelId("f1c9bf7096084cb2a154979ce00c7f50ff")
53 c.Check(err, Equals, ErrExpected128BitsHexRepr)57 c.Check(err, Equals, ErrExpected128BitsHexRepr)
54}58}
59
60func (s *storeSuite) TestUnicastInternalChannelId(c *C) {
61 chanId := UnicastInternalChannelId("user1", "dev2")
62 c.Check(chanId.BroadcastChannel(), Equals, false)
63 c.Check(chanId.UnicastChannel(), Equals, true)
64 u, d := chanId.UnicastUserAndDevice()
65 c.Check(u, Equals, "user1")
66 c.Check(d, Equals, "dev2")
67 c.Check(func() { SystemInternalChannelId.UnicastUserAndDevice() }, PanicMatches, "UnicastUserAndDevice is for unicast channels")
68}
69
70func (s *storeSuite) TestFilterOutByMsgId(c *C) {
71 orig := []protocol.Notification{
72 protocol.Notification{MsgId: "a"},
73 protocol.Notification{MsgId: "b"},
74 protocol.Notification{MsgId: "c"},
75 protocol.Notification{MsgId: "d"},
76 }
77 // removing the continuous head
78 res := FilterOutByMsgId(orig, orig[:3])
79 c.Check(res, DeepEquals, orig[3:])
80
81 // random removal
82 res = FilterOutByMsgId(orig, orig[1:2])
83 c.Check(res, DeepEquals, []protocol.Notification{
84 protocol.Notification{MsgId: "a"},
85 protocol.Notification{MsgId: "c"},
86 protocol.Notification{MsgId: "d"},
87 })
88
89 // looks like removing the continuous head, but it isn't
90 res = FilterOutByMsgId(orig, []protocol.Notification{
91 protocol.Notification{MsgId: "a"},
92 protocol.Notification{MsgId: "c"},
93 protocol.Notification{MsgId: "d"},
94 })
95 c.Check(res, DeepEquals, []protocol.Notification{
96 protocol.Notification{MsgId: "b"},
97 })
98
99}
55100
=== added directory 'signing-helper'
=== added file 'signing-helper/CMakeLists.txt'
--- signing-helper/CMakeLists.txt 1970-01-01 00:00:00 +0000
+++ signing-helper/CMakeLists.txt 2014-05-12 14:11:46 +0000
@@ -0,0 +1,39 @@
1cmake_minimum_required(VERSION 2.8)
2SET (EXAMPLES_TARGET ubuntuoneauth-examples)
3
4SET (SIGNING_EXE "signing-helper")
5
6find_package (PkgConfig REQUIRED)
7pkg_check_modules(UBUNTUONE REQUIRED ubuntuoneauth-2.0)
8add_definitions(${UBUNTUONE_CFLAGS} ${UBUNTUONE_CFLAGS_OTHER})
9
10
11# Qt5 bits
12SET (CMAKE_INCLUDE_CURRENT_DIR ON)
13SET (CMAKE_AUTOMOC ON)
14find_package(Qt5Core REQUIRED)
15
16FILE (GLOB SIGNING_SOURCES signing*.cpp)
17FILE (GLOB SIGNING_HEADERS signing*.h)
18
19add_executable (${SIGNING_EXE}
20 ${SIGNING_SOURCES}
21 ${SIGNING_HEADERS})
22qt5_use_modules (${SIGNING_EXE} DBus Network)
23
24target_link_libraries (${SIGNING_EXE}
25 ${UBUNTUONE_LDFLAGS})
26
27
28
29add_custom_target(examples-valgrind
30 COMMAND "valgrind --tool=memcheck ${CMAKE_CURRENT_BINARY_DIR}/${SIGNING_EXE}"
31 DEPENDS ${SIGNING_EXE}
32)
33
34add_custom_target(examples-valgrind-leaks
35 COMMAND "valgrind --tool=memcheck --track-origins=yes --num-callers=40 --leak-resolution=high --leak-check=full ${CMAKE_CURRENT_BINARY_DIR}/${SIGNING_EXE}"
36 DEPENDS ${SIGNING_EXE}
37)
38
39INSTALL_TARGETS( "lib/ubuntu-push-client/" ${SIGNING_EXE})
040
=== added file 'signing-helper/signing-helper.cpp'
--- signing-helper/signing-helper.cpp 1970-01-01 00:00:00 +0000
+++ signing-helper/signing-helper.cpp 2014-05-12 14:11:46 +0000
@@ -0,0 +1,97 @@
1/*
2 * Copyright (C) 2013-2014 Canonical Ltd.
3 *
4 * This program is free software: you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 3, as published
6 * by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranties of
10 * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 * PURPOSE. See the GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * In addition, as a special exception, the copyright holders give
17 * permission to link the code of portions of this program with the
18 * OpenSSL library under certain conditions as described in each
19 * individual source file, and distribute linked combinations
20 * including the two.
21 * You must obey the GNU General Public License in all respects
22 * for all of the code used other than OpenSSL. If you modify
23 * file(s) with this exception, you may extend this exception to your
24 * version of the file(s), but you are not obligated to do so. If you
25 * do not wish to do so, delete this exception statement from your
26 * version. If you delete this exception statement from all source
27 * files in the program, then also delete it here.
28 */
29
30#include <iostream>
31#include <QCoreApplication>
32#include <QDebug>
33#include <QObject>
34#include <QString>
35#include <QTimer>
36
37#include "ssoservice.h"
38#include "token.h"
39
40#include "signing.h"
41
42namespace UbuntuOne {
43
44 SigningExample::SigningExample(QObject *parent, QString url) :
45 QObject(parent)
46 {
47 QObject::connect(&service, SIGNAL(credentialsFound(const Token&)),
48 this, SLOT(handleCredentialsFound(Token)));
49 QObject::connect(&service, SIGNAL(credentialsNotFound()),
50 this, SLOT(handleCredentialsNotFound()));
51 this->url = url;
52
53 }
54
55 SigningExample::~SigningExample(){
56 }
57
58 void SigningExample::doExample()
59 {
60 service.getCredentials();
61 }
62
63 void SigningExample::handleCredentialsFound(Token token)
64 {
65 qDebug() << "Credentials found, signing url.";
66
67 QString authHeader = token.signUrl(this->url, QStringLiteral("GET"), true);
68
69 std::cout << authHeader.toStdString() << "\n";
70 QCoreApplication::instance()->exit(0);
71
72 }
73
74 void SigningExample::handleCredentialsNotFound()
75 {
76 qDebug() << "No credentials were found.";
77 QCoreApplication::instance()->exit(1);
78 }
79
80
81} // namespace UbuntuOne
82
83
84int main(int argc, char *argv[])
85{
86 QCoreApplication a(argc, argv);
87
88 UbuntuOne::SigningExample *example = new UbuntuOne::SigningExample(&a);
89
90 QObject::connect(example, SIGNAL(finished()), &a, SLOT(quit()));
91
92 QTimer::singleShot(0, example, SLOT(doExample()));
93
94 return a.exec();
95}
96
97
098
=== added file 'signing-helper/signing.h'
--- signing-helper/signing.h 1970-01-01 00:00:00 +0000
+++ signing-helper/signing.h 2014-05-12 14:11:46 +0000
@@ -0,0 +1,76 @@
1/*
2 * Copyright (C) 2013-2014 Canonical Ltd.
3 *
4 * This program is free software: you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 3, as published
6 * by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranties of
10 * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 * PURPOSE. See the GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * In addition, as a special exception, the copyright holders give
17 * permission to link the code of portions of this program with the
18 * OpenSSL library under certain conditions as described in each
19 * individual source file, and distribute linked combinations
20 * including the two.
21 * You must obey the GNU General Public License in all respects
22 * for all of the code used other than OpenSSL. If you modify
23 * file(s) with this exception, you may extend this exception to your
24 * version of the file(s), but you are not obligated to do so. If you
25 * do not wish to do so, delete this exception statement from your
26 * version. If you delete this exception statement from all source
27 * files in the program, then also delete it here.
28 */
29
30#ifndef _SIGNING_H_
31#define _SIGNING_H_
32
33#include <QDebug>
34#include <QNetworkReply>
35#include <QObject>
36#include <QString>
37
38#include "ssoservice.h"
39#include "token.h"
40#include "requests.h"
41#include "errormessages.h"
42
43namespace UbuntuOne {
44
45class SigningExample : public QObject
46{
47 Q_OBJECT
48
49public:
50
51 explicit SigningExample(QObject *parent = 0, QString url="https://one.ubuntu.com/api/account/");
52 ~SigningExample();
53
54public slots:
55
56 void doExample();
57
58signals:
59
60 void finished();
61
62private slots:
63
64 void handleCredentialsFound(Token token);
65 void handleCredentialsNotFound();
66
67private:
68
69 SSOService service;
70 QNetworkAccessManager nam;
71 QString url;
72
73};
74
75}
76#endif /* _SIGNING_H_ */
077
=== modified file 'testing/helpers.go'
--- testing/helpers.go 2014-02-21 16:04:44 +0000
+++ testing/helpers.go 2014-05-12 14:11:46 +0000
@@ -18,6 +18,7 @@
18package testing18package testing
1919
20import (20import (
21 "encoding/json"
21 "fmt"22 "fmt"
22 "os"23 "os"
23 "path/filepath"24 "path/filepath"
@@ -26,6 +27,7 @@
26 "sync"27 "sync"
2728
28 "launchpad.net/ubuntu-push/logger"29 "launchpad.net/ubuntu-push/logger"
30 "launchpad.net/ubuntu-push/protocol"
29)31)
3032
31type captureHelper struct {33type captureHelper struct {
@@ -122,3 +124,12 @@
122 }124 }
123 return filepath.Join(dir, relativePath)125 return filepath.Join(dir, relativePath)
124}126}
127
128// Ns makes a []Notification from just payloads.
129func Ns(payloads ...json.RawMessage) []protocol.Notification {
130 res := make([]protocol.Notification, len(payloads))
131 for i := 0; i < len(payloads); i++ {
132 res[i].Payload = payloads[i]
133 }
134 return res
135}
125136
=== modified file 'ubuntu-push-client.go'
--- ubuntu-push-client.go 2014-04-15 11:27:38 +0000
+++ ubuntu-push-client.go 2014-05-12 14:11:46 +0000
@@ -58,6 +58,7 @@
58 if err != nil {58 if err != nil {
59 log.Fatalf("unable to open the levels database: %v", err)59 log.Fatalf("unable to open the levels database: %v", err)
60 }60 }
61
61 cli := client.NewPushClient(cfgFname, lvlFname)62 cli := client.NewPushClient(cfgFname, lvlFname)
62 err = cli.Start()63 err = cli.Start()
63 if err != nil {64 if err != nil {

Subscribers

People subscribed via source and target branches