Merge lp:~chipaca/ubuntu-push/endpoint-names into lp:ubuntu-push

Proposed by John Lenton
Status: Superseded
Proposed branch: lp:~chipaca/ubuntu-push/endpoint-names
Merge into: lp:ubuntu-push
Diff against target: 4098 lines (+1990/-393)
50 files modified
Makefile (+3/-0)
README (+8/-2)
bus/connectivity/connectivity.go (+8/-7)
bus/connectivity/connectivity_test.go (+78/-3)
bus/endpoint.go (+15/-1)
bus/testing/testing_endpoint.go (+12/-1)
bus/testing/testing_endpoint_test.go (+15/-0)
client/client_test.go (+5/-2)
client/session/session.go (+40/-12)
client/session/session_test.go (+63/-5)
debian/changelog (+6/-0)
debian/control (+2/-0)
debian/rules (+5/-1)
debian/ubuntu-push-client.install (+1/-0)
dependencies.tsv (+1/-0)
protocol/messages.go (+56/-0)
protocol/messages_test.go (+18/-2)
protocol/state-diag-client.gv (+4/-1)
protocol/state-diag-client.svg (+77/-49)
protocol/state-diag-session.gv (+10/-0)
protocol/state-diag-session.svg (+132/-74)
server/acceptance/acceptance_test.go (+3/-0)
server/acceptance/acceptanceclient.go (+22/-1)
server/acceptance/suites/broadcast.go (+7/-6)
server/acceptance/suites/suite.go (+12/-0)
server/acceptance/suites/unicast.go (+96/-0)
server/api/handlers.go (+103/-21)
server/api/handlers_test.go (+214/-12)
server/broker/broker.go (+7/-1)
server/broker/exchanges.go (+73/-41)
server/broker/exchanges_test.go (+127/-26)
server/broker/exchg_impl_test.go (+19/-17)
server/broker/simple/simple.go (+56/-11)
server/broker/simple/simple_test.go (+5/-4)
server/broker/simple/suite_test.go (+3/-0)
server/broker/testing/impls.go (+13/-0)
server/broker/testsuite/suite.go (+121/-34)
server/session/session.go (+23/-11)
server/session/session_test.go (+43/-14)
server/session/tracker.go (+12/-5)
server/session/tracker_test.go (+4/-4)
server/store/inmemory.go (+52/-18)
server/store/inmemory_test.go (+72/-3)
server/store/store.go (+74/-3)
server/store/store_test.go (+46/-1)
signing-helper/CMakeLists.txt (+39/-0)
signing-helper/signing-helper.cpp (+97/-0)
signing-helper/signing.h (+76/-0)
testing/helpers.go (+11/-0)
ubuntu-push-client.go (+1/-0)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/endpoint-names
Reviewer Review Type Date Requested Status
Ubuntu Push Hackers Pending
Review via email: mp+219202@code.launchpad.net

This proposal has been superseded by a proposal from 2014-05-12.

Commit message

Added endpoint.GrabName

Description of the change

Added endpoint.GrabName

To post a comment you must log in.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Makefile'
2--- Makefile 2014-03-31 17:58:54 +0000
3+++ Makefile 2014-05-12 14:11:46 +0000
4@@ -11,10 +11,12 @@
5 GODEPS += launchpad.net/go-dbus/v1
6 GODEPS += launchpad.net/go-xdg/v0
7 GODEPS += code.google.com/p/gosqlite/sqlite3
8+GODEPS += launchpad.net/~ubuntu-push-hackers/ubuntu-push/go-uuid/uuid
9
10 TOTEST = $(shell env GOPATH=$(GOPATH) go list $(PROJECT)/...|grep -v acceptance|grep -v http13client )
11
12 bootstrap:
13+ $(RM) -r $(GOPATH)/pkg
14 mkdir -p $(GOPATH)/bin
15 mkdir -p $(GOPATH)/pkg
16 go get -u launchpad.net/godeps
17@@ -33,6 +35,7 @@
18
19 build-client:
20 go build ubuntu-push-client.go
21+ (cd signing-helper && cmake . && make)
22
23 build-server-dev:
24 go build -o push-server-dev launchpad.net/ubuntu-push/server/dev
25
26=== modified file 'README'
27--- README 2014-03-31 17:58:54 +0000
28+++ README 2014-05-12 14:11:46 +0000
29@@ -6,11 +6,17 @@
30 The code expects to be checked out as launchpad.net/ubuntu-push in a Go
31 workspace, see "go help gopath".
32
33-To setup Go dependencies, install libsqlite3-dev and run:
34+To setup Go dependencies, install the following dependencies:
35+
36+ build-essential
37+ libsqlite3-dev
38+
39+and run:
40
41 make bootstrap
42
43-To run tests, install libgcrypt11-dev and libwhoopsie-dev and run:
44+To run tests, install libglib2.0-dev, libgcrypt11-dev, libwhoopsie-dev,
45+and run:
46
47 make check
48
49
50=== modified file 'bus/connectivity/connectivity.go'
51--- bus/connectivity/connectivity.go 2014-04-04 11:08:28 +0000
52+++ bus/connectivity/connectivity.go 2014-05-12 14:11:46 +0000
53@@ -72,19 +72,20 @@
54 cs.connAttempts += ar.Redial()
55 nm := networkmanager.New(cs.endp, cs.log)
56
57+ // set up the watch
58+ stateCh, err = nm.WatchState()
59+ if err != nil {
60+ cs.log.Debugf("failed to set up the state watch: %s", err)
61+ goto Continue
62+ }
63+
64 // Get the current state.
65 initial = nm.GetState()
66 if initial == networkmanager.Unknown {
67 cs.log.Debugf("Failed to get state.")
68 goto Continue
69 }
70-
71- // set up the watch
72- stateCh, err = nm.WatchState()
73- if err != nil {
74- cs.log.Debugf("failed to set up the state watch: %s", err)
75- goto Continue
76- }
77+ cs.log.Debugf("got initial state of %s", initial)
78
79 primary = nm.GetPrimaryConnection()
80 cs.log.Debugf("primary connection starts as %#v", primary)
81
82=== modified file 'bus/connectivity/connectivity_test.go'
83--- bus/connectivity/connectivity_test.go 2014-04-04 12:01:42 +0000
84+++ bus/connectivity/connectivity_test.go 2014-05-12 14:11:46 +0000
85@@ -17,8 +17,15 @@
86 package connectivity
87
88 import (
89+ "net/http/httptest"
90+ "sync"
91+ "testing"
92+ "time"
93+
94 "launchpad.net/go-dbus/v1"
95 . "launchpad.net/gocheck"
96+
97+ "launchpad.net/ubuntu-push/bus"
98 "launchpad.net/ubuntu-push/bus/networkmanager"
99 testingbus "launchpad.net/ubuntu-push/bus/testing"
100 "launchpad.net/ubuntu-push/config"
101@@ -26,9 +33,6 @@
102 helpers "launchpad.net/ubuntu-push/testing"
103 "launchpad.net/ubuntu-push/testing/condition"
104 "launchpad.net/ubuntu-push/util"
105- "net/http/httptest"
106- "testing"
107- "time"
108 )
109
110 // hook up gocheck
111@@ -115,6 +119,77 @@
112 c.Check(<-cs.networkStateCh, Equals, networkmanager.ConnectedGlobal)
113 }
114
115+// a racyEndpoint is an endpoint that behaves differently depending on
116+// how much time passes between getting the state and setting up the
117+// watch
118+type racyEndpoint struct {
119+ stateGot bool
120+ maxTime time.Time
121+ delta time.Duration
122+ lock sync.RWMutex
123+}
124+
125+func (rep *racyEndpoint) GetProperty(prop string) (interface{}, error) {
126+ switch prop {
127+ case "state":
128+ rep.lock.Lock()
129+ defer rep.lock.Unlock()
130+ rep.stateGot = true
131+ rep.maxTime = time.Now().Add(rep.delta)
132+ return uint32(networkmanager.Connecting), nil
133+ case "PrimaryConnection":
134+ return dbus.ObjectPath("/something"), nil
135+ default:
136+ return nil, nil
137+ }
138+}
139+
140+func (rep *racyEndpoint) WatchSignal(member string, f func(...interface{}), d func()) error {
141+ if member == "StateChanged" {
142+ // we count never having gotten the state as happening "after" now.
143+ rep.lock.RLock()
144+ defer rep.lock.RUnlock()
145+ ok := !rep.stateGot || time.Now().Before(rep.maxTime)
146+ go func() {
147+ if ok {
148+ f(uint32(networkmanager.ConnectedGlobal))
149+ }
150+ d()
151+ }()
152+ }
153+ return nil
154+}
155+
156+func (*racyEndpoint) Close() {}
157+func (*racyEndpoint) Dial() error { return nil }
158+func (*racyEndpoint) String() string { return "racyEndpoint" }
159+func (*racyEndpoint) Call(string, []interface{}, ...interface{}) error { return nil }
160+func (*racyEndpoint) GrabName(bool) <-chan error { return nil }
161+
162+var _ bus.Endpoint = (*racyEndpoint)(nil)
163+
164+// takeNext takes a value from given channel with a 1s timeout
165+func takeNext(ch <-chan networkmanager.State) networkmanager.State {
166+ select {
167+ case <-time.After(time.Second):
168+ panic("channel stuck: too long waiting")
169+ case v := <-ch:
170+ return v
171+ }
172+}
173+
174+// test that if the nm state goes from connecting to connected very
175+// shortly after calling GetState, we don't lose the event.
176+func (s *ConnSuite) TestStartAvoidsRace(c *C) {
177+ for delta := time.Second; delta > 1; delta /= 2 {
178+ rep := &racyEndpoint{delta: delta}
179+ cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: rep}
180+ f := Commentf("when delta=%s", delta)
181+ c.Assert(cs.start(), Equals, networkmanager.Connecting, f)
182+ c.Assert(takeNext(cs.networkStateCh), Equals, networkmanager.ConnectedGlobal, f)
183+ }
184+}
185+
186 /*
187 tests for connectedStateStep()
188 */
189
190=== modified file 'bus/endpoint.go'
191--- bus/endpoint.go 2014-04-02 08:23:15 +0000
192+++ bus/endpoint.go 2014-05-12 14:11:46 +0000
193@@ -31,6 +31,7 @@
194
195 // bus.Endpoint represents the DBus connection itself.
196 type Endpoint interface {
197+ GrabName(allowReplacement bool) <-chan error
198 WatchSignal(member string, f func(...interface{}), d func()) error
199 Call(member string, args []interface{}, rvs ...interface{}) error
200 GetProperty(property string) (interface{}, error)
201@@ -53,7 +54,7 @@
202 }
203
204 // ensure endpoint implements Endpoint
205-var _ Endpoint = &endpoint{}
206+var _ Endpoint = (*endpoint)(nil)
207
208 /*
209 public methods
210@@ -173,6 +174,19 @@
211 return fmt.Sprintf("<Connection to %s %#v>", endp.bus, endp.addr)
212 }
213
214+// GrabName(...) takes over the name on the bus, reporting errors over the
215+// returned channel.
216+//
217+// While the first result will be nil on success, successive results would
218+// typically indicate another process trying to take over the name.
219+func (endp *endpoint) GrabName(allowReplacement bool) <-chan error {
220+ flags := dbus.NameFlagAllowReplacement | dbus.NameFlagReplaceExisting
221+ if !allowReplacement {
222+ flags = 0
223+ }
224+ return endp.bus.RequestName(endp.addr.Name, flags).C
225+}
226+
227 /*
228 private methods
229 */
230
231=== modified file 'bus/testing/testing_endpoint.go'
232--- bus/testing/testing_endpoint.go 2014-04-04 11:08:28 +0000
233+++ bus/testing/testing_endpoint.go 2014-05-12 14:11:46 +0000
234@@ -169,5 +169,16 @@
235 // see Endpoint's Close. This one does nothing.
236 func (tc *testingEndpoint) Close() {}
237
238+func (tc *testingEndpoint) GrabName(allowReplacement bool) <-chan error {
239+ tc.callArgsLck.Lock()
240+ defer tc.callArgsLck.Unlock()
241+
242+ args := callArgs{Member: "::GrabName"}
243+ args.Args = append(args.Args, allowReplacement)
244+ tc.callArgs = append(tc.callArgs, args)
245+
246+ return nil
247+}
248+
249 // ensure testingEndpoint implements bus.Endpoint
250-var _ bus.Endpoint = &testingEndpoint{}
251+var _ bus.Endpoint = (*testingEndpoint)(nil)
252
253=== modified file 'bus/testing/testing_endpoint_test.go'
254--- bus/testing/testing_endpoint_test.go 2014-04-02 08:23:15 +0000
255+++ bus/testing/testing_endpoint_test.go 2014-05-12 14:11:46 +0000
256@@ -173,3 +173,18 @@
257 endp := NewTestingEndpoint(condition.Fail2Work(2), nil, "hello there")
258 c.Check(endp.String(), Matches, ".*Still Broken.*hello there.*")
259 }
260+
261+// Test that GrabName updates callArgs
262+func (s *TestingEndpointSuite) TestGrabNameUpdatesCallArgs(c *C) {
263+ endp := NewTestingEndpoint(nil, condition.Work(true))
264+ endp.GrabName(false)
265+ endp.GrabName(true)
266+ c.Check(GetCallArgs(endp), DeepEquals, []callArgs{
267+ {
268+ Member: "::GrabName",
269+ Args: []interface{}{false},
270+ }, {
271+ Member: "::GrabName",
272+ Args: []interface{}{true},
273+ }})
274+}
275
276=== modified file 'client/client_test.go'
277--- client/client_test.go 2014-04-18 16:31:04 +0000
278+++ client/client_test.go 2014-05-12 14:11:46 +0000
279@@ -265,8 +265,9 @@
280 ExchangeTimeout: 10 * time.Millisecond,
281 HostsCachingExpiryTime: 1 * time.Hour,
282 ExpectAllRepairedTime: 30 * time.Minute,
283- PEM: cli.pem,
284- Info: info,
285+ PEM: cli.pem,
286+ Info: info,
287+ AuthHelper: []string{},
288 }
289 // sanity check that we are looking at all fields
290 vExpected := reflect.ValueOf(expected)
291@@ -276,6 +277,8 @@
292 // field isn't empty/zero
293 c.Assert(fv.Interface(), Not(DeepEquals), reflect.Zero(fv.Type()).Interface(), Commentf("forgot about: %s", vExpected.Type().Field(i).Name))
294 }
295+ // but AuthHelper really should be nil for now
296+ expected.AuthHelper = nil
297 // finally compare
298 conf := cli.deriveSessionConfig(info)
299 c.Check(conf, DeepEquals, expected)
300
301=== modified file 'client/session/session.go'
302--- client/session/session.go 2014-04-18 16:37:31 +0000
303+++ client/session/session.go 2014-05-12 14:11:46 +0000
304@@ -26,6 +26,7 @@
305 "fmt"
306 "math/rand"
307 "net"
308+ "os/exec"
309 "strings"
310 "sync"
311 "sync/atomic"
312@@ -38,7 +39,9 @@
313 "launchpad.net/ubuntu-push/util"
314 )
315
316-var wireVersionBytes = []byte{protocol.ProtocolWireVersion}
317+var (
318+ wireVersionBytes = []byte{protocol.ProtocolWireVersion}
319+)
320
321 type Notification struct {
322 TopLevel int64
323@@ -84,6 +87,7 @@
324 ExpectAllRepairedTime time.Duration
325 PEM []byte
326 Info map[string]interface{}
327+ AuthHelper []string
328 }
329
330 // ClientSession holds a client<->server session and its configuration.
331@@ -115,6 +119,8 @@
332 stateP *uint32
333 ErrCh chan error
334 MsgCh chan *Notification
335+ // authorization
336+ auth string
337 // autoredial knobs
338 shouldDelayP *uint32
339 lastAutoRedial time.Time
340@@ -234,6 +240,27 @@
341 return nil
342 }
343
344+// addAuthorization gets the authorization blob to send to the server
345+// and adds it to the session.
346+func (sess *ClientSession) addAuthorization() error {
347+ sess.Log.Debugf("adding authorization")
348+ // using a helper, for now at least
349+ if len(sess.AuthHelper) == 0 {
350+ // do nothing if helper is unset or empty
351+ return nil
352+ }
353+
354+ auth, err := exec.Command(sess.AuthHelper[0], sess.AuthHelper[1:]...).Output()
355+ if err != nil {
356+ // For now we just log the error, as we don't want to block unauthorized users
357+ sess.Log.Errorf("unable to get the authorization token from the account: %v", err)
358+ } else {
359+ sess.auth = strings.TrimSpace(string(auth))
360+ }
361+
362+ return nil
363+}
364+
365 func (sess *ClientSession) resetHosts() {
366 sess.deliveryHosts = nil
367 }
368@@ -457,10 +484,9 @@
369 return err
370 }
371 err = proto.WriteMessage(protocol.ConnectMsg{
372- Type: "connect",
373- DeviceId: sess.DeviceId,
374- // xxx get the SSO Authorization string from the phone
375- Authorization: "",
376+ Type: "connect",
377+ DeviceId: sess.DeviceId,
378+ Authorization: sess.auth,
379 Levels: levels,
380 Info: sess.Info,
381 })
382@@ -495,13 +521,15 @@
383
384 // run calls connect, and if it works it calls start, and if it works
385 // it runs loop in a goroutine, and ships its return value over ErrCh.
386-func (sess *ClientSession) run(closer func(), hostGetter, connecter, starter, looper func() error) error {
387+func (sess *ClientSession) run(closer func(), authChecker, hostGetter, connecter, starter, looper func() error) error {
388 closer()
389- err := hostGetter()
390- if err != nil {
391- return err
392- }
393- err = connecter()
394+ if err := authChecker(); err != nil {
395+ return err
396+ }
397+ if err := hostGetter(); err != nil {
398+ return err
399+ }
400+ err := connecter()
401 if err == nil {
402 err = starter()
403 if err == nil {
404@@ -531,7 +559,7 @@
405 // keep on trying.
406 panic("can't Dial() without a protocol constructor.")
407 }
408- return sess.run(sess.doClose, sess.getHosts, sess.connect, sess.start, sess.loop)
409+ return sess.run(sess.doClose, sess.addAuthorization, sess.getHosts, sess.connect, sess.start, sess.loop)
410 }
411
412 func init() {
413
414=== modified file 'client/session/session_test.go'
415--- client/session/session_test.go 2014-04-18 16:37:31 +0000
416+++ client/session/session_test.go 2014-05-12 14:11:46 +0000
417@@ -34,7 +34,6 @@
418
419 "launchpad.net/ubuntu-push/client/gethosts"
420 "launchpad.net/ubuntu-push/client/session/levelmap"
421- "launchpad.net/ubuntu-push/logger"
422 "launchpad.net/ubuntu-push/protocol"
423 helpers "launchpad.net/ubuntu-push/testing"
424 "launchpad.net/ubuntu-push/testing/condition"
425@@ -166,7 +165,7 @@
426 /////
427
428 type clientSessionSuite struct {
429- log logger.Logger
430+ log *helpers.TestLogger
431 lvls func() (levelmap.LevelMap, error)
432 }
433
434@@ -347,6 +346,43 @@
435 }
436
437 /****************************************************************
438+ addAuthorization() tests
439+****************************************************************/
440+
441+func (cs *clientSessionSuite) TestAddAuthorizationAddsAuthorization(c *C) {
442+ sess := &ClientSession{Log: cs.log}
443+ sess.AuthHelper = []string{"echo", "some auth"}
444+ c.Assert(sess.auth, Equals, "")
445+ err := sess.addAuthorization()
446+ c.Assert(err, IsNil)
447+ c.Check(sess.auth, Equals, "some auth")
448+}
449+
450+func (cs *clientSessionSuite) TestAddAuthorizationIgnoresErrors(c *C) {
451+ sess := &ClientSession{Log: cs.log}
452+ sess.AuthHelper = []string{"sh", "-c", "echo hello; false"}
453+
454+ c.Assert(sess.auth, Equals, "")
455+ err := sess.addAuthorization()
456+ c.Assert(err, IsNil)
457+ c.Check(sess.auth, Equals, "")
458+}
459+
460+func (cs *clientSessionSuite) TestAddAuthorizationSkipsIfUnsetOrNil(c *C) {
461+ sess := &ClientSession{Log: cs.log}
462+ sess.AuthHelper = nil
463+ c.Assert(sess.auth, Equals, "")
464+ err := sess.addAuthorization()
465+ c.Assert(err, IsNil)
466+ c.Check(sess.auth, Equals, "")
467+
468+ sess.AuthHelper = []string{}
469+ err = sess.addAuthorization()
470+ c.Assert(err, IsNil)
471+ c.Check(sess.auth, Equals, "")
472+}
473+
474+/****************************************************************
475 startConnectionAttempt()/nextHostToTry()/started tests
476 ****************************************************************/
477
478@@ -931,9 +967,10 @@
479
480 c.Check(takeNext(downCh), Equals, "deadline 0")
481 c.Check(takeNext(downCh), DeepEquals, protocol.ConnectMsg{
482- Type: "connect",
483- DeviceId: sess.DeviceId,
484- Levels: map[string]int64{},
485+ Type: "connect",
486+ DeviceId: sess.DeviceId,
487+ Levels: map[string]int64{},
488+ Authorization: "",
489 })
490 upCh <- errors.New("Overflow error in /dev/null")
491 err = <-errCh
492@@ -1038,6 +1075,7 @@
493 msg, ok := takeNext(downCh).(protocol.ConnectMsg)
494 c.Check(ok, Equals, true)
495 c.Check(msg.DeviceId, Equals, "wah")
496+ c.Check(msg.Authorization, Equals, "")
497 c.Check(msg.Info, DeepEquals, info)
498 upCh <- nil // no error
499 upCh <- protocol.ConnAckMsg{
500@@ -1054,6 +1092,22 @@
501 run() tests
502 ****************************************************************/
503
504+func (cs *clientSessionSuite) TestRunBailsIfAuthCheckFails(c *C) {
505+ sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
506+ c.Assert(err, IsNil)
507+ failure := errors.New("TestRunBailsIfAuthCheckFails")
508+ has_closed := false
509+ err = sess.run(
510+ func() { has_closed = true },
511+ func() error { return failure },
512+ nil,
513+ nil,
514+ nil,
515+ nil)
516+ c.Check(err, Equals, failure)
517+ c.Check(has_closed, Equals, true)
518+}
519+
520 func (cs *clientSessionSuite) TestRunBailsIfHostGetterFails(c *C) {
521 sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log)
522 c.Assert(err, IsNil)
523@@ -1061,6 +1115,7 @@
524 has_closed := false
525 err = sess.run(
526 func() { has_closed = true },
527+ func() error { return nil },
528 func() error { return failure },
529 nil,
530 nil,
531@@ -1076,6 +1131,7 @@
532 err = sess.run(
533 func() {},
534 func() error { return nil },
535+ func() error { return nil },
536 func() error { return failure },
537 nil,
538 nil)
539@@ -1090,6 +1146,7 @@
540 func() {},
541 func() error { return nil },
542 func() error { return nil },
543+ func() error { return nil },
544 func() error { return failure },
545 nil)
546 c.Check(err, Equals, failure)
547@@ -1109,6 +1166,7 @@
548 func() error { return nil },
549 func() error { return nil },
550 func() error { return nil },
551+ func() error { return nil },
552 func() error { sess.MsgCh <- notf; return <-failureCh })
553 c.Check(err, Equals, nil)
554 // if run doesn't error it sets up the channels
555
556=== modified file 'debian/changelog'
557--- debian/changelog 2014-04-23 11:54:00 +0000
558+++ debian/changelog 2014-05-12 14:11:46 +0000
559@@ -1,3 +1,9 @@
560+ubuntu-push (0.21-0.ubuntu1) UNRELEASED; urgency=medium
561+
562+ * New upstream release: first auth bits, and Qt dependency.
563+
564+ -- John Lenton <john.lenton@canonical.com> Tue, 15 Apr 2014 14:04:35 +0100
565+
566 ubuntu-push (0.2.1+14.04.20140423.1-0ubuntu1) trusty; urgency=high
567
568 [ Samuele Pedroni ]
569
570=== modified file 'debian/control'
571--- debian/control 2014-03-25 16:26:20 +0000
572+++ debian/control 2014-05-12 14:11:46 +0000
573@@ -14,6 +14,8 @@
574 libgcrypt11-dev,
575 libglib2.0-dev (>= 2.31.6),
576 libwhoopsie-dev,
577+ libubuntuoneauth-2.0-dev,
578+ cmake,
579 Standards-Version: 3.9.5
580 Homepage: http://launchpad.net/ubuntu-push
581 Vcs-Bzr: lp:ubuntu-push
582
583=== modified file 'debian/rules'
584--- debian/rules 2014-03-24 12:22:55 +0000
585+++ debian/rules 2014-05-12 14:11:46 +0000
586@@ -2,9 +2,13 @@
587 # -*- makefile -*-
588
589 export DH_GOPKG := launchpad.net/ubuntu-push
590-export DEB_BUILD_OPTIONS := nostrip
591 export UBUNTU_PUSH_TEST_RESOURCES_ROOT := $(CURDIR)
592
593+override_dh_auto_build:
594+ cd $$( find ./ -type d -regex '\./[^/]*/src/launchpad.net' -printf "%h\n" | head -n1)
595+ dh_auto_build --buildsystem=golang
596+ (cd signing-helper && cmake . && make)
597+
598 override_dh_install:
599 dh_install -Xusr/bin/cmd -Xusr/bin/dev --fail-missing
600
601
602=== modified file 'debian/ubuntu-push-client.install'
603--- debian/ubuntu-push-client.install 2014-03-26 16:27:19 +0000
604+++ debian/ubuntu-push-client.install 2014-05-12 14:11:46 +0000
605@@ -1,4 +1,5 @@
606 #!/usr/bin/dh-exec
607 debian/config.json /etc/xdg/ubuntu-push-client
608 debian/ubuntu-push-client.conf /usr/share/upstart/sessions
609+signing-helper/signing-helper /usr/lib/ubuntu-push-client
610 usr/bin/ubuntu-push => /usr/lib/ubuntu-push-client/ubuntu-push-client
611
612=== modified file 'dependencies.tsv'
613--- dependencies.tsv 2014-03-12 13:23:26 +0000
614+++ dependencies.tsv 2014-05-12 14:11:46 +0000
615@@ -2,3 +2,4 @@
616 launchpad.net/go-dbus/v1 bzr james@jamesh.id.au-20140206110213-pbzcr6ucaz3rqmnw 125
617 launchpad.net/go-xdg/v0 bzr john.lenton@canonical.com-20140208094800-gubd5md7cro3mtxa 10
618 launchpad.net/gocheck bzr gustavo@niemeyer.net-20140127131816-zshobk1qqme626xw 86
619+launchpad.net/~ubuntu-push-hackers/ubuntu-push/go-uuid bzr samuele.pedroni@canonical.com-20140130122455-pm9h8etl4owp90lg 1
620
621=== modified file 'protocol/messages.go'
622--- protocol/messages.go 2014-04-04 13:54:45 +0000
623+++ protocol/messages.go 2014-05-12 14:11:46 +0000
624@@ -54,6 +54,14 @@
625 Split() (done bool)
626 }
627
628+// OnewayMsg are messages that are not to be followed by a response,
629+// after sending them the session either aborts or continues.
630+type OnewayMsg interface {
631+ SplittableMsg
632+ // continue session after the message?
633+ OnewayContinue() bool
634+}
635+
636 // CONNBROKEN message, server side is breaking the connection for reason.
637 type ConnBrokenMsg struct {
638 Type string `json:"T"`
639@@ -65,11 +73,35 @@
640 return true
641 }
642
643+func (m *ConnBrokenMsg) OnewayContinue() bool {
644+ return false
645+}
646+
647 // CONNBROKEN reasons
648 const (
649 BrokenHostMismatch = "host-mismatch"
650 )
651
652+// CONNWARN message, server side is warning about partial functionality
653+// because reason.
654+type ConnWarnMsg struct {
655+ Type string `json:"T"`
656+ // reason
657+ Reason string
658+}
659+
660+func (m *ConnWarnMsg) Split() bool {
661+ return true
662+}
663+func (m *ConnWarnMsg) OnewayContinue() bool {
664+ return true
665+}
666+
667+// CONNWARN reasons
668+const (
669+ WarnUnauthorized = "unauthorized"
670+)
671+
672 // PING/PONG messages
673 type PingPongMsg struct {
674 Type string `json:"T"`
675@@ -122,6 +154,17 @@
676 Notifications []Notification
677 }
678
679+// Reset resets the splitting state if the message storage is to be
680+// reused.
681+func (m *NotificationsMsg) Reset() {
682+ // xxx
683+}
684+
685+func (m *NotificationsMsg) Split() bool {
686+ // xxx
687+ return true
688+}
689+
690 // A single unicast notification
691 type Notification struct {
692 AppId string `json:"A"`
693@@ -130,6 +173,19 @@
694 Payload json.RawMessage `json:"P"`
695 }
696
697+// ExtractPayloads gets only the payloads out of a slice of notications.
698+func ExtractPayloads(notifications []Notification) []json.RawMessage {
699+ n := len(notifications)
700+ if n == 0 {
701+ return nil
702+ }
703+ payloads := make([]json.RawMessage, n)
704+ for i := 0; i < n; i++ {
705+ payloads[i] = notifications[i].Payload
706+ }
707+ return payloads
708+}
709+
710 // ACKnowledgement message
711 type AckMsg struct {
712 Type string `json:"T"`
713
714=== modified file 'protocol/messages_test.go'
715--- protocol/messages_test.go 2014-04-04 13:19:10 +0000
716+++ protocol/messages_test.go 2014-05-12 14:11:46 +0000
717@@ -104,6 +104,22 @@
718 c.Check(b.splitting, Equals, 0)
719 }
720
721-func (s *messagesSuite) TestSplitConnBrokenMsg(c *C) {
722- c.Check((&ConnBrokenMsg{}).Split(), Equals, true)
723+func (s *messagesSuite) TestConnBrokenMsg(c *C) {
724+ m := &ConnBrokenMsg{}
725+ c.Check(m.Split(), Equals, true)
726+ c.Check(m.OnewayContinue(), Equals, false)
727+}
728+
729+func (s *messagesSuite) TestConnWarnMsg(c *C) {
730+ m := &ConnWarnMsg{}
731+ c.Check(m.Split(), Equals, true)
732+ c.Check(m.OnewayContinue(), Equals, true)
733+}
734+
735+func (s *messagesSuite) TestExtractPayloads(c *C) {
736+ c.Check(ExtractPayloads(nil), IsNil)
737+ p1 := json.RawMessage(`{"a":1}`)
738+ p2 := json.RawMessage(`{"b":2}`)
739+ ns := []Notification{Notification{Payload: p1}, Notification{Payload: p2}}
740+ c.Check(ExtractPayloads(ns), DeepEquals, []json.RawMessage{p1, p2})
741 }
742
743=== modified file 'protocol/state-diag-client.gv'
744--- protocol/state-diag-client.gv 2014-01-16 20:07:13 +0000
745+++ protocol/state-diag-client.gv 2014-05-12 14:11:46 +0000
746@@ -2,7 +2,7 @@
747 label = "State diagram for client";
748 size="12,6";
749 rankdir=LR;
750- node [shape = doublecircle]; pingTimeout;
751+ node [shape = doublecircle]; pingTimeout; connBroken;
752 node [shape = circle];
753 start1 -> start2 [ label = "Write wire version" ];
754 start2 -> start3 [ label = "Write CONNECT" ];
755@@ -13,4 +13,7 @@
756 broadcast -> loop [label = "Write ACK"];
757 loop -> pingTimeout [
758 label = "Elapsed ping interval + exchange interval"];
759+ loop -> connBroken [label = "Read CONNBROKEN"];
760+ loop -> warn [label = "Read CONNWARN"];
761+ warn -> loop;
762 }
763
764=== modified file 'protocol/state-diag-client.svg'
765--- protocol/state-diag-client.svg 2014-01-16 19:37:57 +0000
766+++ protocol/state-diag-client.svg 2014-05-12 14:11:46 +0000
767@@ -4,95 +4,123 @@
768 <!-- Generated by graphviz version 2.26.3 (20100126.1600)
769 -->
770 <!-- Title: state_diagram_client Pages: 1 -->
771-<svg width="864pt" height="279pt"
772- viewBox="0.00 0.00 864.00 278.89" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
773-<g id="graph1" class="graph" transform="scale(0.683544 0.683544) rotate(0) translate(4 404)">
774+<svg width="822pt" height="432pt"
775+ viewBox="0.00 0.00 822.36 432.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
776+<g id="graph1" class="graph" transform="scale(0.650602 0.650602) rotate(0) translate(4 660)">
777 <title>state_diagram_client</title>
778-<polygon fill="white" stroke="white" points="-4,5 -4,-404 1261,-404 1261,5 -4,5"/>
779+<polygon fill="white" stroke="white" points="-4,5 -4,-660 1261,-660 1261,5 -4,5"/>
780 <text text-anchor="middle" x="628" y="-9.4" font-family="Times Roman,serif" font-size="14.00">State diagram for client</text>
781 <!-- pingTimeout -->
782 <g id="node1" class="node"><title>pingTimeout</title>
783-<ellipse fill="none" stroke="black" cx="1180" cy="-324" rx="72.1249" ry="72.1249"/>
784-<ellipse fill="none" stroke="black" cx="1180" cy="-324" rx="76.1249" ry="76.1249"/>
785-<text text-anchor="middle" x="1180" y="-320.4" font-family="Times Roman,serif" font-size="14.00">pingTimeout</text>
786+<ellipse fill="none" stroke="black" cx="1180" cy="-580" rx="72.1249" ry="72.1249"/>
787+<ellipse fill="none" stroke="black" cx="1180" cy="-580" rx="76.1249" ry="76.1249"/>
788+<text text-anchor="middle" x="1180" y="-576.4" font-family="Times Roman,serif" font-size="14.00">pingTimeout</text>
789+</g>
790+<!-- connBroken -->
791+<g id="node2" class="node"><title>connBroken</title>
792+<ellipse fill="none" stroke="black" cx="1180" cy="-413" rx="68.8251" ry="69.2965"/>
793+<ellipse fill="none" stroke="black" cx="1180" cy="-413" rx="72.7978" ry="73.2965"/>
794+<text text-anchor="middle" x="1180" y="-409.4" font-family="Times Roman,serif" font-size="14.00">connBroken</text>
795 </g>
796 <!-- start1 -->
797-<g id="node2" class="node"><title>start1</title>
798-<ellipse fill="none" stroke="black" cx="42" cy="-166" rx="41.2167" ry="41.7193"/>
799-<text text-anchor="middle" x="42" y="-162.4" font-family="Times Roman,serif" font-size="14.00">start1</text>
800+<g id="node3" class="node"><title>start1</title>
801+<ellipse fill="none" stroke="black" cx="42" cy="-231" rx="41.2167" ry="41.7193"/>
802+<text text-anchor="middle" x="42" y="-227.4" font-family="Times Roman,serif" font-size="14.00">start1</text>
803 </g>
804 <!-- start2 -->
805-<g id="node4" class="node"><title>start2</title>
806-<ellipse fill="none" stroke="black" cx="292" cy="-166" rx="41.2167" ry="41.7193"/>
807-<text text-anchor="middle" x="292" y="-162.4" font-family="Times Roman,serif" font-size="14.00">start2</text>
808+<g id="node5" class="node"><title>start2</title>
809+<ellipse fill="none" stroke="black" cx="292" cy="-231" rx="41.2167" ry="41.7193"/>
810+<text text-anchor="middle" x="292" y="-227.4" font-family="Times Roman,serif" font-size="14.00">start2</text>
811 </g>
812 <!-- start1&#45;&gt;start2 -->
813 <g id="edge2" class="edge"><title>start1&#45;&gt;start2</title>
814-<path fill="none" stroke="black" d="M83.5631,-166C126.547,-166 193.757,-166 240.181,-166"/>
815-<polygon fill="black" stroke="black" points="240.338,-169.5 250.338,-166 240.338,-162.5 240.338,-169.5"/>
816-<text text-anchor="middle" x="167" y="-171.4" font-family="Times Roman,serif" font-size="14.00">Write wire version</text>
817+<path fill="none" stroke="black" d="M83.5631,-231C126.547,-231 193.757,-231 240.181,-231"/>
818+<polygon fill="black" stroke="black" points="240.338,-234.5 250.338,-231 240.338,-227.5 240.338,-234.5"/>
819+<text text-anchor="middle" x="167" y="-236.4" font-family="Times Roman,serif" font-size="14.00">Write wire version</text>
820 </g>
821 <!-- start3 -->
822-<g id="node6" class="node"><title>start3</title>
823-<ellipse fill="none" stroke="black" cx="526" cy="-166" rx="41.2167" ry="41.7193"/>
824-<text text-anchor="middle" x="526" y="-162.4" font-family="Times Roman,serif" font-size="14.00">start3</text>
825+<g id="node7" class="node"><title>start3</title>
826+<ellipse fill="none" stroke="black" cx="526" cy="-231" rx="41.2167" ry="41.7193"/>
827+<text text-anchor="middle" x="526" y="-227.4" font-family="Times Roman,serif" font-size="14.00">start3</text>
828 </g>
829 <!-- start2&#45;&gt;start3 -->
830 <g id="edge4" class="edge"><title>start2&#45;&gt;start3</title>
831-<path fill="none" stroke="black" d="M333.565,-166C372.875,-166 431.992,-166 474.321,-166"/>
832-<polygon fill="black" stroke="black" points="474.429,-169.5 484.429,-166 474.429,-162.5 474.429,-169.5"/>
833-<text text-anchor="middle" x="409" y="-171.4" font-family="Times Roman,serif" font-size="14.00">Write CONNECT</text>
834+<path fill="none" stroke="black" d="M333.565,-231C372.875,-231 431.992,-231 474.321,-231"/>
835+<polygon fill="black" stroke="black" points="474.429,-234.5 484.429,-231 474.429,-227.5 474.429,-234.5"/>
836+<text text-anchor="middle" x="409" y="-236.4" font-family="Times Roman,serif" font-size="14.00">Write CONNECT</text>
837 </g>
838 <!-- loop -->
839-<g id="node8" class="node"><title>loop</title>
840-<ellipse fill="none" stroke="black" cx="746" cy="-166" rx="31.8198" ry="31.8198"/>
841-<text text-anchor="middle" x="746" y="-162.4" font-family="Times Roman,serif" font-size="14.00">loop</text>
842+<g id="node9" class="node"><title>loop</title>
843+<ellipse fill="none" stroke="black" cx="746" cy="-231" rx="31.8198" ry="31.8198"/>
844+<text text-anchor="middle" x="746" y="-227.4" font-family="Times Roman,serif" font-size="14.00">loop</text>
845 </g>
846 <!-- start3&#45;&gt;loop -->
847 <g id="edge6" class="edge"><title>start3&#45;&gt;loop</title>
848-<path fill="none" stroke="black" d="M567.639,-166C606.633,-166 664.616,-166 703.793,-166"/>
849-<polygon fill="black" stroke="black" points="703.818,-169.5 713.818,-166 703.818,-162.5 703.818,-169.5"/>
850-<text text-anchor="middle" x="641" y="-171.4" font-family="Times Roman,serif" font-size="14.00">Read CONNACK</text>
851+<path fill="none" stroke="black" d="M567.639,-231C606.633,-231 664.616,-231 703.793,-231"/>
852+<polygon fill="black" stroke="black" points="703.818,-234.5 713.818,-231 703.818,-227.5 703.818,-234.5"/>
853+<text text-anchor="middle" x="641" y="-236.4" font-family="Times Roman,serif" font-size="14.00">Read CONNACK</text>
854 </g>
855 <!-- loop&#45;&gt;pingTimeout -->
856 <g id="edge16" class="edge"><title>loop&#45;&gt;pingTimeout</title>
857-<path fill="none" stroke="black" d="M763.666,-192.937C772.211,-204.042 783.361,-216.128 796,-224 888.06,-281.339 1012.12,-305.973 1094,-316.443"/>
858-<polygon fill="black" stroke="black" points="1093.67,-319.928 1104.02,-317.68 1094.53,-312.981 1093.67,-319.928"/>
859-<text text-anchor="middle" x="941" y="-319.4" font-family="Times Roman,serif" font-size="14.00">Elapsed ping interval + exchange interval</text>
860+<path fill="none" stroke="black" d="M750.211,-262.971C757.458,-313.528 773.689,-408.79 796,-434 872.806,-520.784 1006.81,-556.22 1094.46,-570.528"/>
861+<polygon fill="black" stroke="black" points="1093.96,-573.992 1104.39,-572.09 1095.05,-567.078 1093.96,-573.992"/>
862+<text text-anchor="middle" x="941" y="-572.4" font-family="Times Roman,serif" font-size="14.00">Elapsed ping interval + exchange interval</text>
863+</g>
864+<!-- loop&#45;&gt;connBroken -->
865+<g id="edge18" class="edge"><title>loop&#45;&gt;connBroken</title>
866+<path fill="none" stroke="black" d="M755.1,-261.824C762.755,-282.438 775.756,-308.526 796,-324 883.382,-390.791 1012.39,-408.797 1096.33,-412.948"/>
867+<polygon fill="black" stroke="black" points="1096.19,-416.445 1106.33,-413.388 1096.5,-409.452 1096.19,-416.445"/>
868+<text text-anchor="middle" x="941" y="-417.4" font-family="Times Roman,serif" font-size="14.00">Read CONNBROKEN</text>
869 </g>
870 <!-- pong -->
871-<g id="node10" class="node"><title>pong</title>
872-<ellipse fill="none" stroke="black" cx="1180" cy="-195" rx="34.8574" ry="35.3553"/>
873-<text text-anchor="middle" x="1180" y="-191.4" font-family="Times Roman,serif" font-size="14.00">pong</text>
874+<g id="node11" class="node"><title>pong</title>
875+<ellipse fill="none" stroke="black" cx="1180" cy="-287" rx="34.8574" ry="35.3553"/>
876+<text text-anchor="middle" x="1180" y="-283.4" font-family="Times Roman,serif" font-size="14.00">pong</text>
877 </g>
878 <!-- loop&#45;&gt;pong -->
879 <g id="edge8" class="edge"><title>loop&#45;&gt;pong</title>
880-<path fill="none" stroke="black" d="M775.392,-179.044C782.046,-181.465 789.167,-183.653 796,-185 916.362,-208.722 1062.02,-203.515 1134.48,-198.706"/>
881-<polygon fill="black" stroke="black" points="1134.89,-202.186 1144.62,-198.003 1134.4,-195.203 1134.89,-202.186"/>
882-<text text-anchor="middle" x="941" y="-207.4" font-family="Times Roman,serif" font-size="14.00">Read PING</text>
883+<path fill="none" stroke="black" d="M768.467,-253.959C776.476,-260.698 786.005,-267.259 796,-271 911.696,-314.31 1060.9,-303.343 1134.62,-293.955"/>
884+<polygon fill="black" stroke="black" points="1135.49,-297.371 1144.94,-292.588 1134.57,-290.432 1135.49,-297.371"/>
885+<text text-anchor="middle" x="941" y="-307.4" font-family="Times Roman,serif" font-size="14.00">Read PING</text>
886 </g>
887 <!-- broadcast -->
888-<g id="node12" class="node"><title>broadcast</title>
889-<ellipse fill="none" stroke="black" cx="1180" cy="-84" rx="58.1882" ry="58.6899"/>
890-<text text-anchor="middle" x="1180" y="-80.4" font-family="Times Roman,serif" font-size="14.00">broadcast</text>
891+<g id="node13" class="node"><title>broadcast</title>
892+<ellipse fill="none" stroke="black" cx="1180" cy="-176" rx="58.1882" ry="58.6899"/>
893+<text text-anchor="middle" x="1180" y="-172.4" font-family="Times Roman,serif" font-size="14.00">broadcast</text>
894 </g>
895 <!-- loop&#45;&gt;broadcast -->
896 <g id="edge10" class="edge"><title>loop&#45;&gt;broadcast</title>
897-<path fill="none" stroke="black" d="M770.52,-145.1C778.217,-139.607 787.053,-134.301 796,-131 917.482,-86.1746 957.924,-122.075 1086,-103 1094.61,-101.717 1103.63,-100.165 1112.53,-98.5074"/>
898-<polygon fill="black" stroke="black" points="1113.34,-101.917 1122.5,-96.5998 1112.02,-95.0419 1113.34,-101.917"/>
899-<text text-anchor="middle" x="941" y="-136.4" font-family="Times Roman,serif" font-size="14.00">Read BROADCAST</text>
900+<path fill="none" stroke="black" d="M775.45,-218.228C782.1,-215.791 789.205,-213.528 796,-212 922.145,-183.64 957.464,-202.973 1086,-189 1094.36,-188.091 1103.12,-187.028 1111.79,-185.909"/>
901+<polygon fill="black" stroke="black" points="1112.44,-189.353 1121.9,-184.574 1111.53,-182.413 1112.44,-189.353"/>
902+<text text-anchor="middle" x="941" y="-217.4" font-family="Times Roman,serif" font-size="14.00">Read BROADCAST</text>
903+</g>
904+<!-- warn -->
905+<g id="node19" class="node"><title>warn</title>
906+<ellipse fill="none" stroke="black" cx="1180" cy="-63" rx="36.7696" ry="36.7696"/>
907+<text text-anchor="middle" x="1180" y="-59.4" font-family="Times Roman,serif" font-size="14.00">warn</text>
908+</g>
909+<!-- loop&#45;&gt;warn -->
910+<g id="edge20" class="edge"><title>loop&#45;&gt;warn</title>
911+<path fill="none" stroke="black" d="M753.357,-199.767C760.401,-177.027 773.396,-147.441 796,-131 901.425,-54.3166 958.242,-112.935 1086,-87 1101.84,-83.7841 1119.02,-79.6061 1134.3,-75.6396"/>
912+<polygon fill="black" stroke="black" points="1135.26,-79.0068 1144.04,-73.0757 1133.48,-72.2376 1135.26,-79.0068"/>
913+<text text-anchor="middle" x="941" y="-136.4" font-family="Times Roman,serif" font-size="14.00">Read CONNWARN</text>
914 </g>
915 <!-- pong&#45;&gt;loop -->
916 <g id="edge12" class="edge"><title>pong&#45;&gt;loop</title>
917-<path fill="none" stroke="black" d="M1147.19,-180.867C1129.44,-173.986 1106.92,-166.463 1086,-163 980.081,-145.465 853.051,-154.36 788.368,-160.981"/>
918-<polygon fill="black" stroke="black" points="787.736,-157.528 778.16,-162.06 788.472,-164.489 787.736,-157.528"/>
919-<text text-anchor="middle" x="941" y="-168.4" font-family="Times Roman,serif" font-size="14.00">Write PONG</text>
920+<path fill="none" stroke="black" d="M1148.22,-271.079C1130.39,-262.942 1107.48,-253.77 1086,-249 1030.54,-236.684 866.695,-232.715 788.482,-231.502"/>
921+<polygon fill="black" stroke="black" points="788.085,-227.996 778.035,-231.348 787.982,-234.995 788.085,-227.996"/>
922+<text text-anchor="middle" x="941" y="-254.4" font-family="Times Roman,serif" font-size="14.00">Write PONG</text>
923 </g>
924 <!-- broadcast&#45;&gt;loop -->
925 <g id="edge14" class="edge"><title>broadcast&#45;&gt;loop</title>
926-<path fill="none" stroke="black" d="M1123.8,-67.0114C1044.83,-46.6166 899.156,-22.0001 796,-81 778.946,-90.7538 767.135,-108.842 759.293,-125.833"/>
927-<polygon fill="black" stroke="black" points="756.044,-124.528 755.336,-135.099 762.482,-127.277 756.044,-124.528"/>
928-<text text-anchor="middle" x="941" y="-86.4" font-family="Times Roman,serif" font-size="14.00">Write ACK</text>
929+<path fill="none" stroke="black" d="M1121.7,-168.205C1028.72,-156.837 851.665,-139.849 796,-167 784,-172.853 774.037,-183.132 766.245,-193.762"/>
930+<polygon fill="black" stroke="black" points="763.182,-192.043 760.465,-202.284 768.975,-195.973 763.182,-192.043"/>
931+<text text-anchor="middle" x="941" y="-172.4" font-family="Times Roman,serif" font-size="14.00">Write ACK</text>
932+</g>
933+<!-- warn&#45;&gt;loop -->
934+<g id="edge22" class="edge"><title>warn&#45;&gt;loop</title>
935+<path fill="none" stroke="black" d="M1144.07,-53.3553C1070.4,-35.8873 900.397,-7.71825 796,-87 779.313,-99.6722 764.14,-151.763 754.991,-189.659"/>
936+<polygon fill="black" stroke="black" points="751.574,-188.904 752.686,-199.44 758.387,-190.51 751.574,-188.904"/>
937 </g>
938 </g>
939 </svg>
940
941=== modified file 'protocol/state-diag-session.gv'
942--- protocol/state-diag-session.gv 2014-01-16 20:07:13 +0000
943+++ protocol/state-diag-session.gv 2014-05-12 14:11:46 +0000
944@@ -2,6 +2,7 @@
945 label = "State diagram for session";
946 size="12,6";
947 rankdir=LR;
948+ node [shape = doublecircle]; stop;
949 node [shape = circle];
950 start1 -> start2 [ label = "Read wire version" ];
951 start2 -> start3 [ label = "Read CONNECT" ];
952@@ -17,4 +18,13 @@
953 split_broadcast -> split_ack_wait [label = "Write split BROADCAST"];
954 split_ack_wait -> split_broadcast [label = "Read ACK"];
955 split_broadcast -> loop [label = "All split msgs written"];
956+ // other
957+ loop -> conn_broken [label = "Receive connbroken request"];
958+ loop -> conn_warn [label = "Receive connwarn request"];
959+ conn_broken -> stop [label = "Write CONNBROKEN"];
960+ conn_warn -> loop [label = "Write CONNWARN"];
961+ // timeouts
962+ ack_wait -> stop [label = "Elapsed exhange timeout"];
963+ split_ack_wait -> stop [label = "Elapsed exhange timeout"];
964+ pong_wait -> stop [label = "Elapsed exhange timeout"];
965 }
966
967=== modified file 'protocol/state-diag-session.svg'
968--- protocol/state-diag-session.svg 2014-01-16 19:37:57 +0000
969+++ protocol/state-diag-session.svg 2014-05-12 14:11:46 +0000
970@@ -4,139 +4,197 @@
971 <!-- Generated by graphviz version 2.26.3 (20100126.1600)
972 -->
973 <!-- Title: state_diagram_session Pages: 1 -->
974-<svg width="864pt" height="208pt"
975- viewBox="0.00 0.00 864.00 207.94" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
976-<g id="graph1" class="graph" transform="scale(0.435923 0.435923) rotate(0) translate(4 473)">
977+<svg width="864pt" height="266pt"
978+ viewBox="0.00 0.00 864.00 265.73" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
979+<g id="graph1" class="graph" transform="scale(0.367035 0.367035) rotate(0) translate(4 720)">
980 <title>state_diagram_session</title>
981-<polygon fill="white" stroke="white" points="-4,5 -4,-473 1979,-473 1979,5 -4,5"/>
982-<text text-anchor="middle" x="987" y="-9.4" font-family="Times Roman,serif" font-size="14.00">State diagram for session</text>
983+<polygon fill="white" stroke="white" points="-4,5 -4,-720 2351,-720 2351,5 -4,5"/>
984+<text text-anchor="middle" x="1173" y="-9.4" font-family="Times Roman,serif" font-size="14.00">State diagram for session</text>
985+<!-- stop -->
986+<g id="node1" class="node"><title>stop</title>
987+<ellipse fill="none" stroke="black" cx="2309" cy="-335" rx="32.0813" ry="32.5269"/>
988+<ellipse fill="none" stroke="black" cx="2309" cy="-335" rx="36.0265" ry="36.5269"/>
989+<text text-anchor="middle" x="2309" y="-331.4" font-family="Times Roman,serif" font-size="14.00">stop</text>
990+</g>
991 <!-- start1 -->
992-<g id="node1" class="node"><title>start1</title>
993-<ellipse fill="none" stroke="black" cx="42" cy="-294" rx="41.2167" ry="41.7193"/>
994-<text text-anchor="middle" x="42" y="-290.4" font-family="Times Roman,serif" font-size="14.00">start1</text>
995+<g id="node2" class="node"><title>start1</title>
996+<ellipse fill="none" stroke="black" cx="42" cy="-395" rx="41.2167" ry="41.7193"/>
997+<text text-anchor="middle" x="42" y="-391.4" font-family="Times Roman,serif" font-size="14.00">start1</text>
998 </g>
999 <!-- start2 -->
1000-<g id="node3" class="node"><title>start2</title>
1001-<ellipse fill="none" stroke="black" cx="286" cy="-294" rx="41.2167" ry="41.7193"/>
1002-<text text-anchor="middle" x="286" y="-290.4" font-family="Times Roman,serif" font-size="14.00">start2</text>
1003+<g id="node4" class="node"><title>start2</title>
1004+<ellipse fill="none" stroke="black" cx="286" cy="-395" rx="41.2167" ry="41.7193"/>
1005+<text text-anchor="middle" x="286" y="-391.4" font-family="Times Roman,serif" font-size="14.00">start2</text>
1006 </g>
1007 <!-- start1&#45;&gt;start2 -->
1008 <g id="edge2" class="edge"><title>start1&#45;&gt;start2</title>
1009-<path fill="none" stroke="black" d="M83.6679,-294C125.213,-294 189.13,-294 233.981,-294"/>
1010-<polygon fill="black" stroke="black" points="234.096,-297.5 244.096,-294 234.096,-290.5 234.096,-297.5"/>
1011-<text text-anchor="middle" x="164" y="-299.4" font-family="Times Roman,serif" font-size="14.00">Read wire version</text>
1012+<path fill="none" stroke="black" d="M83.6679,-395C125.213,-395 189.13,-395 233.981,-395"/>
1013+<polygon fill="black" stroke="black" points="234.096,-398.5 244.096,-395 234.096,-391.5 234.096,-398.5"/>
1014+<text text-anchor="middle" x="164" y="-400.4" font-family="Times Roman,serif" font-size="14.00">Read wire version</text>
1015 </g>
1016 <!-- start3 -->
1017-<g id="node5" class="node"><title>start3</title>
1018-<ellipse fill="none" stroke="black" cx="516" cy="-294" rx="41.2167" ry="41.7193"/>
1019-<text text-anchor="middle" x="516" y="-290.4" font-family="Times Roman,serif" font-size="14.00">start3</text>
1020+<g id="node6" class="node"><title>start3</title>
1021+<ellipse fill="none" stroke="black" cx="537" cy="-395" rx="41.2167" ry="41.7193"/>
1022+<text text-anchor="middle" x="537" y="-391.4" font-family="Times Roman,serif" font-size="14.00">start3</text>
1023 </g>
1024 <!-- start2&#45;&gt;start3 -->
1025 <g id="edge4" class="edge"><title>start2&#45;&gt;start3</title>
1026-<path fill="none" stroke="black" d="M327.651,-294C365.959,-294 422.903,-294 464.145,-294"/>
1027-<polygon fill="black" stroke="black" points="464.271,-297.5 474.271,-294 464.271,-290.5 464.271,-297.5"/>
1028-<text text-anchor="middle" x="401" y="-299.4" font-family="Times Roman,serif" font-size="14.00">Read CONNECT</text>
1029+<path fill="none" stroke="black" d="M327.729,-395C370.886,-395 438.364,-395 484.973,-395"/>
1030+<polygon fill="black" stroke="black" points="485.171,-398.5 495.171,-395 485.171,-391.5 485.171,-398.5"/>
1031+<text text-anchor="middle" x="401" y="-400.4" font-family="Times Roman,serif" font-size="14.00">Read CONNECT</text>
1032 </g>
1033 <!-- loop -->
1034-<g id="node7" class="node"><title>loop</title>
1035-<ellipse fill="none" stroke="black" cx="740" cy="-294" rx="31.8198" ry="31.8198"/>
1036-<text text-anchor="middle" x="740" y="-290.4" font-family="Times Roman,serif" font-size="14.00">loop</text>
1037+<g id="node8" class="node"><title>loop</title>
1038+<ellipse fill="none" stroke="black" cx="790" cy="-395" rx="31.8198" ry="31.8198"/>
1039+<text text-anchor="middle" x="790" y="-391.4" font-family="Times Roman,serif" font-size="14.00">loop</text>
1040 </g>
1041 <!-- start3&#45;&gt;loop -->
1042 <g id="edge6" class="edge"><title>start3&#45;&gt;loop</title>
1043-<path fill="none" stroke="black" d="M557.608,-294C597.53,-294 657.517,-294 697.677,-294"/>
1044-<polygon fill="black" stroke="black" points="697.687,-297.5 707.687,-294 697.687,-290.5 697.687,-297.5"/>
1045-<text text-anchor="middle" x="633" y="-299.4" font-family="Times Roman,serif" font-size="14.00">Write CONNACK</text>
1046+<path fill="none" stroke="black" d="M578.778,-395C625.49,-395 700.728,-395 747.665,-395"/>
1047+<polygon fill="black" stroke="black" points="747.805,-398.5 757.805,-395 747.805,-391.5 747.805,-398.5"/>
1048+<text text-anchor="middle" x="675" y="-400.4" font-family="Times Roman,serif" font-size="14.00">Write CONNACK</text>
1049 </g>
1050 <!-- ping -->
1051-<g id="node9" class="node"><title>ping</title>
1052-<ellipse fill="none" stroke="black" cx="1063" cy="-416" rx="32.0265" ry="32.5269"/>
1053-<text text-anchor="middle" x="1063" y="-412.4" font-family="Times Roman,serif" font-size="14.00">ping</text>
1054+<g id="node10" class="node"><title>ping</title>
1055+<ellipse fill="none" stroke="black" cx="1135" cy="-593" rx="32.0265" ry="32.5269"/>
1056+<text text-anchor="middle" x="1135" y="-589.4" font-family="Times Roman,serif" font-size="14.00">ping</text>
1057 </g>
1058 <!-- loop&#45;&gt;ping -->
1059 <g id="edge8" class="edge"><title>loop&#45;&gt;ping</title>
1060-<path fill="none" stroke="black" d="M754.564,-322.853C763.046,-336.78 775.035,-352.491 790,-362 861.597,-407.491 963.396,-415.983 1020.29,-416.829"/>
1061-<polygon fill="black" stroke="black" points="1020.35,-420.33 1030.38,-416.906 1020.4,-413.33 1020.35,-420.33"/>
1062-<text text-anchor="middle" x="881" y="-418.4" font-family="Times Roman,serif" font-size="14.00">Elapsed ping interval</text>
1063+<path fill="none" stroke="black" d="M800.39,-425.317C809.609,-448.006 825.187,-478.237 848,-497 920.691,-556.785 1032.18,-579.907 1092.58,-588.403"/>
1064+<polygon fill="black" stroke="black" points="1092.15,-591.877 1102.53,-589.734 1093.08,-584.939 1092.15,-591.877"/>
1065+<text text-anchor="middle" x="946" y="-583.4" font-family="Times Roman,serif" font-size="14.00">Elapsed ping interval</text>
1066 </g>
1067 <!-- broadcast -->
1068-<g id="node11" class="node"><title>broadcast</title>
1069-<ellipse fill="none" stroke="black" cx="1063" cy="-200" rx="58.1882" ry="58.6899"/>
1070-<text text-anchor="middle" x="1063" y="-196.4" font-family="Times Roman,serif" font-size="14.00">broadcast</text>
1071+<g id="node12" class="node"><title>broadcast</title>
1072+<ellipse fill="none" stroke="black" cx="1135" cy="-281" rx="58.1882" ry="58.6899"/>
1073+<text text-anchor="middle" x="1135" y="-277.4" font-family="Times Roman,serif" font-size="14.00">broadcast</text>
1074 </g>
1075 <!-- loop&#45;&gt;broadcast -->
1076 <g id="edge10" class="edge"><title>loop&#45;&gt;broadcast</title>
1077-<path fill="none" stroke="black" d="M766.046,-274.934C773.498,-270.155 781.824,-265.421 790,-262 856.828,-234.035 938.382,-217.617 994.86,-208.779"/>
1078-<polygon fill="black" stroke="black" points="995.396,-212.238 1004.75,-207.269 994.34,-205.318 995.396,-212.238"/>
1079-<text text-anchor="middle" x="881" y="-267.4" font-family="Times Roman,serif" font-size="14.00">Receive broadcast request</text>
1080+<path fill="none" stroke="black" d="M811.332,-370.953C821.492,-360.892 834.388,-349.946 848,-343 917.32,-307.628 1006.03,-292.395 1066.35,-285.86"/>
1081+<polygon fill="black" stroke="black" points="1066.94,-289.319 1076.53,-284.811 1066.22,-282.355 1066.94,-289.319"/>
1082+<text text-anchor="middle" x="946" y="-348.4" font-family="Times Roman,serif" font-size="14.00">Receive broadcast request</text>
1083+</g>
1084+<!-- conn_broken -->
1085+<g id="node26" class="node"><title>conn_broken</title>
1086+<ellipse fill="none" stroke="black" cx="1361" cy="-99" rx="73.0388" ry="73.5391"/>
1087+<text text-anchor="middle" x="1361" y="-95.4" font-family="Times Roman,serif" font-size="14.00">conn_broken</text>
1088+</g>
1089+<!-- loop&#45;&gt;conn_broken -->
1090+<g id="edge28" class="edge"><title>loop&#45;&gt;conn_broken</title>
1091+<path fill="none" stroke="black" d="M793.216,-363.054C799.833,-304.219 817.014,-182.243 848,-155 967.196,-50.2026 1167.08,-63.6291 1278.91,-81.8408"/>
1092+<polygon fill="black" stroke="black" points="1278.34,-85.2954 1288.79,-83.4998 1279.5,-78.392 1278.34,-85.2954"/>
1093+<text text-anchor="middle" x="946" y="-160.4" font-family="Times Roman,serif" font-size="14.00">Receive connbroken request</text>
1094+</g>
1095+<!-- conn_warn -->
1096+<g id="node28" class="node"><title>conn_warn</title>
1097+<ellipse fill="none" stroke="black" cx="1135" cy="-477" rx="65.7609" ry="65.7609"/>
1098+<text text-anchor="middle" x="1135" y="-473.4" font-family="Times Roman,serif" font-size="14.00">conn_warn</text>
1099+</g>
1100+<!-- loop&#45;&gt;conn_warn -->
1101+<g id="edge30" class="edge"><title>loop&#45;&gt;conn_warn</title>
1102+<path fill="none" stroke="black" d="M814.092,-416.512C823.957,-424.185 835.89,-432.126 848,-437 915.942,-464.343 999.421,-473.523 1058.8,-476.355"/>
1103+<polygon fill="black" stroke="black" points="1058.71,-479.855 1068.85,-476.786 1059.01,-472.861 1058.71,-479.855"/>
1104+<text text-anchor="middle" x="946" y="-480.4" font-family="Times Roman,serif" font-size="14.00">Receive connwarn request</text>
1105 </g>
1106 <!-- pong_wait -->
1107-<g id="node13" class="node"><title>pong_wait</title>
1108-<ellipse fill="none" stroke="black" cx="1526" cy="-406" rx="62.9325" ry="62.9325"/>
1109-<text text-anchor="middle" x="1526" y="-402.4" font-family="Times Roman,serif" font-size="14.00">pong_wait</text>
1110+<g id="node14" class="node"><title>pong_wait</title>
1111+<ellipse fill="none" stroke="black" cx="537" cy="-653" rx="62.9325" ry="62.9325"/>
1112+<text text-anchor="middle" x="537" y="-649.4" font-family="Times Roman,serif" font-size="14.00">pong_wait</text>
1113 </g>
1114 <!-- ping&#45;&gt;pong_wait -->
1115 <g id="edge12" class="edge"><title>ping&#45;&gt;pong_wait</title>
1116-<path fill="none" stroke="black" d="M1095.56,-415.297C1169.19,-413.707 1350.04,-409.8 1452.36,-407.591"/>
1117-<polygon fill="black" stroke="black" points="1452.69,-411.084 1462.61,-407.369 1452.54,-404.086 1452.69,-411.084"/>
1118-<text text-anchor="middle" x="1289" y="-418.4" font-family="Times Roman,serif" font-size="14.00">Write PING</text>
1119+<path fill="none" stroke="black" d="M1103.4,-600.831C1035.81,-617.134 871.913,-654.261 732,-667 681.542,-671.594 668.481,-671.33 618,-667 615.134,-666.754 612.217,-666.46 609.275,-666.127"/>
1120+<polygon fill="black" stroke="black" points="609.573,-662.637 599.214,-664.858 608.697,-669.582 609.573,-662.637"/>
1121+<text text-anchor="middle" x="790" y="-670.4" font-family="Times Roman,serif" font-size="14.00">Write PING</text>
1122 </g>
1123 <!-- ack_wait -->
1124-<g id="node15" class="node"><title>ack_wait</title>
1125-<ellipse fill="none" stroke="black" cx="1526" cy="-269" rx="55.1543" ry="55.1543"/>
1126-<text text-anchor="middle" x="1526" y="-265.4" font-family="Times Roman,serif" font-size="14.00">ack_wait</text>
1127+<g id="node16" class="node"><title>ack_wait</title>
1128+<ellipse fill="none" stroke="black" cx="1598" cy="-373" rx="55.1543" ry="55.1543"/>
1129+<text text-anchor="middle" x="1598" y="-369.4" font-family="Times Roman,serif" font-size="14.00">ack_wait</text>
1130 </g>
1131 <!-- broadcast&#45;&gt;ack_wait -->
1132 <g id="edge14" class="edge"><title>broadcast&#45;&gt;ack_wait</title>
1133-<path fill="none" stroke="black" d="M1121.17,-208.669C1207.93,-221.599 1370.7,-245.856 1461.17,-259.339"/>
1134-<polygon fill="black" stroke="black" points="1460.9,-262.837 1471.3,-260.849 1461.93,-255.913 1460.9,-262.837"/>
1135-<text text-anchor="middle" x="1289" y="-257.4" font-family="Times Roman,serif" font-size="14.00">Write BROADCAST [fits one wire msg]</text>
1136+<path fill="none" stroke="black" d="M1193.25,-288.88C1264.95,-299.067 1390.23,-318.45 1496,-343 1508.9,-345.993 1522.57,-349.664 1535.58,-353.397"/>
1137+<polygon fill="black" stroke="black" points="1534.79,-356.813 1545.37,-356.254 1536.75,-350.093 1534.79,-356.813"/>
1138+<text text-anchor="middle" x="1361" y="-348.4" font-family="Times Roman,serif" font-size="14.00">Write BROADCAST [fits one wire msg]</text>
1139 </g>
1140 <!-- split_broadcast -->
1141-<g id="node17" class="node"><title>split_broadcast</title>
1142-<ellipse fill="none" stroke="black" cx="1526" cy="-110" rx="84.1457" ry="84.1457"/>
1143-<text text-anchor="middle" x="1526" y="-106.4" font-family="Times Roman,serif" font-size="14.00">split_broadcast</text>
1144+<g id="node18" class="node"><title>split_broadcast</title>
1145+<ellipse fill="none" stroke="black" cx="1598" cy="-216" rx="84.1457" ry="84.1457"/>
1146+<text text-anchor="middle" x="1598" y="-212.4" font-family="Times Roman,serif" font-size="14.00">split_broadcast</text>
1147 </g>
1148 <!-- broadcast&#45;&gt;split_broadcast -->
1149 <g id="edge16" class="edge"><title>broadcast&#45;&gt;split_broadcast</title>
1150-<path fill="none" stroke="black" d="M1120.7,-188.783C1199.06,-173.553 1340.01,-146.154 1433.29,-128.021"/>
1151-<polygon fill="black" stroke="black" points="1434.15,-131.421 1443.29,-126.077 1432.81,-124.549 1434.15,-131.421"/>
1152-<text text-anchor="middle" x="1289" y="-185.4" font-family="Times Roman,serif" font-size="14.00">BROADCAST does not fit one wire msg</text>
1153+<path fill="none" stroke="black" d="M1193.17,-272.834C1271.44,-261.846 1411.56,-242.174 1504.66,-229.104"/>
1154+<polygon fill="black" stroke="black" points="1505.23,-232.558 1514.65,-227.702 1504.26,-225.626 1505.23,-232.558"/>
1155+<text text-anchor="middle" x="1361" y="-272.4" font-family="Times Roman,serif" font-size="14.00">BROADCAST does not fit one wire msg</text>
1156+</g>
1157+<!-- pong_wait&#45;&gt;stop -->
1158+<g id="edge40" class="edge"><title>pong_wait&#45;&gt;stop</title>
1159+<path fill="none" stroke="black" d="M600.164,-653C651.344,-653 725.322,-653 790,-653 790,-653 790,-653 1972,-653 2131.11,-653 2245.53,-463.168 2289.33,-376.844"/>
1160+<polygon fill="black" stroke="black" points="2292.5,-378.343 2293.84,-367.834 2286.24,-375.212 2292.5,-378.343"/>
1161+<text text-anchor="middle" x="1361" y="-658.4" font-family="Times Roman,serif" font-size="14.00">Elapsed exhange timeout</text>
1162 </g>
1163 <!-- pong_wait&#45;&gt;loop -->
1164 <g id="edge18" class="edge"><title>pong_wait&#45;&gt;loop</title>
1165-<path fill="none" stroke="black" d="M1463.29,-398.59C1336,-383.27 1038.34,-346.004 790,-304 787.177,-303.523 784.269,-303.006 781.343,-302.468"/>
1166-<polygon fill="black" stroke="black" points="781.898,-299.011 771.42,-300.582 780.59,-305.888 781.898,-299.011"/>
1167-<text text-anchor="middle" x="1063" y="-359.4" font-family="Times Roman,serif" font-size="14.00">Read PONG</text>
1168+<path fill="none" stroke="black" d="M581.359,-607.765C632.774,-555.333 716.085,-470.376 760.273,-425.314"/>
1169+<polygon fill="black" stroke="black" points="762.774,-427.763 767.277,-418.172 757.776,-422.862 762.774,-427.763"/>
1170+<text text-anchor="middle" x="675" y="-574.4" font-family="Times Roman,serif" font-size="14.00">Read PONG</text>
1171+</g>
1172+<!-- ack_wait&#45;&gt;stop -->
1173+<g id="edge36" class="edge"><title>ack_wait&#45;&gt;stop</title>
1174+<path fill="none" stroke="black" d="M1653.02,-372.757C1765.96,-371.776 2032.03,-366.986 2254,-344 2256.89,-343.701 2259.85,-343.348 2262.84,-342.959"/>
1175+<polygon fill="black" stroke="black" points="2263.58,-346.389 2272.99,-341.517 2262.59,-339.459 2263.58,-346.389"/>
1176+<text text-anchor="middle" x="1972" y="-373.4" font-family="Times Roman,serif" font-size="14.00">Elapsed exhange timeout</text>
1177 </g>
1178 <!-- ack_wait&#45;&gt;loop -->
1179 <g id="edge20" class="edge"><title>ack_wait&#45;&gt;loop</title>
1180-<path fill="none" stroke="black" d="M1470.96,-271.898C1455.75,-272.644 1439.24,-273.404 1424,-274 1181.02,-283.507 889.323,-290.597 782.144,-293.057"/>
1181-<polygon fill="black" stroke="black" points="781.977,-289.56 772.059,-293.288 782.136,-296.558 781.977,-289.56"/>
1182-<text text-anchor="middle" x="1063" y="-292.4" font-family="Times Roman,serif" font-size="14.00">Read ACK</text>
1183+<path fill="none" stroke="black" d="M1542.83,-374.081C1445.66,-375.995 1237.64,-380.146 1062,-384 966.886,-386.087 942.947,-382.989 848,-389 842.829,-389.327 837.406,-389.764 832.04,-390.252"/>
1184+<polygon fill="black" stroke="black" points="831.485,-386.79 821.871,-391.242 832.163,-393.757 831.485,-386.79"/>
1185+<text text-anchor="middle" x="1135" y="-389.4" font-family="Times Roman,serif" font-size="14.00">Read ACK</text>
1186 </g>
1187 <!-- split_broadcast&#45;&gt;loop -->
1188 <g id="edge26" class="edge"><title>split_broadcast&#45;&gt;loop</title>
1189-<path fill="none" stroke="black" d="M1442.56,-99.9981C1336.06,-89.6718 1146.8,-79.5577 990,-115 894.383,-136.612 862.41,-141.921 790,-208 775.817,-220.943 764.522,-238.865 756.283,-254.999"/>
1190-<polygon fill="black" stroke="black" points="753.014,-253.718 751.785,-264.241 759.308,-256.781 753.014,-253.718"/>
1191-<text text-anchor="middle" x="1063" y="-120.4" font-family="Times Roman,serif" font-size="14.00">All split msgs written</text>
1192+<path fill="none" stroke="black" d="M1515.33,-201.109C1409.22,-184.681 1219.98,-164.458 1062,-196 960.985,-216.168 924.079,-215.554 848,-285 827.351,-303.849 812.751,-331.776 803.364,-354.774"/>
1193+<polygon fill="black" stroke="black" points="800.027,-353.699 799.658,-364.287 806.549,-356.24 800.027,-353.699"/>
1194+<text text-anchor="middle" x="1135" y="-201.4" font-family="Times Roman,serif" font-size="14.00">All split msgs written</text>
1195 </g>
1196 <!-- split_ack_wait -->
1197-<g id="node21" class="node"><title>split_ack_wait</title>
1198-<ellipse fill="none" stroke="black" cx="1893" cy="-110" rx="80.1095" ry="80.6102"/>
1199-<text text-anchor="middle" x="1893" y="-106.4" font-family="Times Roman,serif" font-size="14.00">split_ack_wait</text>
1200+<g id="node22" class="node"><title>split_ack_wait</title>
1201+<ellipse fill="none" stroke="black" cx="1972" cy="-257" rx="80.1095" ry="80.6102"/>
1202+<text text-anchor="middle" x="1972" y="-253.4" font-family="Times Roman,serif" font-size="14.00">split_ack_wait</text>
1203 </g>
1204 <!-- split_broadcast&#45;&gt;split_ack_wait -->
1205 <g id="edge22" class="edge"><title>split_broadcast&#45;&gt;split_ack_wait</title>
1206-<path fill="none" stroke="black" d="M1610.2,-110C1667.61,-110 1743.59,-110 1802.33,-110"/>
1207-<polygon fill="black" stroke="black" points="1802.35,-113.5 1812.35,-110 1802.34,-106.5 1802.35,-113.5"/>
1208-<text text-anchor="middle" x="1711" y="-115.4" font-family="Times Roman,serif" font-size="14.00">Write split BROADCAST</text>
1209+<path fill="none" stroke="black" d="M1680.55,-232.126C1687.12,-233.18 1693.66,-234.156 1700,-235 1760.22,-243.022 1828.36,-248.528 1881.38,-252.025"/>
1210+<polygon fill="black" stroke="black" points="1881.23,-255.523 1891.43,-252.676 1881.68,-248.537 1881.23,-255.523"/>
1211+<text text-anchor="middle" x="1783" y="-256.4" font-family="Times Roman,serif" font-size="14.00">Write split BROADCAST</text>
1212+</g>
1213+<!-- split_ack_wait&#45;&gt;stop -->
1214+<g id="edge38" class="edge"><title>split_ack_wait&#45;&gt;stop</title>
1215+<path fill="none" stroke="black" d="M2050.59,-275.189C2116.55,-290.456 2208.51,-311.74 2263.08,-324.372"/>
1216+<polygon fill="black" stroke="black" points="2262.62,-327.857 2273.15,-326.702 2264.2,-321.037 2262.62,-327.857"/>
1217+<text text-anchor="middle" x="2166" y="-327.4" font-family="Times Roman,serif" font-size="14.00">Elapsed exhange timeout</text>
1218 </g>
1219 <!-- split_ack_wait&#45;&gt;split_broadcast -->
1220 <g id="edge24" class="edge"><title>split_ack_wait&#45;&gt;split_broadcast</title>
1221-<path fill="none" stroke="black" d="M1814.64,-90.9448C1807.69,-89.7544 1800.74,-88.7397 1794,-88 1720.66,-79.9496 1701.36,-80.1783 1628,-88 1624.71,-88.3505 1621.38,-88.7628 1618.01,-89.2264"/>
1222-<polygon fill="black" stroke="black" points="1617.23,-85.8043 1607.87,-90.7602 1618.28,-92.7256 1617.23,-85.8043"/>
1223-<text text-anchor="middle" x="1711" y="-93.4" font-family="Times Roman,serif" font-size="14.00">Read ACK</text>
1224+<path fill="none" stroke="black" d="M1899.33,-222.493C1888.37,-218.573 1877.04,-215.2 1866,-213 1808.89,-201.617 1743.56,-202.081 1691.71,-205.529"/>
1225+<polygon fill="black" stroke="black" points="1691.25,-202.053 1681.52,-206.256 1691.74,-209.035 1691.25,-202.053"/>
1226+<text text-anchor="middle" x="1783" y="-218.4" font-family="Times Roman,serif" font-size="14.00">Read ACK</text>
1227+</g>
1228+<!-- conn_broken&#45;&gt;stop -->
1229+<g id="edge32" class="edge"><title>conn_broken&#45;&gt;stop</title>
1230+<path fill="none" stroke="black" d="M1434.61,-95.8477C1564.45,-92.5456 1841.16,-95.6985 2060,-168 2154.32,-199.163 2175.34,-218.326 2254,-279 2262.17,-285.305 2270.33,-292.782 2277.75,-300.185"/>
1231+<polygon fill="black" stroke="black" points="2275.42,-302.808 2284.91,-307.517 2280.43,-297.917 2275.42,-302.808"/>
1232+<text text-anchor="middle" x="1783" y="-127.4" font-family="Times Roman,serif" font-size="14.00">Write CONNBROKEN</text>
1233+</g>
1234+<!-- conn_warn&#45;&gt;loop -->
1235+<g id="edge34" class="edge"><title>conn_warn&#45;&gt;loop</title>
1236+<path fill="none" stroke="black" d="M1083.63,-435.301C1071.29,-427.246 1057.71,-419.822 1044,-415 972.758,-389.933 883.562,-389.406 832.05,-391.836"/>
1237+<polygon fill="black" stroke="black" points="831.758,-388.346 821.959,-392.373 832.131,-395.337 831.758,-388.346"/>
1238+<text text-anchor="middle" x="946" y="-420.4" font-family="Times Roman,serif" font-size="14.00">Write CONNWARN</text>
1239 </g>
1240 </g>
1241 </svg>
1242
1243=== modified file 'server/acceptance/acceptance_test.go'
1244--- server/acceptance/acceptance_test.go 2014-04-07 19:39:19 +0000
1245+++ server/acceptance/acceptance_test.go 2014-05-12 14:11:46 +0000
1246@@ -59,3 +59,6 @@
1247
1248 // broadcast
1249 var _ = Suite(&suites.BroadcastAcceptanceSuite{suites.AcceptanceSuite{StartServer: StartServer}})
1250+
1251+// unicast
1252+var _ = Suite(&suites.UnicastAcceptanceSuite{suites.AcceptanceSuite{StartServer: StartServer}, nil})
1253
1254=== modified file 'server/acceptance/acceptanceclient.go'
1255--- server/acceptance/acceptanceclient.go 2014-04-09 19:30:53 +0000
1256+++ server/acceptance/acceptanceclient.go 2014-05-12 14:11:46 +0000
1257@@ -24,6 +24,7 @@
1258 "errors"
1259 "fmt"
1260 "net"
1261+ "strings"
1262 "time"
1263
1264 "launchpad.net/ubuntu-push/protocol"
1265@@ -44,6 +45,7 @@
1266 Levels map[string]int64
1267 Insecure bool // don't verify certs
1268 Prefix string // prefix for events
1269+ Auth string
1270 // connection
1271 Connection net.Conn
1272 }
1273@@ -73,6 +75,7 @@
1274 Type string `json:"T"`
1275 protocol.BroadcastMsg
1276 protocol.NotificationsMsg
1277+ protocol.ConnWarnMsg
1278 }
1279
1280 // Run the session with the server, emits a stream of events.
1281@@ -93,6 +96,7 @@
1282 "device": sess.Model,
1283 "channel": sess.ImageChannel,
1284 },
1285+ Authorization: sess.Auth,
1286 })
1287 if err != nil {
1288 return err
1289@@ -125,9 +129,24 @@
1290 if sess.ReportPings {
1291 events <- sess.Prefix + "ping"
1292 }
1293+ case "notifications":
1294+ conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
1295+ err := proto.WriteMessage(protocol.AckMsg{Type: "ack"})
1296+ if err != nil {
1297+ return err
1298+ }
1299+ parts := make([]string, len(recv.Notifications))
1300+ for i, notif := range recv.Notifications {
1301+ pack, err := json.Marshal(&notif.Payload)
1302+ if err != nil {
1303+ return err
1304+ }
1305+ parts[i] = fmt.Sprintf("app:%v payload:%s;", notif.AppId, pack)
1306+ }
1307+ events <- fmt.Sprintf("%sunicast %s", sess.Prefix, strings.Join(parts, " "))
1308 case "broadcast":
1309 conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
1310- err := proto.WriteMessage(protocol.PingPongMsg{Type: "ack"})
1311+ err := proto.WriteMessage(protocol.AckMsg{Type: "ack"})
1312 if err != nil {
1313 return err
1314 }
1315@@ -136,6 +155,8 @@
1316 return err
1317 }
1318 events <- fmt.Sprintf("%sbroadcast chan:%v app:%v topLevel:%d payloads:%s", sess.Prefix, recv.ChanId, recv.AppId, recv.TopLevel, pack)
1319+ case "warn":
1320+ events <- fmt.Sprintf("%swarn %s", sess.Prefix, recv.Reason)
1321 }
1322 }
1323 return nil
1324
1325=== modified file 'server/acceptance/suites/broadcast.go'
1326--- server/acceptance/suites/broadcast.go 2014-04-07 19:39:19 +0000
1327+++ server/acceptance/suites/broadcast.go 2014-05-12 14:11:46 +0000
1328@@ -29,14 +29,11 @@
1329 "launchpad.net/ubuntu-push/server/api"
1330 )
1331
1332-// BroadCastAcceptanceSuite has tests about broadcast.
1333+// BroadcastAcceptanceSuite has tests about broadcast.
1334 type BroadcastAcceptanceSuite struct {
1335 AcceptanceSuite
1336 }
1337
1338-// Long after the end of the tests.
1339-var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)
1340-
1341 func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) {
1342 events, errCh, stop := s.StartClient(c, "DEVB", nil)
1343 got, err := s.PostRequest("/broadcast", &api.Broadcast{
1344@@ -265,7 +262,11 @@
1345
1346 func (s *BroadcastAcceptanceSuite) TestGetHosts(c *C) {
1347 gh := gethosts.New("", s.ServerAPIURL+"/delivery-hosts", 2*time.Second)
1348- hosts, err := gh.Get()
1349+ host, err := gh.Get()
1350 c.Assert(err, IsNil)
1351- c.Check(hosts, DeepEquals, []string{s.ServerAddr})
1352+ expected := &gethosts.Host{
1353+ Domain: "localhost",
1354+ Hosts: []string{s.ServerAddr},
1355+ }
1356+ c.Check(host, DeepEquals, expected)
1357 }
1358
1359=== modified file 'server/acceptance/suites/suite.go'
1360--- server/acceptance/suites/suite.go 2014-04-03 16:47:47 +0000
1361+++ server/acceptance/suites/suite.go 2014-05-12 14:11:46 +0000
1362@@ -44,10 +44,19 @@
1363
1364 // Start a client.
1365 func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
1366+ return h.StartClientAuth(c, devId, levels, "")
1367+}
1368+
1369+// Start a client with auth.
1370+func (h *ServerHandle) StartClientAuth(c *C, devId string, levels map[string]int64, auth string) (events <-chan string, errorCh <-chan error, stop func()) {
1371 errCh := make(chan error, 1)
1372 cliEvents := make(chan string, 10)
1373 sess := testClientSession(h.ServerAddr, devId, "m1", "img1", false)
1374 sess.Levels = levels
1375+ sess.Auth = auth
1376+ if auth != "" {
1377+ sess.ExchangeTimeout = 5 * time.Second
1378+ }
1379 err := sess.Dial()
1380 c.Assert(err, IsNil)
1381 clientShutdown := make(chan bool, 1) // abused as an atomic flag
1382@@ -186,3 +195,6 @@
1383 }
1384 return
1385 }
1386+
1387+// Long after the end of the tests.
1388+var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)
1389
1390=== added file 'server/acceptance/suites/unicast.go'
1391--- server/acceptance/suites/unicast.go 1970-01-01 00:00:00 +0000
1392+++ server/acceptance/suites/unicast.go 2014-05-12 14:11:46 +0000
1393@@ -0,0 +1,96 @@
1394+/*
1395+ Copyright 2013-2014 Canonical Ltd.
1396+
1397+ This program is free software: you can redistribute it and/or modify it
1398+ under the terms of the GNU General Public License version 3, as published
1399+ by the Free Software Foundation.
1400+
1401+ This program is distributed in the hope that it will be useful, but
1402+ WITHOUT ANY WARRANTY; without even the implied warranties of
1403+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1404+ PURPOSE. See the GNU General Public License for more details.
1405+
1406+ You should have received a copy of the GNU General Public License along
1407+ with this program. If not, see <http://www.gnu.org/licenses/>.
1408+*/
1409+
1410+package suites
1411+
1412+import (
1413+ "encoding/json"
1414+ //"fmt"
1415+ //"strings"
1416+ //"time"
1417+
1418+ . "launchpad.net/gocheck"
1419+
1420+ //"launchpad.net/ubuntu-push/protocol"
1421+ "launchpad.net/ubuntu-push/server/api"
1422+)
1423+
1424+// UnicastAcceptanceSuite has tests about unicast.
1425+type UnicastAcceptanceSuite struct {
1426+ AcceptanceSuite
1427+ AssociatedAuth func(string) (string, string)
1428+}
1429+
1430+func (s *UnicastAcceptanceSuite) associatedAuth(deviceId string) (userId string, auth string) {
1431+ if s.AssociatedAuth != nil {
1432+ return s.AssociatedAuth(deviceId)
1433+ }
1434+ return deviceId, ""
1435+}
1436+
1437+func (s *UnicastAcceptanceSuite) TestUnicastToConnected(c *C) {
1438+ userId, auth := s.associatedAuth("DEV1")
1439+ events, errCh, stop := s.StartClientAuth(c, "DEV1", nil, auth)
1440+ got, err := s.PostRequest("/notify", &api.Unicast{
1441+ UserId: userId,
1442+ DeviceId: "DEV1",
1443+ AppId: "app1",
1444+ ExpireOn: future,
1445+ Data: json.RawMessage(`{"a": 42}`),
1446+ })
1447+ c.Assert(err, IsNil)
1448+ c.Assert(got, Matches, ".*ok.*")
1449+ c.Check(NextEvent(events, errCh), Equals, `unicast app:app1 payload:{"a":42};`)
1450+ stop()
1451+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
1452+ c.Check(len(errCh), Equals, 0)
1453+}
1454+
1455+func (s *UnicastAcceptanceSuite) TestUnicastCorrectDistribution(c *C) {
1456+ userId1, auth1 := s.associatedAuth("DEV1")
1457+ userId2, auth2 := s.associatedAuth("DEV2")
1458+ // start 1st client
1459+ events1, errCh1, stop1 := s.StartClientAuth(c, "DEV1", nil, auth1)
1460+ // start 2nd client
1461+ events2, errCh2, stop2 := s.StartClientAuth(c, "DEV2", nil, auth2)
1462+ // unicast to one and the other
1463+ got, err := s.PostRequest("/notify", &api.Unicast{
1464+ UserId: userId1,
1465+ DeviceId: "DEV1",
1466+ AppId: "app1",
1467+ ExpireOn: future,
1468+ Data: json.RawMessage(`{"to": 1}`),
1469+ })
1470+ c.Assert(err, IsNil)
1471+ c.Assert(got, Matches, ".*ok.*")
1472+ got, err = s.PostRequest("/notify", &api.Unicast{
1473+ UserId: userId2,
1474+ DeviceId: "DEV2",
1475+ AppId: "app1",
1476+ ExpireOn: future,
1477+ Data: json.RawMessage(`{"to": 2}`),
1478+ })
1479+ c.Assert(err, IsNil)
1480+ c.Assert(got, Matches, ".*ok.*")
1481+ c.Check(NextEvent(events1, errCh1), Equals, `unicast app:app1 payload:{"to":1};`)
1482+ c.Check(NextEvent(events2, errCh2), Equals, `unicast app:app1 payload:{"to":2};`)
1483+ stop1()
1484+ stop2()
1485+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
1486+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
1487+ c.Check(len(errCh1), Equals, 0)
1488+ c.Check(len(errCh2), Equals, 0)
1489+}
1490
1491=== modified file 'server/api/handlers.go'
1492--- server/api/handlers.go 2014-02-20 17:09:03 +0000
1493+++ server/api/handlers.go 2014-05-12 14:11:46 +0000
1494@@ -19,12 +19,15 @@
1495 package api
1496
1497 import (
1498+ "encoding/base64"
1499 "encoding/json"
1500 "fmt"
1501 "io"
1502 "net/http"
1503 "time"
1504
1505+ "launchpad.net/~ubuntu-push-hackers/ubuntu-push/go-uuid/uuid"
1506+
1507 "launchpad.net/ubuntu-push/logger"
1508 "launchpad.net/ubuntu-push/server/broker"
1509 "launchpad.net/ubuntu-push/server/store"
1510@@ -93,6 +96,11 @@
1511 ioError,
1512 "Could not read request body",
1513 }
1514+ ErrMissingIdField = &APIError{
1515+ http.StatusBadRequest,
1516+ invalidRequest,
1517+ "Missing id field",
1518+ }
1519 ErrMissingData = &APIError{
1520 http.StatusBadRequest,
1521 invalidRequest,
1522@@ -130,10 +138,17 @@
1523 }
1524 )
1525
1526-type Message struct {
1527- Registration string `json:"registration"`
1528- CoalesceTag string `json:"coalesce_tag"`
1529- Data json.RawMessage `json:"data"`
1530+type castCommon struct {
1531+}
1532+
1533+type Unicast struct {
1534+ UserId string `json:"userid"`
1535+ DeviceId string `json:"deviceid"`
1536+ AppId string `json:"appid"`
1537+ //Registration string `json:"registration"`
1538+ //CoalesceTag string `json:"coalesce_tag"`
1539+ ExpireOn string `json:"expire_on"`
1540+ Data json.RawMessage `json:"data"`
1541 }
1542
1543 // Broadcast request JSON object.
1544@@ -198,11 +213,11 @@
1545
1546 var zeroTime = time.Time{}
1547
1548-func checkBroadcast(bcast *Broadcast) (time.Time, *APIError) {
1549- if len(bcast.Data) == 0 {
1550+func checkCastCommon(data json.RawMessage, expireOn string) (time.Time, *APIError) {
1551+ if len(data) == 0 {
1552 return zeroTime, ErrMissingData
1553 }
1554- expire, err := time.Parse(time.RFC3339, bcast.ExpireOn)
1555+ expire, err := time.Parse(time.RFC3339, expireOn)
1556 if err != nil {
1557 return zeroTime, ErrInvalidExpiration
1558 }
1559@@ -212,6 +227,10 @@
1560 return expire, nil
1561 }
1562
1563+func checkBroadcast(bcast *Broadcast) (time.Time, *APIError) {
1564+ return checkCastCommon(bcast.Data, bcast.ExpireOn)
1565+}
1566+
1567 type StoreForRequest func(w http.ResponseWriter, request *http.Request) (store.PendingStore, error)
1568
1569 // context holds the interfaces to delegate to serving requests
1570@@ -234,6 +253,20 @@
1571 return sto, nil
1572 }
1573
1574+func (ctx *context) prepare(w http.ResponseWriter, request *http.Request, reqObj interface{}) (store.PendingStore, *APIError) {
1575+ body, apiErr := ReadBody(request, MaxRequestBodyBytes)
1576+ if apiErr != nil {
1577+ return nil, apiErr
1578+ }
1579+
1580+ err := json.Unmarshal(body, reqObj)
1581+ if err != nil {
1582+ return nil, ErrMalformedJSONObject
1583+ }
1584+
1585+ return ctx.getStore(w, request)
1586+}
1587+
1588 type BroadcastHandler struct {
1589 *context
1590 }
1591@@ -270,23 +303,13 @@
1592 }
1593 }()
1594
1595- body, apiErr := ReadBody(request, MaxRequestBodyBytes)
1596- if apiErr != nil {
1597- return
1598- }
1599-
1600- sto, apiErr := h.getStore(writer, request)
1601- if apiErr != nil {
1602- return
1603- }
1604- defer sto.Close()
1605-
1606 broadcast := &Broadcast{}
1607- err := json.Unmarshal(body, broadcast)
1608- if err != nil {
1609- apiErr = ErrMalformedJSONObject
1610+
1611+ sto, apiErr := h.prepare(writer, request, broadcast)
1612+ if apiErr != nil {
1613 return
1614 }
1615+ defer sto.Close()
1616
1617 apiErr = h.doBroadcast(sto, broadcast)
1618 if apiErr != nil {
1619@@ -297,6 +320,64 @@
1620 fmt.Fprintf(writer, `{"ok":true}`)
1621 }
1622
1623+type UnicastHandler struct {
1624+ *context
1625+}
1626+
1627+func checkUnicast(ucast *Unicast) (time.Time, *APIError) {
1628+ if ucast.UserId == "" || ucast.DeviceId == "" || ucast.AppId == "" {
1629+ return zeroTime, ErrMissingIdField
1630+ }
1631+ return checkCastCommon(ucast.Data, ucast.ExpireOn)
1632+}
1633+
1634+// use a base64 encoded TimeUUID
1635+var generateMsgId = func() string {
1636+ return base64.StdEncoding.EncodeToString(uuid.NewUUID())
1637+}
1638+
1639+func (h *UnicastHandler) doUnicast(sto store.PendingStore, ucast *Unicast) *APIError {
1640+ expire, apiErr := checkUnicast(ucast)
1641+ if apiErr != nil {
1642+ return apiErr
1643+ }
1644+ chanId := store.UnicastInternalChannelId(ucast.UserId, ucast.DeviceId)
1645+ msgId := generateMsgId()
1646+ err := sto.AppendToUnicastChannel(chanId, ucast.AppId, ucast.Data, msgId, expire)
1647+ if err != nil {
1648+ h.logger.Errorf("could not store notification: %v", err)
1649+ return ErrCouldNotStoreNotification
1650+ }
1651+
1652+ h.broker.Unicast(chanId)
1653+ return nil
1654+}
1655+
1656+func (h *UnicastHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
1657+ var apiErr *APIError
1658+ defer func() {
1659+ if apiErr != nil {
1660+ RespondError(writer, apiErr)
1661+ }
1662+ }()
1663+
1664+ unicast := &Unicast{}
1665+
1666+ sto, apiErr := h.prepare(writer, request, unicast)
1667+ if apiErr != nil {
1668+ return
1669+ }
1670+ defer sto.Close()
1671+
1672+ apiErr = h.doUnicast(sto, unicast)
1673+ if apiErr != nil {
1674+ return
1675+ }
1676+
1677+ writer.Header().Set("Content-Type", "application/json")
1678+ fmt.Fprintf(writer, `{"ok":true}`)
1679+}
1680+
1681 // MakeHandlersMux makes a handler that dispatches for the various API endpoints.
1682 func MakeHandlersMux(storeForRequest StoreForRequest, broker broker.BrokerSending, logger logger.Logger) *http.ServeMux {
1683 ctx := &context{
1684@@ -306,5 +387,6 @@
1685 }
1686 mux := http.NewServeMux()
1687 mux.Handle("/broadcast", &BroadcastHandler{context: ctx})
1688+ mux.Handle("/notify", &UnicastHandler{context: ctx})
1689 return mux
1690 }
1691
1692=== modified file 'server/api/handlers_test.go'
1693--- server/api/handlers_test.go 2014-02-21 11:32:38 +0000
1694+++ server/api/handlers_test.go 2014-05-12 14:11:46 +0000
1695@@ -18,6 +18,7 @@
1696
1697 import (
1698 "bytes"
1699+ "encoding/base64"
1700 "encoding/json"
1701 "errors"
1702 "fmt"
1703@@ -30,8 +31,9 @@
1704
1705 . "launchpad.net/gocheck"
1706
1707+ "launchpad.net/ubuntu-push/protocol"
1708 "launchpad.net/ubuntu-push/server/store"
1709- helpers "launchpad.net/ubuntu-push/testing"
1710+ help "launchpad.net/ubuntu-push/testing"
1711 )
1712
1713 func TestHandlers(t *testing.T) { TestingT(t) }
1714@@ -41,14 +43,14 @@
1715 json string
1716 client *http.Client
1717 c *C
1718- testlog *helpers.TestLogger
1719+ testlog *help.TestLogger
1720 }
1721
1722 var _ = Suite(&handlersSuite{})
1723
1724 func (s *handlersSuite) SetUpTest(c *C) {
1725 s.client = &http.Client{}
1726- s.testlog = helpers.NewTestLogger(c, "error")
1727+ s.testlog = help.NewTestLogger(c, "error")
1728 }
1729
1730 func (s *handlersSuite) TestAPIError(c *C) {
1731@@ -98,7 +100,7 @@
1732
1733 var future = time.Now().Add(4 * time.Hour).Format(time.RFC3339)
1734
1735-func (s *handlersSuite) TestCheckBroadcast(c *C) {
1736+func (s *handlersSuite) TestCheckCastBroadcastAndCommon(c *C) {
1737 payload := json.RawMessage(`{"foo":"bar"}`)
1738 broadcast := &Broadcast{
1739 Channel: "system",
1740@@ -134,19 +136,27 @@
1741 }
1742
1743 type checkBrokerSending struct {
1744- store store.PendingStore
1745- chanId store.InternalChannelId
1746- err error
1747- top int64
1748- payloads []json.RawMessage
1749+ store store.PendingStore
1750+ chanId store.InternalChannelId
1751+ err error
1752+ top int64
1753+ notifications []protocol.Notification
1754 }
1755
1756 func (cbsend *checkBrokerSending) Broadcast(chanId store.InternalChannelId) {
1757- top, payloads, err := cbsend.store.GetChannelSnapshot(chanId)
1758+ top, notifications, err := cbsend.store.GetChannelSnapshot(chanId)
1759 cbsend.err = err
1760 cbsend.chanId = chanId
1761 cbsend.top = top
1762- cbsend.payloads = payloads
1763+ cbsend.notifications = notifications
1764+}
1765+
1766+func (cbsend *checkBrokerSending) Unicast(chanIds ...store.InternalChannelId) {
1767+ // for now
1768+ if len(chanIds) != 1 {
1769+ panic("not expecting many chan ids for now")
1770+ }
1771+ cbsend.Broadcast(chanIds[0])
1772 }
1773
1774 func (s *handlersSuite) TestDoBroadcast(c *C) {
1775@@ -163,7 +173,7 @@
1776 c.Check(bsend.err, IsNil)
1777 c.Check(bsend.chanId, Equals, store.SystemInternalChannelId)
1778 c.Check(bsend.top, Equals, int64(1))
1779- c.Check(bsend.payloads, DeepEquals, []json.RawMessage{payload})
1780+ c.Check(bsend.notifications, DeepEquals, help.Ns(payload))
1781 }
1782
1783 func (s *handlersSuite) TestDoBroadcastUnknownChannel(c *C) {
1784@@ -192,6 +202,11 @@
1785 return isto.intercept("AppendToChannel", err)
1786 }
1787
1788+func (isto *interceptInMemoryPendingStore) AppendToUnicastChannel(chanId store.InternalChannelId, appId string, payload json.RawMessage, msgId string, expiration time.Time) error {
1789+ err := isto.InMemoryPendingStore.AppendToUnicastChannel(chanId, appId, payload, msgId, expiration)
1790+ return isto.intercept("AppendToUnicastChannel", err)
1791+}
1792+
1793 func (s *handlersSuite) TestDoBroadcastUnknownError(c *C) {
1794 sto := &interceptInMemoryPendingStore{
1795 store.NewInMemoryPendingStore(),
1796@@ -229,6 +244,115 @@
1797 c.Check(s.testlog.Captured(), Equals, "ERROR could not store notification: fail\n")
1798 }
1799
1800+func (s *handlersSuite) TestCheckUnicast(c *C) {
1801+ payload := json.RawMessage(`{"foo":"bar"}`)
1802+ unicast := func() *Unicast {
1803+ return &Unicast{
1804+ UserId: "user1",
1805+ DeviceId: "DEV1",
1806+ AppId: "app1",
1807+ ExpireOn: future,
1808+ Data: payload,
1809+ }
1810+ }
1811+ u := unicast()
1812+ expire, apiErr := checkUnicast(u)
1813+ c.Assert(apiErr, IsNil)
1814+ c.Check(expire.Format(time.RFC3339), Equals, future)
1815+
1816+ u = unicast()
1817+ u.UserId = ""
1818+ expire, apiErr = checkUnicast(u)
1819+ c.Check(apiErr, Equals, ErrMissingIdField)
1820+
1821+ u = unicast()
1822+ u.AppId = ""
1823+ expire, apiErr = checkUnicast(u)
1824+ c.Check(apiErr, Equals, ErrMissingIdField)
1825+
1826+ u = unicast()
1827+ u.DeviceId = ""
1828+ expire, apiErr = checkUnicast(u)
1829+ c.Check(apiErr, Equals, ErrMissingIdField)
1830+
1831+ u = unicast()
1832+ u.Data = json.RawMessage(nil)
1833+ expire, apiErr = checkUnicast(u)
1834+ c.Check(apiErr, Equals, ErrMissingData)
1835+}
1836+
1837+func (s *handlersSuite) TestGenerateMsgId(c *C) {
1838+ msgId := generateMsgId()
1839+ decoded, err := base64.StdEncoding.DecodeString(msgId)
1840+ c.Assert(err, IsNil)
1841+ c.Check(decoded, HasLen, 16)
1842+}
1843+
1844+func (s *handlersSuite) TestDoUnicast(c *C) {
1845+ prevGenMsgId := generateMsgId
1846+ defer func() {
1847+ generateMsgId = prevGenMsgId
1848+ }()
1849+ generateMsgId = func() string {
1850+ return "MSG-ID"
1851+ }
1852+ sto := store.NewInMemoryPendingStore()
1853+ bsend := &checkBrokerSending{store: sto}
1854+ bh := &UnicastHandler{&context{nil, bsend, nil}}
1855+ payload := json.RawMessage(`{"a": 1}`)
1856+ apiErr := bh.doUnicast(sto, &Unicast{
1857+ UserId: "user1",
1858+ DeviceId: "DEV1",
1859+ AppId: "app1",
1860+ ExpireOn: future,
1861+ Data: payload,
1862+ })
1863+ c.Check(apiErr, IsNil)
1864+ c.Check(bsend.err, IsNil)
1865+ c.Check(bsend.chanId, Equals, store.UnicastInternalChannelId("user1", "DEV1"))
1866+ c.Check(bsend.top, Equals, int64(0))
1867+ c.Check(bsend.notifications, DeepEquals, []protocol.Notification{
1868+ protocol.Notification{
1869+ AppId: "app1",
1870+ MsgId: "MSG-ID",
1871+ Payload: payload,
1872+ },
1873+ })
1874+}
1875+
1876+func (s *handlersSuite) TestDoUnicastMissingIdField(c *C) {
1877+ sto := store.NewInMemoryPendingStore()
1878+ bh := &UnicastHandler{}
1879+ apiErr := bh.doUnicast(sto, &Unicast{
1880+ ExpireOn: future,
1881+ Data: json.RawMessage(`{"a": 1}`),
1882+ })
1883+ c.Check(apiErr, Equals, ErrMissingIdField)
1884+}
1885+
1886+func (s *handlersSuite) TestDoUnicastCouldNotStoreNotification(c *C) {
1887+ sto := &interceptInMemoryPendingStore{
1888+ store.NewInMemoryPendingStore(),
1889+ func(meth string, err error) error {
1890+ if meth == "AppendToUnicastChannel" {
1891+ return errors.New("fail")
1892+ }
1893+ return err
1894+ },
1895+ }
1896+ ctx := &context{logger: s.testlog}
1897+ bh := &UnicastHandler{ctx}
1898+ apiErr := bh.doUnicast(sto, &Unicast{
1899+ UserId: "user1",
1900+ DeviceId: "DEV1",
1901+ AppId: "app1",
1902+ ExpireOn: future,
1903+ Data: json.RawMessage(`{"a": 1}`),
1904+ })
1905+ c.Check(apiErr, Equals, ErrCouldNotStoreNotification)
1906+ c.Check(s.testlog.Captured(), Equals, "ERROR could not store notification: fail\n")
1907+}
1908+
1909 func newPostRequest(path string, message interface{}, server *httptest.Server) *http.Request {
1910 packedMessage, err := json.Marshal(message)
1911 if err != nil {
1912@@ -268,6 +392,14 @@
1913 bsend.chanId <- chanId
1914 }
1915
1916+func (bsend testBrokerSending) Unicast(chanIds ...store.InternalChannelId) {
1917+ // for now
1918+ if len(chanIds) != 1 {
1919+ panic("not expecting many chan ids for now")
1920+ }
1921+ bsend.chanId <- chanIds[0]
1922+}
1923+
1924 func (s *handlersSuite) TestRespondsToBasicSystemBroadcast(c *C) {
1925 sto := store.NewInMemoryPendingStore()
1926 stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
1927@@ -480,3 +612,73 @@
1928
1929 checkError(c, response, ErrWrongRequestMethod)
1930 }
1931+
1932+func (s *handlersSuite) TestRespondsUnicast(c *C) {
1933+ sto := store.NewInMemoryPendingStore()
1934+ stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
1935+ return sto, nil
1936+ }
1937+ bsend := testBrokerSending{make(chan store.InternalChannelId, 1)}
1938+ testServer := httptest.NewServer(MakeHandlersMux(stoForReq, bsend, nil))
1939+ defer testServer.Close()
1940+
1941+ payload := json.RawMessage(`{"foo":"bar"}`)
1942+
1943+ request := newPostRequest("/notify", &Unicast{
1944+ UserId: "user2",
1945+ DeviceId: "dev3",
1946+ AppId: "app2",
1947+ ExpireOn: future,
1948+ Data: payload,
1949+ }, testServer)
1950+
1951+ response, err := s.client.Do(request)
1952+ c.Assert(err, IsNil)
1953+
1954+ c.Check(response.StatusCode, Equals, http.StatusOK)
1955+ c.Check(response.Header.Get("Content-Type"), Equals, "application/json")
1956+ body, err := getResponseBody(response)
1957+ c.Assert(err, IsNil)
1958+ c.Check(string(body), Matches, ".*ok.*")
1959+
1960+ chanId := store.UnicastInternalChannelId("user2", "dev3")
1961+ c.Check(<-bsend.chanId, Equals, chanId)
1962+ top, notifications, err := sto.GetChannelSnapshot(chanId)
1963+ c.Assert(err, IsNil)
1964+ c.Check(top, Equals, int64(0))
1965+ c.Check(notifications, HasLen, 1)
1966+}
1967+
1968+func (s *handlersSuite) TestCannotUnicastTooBigMessages(c *C) {
1969+ testServer := httptest.NewServer(&UnicastHandler{})
1970+ defer testServer.Close()
1971+
1972+ bigString := strings.Repeat("a", MaxRequestBodyBytes)
1973+ dataString := fmt.Sprintf(`"%v"`, bigString)
1974+
1975+ request := newPostRequest("/", &Unicast{
1976+ ExpireOn: future,
1977+ Data: json.RawMessage([]byte(dataString)),
1978+ }, testServer)
1979+
1980+ response, err := s.client.Do(request)
1981+ c.Assert(err, IsNil)
1982+ checkError(c, response, ErrRequestBodyTooLarge)
1983+}
1984+
1985+func (s *handlersSuite) TestCannotUnicastWithMissingFields(c *C) {
1986+ stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
1987+ return store.NewInMemoryPendingStore(), nil
1988+ }
1989+ ctx := &context{stoForReq, nil, nil}
1990+ testServer := httptest.NewServer(&UnicastHandler{ctx})
1991+ defer testServer.Close()
1992+
1993+ request := newPostRequest("/", &Unicast{
1994+ Data: json.RawMessage(`{"foo":"bar"}`),
1995+ }, testServer)
1996+
1997+ response, err := s.client.Do(request)
1998+ c.Assert(err, IsNil)
1999+ checkError(c, response, ErrMissingIdField)
2000+}
2001
2002=== modified file 'server/broker/broker.go'
2003--- server/broker/broker.go 2014-04-04 09:58:34 +0000
2004+++ server/broker/broker.go 2014-05-12 14:11:46 +0000
2005@@ -30,7 +30,7 @@
2006 // through them.
2007 type Broker interface {
2008 // Register the session.
2009- Register(*protocol.ConnectMsg) (BrokerSession, error)
2010+ Register(connMsg *protocol.ConnectMsg, sessionId string) (BrokerSession, error)
2011 // Unregister the session.
2012 Unregister(BrokerSession)
2013 }
2014@@ -39,6 +39,8 @@
2015 type BrokerSending interface {
2016 // Broadcast channel.
2017 Broadcast(chanId store.InternalChannelId)
2018+ // Unicast over channels.
2019+ Unicast(chanIds ...store.InternalChannelId)
2020 }
2021
2022 // Exchange leads the session through performing an exchange, typically delivery.
2023@@ -81,6 +83,10 @@
2024 Levels() LevelsMap
2025 // ExchangeScratchArea returns the scratch area for exchanges.
2026 ExchangeScratchArea() *ExchangesScratchArea
2027+ // Get gets the content of the channel with chanId.
2028+ Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error)
2029+ // DropByMsgId drops notifications from the channel chanId by message id.
2030+ DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error
2031 }
2032
2033 // Session aborted error.
2034
2035=== modified file 'server/broker/exchanges.go'
2036--- server/broker/exchanges.go 2014-04-04 13:19:10 +0000
2037+++ server/broker/exchanges.go 2014-05-12 14:11:46 +0000
2038@@ -28,18 +28,18 @@
2039
2040 // Scratch area for exchanges, sessions should hold one of these.
2041 type ExchangesScratchArea struct {
2042- broadcastMsg protocol.BroadcastMsg
2043- ackMsg protocol.AckMsg
2044- connBrokenMsg protocol.ConnBrokenMsg
2045+ broadcastMsg protocol.BroadcastMsg
2046+ notificationsMsg protocol.NotificationsMsg
2047+ ackMsg protocol.AckMsg
2048 }
2049
2050 // BroadcastExchange leads a session through delivering a BROADCAST.
2051 // For simplicity it is fully public.
2052 type BroadcastExchange struct {
2053- ChanId store.InternalChannelId
2054- TopLevel int64
2055- NotificationPayloads []json.RawMessage
2056- Decoded []map[string]interface{}
2057+ ChanId store.InternalChannelId
2058+ TopLevel int64
2059+ Notifications []protocol.Notification
2060+ Decoded []map[string]interface{}
2061 }
2062
2063 // check interface already here
2064@@ -47,18 +47,18 @@
2065
2066 // Init ensures the BroadcastExchange is fully initialized for the sessions.
2067 func (sbe *BroadcastExchange) Init() {
2068- decoded := make([]map[string]interface{}, len(sbe.NotificationPayloads))
2069+ decoded := make([]map[string]interface{}, len(sbe.Notifications))
2070 sbe.Decoded = decoded
2071- for i, p := range sbe.NotificationPayloads {
2072- err := json.Unmarshal(p, &decoded[i])
2073+ for i, notif := range sbe.Notifications {
2074+ err := json.Unmarshal(notif.Payload, &decoded[i])
2075 if err != nil {
2076 decoded[i] = nil
2077 }
2078 }
2079 }
2080
2081-func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
2082- c := int64(len(payloads))
2083+func filterByLevel(clientLevel, topLevel int64, notifs []protocol.Notification) []protocol.Notification {
2084+ c := int64(len(notifs))
2085 if c == 0 {
2086 return nil
2087 }
2088@@ -67,32 +67,32 @@
2089 delta = 1
2090 }
2091 if delta < c {
2092- return payloads[c-delta:]
2093+ return notifs[c-delta:]
2094 } else {
2095- return payloads
2096+ return notifs
2097 }
2098 }
2099
2100-func channelFilter(tag string, chanId store.InternalChannelId, payloads []json.RawMessage, decoded []map[string]interface{}) []json.RawMessage {
2101- if len(payloads) != 0 && chanId == store.SystemInternalChannelId {
2102- decoded := decoded[len(decoded)-len(payloads):]
2103+func channelFilter(tag string, chanId store.InternalChannelId, notifs []protocol.Notification, decoded []map[string]interface{}) []json.RawMessage {
2104+ if len(notifs) != 0 && chanId == store.SystemInternalChannelId {
2105+ decoded := decoded[len(decoded)-len(notifs):]
2106 filtered := make([]json.RawMessage, 0)
2107 for i, decoded1 := range decoded {
2108 if _, ok := decoded1[tag]; ok {
2109- filtered = append(filtered, payloads[i])
2110+ filtered = append(filtered, notifs[i].Payload)
2111 }
2112 }
2113- payloads = filtered
2114+ return filtered
2115 }
2116- return payloads
2117+ return protocol.ExtractPayloads(notifs)
2118 }
2119
2120 // Prepare session for a BROADCAST.
2121 func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
2122 clientLevel := sess.Levels()[sbe.ChanId]
2123- payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
2124+ notifs := filterByLevel(clientLevel, sbe.TopLevel, sbe.Notifications)
2125 tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel())
2126- payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded)
2127+ payloads := channelFilter(tag, sbe.ChanId, notifs, sbe.Decoded)
2128 if len(payloads) == 0 && sbe.TopLevel >= clientLevel {
2129 // empty and don't need to force resync => do nothing
2130 return nil, nil, ErrNop
2131@@ -119,23 +119,55 @@
2132 return nil
2133 }
2134
2135-// ConnBrokenExchange breaks a session giving a reason.
2136-type ConnBrokenExchange struct {
2137- Reason string
2138-}
2139-
2140-// check interface already here
2141-var _ Exchange = (*ConnBrokenExchange)(nil)
2142-
2143-// Prepare session for a CONNBROKEN.
2144-func (cbe *ConnBrokenExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
2145- scratchArea := sess.ExchangeScratchArea()
2146- scratchArea.connBrokenMsg.Type = "connbroken"
2147- scratchArea.connBrokenMsg.Reason = cbe.Reason
2148- return &scratchArea.connBrokenMsg, nil, nil
2149-}
2150-
2151-// CONNBROKEN isn't acked
2152-func (cbe *ConnBrokenExchange) Acked(sess BrokerSession, done bool) error {
2153- panic("Acked should not get invoked on ConnBrokenExchange")
2154+// ConnMetaExchange allows to send a CONNBROKEN or CONNWARN message.
2155+type ConnMetaExchange struct {
2156+ Msg protocol.OnewayMsg
2157+}
2158+
2159+// check interface already here
2160+var _ Exchange = (*ConnMetaExchange)(nil)
2161+
2162+// Prepare session for a CONNBROKEN/WARN.
2163+func (cbe *ConnMetaExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
2164+ return cbe.Msg, nil, nil
2165+}
2166+
2167+// CONNBROKEN/WARN aren't acked.
2168+func (cbe *ConnMetaExchange) Acked(sess BrokerSession, done bool) error {
2169+ panic("Acked should not get invoked on ConnMetaExchange")
2170+}
2171+
2172+// UnicastExchange leads a session through delivering a NOTIFICATIONS message.
2173+// For simplicity it is fully public.
2174+type UnicastExchange struct {
2175+ ChanId store.InternalChannelId
2176+}
2177+
2178+// check interface already here
2179+var _ Exchange = (*UnicastExchange)(nil)
2180+
2181+// Prepare session for a NOTIFICATIONS.
2182+func (sue *UnicastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
2183+ _, notifs, err := sess.Get(sue.ChanId, false)
2184+ if err != nil {
2185+ return nil, nil, err
2186+ }
2187+ scratchArea := sess.ExchangeScratchArea()
2188+ scratchArea.notificationsMsg.Reset()
2189+ scratchArea.notificationsMsg.Type = "notifications"
2190+ scratchArea.notificationsMsg.Notifications = notifs
2191+ return &scratchArea.notificationsMsg, &scratchArea.ackMsg, nil
2192+}
2193+
2194+// Acked deals with an ACK for a NOTIFICATIONS.
2195+func (sue *UnicastExchange) Acked(sess BrokerSession, done bool) error {
2196+ scratchArea := sess.ExchangeScratchArea()
2197+ if scratchArea.ackMsg.Type != "ack" {
2198+ return &ErrAbort{"expected ACK message"}
2199+ }
2200+ err := sess.DropByMsgId(sue.ChanId, scratchArea.notificationsMsg.Notifications)
2201+ if err != nil {
2202+ return err
2203+ }
2204+ return nil
2205 }
2206
2207=== modified file 'server/broker/exchanges_test.go'
2208--- server/broker/exchanges_test.go 2014-04-04 13:19:10 +0000
2209+++ server/broker/exchanges_test.go 2014-05-12 14:11:46 +0000
2210@@ -18,15 +18,18 @@
2211
2212 import (
2213 "encoding/json"
2214+ "errors"
2215 "fmt"
2216 "strings"
2217 stdtesting "testing"
2218
2219 . "launchpad.net/gocheck"
2220
2221+ "launchpad.net/ubuntu-push/protocol"
2222 "launchpad.net/ubuntu-push/server/broker"
2223 "launchpad.net/ubuntu-push/server/broker/testing"
2224 "launchpad.net/ubuntu-push/server/store"
2225+ help "launchpad.net/ubuntu-push/testing"
2226 )
2227
2228 func TestBroker(t *stdtesting.T) { TestingT(t) }
2229@@ -39,11 +42,11 @@
2230 exchg := &broker.BroadcastExchange{
2231 ChanId: store.SystemInternalChannelId,
2232 TopLevel: 3,
2233- NotificationPayloads: []json.RawMessage{
2234+ Notifications: help.Ns(
2235 json.RawMessage(`{"a":"x"}`),
2236 json.RawMessage(`[]`),
2237 json.RawMessage(`{"a":"y"}`),
2238- },
2239+ ),
2240 }
2241 exchg.Init()
2242 c.Check(exchg.Decoded, DeepEquals, []map[string]interface{}{
2243@@ -62,10 +65,10 @@
2244 exchg := &broker.BroadcastExchange{
2245 ChanId: store.SystemInternalChannelId,
2246 TopLevel: 3,
2247- NotificationPayloads: []json.RawMessage{
2248+ Notifications: help.Ns(
2249 json.RawMessage(`{"img1/m1":100}`),
2250 json.RawMessage(`{"img2/m2":200}`),
2251- },
2252+ ),
2253 }
2254 exchg.Init()
2255 outMsg, inMsg, err := exchg.Prepare(sess)
2256@@ -88,9 +91,9 @@
2257 ImageChannel: "img1",
2258 }
2259 exchg := &broker.BroadcastExchange{
2260- ChanId: store.SystemInternalChannelId,
2261- TopLevel: 3,
2262- NotificationPayloads: []json.RawMessage{},
2263+ ChanId: store.SystemInternalChannelId,
2264+ TopLevel: 3,
2265+ Notifications: []protocol.Notification{},
2266 }
2267 exchg.Init()
2268 outMsg, inMsg, err := exchg.Prepare(sess)
2269@@ -108,9 +111,9 @@
2270 ImageChannel: "img1",
2271 }
2272 exchg := &broker.BroadcastExchange{
2273- ChanId: store.SystemInternalChannelId,
2274- TopLevel: 3,
2275- NotificationPayloads: []json.RawMessage{},
2276+ ChanId: store.SystemInternalChannelId,
2277+ TopLevel: 3,
2278+ Notifications: []protocol.Notification{},
2279 }
2280 exchg.Init()
2281 outMsg, inMsg, err := exchg.Prepare(sess)
2282@@ -133,9 +136,9 @@
2283
2284 topLevel := int64(len(needsSplitting))
2285 exchg := &broker.BroadcastExchange{
2286- ChanId: store.SystemInternalChannelId,
2287- TopLevel: topLevel,
2288- NotificationPayloads: needsSplitting,
2289+ ChanId: store.SystemInternalChannelId,
2290+ TopLevel: topLevel,
2291+ Notifications: help.Ns(needsSplitting...),
2292 }
2293 exchg.Init()
2294 outMsg, _, err := exchg.Prepare(sess)
2295@@ -152,10 +155,10 @@
2296 exchg = &broker.BroadcastExchange{
2297 ChanId: store.SystemInternalChannelId,
2298 TopLevel: topLevel + 2,
2299- NotificationPayloads: []json.RawMessage{
2300+ Notifications: help.Ns(
2301 json.RawMessage(`{"img1/m1":"x"}`),
2302 json.RawMessage(`{"img1/m1":"y"}`),
2303- },
2304+ ),
2305 }
2306 exchg.Init()
2307 outMsg, _, err = exchg.Prepare(sess)
2308@@ -173,9 +176,9 @@
2309 exchg := &broker.BroadcastExchange{
2310 ChanId: store.SystemInternalChannelId,
2311 TopLevel: 3,
2312- NotificationPayloads: []json.RawMessage{
2313+ Notifications: help.Ns(
2314 json.RawMessage(`{"img2/m1":1}`),
2315- },
2316+ ),
2317 }
2318 exchg.Init()
2319 outMsg, inMsg, err := exchg.Prepare(sess)
2320@@ -202,10 +205,10 @@
2321 exchg := &broker.BroadcastExchange{
2322 ChanId: store.SystemInternalChannelId,
2323 TopLevel: 3,
2324- NotificationPayloads: []json.RawMessage{
2325+ Notifications: help.Ns(
2326 json.RawMessage(`{"img1/m1":100}`),
2327 json.RawMessage(`{"img1/m1":101}`),
2328- },
2329+ ),
2330 }
2331 exchg.Init()
2332 outMsg, inMsg, err := exchg.Prepare(sess)
2333@@ -229,11 +232,11 @@
2334 exchg := &broker.BroadcastExchange{
2335 ChanId: store.SystemInternalChannelId,
2336 TopLevel: 5,
2337- NotificationPayloads: []json.RawMessage{
2338+ Notifications: help.Ns(
2339 json.RawMessage(`{"img1/m1":100}`),
2340 json.RawMessage(`{"img2/m2":200}`),
2341 json.RawMessage(`{"img1/m1":101}`),
2342- },
2343+ ),
2344 }
2345 exchg.Init()
2346 outMsg, inMsg, err := exchg.Prepare(sess)
2347@@ -249,16 +252,114 @@
2348 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5))
2349 }
2350
2351-func (s *exchangesSuite) TestConnBrokenExchange(c *C) {
2352+func (s *exchangesSuite) TestConnMetaExchange(c *C) {
2353 sess := &testing.TestBrokerSession{}
2354- cbe := &broker.ConnBrokenExchange{"REASON"}
2355+ var msg protocol.OnewayMsg = &protocol.ConnWarnMsg{"connwarn", "REASON"}
2356+ cbe := &broker.ConnMetaExchange{msg}
2357 outMsg, inMsg, err := cbe.Prepare(sess)
2358 c.Assert(err, IsNil)
2359+ c.Check(msg, Equals, outMsg)
2360 c.Check(inMsg, IsNil) // no answer is expected
2361 // check
2362 marshalled, err := json.Marshal(outMsg)
2363 c.Assert(err, IsNil)
2364- c.Check(string(marshalled), Equals, `{"T":"connbroken","Reason":"REASON"}`)
2365-
2366- c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnBrokenExchange")
2367+ c.Check(string(marshalled), Equals, `{"T":"connwarn","Reason":"REASON"}`)
2368+
2369+ c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnMetaExchange")
2370+}
2371+
2372+func (s *exchangesSuite) TestUnicastExchange(c *C) {
2373+ chanId1 := store.UnicastInternalChannelId("u1", "d1")
2374+ notifs := []protocol.Notification{
2375+ protocol.Notification{
2376+ MsgId: "msg1",
2377+ AppId: "app1",
2378+ Payload: json.RawMessage(`{"m": 1}`),
2379+ },
2380+ protocol.Notification{
2381+ MsgId: "msg2",
2382+ AppId: "app2",
2383+ Payload: json.RawMessage(`{"m": 2}`),
2384+ },
2385+ }
2386+ dropped := make(chan []protocol.Notification, 2)
2387+ sess := &testing.TestBrokerSession{
2388+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
2389+ c.Check(chanId, Equals, chanId1)
2390+ c.Check(cachedOk, Equals, false)
2391+ return 0, notifs, nil
2392+ },
2393+ DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error {
2394+ c.Check(chanId, Equals, chanId1)
2395+ dropped <- targets
2396+ return nil
2397+ },
2398+ }
2399+ exchg := &broker.UnicastExchange{chanId1}
2400+ outMsg, inMsg, err := exchg.Prepare(sess)
2401+ c.Assert(err, IsNil)
2402+ // check
2403+ marshalled, err := json.Marshal(outMsg)
2404+ c.Assert(err, IsNil)
2405+ c.Check(string(marshalled), Equals, `{"T":"notifications","Notifications":[{"A":"app1","M":"msg1","P":{"m":1}},{"A":"app2","M":"msg2","P":{"m":2}}]}`)
2406+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
2407+ c.Assert(err, IsNil)
2408+ err = exchg.Acked(sess, true)
2409+ c.Assert(err, IsNil)
2410+ c.Check(dropped, HasLen, 1)
2411+ c.Check(<-dropped, DeepEquals, notifs)
2412+}
2413+
2414+func (s *exchangesSuite) TestUnicastExchangeAckMismatch(c *C) {
2415+ notifs := []protocol.Notification{}
2416+ dropped := make(chan []protocol.Notification, 2)
2417+ sess := &testing.TestBrokerSession{
2418+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
2419+ return 0, notifs, nil
2420+ },
2421+ DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error {
2422+ dropped <- targets
2423+ return nil
2424+ },
2425+ }
2426+ exchg := &broker.UnicastExchange{}
2427+ _, inMsg, err := exchg.Prepare(sess)
2428+ c.Assert(err, IsNil)
2429+ err = json.Unmarshal([]byte(`{}`), inMsg)
2430+ c.Assert(err, IsNil)
2431+ err = exchg.Acked(sess, true)
2432+ c.Assert(err, Not(IsNil))
2433+ c.Check(dropped, HasLen, 0)
2434+}
2435+
2436+func (s *exchangesSuite) TestUnicastExchangeErrorOnPrepare(c *C) {
2437+ fail := errors.New("fail")
2438+ sess := &testing.TestBrokerSession{
2439+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
2440+ return 0, nil, fail
2441+ },
2442+ }
2443+ exchg := &broker.UnicastExchange{}
2444+ _, _, err := exchg.Prepare(sess)
2445+ c.Assert(err, Equals, fail)
2446+}
2447+
2448+func (s *exchangesSuite) TestUnicastExchangeErrorOnAcked(c *C) {
2449+ notifs := []protocol.Notification{}
2450+ fail := errors.New("fail")
2451+ sess := &testing.TestBrokerSession{
2452+ DoGet: func(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
2453+ return 0, notifs, nil
2454+ },
2455+ DoDropByMsgId: func(chanId store.InternalChannelId, targets []protocol.Notification) error {
2456+ return fail
2457+ },
2458+ }
2459+ exchg := &broker.UnicastExchange{}
2460+ _, inMsg, err := exchg.Prepare(sess)
2461+ c.Assert(err, IsNil)
2462+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
2463+ c.Assert(err, IsNil)
2464+ err = exchg.Acked(sess, true)
2465+ c.Assert(err, Equals, fail)
2466 }
2467
2468=== modified file 'server/broker/exchg_impl_test.go'
2469--- server/broker/exchg_impl_test.go 2014-04-03 16:00:53 +0000
2470+++ server/broker/exchg_impl_test.go 2014-05-12 14:11:46 +0000
2471@@ -22,6 +22,7 @@
2472 . "launchpad.net/gocheck"
2473
2474 "launchpad.net/ubuntu-push/server/store"
2475+ help "launchpad.net/ubuntu-push/testing"
2476 )
2477
2478 type exchangesImplSuite struct{}
2479@@ -29,27 +30,27 @@
2480 var _ = Suite(&exchangesImplSuite{})
2481
2482 func (s *exchangesImplSuite) TestFilterByLevel(c *C) {
2483- payloads := []json.RawMessage{
2484+ notifs := help.Ns(
2485 json.RawMessage(`{"a": 3}`),
2486 json.RawMessage(`{"a": 4}`),
2487 json.RawMessage(`{"a": 5}`),
2488- }
2489- res := filterByLevel(5, 5, payloads)
2490+ )
2491+ res := filterByLevel(5, 5, notifs)
2492 c.Check(len(res), Equals, 0)
2493- res = filterByLevel(4, 5, payloads)
2494+ res = filterByLevel(4, 5, notifs)
2495 c.Check(len(res), Equals, 1)
2496- c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`))
2497- res = filterByLevel(3, 5, payloads)
2498+ c.Check(res[0].Payload, DeepEquals, json.RawMessage(`{"a": 5}`))
2499+ res = filterByLevel(3, 5, notifs)
2500 c.Check(len(res), Equals, 2)
2501- c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`))
2502- res = filterByLevel(2, 5, payloads)
2503+ c.Check(res[0].Payload, DeepEquals, json.RawMessage(`{"a": 4}`))
2504+ res = filterByLevel(2, 5, notifs)
2505 c.Check(len(res), Equals, 3)
2506- res = filterByLevel(1, 5, payloads)
2507+ res = filterByLevel(1, 5, notifs)
2508 c.Check(len(res), Equals, 3)
2509 // too ahead, pick only last
2510- res = filterByLevel(10, 5, payloads)
2511+ res = filterByLevel(10, 5, notifs)
2512 c.Check(len(res), Equals, 1)
2513- c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`))
2514+ c.Check(res[0].Payload, DeepEquals, json.RawMessage(`{"a": 5}`))
2515 }
2516
2517 func (s *exchangesImplSuite) TestFilterByLevelEmpty(c *C) {
2518@@ -71,18 +72,19 @@
2519 err := json.Unmarshal(p, &decoded[i])
2520 c.Assert(err, IsNil)
2521 }
2522+ notifs := help.Ns(payloads...)
2523
2524 other := store.InternalChannelId("1")
2525
2526 c.Check(channelFilter("", store.SystemInternalChannelId, nil, nil), IsNil)
2527- c.Check(channelFilter("", other, payloads[1:], decoded), DeepEquals, payloads[1:])
2528+ c.Check(channelFilter("", other, notifs[1:], decoded), DeepEquals, payloads[1:])
2529
2530 // use tag when channel is the sytem channel
2531
2532- c.Check(channelFilter("c/z", store.SystemInternalChannelId, payloads, decoded), HasLen, 0)
2533-
2534- c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]})
2535-
2536- c.Check(channelFilter("a/x", store.SystemInternalChannelId, payloads[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]})
2537+ c.Check(channelFilter("c/z", store.SystemInternalChannelId, notifs, decoded), HasLen, 0)
2538+
2539+ c.Check(channelFilter("a/x", store.SystemInternalChannelId, notifs, decoded), DeepEquals, []json.RawMessage{payloads[0], payloads[3]})
2540+
2541+ c.Check(channelFilter("a/x", store.SystemInternalChannelId, notifs[1:], decoded), DeepEquals, []json.RawMessage{payloads[3]})
2542
2543 }
2544
2545=== modified file 'server/broker/simple/simple.go'
2546--- server/broker/simple/simple.go 2014-04-11 08:47:18 +0000
2547+++ server/broker/simple/simple.go 2014-05-12 14:11:46 +0000
2548@@ -46,6 +46,7 @@
2549
2550 // simpleBrokerSession represents a session in the broker.
2551 type simpleBrokerSession struct {
2552+ broker *SimpleBroker
2553 registered bool
2554 deviceId string
2555 model string
2556@@ -61,6 +62,7 @@
2557
2558 const (
2559 broadcastDelivery deliveryKind = iota
2560+ unicastDelivery
2561 )
2562
2563 // delivery holds all the information to request a delivery
2564@@ -93,6 +95,14 @@
2565 return &sess.exchgScratch
2566 }
2567
2568+func (sess *simpleBrokerSession) Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
2569+ return sess.broker.get(chanId, cachedOk)
2570+}
2571+
2572+func (sess *simpleBrokerSession) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error {
2573+ return sess.broker.drop(chanId, targets)
2574+}
2575+
2576 // NewSimpleBroker makes a new SimpleBroker.
2577 func NewSimpleBroker(sto store.PendingStore, cfg broker.BrokerConfig, logger logger.Logger) *SimpleBroker {
2578 sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize())
2579@@ -144,7 +154,7 @@
2580 // find relevant channels, for now only system
2581 channels := []store.InternalChannelId{store.SystemInternalChannelId}
2582 for _, chanId := range channels {
2583- topLevel, payloads, err := b.sto.GetChannelSnapshot(chanId)
2584+ topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId)
2585 if err != nil {
2586 // next broadcast will try again
2587 b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err)
2588@@ -153,9 +163,9 @@
2589 clientLevel := sess.levels[chanId]
2590 if clientLevel != topLevel {
2591 broadcastExchg := &broker.BroadcastExchange{
2592- ChanId: chanId,
2593- TopLevel: topLevel,
2594- NotificationPayloads: payloads,
2595+ ChanId: chanId,
2596+ TopLevel: topLevel,
2597+ Notifications: notifications,
2598 }
2599 broadcastExchg.Init()
2600 sess.exchanges <- broadcastExchg
2601@@ -166,7 +176,7 @@
2602
2603 // Register registers a session with the broker. It feeds the session
2604 // pending notifications as well.
2605-func (b *SimpleBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) {
2606+func (b *SimpleBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) {
2607 // xxx sanity check DeviceId
2608 model, err := broker.GetInfoString(connect, "device", "?")
2609 if err != nil {
2610@@ -185,6 +195,7 @@
2611 levels[id] = v
2612 }
2613 sess := &simpleBrokerSession{
2614+ broker: b,
2615 deviceId: connect.DeviceId,
2616 model: model,
2617 imageChannel: imageChannel,
2618@@ -207,6 +218,24 @@
2619 b.sessionCh <- sess
2620 }
2621
2622+func (b *SimpleBroker) get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
2623+ topLevel, notifications, err := b.sto.GetChannelSnapshot(chanId)
2624+ if err != nil {
2625+ b.logger.Errorf("unsuccessful, get channel snapshot for %v (cachedOk=%v): %v", chanId, cachedOk, err)
2626+ }
2627+ return topLevel, notifications, err
2628+
2629+}
2630+
2631+func (b *SimpleBroker) drop(chanId store.InternalChannelId, targets []protocol.Notification) error {
2632+ err := b.sto.DropByMsgId(chanId, targets)
2633+ if err != nil {
2634+ b.logger.Errorf("unsuccessful, drop from channel %v: %v", chanId, err)
2635+ }
2636+ return err
2637+
2638+}
2639+
2640 // run runs the agent logic of the broker.
2641 func (b *SimpleBroker) run() {
2642 Loop:
2643@@ -224,7 +253,7 @@
2644 } else { // register
2645 prev := b.registry[sess.deviceId]
2646 if prev != nil { // kick it
2647- close(prev.exchanges)
2648+ prev.exchanges <- nil
2649 }
2650 b.registry[sess.deviceId] = sess
2651 sess.registered = true
2652@@ -233,21 +262,27 @@
2653 case delivery := <-b.deliveryCh:
2654 switch delivery.kind {
2655 case broadcastDelivery:
2656- topLevel, payloads, err := b.sto.GetChannelSnapshot(delivery.chanId)
2657+ topLevel, notifications, err := b.get(delivery.chanId, false)
2658 if err != nil {
2659 // next broadcast will try again
2660- b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err)
2661 continue Loop
2662 }
2663 broadcastExchg := &broker.BroadcastExchange{
2664- ChanId: delivery.chanId,
2665- TopLevel: topLevel,
2666- NotificationPayloads: payloads,
2667+ ChanId: delivery.chanId,
2668+ TopLevel: topLevel,
2669+ Notifications: notifications,
2670 }
2671 broadcastExchg.Init()
2672 for _, sess := range b.registry {
2673 sess.exchanges <- broadcastExchg
2674 }
2675+ case unicastDelivery:
2676+ chanId := delivery.chanId
2677+ _, devId := chanId.UnicastUserAndDevice()
2678+ sess := b.registry[devId]
2679+ if sess != nil {
2680+ sess.exchanges <- &broker.UnicastExchange{chanId}
2681+ }
2682 }
2683 }
2684 }
2685@@ -260,3 +295,13 @@
2686 chanId: chanId,
2687 }
2688 }
2689+
2690+// Unicast requests unicast for the channels.
2691+func (b *SimpleBroker) Unicast(chanIds ...store.InternalChannelId) {
2692+ for _, chanId := range chanIds {
2693+ b.deliveryCh <- &delivery{
2694+ kind: unicastDelivery,
2695+ chanId: chanId,
2696+ }
2697+ }
2698+}
2699
2700=== modified file 'server/broker/simple/simple_test.go'
2701--- server/broker/simple/simple_test.go 2014-04-03 16:00:53 +0000
2702+++ server/broker/simple/simple_test.go 2014-05-12 14:11:46 +0000
2703@@ -26,6 +26,7 @@
2704 "launchpad.net/ubuntu-push/server/broker"
2705 "launchpad.net/ubuntu-push/server/broker/testing"
2706 "launchpad.net/ubuntu-push/server/store"
2707+ help "launchpad.net/ubuntu-push/testing"
2708 )
2709
2710 func TestSimple(t *stdtesting.T) { TestingT(t) }
2711@@ -58,10 +59,10 @@
2712 c.Assert(len(sess.exchanges), Equals, 1)
2713 exchg1 := <-sess.exchanges
2714 c.Check(exchg1, DeepEquals, &broker.BroadcastExchange{
2715- ChanId: store.SystemInternalChannelId,
2716- TopLevel: 1,
2717- NotificationPayloads: []json.RawMessage{notification1},
2718- Decoded: []map[string]interface{}{decoded1},
2719+ ChanId: store.SystemInternalChannelId,
2720+ TopLevel: 1,
2721+ Notifications: help.Ns(notification1),
2722+ Decoded: []map[string]interface{}{decoded1},
2723 })
2724 }
2725
2726
2727=== modified file 'server/broker/simple/suite_test.go'
2728--- server/broker/simple/suite_test.go 2014-02-10 23:19:08 +0000
2729+++ server/broker/simple/suite_test.go 2014-05-12 14:11:46 +0000
2730@@ -42,4 +42,7 @@
2731 RevealBroadcastExchange: func(exchg broker.Exchange) *broker.BroadcastExchange {
2732 return exchg.(*broker.BroadcastExchange)
2733 },
2734+ RevealUnicastExchange: func(exchg broker.Exchange) *broker.UnicastExchange {
2735+ return exchg.(*broker.UnicastExchange)
2736+ },
2737 }})
2738
2739=== modified file 'server/broker/testing/impls.go'
2740--- server/broker/testing/impls.go 2014-04-03 14:31:10 +0000
2741+++ server/broker/testing/impls.go 2014-05-12 14:11:46 +0000
2742@@ -18,7 +18,9 @@
2743 package testing
2744
2745 import (
2746+ "launchpad.net/ubuntu-push/protocol"
2747 "launchpad.net/ubuntu-push/server/broker"
2748+ "launchpad.net/ubuntu-push/server/store"
2749 )
2750
2751 // Test implementation of BrokerSession.
2752@@ -29,6 +31,9 @@
2753 Exchanges chan broker.Exchange
2754 LevelsMap broker.LevelsMap
2755 exchgScratch broker.ExchangesScratchArea
2756+ // hooks
2757+ DoGet func(store.InternalChannelId, bool) (int64, []protocol.Notification, error)
2758+ DoDropByMsgId func(store.InternalChannelId, []protocol.Notification) error
2759 }
2760
2761 func (tbs *TestBrokerSession) DeviceIdentifier() string {
2762@@ -55,6 +60,14 @@
2763 return &tbs.exchgScratch
2764 }
2765
2766+func (tbs *TestBrokerSession) Get(chanId store.InternalChannelId, cachedOk bool) (int64, []protocol.Notification, error) {
2767+ return tbs.DoGet(chanId, cachedOk)
2768+}
2769+
2770+func (tbs *TestBrokerSession) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error {
2771+ return tbs.DoDropByMsgId(chanId, targets)
2772+}
2773+
2774 // Test implementation of BrokerConfig.
2775 type TestBrokerConfig struct {
2776 ConfigSessionQueueSize uint
2777
2778=== modified file 'server/broker/testsuite/suite.go'
2779--- server/broker/testsuite/suite.go 2014-04-11 08:47:18 +0000
2780+++ server/broker/testsuite/suite.go 2014-05-12 14:11:46 +0000
2781@@ -30,7 +30,7 @@
2782 "launchpad.net/ubuntu-push/server/broker"
2783 "launchpad.net/ubuntu-push/server/broker/testing"
2784 "launchpad.net/ubuntu-push/server/store"
2785- helpers "launchpad.net/ubuntu-push/testing"
2786+ help "launchpad.net/ubuntu-push/testing"
2787 )
2788
2789 // The expected interface for tested brokers.
2790@@ -50,12 +50,14 @@
2791 RevealSession func(broker.Broker, string) broker.BrokerSession
2792 // Let us get to a broker.BroadcastExchange from an Exchange.
2793 RevealBroadcastExchange func(broker.Exchange) *broker.BroadcastExchange
2794+ // Let us get to a broker.UnicastExchange from an Exchange.
2795+ RevealUnicastExchange func(broker.Exchange) *broker.UnicastExchange
2796 // private
2797- testlog *helpers.TestLogger
2798+ testlog *help.TestLogger
2799 }
2800
2801 func (s *CommonBrokerSuite) SetUpTest(c *C) {
2802- s.testlog = helpers.NewTestLogger(c, "error")
2803+ s.testlog = help.NewTestLogger(c, "error")
2804 }
2805
2806 var testBrokerConfig = &testing.TestBrokerConfig{10, 5}
2807@@ -89,7 +91,7 @@
2808 "device": "model",
2809 "channel": "daily",
2810 },
2811- })
2812+ }, "s1")
2813 c.Assert(err, IsNil)
2814 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess)
2815 c.Assert(sess.DeviceIdentifier(), Equals, "dev-1")
2816@@ -101,7 +103,7 @@
2817 }))
2818 b.Unregister(sess)
2819 // just to make sure the unregister was processed
2820- _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""})
2821+ _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""}, "s2")
2822 c.Assert(err, IsNil)
2823 c.Check(s.RevealSession(b, "dev-1"), IsNil)
2824 }
2825@@ -111,7 +113,7 @@
2826 b := s.MakeBroker(sto, testBrokerConfig, nil)
2827 b.Start()
2828 defer b.Stop()
2829- _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"z": 5}})
2830+ _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"z": 5}}, "s1")
2831 c.Check(err, FitsTypeOf, &broker.ErrAbort{})
2832 }
2833
2834@@ -123,11 +125,11 @@
2835 info := map[string]interface{}{
2836 "device": -1,
2837 }
2838- _, err := b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})
2839+ _, err := b.Register(&protocol.ConnectMsg{Type: "connect", Info: info}, "s1")
2840 c.Check(err, Equals, broker.ErrUnexpectedValue)
2841 info["device"] = "m"
2842 info["channel"] = -1
2843- _, err = b.Register(&protocol.ConnectMsg{Type: "connect", Info: info})
2844+ _, err = b.Register(&protocol.ConnectMsg{Type: "connect", Info: info}, "s2")
2845 c.Check(err, Equals, broker.ErrUnexpectedValue)
2846 }
2847
2848@@ -139,7 +141,7 @@
2849 b := s.MakeBroker(sto, testBrokerConfig, nil)
2850 b.Start()
2851 defer b.Stop()
2852- sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
2853+ sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
2854 c.Assert(err, IsNil)
2855 c.Check(len(sess.SessionChannel()), Equals, 1)
2856 }
2857@@ -149,7 +151,7 @@
2858 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)
2859 b.Start()
2860 defer b.Stop()
2861- _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
2862+ _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
2863 c.Assert(err, IsNil)
2864 // but
2865 c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful feed pending, get channel snapshot for 0: get channel snapshot fail\n")
2866@@ -160,22 +162,25 @@
2867 b := s.MakeBroker(sto, testBrokerConfig, nil)
2868 b.Start()
2869 defer b.Stop()
2870- sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
2871- c.Assert(err, IsNil)
2872- sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
2873- c.Assert(err, IsNil)
2874- checkAndFalse := false
2875- // previous session got signaled by closing its channel
2876+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
2877+ c.Assert(err, IsNil)
2878+ sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s2")
2879+ c.Assert(err, IsNil)
2880+ // previous session got signaled by sending nil on its channel
2881+ var sentinel broker.Exchange
2882+ got := false
2883 select {
2884- case _, ok := <-sess1.SessionChannel():
2885- checkAndFalse = ok == false
2886- default:
2887+ case sentinel = <-sess1.SessionChannel():
2888+ got = true
2889+ case <-time.After(5 * time.Second):
2890+ c.Fatal("taking too long to get sentinel")
2891 }
2892- c.Check(checkAndFalse, Equals, true)
2893+ c.Check(got, Equals, true)
2894+ c.Check(sentinel, IsNil)
2895 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess2)
2896 b.Unregister(sess1)
2897 // just to make sure the unregister was processed
2898- _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""})
2899+ _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""}, "s3")
2900 c.Assert(err, IsNil)
2901 c.Check(s.RevealSession(b, "dev-1"), Equals, sess2)
2902 }
2903@@ -187,9 +192,9 @@
2904 b := s.MakeBroker(sto, testBrokerConfig, nil)
2905 b.Start()
2906 defer b.Stop()
2907- sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
2908+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
2909 c.Assert(err, IsNil)
2910- sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"})
2911+ sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}, "s2")
2912 c.Assert(err, IsNil)
2913 // add notification to channel *after* the registrations
2914 muchLater := time.Now().Add(10 * time.Minute)
2915@@ -200,10 +205,10 @@
2916 c.Fatal("taking too long to get broadcast exchange")
2917 case exchg1 := <-sess1.SessionChannel():
2918 c.Check(s.RevealBroadcastExchange(exchg1), DeepEquals, &broker.BroadcastExchange{
2919- ChanId: store.SystemInternalChannelId,
2920- TopLevel: 1,
2921- NotificationPayloads: []json.RawMessage{notification1},
2922- Decoded: []map[string]interface{}{decoded1},
2923+ ChanId: store.SystemInternalChannelId,
2924+ TopLevel: 1,
2925+ Notifications: help.Ns(notification1),
2926+ Decoded: []map[string]interface{}{decoded1},
2927 })
2928 }
2929 select {
2930@@ -211,10 +216,10 @@
2931 c.Fatal("taking too long to get broadcast exchange")
2932 case exchg2 := <-sess2.SessionChannel():
2933 c.Check(s.RevealBroadcastExchange(exchg2), DeepEquals, &broker.BroadcastExchange{
2934- ChanId: store.SystemInternalChannelId,
2935- TopLevel: 1,
2936- NotificationPayloads: []json.RawMessage{notification1},
2937- Decoded: []map[string]interface{}{decoded1},
2938+ ChanId: store.SystemInternalChannelId,
2939+ TopLevel: 1,
2940+ Notifications: help.Ns(notification1),
2941+ Decoded: []map[string]interface{}{decoded1},
2942 })
2943 }
2944 }
2945@@ -224,7 +229,7 @@
2946 countdownToFail int
2947 }
2948
2949-func (sto *testFailingStore) GetChannelSnapshot(chanId store.InternalChannelId) (int64, []json.RawMessage, error) {
2950+func (sto *testFailingStore) GetChannelSnapshot(chanId store.InternalChannelId) (int64, []protocol.Notification, error) {
2951 if sto.countdownToFail == 0 {
2952 return 0, nil, errors.New("get channel snapshot fail")
2953 }
2954@@ -232,6 +237,10 @@
2955 return 0, nil, nil
2956 }
2957
2958+func (sto *testFailingStore) DropByMsgId(chanId store.InternalChannelId, targets []protocol.Notification) error {
2959+ return errors.New("drop fail")
2960+}
2961+
2962 func (s *CommonBrokerSuite) TestBroadcastFail(c *C) {
2963 logged := make(chan bool, 1)
2964 s.testlog.SetLogEventCb(func(string) {
2965@@ -241,7 +250,7 @@
2966 b := s.MakeBroker(sto, testBrokerConfig, s.testlog)
2967 b.Start()
2968 defer b.Stop()
2969- _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
2970+ _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}, "s1")
2971 c.Assert(err, IsNil)
2972 b.Broadcast(store.SystemInternalChannelId)
2973 select {
2974@@ -249,5 +258,83 @@
2975 c.Fatal("taking too long to log error")
2976 case <-logged:
2977 }
2978- c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful broadcast, get channel snapshot for 0: get channel snapshot fail\n")
2979+ c.Check(s.testlog.Captured(), Matches, "ERROR.*: get channel snapshot fail\n")
2980+}
2981+
2982+func (s *CommonBrokerSuite) TestUnicast(c *C) {
2983+ sto := store.NewInMemoryPendingStore()
2984+ notification1 := json.RawMessage(`{"m": "M1"}`)
2985+ notification2 := json.RawMessage(`{"m": "M2"}`)
2986+ chanId1 := store.UnicastInternalChannelId("dev1", "dev1")
2987+ chanId2 := store.UnicastInternalChannelId("dev2", "dev2")
2988+ b := s.MakeBroker(sto, testBrokerConfig, nil)
2989+ b.Start()
2990+ defer b.Stop()
2991+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev1"}, "s1")
2992+ c.Assert(err, IsNil)
2993+ sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev2"}, "s2")
2994+ c.Assert(err, IsNil)
2995+ // add notification to channel *after* the registrations
2996+ muchLater := time.Now().Add(10 * time.Minute)
2997+ sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater)
2998+ sto.AppendToUnicastChannel(chanId2, "app2", notification2, "msg2", muchLater)
2999+ b.Unicast(chanId2, chanId1)
3000+ select {
3001+ case <-time.After(5 * time.Second):
3002+ c.Fatal("taking too long to get unicast exchange")
3003+ case exchg1 := <-sess1.SessionChannel():
3004+ u1 := s.RevealUnicastExchange(exchg1)
3005+ c.Check(u1.ChanId, Equals, chanId1)
3006+ }
3007+ select {
3008+ case <-time.After(5 * time.Second):
3009+ c.Fatal("taking too long to get unicast exchange")
3010+ case exchg2 := <-sess2.SessionChannel():
3011+ u2 := s.RevealUnicastExchange(exchg2)
3012+ c.Check(u2.ChanId, Equals, chanId2)
3013+ }
3014+}
3015+
3016+func (s *CommonBrokerSuite) TestGetAndDrop(c *C) {
3017+ sto := store.NewInMemoryPendingStore()
3018+ notification1 := json.RawMessage(`{"m": "M1"}`)
3019+ chanId1 := store.UnicastInternalChannelId("dev3", "dev3")
3020+ b := s.MakeBroker(sto, testBrokerConfig, nil)
3021+ b.Start()
3022+ defer b.Stop()
3023+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1")
3024+ c.Assert(err, IsNil)
3025+ muchLater := time.Now().Add(10 * time.Minute)
3026+ sto.AppendToUnicastChannel(chanId1, "app1", notification1, "msg1", muchLater)
3027+ _, expected, err := sto.GetChannelSnapshot(chanId1)
3028+ c.Assert(err, IsNil)
3029+ _, notifs, err := sess1.Get(chanId1, false)
3030+ c.Check(notifs, HasLen, 1)
3031+ c.Check(notifs, DeepEquals, expected)
3032+ err = sess1.DropByMsgId(chanId1, notifs)
3033+ c.Assert(err, IsNil)
3034+ _, notifs, err = sess1.Get(chanId1, true)
3035+ c.Check(notifs, HasLen, 0)
3036+ _, expected, err = sto.GetChannelSnapshot(chanId1)
3037+ c.Assert(err, IsNil)
3038+ c.Check(expected, HasLen, 0)
3039+
3040+}
3041+
3042+func (s *CommonBrokerSuite) TestGetAndDropErrors(c *C) {
3043+ chanId1 := store.UnicastInternalChannelId("dev3", "dev3")
3044+ sto := &testFailingStore{countdownToFail: 1}
3045+ b := s.MakeBroker(sto, testBrokerConfig, s.testlog)
3046+ b.Start()
3047+ defer b.Stop()
3048+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev3"}, "s1")
3049+ c.Assert(err, IsNil)
3050+ _, _, err = sess1.Get(chanId1, false)
3051+ c.Assert(err, ErrorMatches, "get channel snapshot fail")
3052+ c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, get channel snapshot for Udev3:dev3 \\(cachedOk=false\\): get channel snapshot fail\n")
3053+ s.testlog.ResetCapture()
3054+
3055+ err = sess1.DropByMsgId(chanId1, nil)
3056+ c.Assert(err, ErrorMatches, "drop fail")
3057+ c.Check(s.testlog.Captured(), Matches, "ERROR unsuccessful, drop from channel Udev3:dev3: drop fail\n")
3058 }
3059
3060=== modified file 'server/session/session.go'
3061--- server/session/session.go 2014-04-11 08:47:18 +0000
3062+++ server/session/session.go 2014-05-12 14:11:46 +0000
3063@@ -18,6 +18,7 @@
3064 package session
3065
3066 import (
3067+ "errors"
3068 "net"
3069 "time"
3070
3071@@ -35,7 +36,7 @@
3072 }
3073
3074 // sessionStart manages the start of the protocol session.
3075-func sessionStart(proto protocol.Protocol, brkr broker.Broker, cfg SessionConfig) (broker.BrokerSession, error) {
3076+func sessionStart(proto protocol.Protocol, brkr broker.Broker, cfg SessionConfig, sessionId string) (broker.BrokerSession, error) {
3077 var connMsg protocol.ConnectMsg
3078 proto.SetDeadline(time.Now().Add(cfg.ExchangeTimeout()))
3079 err := proto.ReadMessage(&connMsg)
3080@@ -52,9 +53,11 @@
3081 if err != nil {
3082 return nil, err
3083 }
3084- return brkr.Register(&connMsg)
3085+ return brkr.Register(&connMsg, sessionId)
3086 }
3087
3088+var errOneway = errors.New("oneway")
3089+
3090 // exchange writes outMsg message, reads answer in inMsg
3091 func exchange(proto protocol.Protocol, outMsg, inMsg interface{}, exchangeTimeout time.Duration) error {
3092 proto.SetDeadline(time.Now().Add(exchangeTimeout))
3093@@ -62,7 +65,10 @@
3094 if err != nil {
3095 return err
3096 }
3097- if inMsg == nil { // no answer expected, breaking connection
3098+ if inMsg == nil { // no answer expected
3099+ if outMsg.(protocol.OnewayMsg).OnewayContinue() {
3100+ return errOneway
3101+ }
3102 return &broker.ErrAbort{"session broken for reason"}
3103 }
3104 err = proto.ReadMessage(inMsg)
3105@@ -78,6 +84,10 @@
3106 exchangeTimeout := cfg.ExchangeTimeout()
3107 pingTimer := time.NewTimer(pingInterval)
3108 intervalStart := time.Now()
3109+ pingTimerReset := func() {
3110+ pingTimer.Reset(pingInterval)
3111+ intervalStart = time.Now()
3112+ }
3113 ch := sess.SessionChannel()
3114 Loop:
3115 for {
3116@@ -93,16 +103,15 @@
3117 if pongMsg.Type != "pong" {
3118 return &broker.ErrAbort{"expected PONG message"}
3119 }
3120- pingTimer.Reset(pingInterval)
3121- case exchg, ok := <-ch:
3122+ pingTimerReset()
3123+ case exchg := <-ch:
3124 pingTimer.Stop()
3125- if !ok {
3126+ if exchg == nil {
3127 return &broker.ErrAbort{"terminated"}
3128 }
3129 outMsg, inMsg, err := exchg.Prepare(sess)
3130 if err == broker.ErrNop { // nothing to do
3131- pingTimer.Reset(pingInterval)
3132- intervalStart = time.Now()
3133+ pingTimerReset()
3134 continue Loop
3135 }
3136 if err != nil {
3137@@ -111,12 +120,15 @@
3138 for {
3139 done := outMsg.Split()
3140 err = exchange(proto, outMsg, inMsg, exchangeTimeout)
3141+ if err == errOneway {
3142+ pingTimerReset()
3143+ continue Loop
3144+ }
3145 if err != nil {
3146 return err
3147 }
3148 if done {
3149- pingTimer.Reset(pingInterval)
3150- intervalStart = time.Now()
3151+ pingTimerReset()
3152 }
3153 err = exchg.Acked(sess, done)
3154 if err != nil {
3155@@ -142,7 +154,7 @@
3156 return track.End(&broker.ErrAbort{"unexpected wire format version"})
3157 }
3158 proto := protocol.NewProtocol0(conn)
3159- sess, err := sessionStart(proto, brkr, cfg)
3160+ sess, err := sessionStart(proto, brkr, cfg, track.SessionId())
3161 if err != nil {
3162 return track.End(err)
3163 }
3164
3165=== modified file 'server/session/session_test.go'
3166--- server/session/session_test.go 2014-04-11 08:47:18 +0000
3167+++ server/session/session_test.go 2014-05-12 14:11:46 +0000
3168@@ -130,8 +130,8 @@
3169 return &testBroker{registration: make(chan interface{}, 2)}
3170 }
3171
3172-func (tb *testBroker) Register(connect *protocol.ConnectMsg) (broker.BrokerSession, error) {
3173- tb.registration <- "register " + connect.DeviceId
3174+func (tb *testBroker) Register(connect *protocol.ConnectMsg, sessionId string) (broker.BrokerSession, error) {
3175+ tb.registration <- fmt.Sprintf("register %s %s", connect.DeviceId, sessionId)
3176 return &testing.TestBrokerSession{DeviceId: connect.DeviceId}, tb.err
3177 }
3178
3179@@ -148,7 +148,7 @@
3180 brkr := newTestBroker()
3181 go func() {
3182 var err error
3183- sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout)
3184+ sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout, "s1")
3185 errCh <- err
3186 }()
3187 c.Check(takeNext(down), Equals, "deadline 5ms")
3188@@ -160,7 +160,7 @@
3189 up <- nil // no write error
3190 err := <-errCh
3191 c.Check(err, IsNil)
3192- c.Check(takeNext(brkr.registration), Equals, "register dev-1")
3193+ c.Check(takeNext(brkr.registration), Equals, "register dev-1 s1")
3194 c.Check(sess.DeviceIdentifier(), Equals, "dev-1")
3195 }
3196
3197@@ -175,7 +175,7 @@
3198 brkr.err = errRegister
3199 go func() {
3200 var err error
3201- sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout)
3202+ sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout, "s2")
3203 errCh <- err
3204 }()
3205 up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"}
3206@@ -190,7 +190,7 @@
3207 down := make(chan interface{}, 5)
3208 tp := &testProtocol{up, down}
3209 up <- io.ErrUnexpectedEOF
3210- _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout)
3211+ _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout, "s3")
3212 c.Check(err, Equals, io.ErrUnexpectedEOF)
3213 }
3214
3215@@ -200,7 +200,7 @@
3216 tp := &testProtocol{up, down}
3217 up <- protocol.ConnectMsg{Type: "connect"}
3218 up <- io.ErrUnexpectedEOF
3219- _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout)
3220+ _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout, "s4")
3221 c.Check(err, Equals, io.ErrUnexpectedEOF)
3222 // sanity
3223 c.Check(takeNext(down), Matches, "deadline.*")
3224@@ -212,7 +212,7 @@
3225 down := make(chan interface{}, 5)
3226 tp := &testProtocol{up, down}
3227 up <- protocol.ConnectMsg{Type: "what"}
3228- _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout)
3229+ _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout, "s5")
3230 c.Check(err, DeepEquals, &broker.ErrAbort{"expected CONNECT message"})
3231 }
3232
3233@@ -222,14 +222,14 @@
3234 }
3235
3236 func (s *sessionSuite) TestSessionLoop(c *C) {
3237- nopTrack := NewTracker(s.testlog)
3238+ track := &testTracker{NewTracker(s.testlog), make(chan interface{}, 2)}
3239 errCh := make(chan error, 1)
3240 up := make(chan interface{}, 5)
3241 down := make(chan interface{}, 5)
3242 tp := &testProtocol{up, down}
3243 sess := &testing.TestBrokerSession{}
3244 go func() {
3245- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
3246+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, track)
3247 }()
3248 c.Check(takeNext(down), Equals, "deadline 2ms")
3249 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
3250@@ -241,6 +241,9 @@
3251 up <- io.ErrUnexpectedEOF
3252 err := <-errCh
3253 c.Check(err, Equals, io.ErrUnexpectedEOF)
3254+ c.Check(track.interval, HasLen, 2)
3255+ c.Check((<-track.interval).(time.Duration) <= 8*time.Millisecond, Equals, true)
3256+ c.Check((<-track.interval).(time.Duration) <= 8*time.Millisecond, Equals, true)
3257 }
3258
3259 func (s *sessionSuite) TestSessionLoopWriteError(c *C) {
3260@@ -357,7 +360,7 @@
3261 go func() {
3262 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
3263 }()
3264- close(exchanges)
3265+ exchanges <- nil
3266 err := <-errCh
3267 c.Check(err, DeepEquals, &broker.ErrAbort{"terminated"})
3268 }
3269@@ -477,18 +480,44 @@
3270 down := make(chan interface{}, 5)
3271 tp := &testProtocol{up, down}
3272 exchanges := make(chan broker.Exchange, 1)
3273- exchanges <- &broker.ConnBrokenExchange{"REASON"}
3274+ msg := &protocol.ConnBrokenMsg{"connbroken", "BREASON"}
3275+ exchanges <- &broker.ConnMetaExchange{msg}
3276 sess := &testing.TestBrokerSession{Exchanges: exchanges}
3277 go func() {
3278 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
3279 }()
3280 c.Check(takeNext(down), Equals, "deadline 2ms")
3281- c.Check(takeNext(down), DeepEquals, protocol.ConnBrokenMsg{"connbroken", "REASON"})
3282+ c.Check(takeNext(down), DeepEquals, protocol.ConnBrokenMsg{"connbroken", "BREASON"})
3283 up <- nil // no write error
3284 err := <-errCh
3285 c.Check(err, DeepEquals, &broker.ErrAbort{"session broken for reason"})
3286 }
3287
3288+func (s *sessionSuite) TestSessionLoopConnWarnExchange(c *C) {
3289+ nopTrack := NewTracker(s.testlog)
3290+ errCh := make(chan error, 1)
3291+ up := make(chan interface{}, 5)
3292+ down := make(chan interface{}, 5)
3293+ tp := &testProtocol{up, down}
3294+ exchanges := make(chan broker.Exchange, 1)
3295+ msg := &protocol.ConnWarnMsg{"connwarn", "WREASON"}
3296+ exchanges <- &broker.ConnMetaExchange{msg}
3297+ sess := &testing.TestBrokerSession{Exchanges: exchanges}
3298+ go func() {
3299+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
3300+ }()
3301+ c.Check(takeNext(down), Equals, "deadline 2ms")
3302+ c.Check(takeNext(down), DeepEquals, protocol.ConnWarnMsg{"connwarn", "WREASON"})
3303+ up <- nil // no write error
3304+ // session continues
3305+ c.Check(takeNext(down), Equals, "deadline 2ms")
3306+ c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
3307+ up <- nil // no write error
3308+ up <- io.EOF
3309+ err := <-errCh
3310+ c.Check(err, Equals, io.EOF)
3311+}
3312+
3313 type testTracker struct {
3314 SessionTracker
3315 interval chan interface{}
3316@@ -593,7 +622,7 @@
3317 msg, err = downStream.ReadBytes(byte('}'))
3318 c.Check(err, IsNil)
3319 c.Check(msg, DeepEquals, []byte("\x00\x0c{\"T\":\"ping\"}"))
3320- c.Check(takeNext(brkr.registration), Equals, "register DEV")
3321+ c.Check(takeNext(brkr.registration), Equals, "register DEV "+track.SessionId())
3322 c.Check(len(brkr.registration), Equals, 0) // not yet unregistered
3323 cli.Close()
3324 err = <-errCh
3325
3326=== modified file 'server/session/tracker.go'
3327--- server/session/tracker.go 2014-02-10 23:19:08 +0000
3328+++ server/session/tracker.go 2014-05-12 14:11:46 +0000
3329@@ -17,6 +17,7 @@
3330 package session
3331
3332 import (
3333+ "fmt"
3334 "net"
3335 "time"
3336
3337@@ -29,6 +30,8 @@
3338 logger.Logger
3339 // Session got started.
3340 Start(WithRemoteAddr)
3341+ // SessionId
3342+ SessionId() string
3343 // Session got registered with broker as sess BrokerSession.
3344 Registered(sess broker.BrokerSession)
3345 // Report effective elapsed ping interval.
3346@@ -47,7 +50,7 @@
3347 // Tracker implements SessionTracker simply.
3348 type tracker struct {
3349 logger.Logger
3350- sessionId int64 // xxx use timeuuid later
3351+ sessionId string
3352 }
3353
3354 func NewTracker(logger logger.Logger) SessionTracker {
3355@@ -55,18 +58,22 @@
3356 }
3357
3358 func (trk *tracker) Start(conn WithRemoteAddr) {
3359- trk.sessionId = time.Now().UnixNano() - sessionsEpoch
3360- trk.Debugf("session(%x) connected %v", trk.sessionId, conn.RemoteAddr())
3361+ trk.sessionId = fmt.Sprintf("%x", time.Now().UnixNano()-sessionsEpoch)
3362+ trk.Debugf("session(%s) connected %v", trk.sessionId, conn.RemoteAddr())
3363+}
3364+
3365+func (trk *tracker) SessionId() string {
3366+ return trk.sessionId
3367 }
3368
3369 func (trk *tracker) Registered(sess broker.BrokerSession) {
3370- trk.Infof("session(%x) registered %v", trk.sessionId, sess.DeviceIdentifier())
3371+ trk.Infof("session(%s) registered %v", trk.sessionId, sess.DeviceIdentifier())
3372 }
3373
3374 func (trk *tracker) EffectivePingInterval(time.Duration) {
3375 }
3376
3377 func (trk *tracker) End(err error) error {
3378- trk.Debugf("session(%x) ended with: %v", trk.sessionId, err)
3379+ trk.Debugf("session(%s) ended with: %v", trk.sessionId, err)
3380 return err
3381 }
3382
3383=== modified file 'server/session/tracker_test.go'
3384--- server/session/tracker_test.go 2014-02-10 23:19:08 +0000
3385+++ server/session/tracker_test.go 2014-05-12 14:11:46 +0000
3386@@ -46,8 +46,8 @@
3387 func (s *trackerSuite) TestSessionTrackStart(c *C) {
3388 track := NewTracker(s.testlog)
3389 track.Start(&testRemoteAddrable{})
3390- c.Check(track.(*tracker).sessionId, Not(Equals), 0)
3391- regExpected := fmt.Sprintf(`DEBUG session\(%x\) connected 127\.0\.0\.1:9999\n`, track.(*tracker).sessionId)
3392+ c.Check(track.SessionId(), Not(Equals), "")
3393+ regExpected := fmt.Sprintf(`DEBUG session\(%s\) connected 127\.0\.0\.1:9999\n`, track.SessionId())
3394 c.Check(s.testlog.Captured(), Matches, regExpected)
3395 }
3396
3397@@ -55,7 +55,7 @@
3398 track := NewTracker(s.testlog)
3399 track.Start(&testRemoteAddrable{})
3400 track.Registered(&testing.TestBrokerSession{DeviceId: "DEV-ID"})
3401- regExpected := fmt.Sprintf(`.*connected.*\nINFO session\(%x\) registered DEV-ID\n`, track.(*tracker).sessionId)
3402+ regExpected := fmt.Sprintf(`.*connected.*\nINFO session\(%s\) registered DEV-ID\n`, track.SessionId())
3403 c.Check(s.testlog.Captured(), Matches, regExpected)
3404 }
3405
3406@@ -63,6 +63,6 @@
3407 track := NewTracker(s.testlog)
3408 track.Start(&testRemoteAddrable{})
3409 track.End(&broker.ErrAbort{})
3410- regExpected := fmt.Sprintf(`.*connected.*\nDEBUG session\(%x\) ended with: session aborted \(\)\n`, track.(*tracker).sessionId)
3411+ regExpected := fmt.Sprintf(`.*connected.*\nDEBUG session\(%s\) ended with: session aborted \(\)\n`, track.SessionId())
3412 c.Check(s.testlog.Captured(), Matches, regExpected)
3413 }
3414
3415=== modified file 'server/store/inmemory.go'
3416--- server/store/inmemory.go 2014-02-18 14:19:05 +0000
3417+++ server/store/inmemory.go 2014-05-12 14:11:46 +0000
3418@@ -20,18 +20,15 @@
3419 "encoding/json"
3420 "sync"
3421 "time"
3422+
3423+ "launchpad.net/ubuntu-push/protocol"
3424 )
3425
3426-// one stored notification
3427-type notification struct {
3428- payload json.RawMessage
3429- expiration time.Time
3430-}
3431-
3432 // one stored channel
3433 type channel struct {
3434 topLevel int64
3435- notifications []notification
3436+ notifications []protocol.Notification
3437+ expirations []time.Time
3438 }
3439
3440 // InMemoryPendingStore is a basic in-memory pending notification store.
3441@@ -54,23 +51,35 @@
3442 return InternalChannelId(""), ErrUnknownChannel
3443 }
3444
3445-func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notificationPayload json.RawMessage, expiration time.Time) error {
3446+func (sto *InMemoryPendingStore) appendToChannel(chanId InternalChannelId, newNotification protocol.Notification, inc int64, expiration time.Time) error {
3447 sto.lock.Lock()
3448 defer sto.lock.Unlock()
3449 prev := sto.store[chanId]
3450 if prev == nil {
3451 prev = &channel{}
3452 }
3453- prev.topLevel++
3454- prev.notifications = append(prev.notifications, notification{
3455- payload: notificationPayload,
3456- expiration: expiration,
3457- })
3458+ prev.topLevel += inc
3459+ prev.notifications = append(prev.notifications, newNotification)
3460+ prev.expirations = append(prev.expirations, expiration)
3461 sto.store[chanId] = prev
3462 return nil
3463 }
3464
3465-func (sto *InMemoryPendingStore) GetChannelSnapshot(chanId InternalChannelId) (int64, []json.RawMessage, error) {
3466+func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notificationPayload json.RawMessage, expiration time.Time) error {
3467+ newNotification := protocol.Notification{Payload: notificationPayload}
3468+ return sto.appendToChannel(chanId, newNotification, 1, expiration)
3469+}
3470+
3471+func (sto *InMemoryPendingStore) AppendToUnicastChannel(chanId InternalChannelId, appId string, notificationPayload json.RawMessage, msgId string, expiration time.Time) error {
3472+ newNotification := protocol.Notification{
3473+ Payload: notificationPayload,
3474+ AppId: appId,
3475+ MsgId: msgId,
3476+ }
3477+ return sto.appendToChannel(chanId, newNotification, 0, expiration)
3478+}
3479+
3480+func (sto *InMemoryPendingStore) GetChannelSnapshot(chanId InternalChannelId) (int64, []protocol.Notification, error) {
3481 sto.lock.Lock()
3482 defer sto.lock.Unlock()
3483 channel, ok := sto.store[chanId]
3484@@ -79,14 +88,19 @@
3485 }
3486 topLevel := channel.topLevel
3487 n := len(channel.notifications)
3488- res := make([]json.RawMessage, 0, n)
3489+ res := make([]protocol.Notification, 0, n)
3490+ exps := make([]time.Time, 0, n)
3491 now := time.Now()
3492- for _, notification := range channel.notifications {
3493- if notification.expiration.Before(now) {
3494+ for i, expiration := range channel.expirations {
3495+ if expiration.Before(now) {
3496 continue
3497 }
3498- res = append(res, notification.payload)
3499+ res = append(res, channel.notifications[i])
3500+ exps = append(exps, expiration)
3501 }
3502+ // store as well
3503+ channel.notifications = res
3504+ channel.expirations = exps
3505 return topLevel, res, nil
3506 }
3507
3508@@ -94,5 +108,25 @@
3509 // ignored
3510 }
3511
3512+func (sto *InMemoryPendingStore) DropByMsgId(chanId InternalChannelId, targets []protocol.Notification) error {
3513+ sto.lock.Lock()
3514+ defer sto.lock.Unlock()
3515+ channel, ok := sto.store[chanId]
3516+ if !ok {
3517+ return nil
3518+ }
3519+ expById := make(map[string]time.Time, len(channel.notifications))
3520+ for i, notif := range channel.notifications {
3521+ expById[notif.MsgId] = channel.expirations[i]
3522+ }
3523+ channel.notifications = FilterOutByMsgId(channel.notifications, targets)
3524+ exps := make([]time.Time, len(channel.notifications))
3525+ for i, notif := range channel.notifications {
3526+ exps[i] = expById[notif.MsgId]
3527+ }
3528+ channel.expirations = exps
3529+ return nil
3530+}
3531+
3532 // sanity check we implement the interface
3533 var _ PendingStore = (*InMemoryPendingStore)(nil)
3534
3535=== modified file 'server/store/inmemory_test.go'
3536--- server/store/inmemory_test.go 2014-02-14 12:38:38 +0000
3537+++ server/store/inmemory_test.go 2014-05-12 14:11:46 +0000
3538@@ -21,6 +21,9 @@
3539 "time"
3540
3541 . "launchpad.net/gocheck"
3542+
3543+ "launchpad.net/ubuntu-push/protocol"
3544+ help "launchpad.net/ubuntu-push/testing"
3545 )
3546
3547 type inMemorySuite struct{}
3548@@ -45,7 +48,7 @@
3549 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)
3550 c.Assert(err, IsNil)
3551 c.Check(top, Equals, int64(0))
3552- c.Check(res, DeepEquals, []json.RawMessage(nil))
3553+ c.Check(res, DeepEquals, []protocol.Notification(nil))
3554 }
3555
3556 func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshot(c *C) {
3557@@ -61,7 +64,33 @@
3558 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)
3559 c.Assert(err, IsNil)
3560 c.Check(top, Equals, int64(2))
3561- c.Check(res, DeepEquals, []json.RawMessage{notification1, notification2})
3562+ c.Check(res, DeepEquals, help.Ns(notification1, notification2))
3563+}
3564+
3565+func (s *inMemorySuite) TestAppendToUnicastChannelAndGetChannelSnapshot(c *C) {
3566+ sto := NewInMemoryPendingStore()
3567+
3568+ chanId := UnicastInternalChannelId("user", "dev1")
3569+ notification1 := json.RawMessage(`{"a":1}`)
3570+ notification2 := json.RawMessage(`{"b":2}`)
3571+ app1 := "app1"
3572+ app2 := "app2"
3573+ msg1 := "msg1"
3574+ msg2 := "msg2"
3575+
3576+ muchLater := time.Now().Add(time.Minute)
3577+
3578+ err := sto.AppendToUnicastChannel(chanId, app1, notification1, msg1, muchLater)
3579+ c.Assert(err, IsNil)
3580+ err = sto.AppendToUnicastChannel(chanId, app2, notification2, msg2, muchLater)
3581+ c.Assert(err, IsNil)
3582+ top, res, err := sto.GetChannelSnapshot(chanId)
3583+ c.Assert(err, IsNil)
3584+ c.Check(res, DeepEquals, []protocol.Notification{
3585+ protocol.Notification{Payload: notification1, AppId: app1, MsgId: msg1},
3586+ protocol.Notification{Payload: notification2, AppId: app2, MsgId: msg2},
3587+ })
3588+ c.Check(top, Equals, int64(0))
3589 }
3590
3591 func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshotWithExpiration(c *C) {
3592@@ -81,5 +110,45 @@
3593 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)
3594 c.Assert(err, IsNil)
3595 c.Check(top, Equals, int64(2))
3596- c.Check(res, DeepEquals, []json.RawMessage{notification1})
3597+ c.Check(res, DeepEquals, help.Ns(notification1))
3598+}
3599+
3600+func (s *inMemorySuite) TestDropByMsgId(c *C) {
3601+ sto := NewInMemoryPendingStore()
3602+
3603+ chanId := UnicastInternalChannelId("user", "dev2")
3604+
3605+ // nothing to do is fine
3606+ err := sto.DropByMsgId(chanId, nil)
3607+ c.Assert(err, IsNil)
3608+
3609+ notification1 := json.RawMessage(`{"a":1}`)
3610+ notification2 := json.RawMessage(`{"b":2}`)
3611+ notification3 := json.RawMessage(`{"a":2}`)
3612+ app1 := "app1"
3613+ app2 := "app2"
3614+ msg1 := "msg1"
3615+ msg2 := "msg2"
3616+ msg3 := "msg3"
3617+
3618+ muchLater := time.Now().Add(time.Minute)
3619+
3620+ err = sto.AppendToUnicastChannel(chanId, app1, notification1, msg1, muchLater)
3621+ c.Assert(err, IsNil)
3622+ err = sto.AppendToUnicastChannel(chanId, app2, notification2, msg2, muchLater)
3623+ c.Assert(err, IsNil)
3624+ err = sto.AppendToUnicastChannel(chanId, app1, notification3, msg3, muchLater)
3625+ c.Assert(err, IsNil)
3626+ _, res, err := sto.GetChannelSnapshot(chanId)
3627+ c.Assert(err, IsNil)
3628+
3629+ err = sto.DropByMsgId(chanId, res[:2])
3630+ c.Assert(err, IsNil)
3631+
3632+ _, res, err = sto.GetChannelSnapshot(chanId)
3633+ c.Assert(err, IsNil)
3634+ c.Check(res, HasLen, 1)
3635+ c.Check(res, DeepEquals, []protocol.Notification{
3636+ protocol.Notification{Payload: notification3, AppId: app1, MsgId: msg3},
3637+ })
3638 }
3639
3640=== modified file 'server/store/store.go'
3641--- server/store/store.go 2014-02-18 13:43:07 +0000
3642+++ server/store/store.go 2014-05-12 14:11:46 +0000
3643@@ -21,11 +21,36 @@
3644 "encoding/hex"
3645 "encoding/json"
3646 "errors"
3647+ "fmt"
3648+ "strings"
3649 "time"
3650+
3651+ "launchpad.net/ubuntu-push/protocol"
3652 )
3653
3654 type InternalChannelId string
3655
3656+// BroadcastChannel returns whether the id represents a broadcast channel.
3657+func (icid InternalChannelId) BroadcastChannel() bool {
3658+ marker := icid[0]
3659+ return marker == 'B' || marker == '0'
3660+}
3661+
3662+// UnicastChannel returns whether the id represents a unicast channel.
3663+func (icid InternalChannelId) UnicastChannel() bool {
3664+ marker := icid[0]
3665+ return marker == 'U'
3666+}
3667+
3668+// UnicastUserAndDevice returns the user and device ids of a unicast channel.
3669+func (icid InternalChannelId) UnicastUserAndDevice() (userId, deviceId string) {
3670+ if !icid.UnicastChannel() {
3671+ panic("UnicastUserAndDevice is for unicast channels")
3672+ }
3673+ parts := strings.SplitN(string(icid)[1:], ":", 2)
3674+ return parts[0], parts[1]
3675+}
3676+
3677 var ErrUnknownChannel = errors.New("unknown channel name")
3678 var ErrFull = errors.New("channel is full")
3679 var ErrExpected128BitsHexRepr = errors.New("expected 128 bits hex repr")
3680@@ -36,7 +61,10 @@
3681 if chanId == SystemInternalChannelId {
3682 return "0"
3683 }
3684- panic("general InternalChannelIdToHex not implemeted yet")
3685+ if !chanId.BroadcastChannel() {
3686+ panic("InternalChannelIdToHex is for broadcast channels")
3687+ }
3688+ return string(chanId)[1:]
3689 }
3690
3691 var zero128 [16]byte
3692@@ -58,7 +86,14 @@
3693 if idbytes == zero128 {
3694 return SystemInternalChannelId, nil
3695 }
3696- return InternalChannelId(idbytes[:]), nil
3697+ // mark with B(roadcast) prefix
3698+ s := "B" + hexRepr
3699+ return InternalChannelId(s), nil
3700+}
3701+
3702+// UnicastInternalChannelId builds a channel id for the userId, deviceId pair.
3703+func UnicastInternalChannelId(userId, deviceId string) InternalChannelId {
3704+ return InternalChannelId(fmt.Sprintf("U%s:%s", userId, deviceId))
3705 }
3706
3707 // PendingStore let store notifications into channels.
3708@@ -68,9 +103,45 @@
3709 GetInternalChannelId(name string) (InternalChannelId, error)
3710 // AppendToChannel appends a notification to the channel.
3711 AppendToChannel(chanId InternalChannelId, notification json.RawMessage, expiration time.Time) error
3712+ // AppendToUnicastChannel appends a notification to the unicast channel.
3713 // GetChannelSnapshot gets all the current notifications and
3714+ AppendToUnicastChannel(chanId InternalChannelId, appId string, notification json.RawMessage, msgId string, expiration time.Time) error
3715 // current top level in the channel.
3716- GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, payloads []json.RawMessage, err error)
3717+ GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, notifications []protocol.Notification, err error)
3718+ // DropByMsgId drops notifications from a unicast channel based on message ids.
3719+ DropByMsgId(chanId InternalChannelId, targets []protocol.Notification) error
3720 // Close is to be called when done with the store.
3721 Close()
3722 }
3723+
3724+// FilterOutByMsgId returns the notifications from orig whose msg id is not
3725+// mentioned in targets.
3726+func FilterOutByMsgId(orig, targets []protocol.Notification) []protocol.Notification {
3727+ n := len(orig)
3728+ t := len(targets)
3729+ // common case, removing the continuous head
3730+ if t > 0 && n >= t {
3731+ if targets[0].MsgId == orig[0].MsgId {
3732+ for i := t - 1; i >= 0; i-- {
3733+ if i == 0 {
3734+ return orig[t:]
3735+ }
3736+ if targets[i].MsgId != orig[i].MsgId {
3737+ break
3738+ }
3739+ }
3740+ }
3741+ }
3742+ // slow way
3743+ ids := make(map[string]bool, t)
3744+ for _, target := range targets {
3745+ ids[target.MsgId] = true
3746+ }
3747+ acc := make([]protocol.Notification, 0, n)
3748+ for _, notif := range orig {
3749+ if !ids[notif.MsgId] {
3750+ acc = append(acc, notif)
3751+ }
3752+ }
3753+ return acc
3754+}
3755
3756=== modified file 'server/store/store_test.go'
3757--- server/store/store_test.go 2014-02-10 23:19:08 +0000
3758+++ server/store/store_test.go 2014-05-12 14:11:46 +0000
3759@@ -33,6 +33,8 @@
3760
3761 func (s *storeSuite) TestInternalChannelIdToHex(c *C) {
3762 c.Check(InternalChannelIdToHex(SystemInternalChannelId), Equals, protocol.SystemChannelId)
3763+ c.Check(InternalChannelIdToHex(InternalChannelId("Bf1c9bf7096084cb2a154979ce00c7f50")), Equals, "f1c9bf7096084cb2a154979ce00c7f50")
3764+ c.Check(func() { InternalChannelIdToHex(InternalChannelId("U")) }, PanicMatches, "InternalChannelIdToHex is for broadcast channels")
3765 }
3766
3767 func (s *storeSuite) TestHexToInternalChannelId(c *C) {
3768@@ -42,9 +44,11 @@
3769 i1, err := HexToInternalChannelId("00000000000000000000000000000000")
3770 c.Check(err, IsNil)
3771 c.Check(i1, Equals, SystemInternalChannelId)
3772+ c.Check(i1.BroadcastChannel(), Equals, true)
3773 i2, err := HexToInternalChannelId("f1c9bf7096084cb2a154979ce00c7f50")
3774 c.Check(err, IsNil)
3775- c.Check(i2, Equals, InternalChannelId("\xf1\xc9\xbf\x70\x96\x08\x4c\xb2\xa1\x54\x97\x9c\xe0\x0c\x7f\x50"))
3776+ c.Check(i2.BroadcastChannel(), Equals, true)
3777+ c.Check(i2, Equals, InternalChannelId("Bf1c9bf7096084cb2a154979ce00c7f50"))
3778 _, err = HexToInternalChannelId("01")
3779 c.Check(err, Equals, ErrExpected128BitsHexRepr)
3780 _, err = HexToInternalChannelId("abceddddddddddddddddzeeeeeeeeeee")
3781@@ -52,3 +56,44 @@
3782 _, err = HexToInternalChannelId("f1c9bf7096084cb2a154979ce00c7f50ff")
3783 c.Check(err, Equals, ErrExpected128BitsHexRepr)
3784 }
3785+
3786+func (s *storeSuite) TestUnicastInternalChannelId(c *C) {
3787+ chanId := UnicastInternalChannelId("user1", "dev2")
3788+ c.Check(chanId.BroadcastChannel(), Equals, false)
3789+ c.Check(chanId.UnicastChannel(), Equals, true)
3790+ u, d := chanId.UnicastUserAndDevice()
3791+ c.Check(u, Equals, "user1")
3792+ c.Check(d, Equals, "dev2")
3793+ c.Check(func() { SystemInternalChannelId.UnicastUserAndDevice() }, PanicMatches, "UnicastUserAndDevice is for unicast channels")
3794+}
3795+
3796+func (s *storeSuite) TestFilterOutByMsgId(c *C) {
3797+ orig := []protocol.Notification{
3798+ protocol.Notification{MsgId: "a"},
3799+ protocol.Notification{MsgId: "b"},
3800+ protocol.Notification{MsgId: "c"},
3801+ protocol.Notification{MsgId: "d"},
3802+ }
3803+ // removing the continuous head
3804+ res := FilterOutByMsgId(orig, orig[:3])
3805+ c.Check(res, DeepEquals, orig[3:])
3806+
3807+ // random removal
3808+ res = FilterOutByMsgId(orig, orig[1:2])
3809+ c.Check(res, DeepEquals, []protocol.Notification{
3810+ protocol.Notification{MsgId: "a"},
3811+ protocol.Notification{MsgId: "c"},
3812+ protocol.Notification{MsgId: "d"},
3813+ })
3814+
3815+ // looks like removing the continuous head, but it isn't
3816+ res = FilterOutByMsgId(orig, []protocol.Notification{
3817+ protocol.Notification{MsgId: "a"},
3818+ protocol.Notification{MsgId: "c"},
3819+ protocol.Notification{MsgId: "d"},
3820+ })
3821+ c.Check(res, DeepEquals, []protocol.Notification{
3822+ protocol.Notification{MsgId: "b"},
3823+ })
3824+
3825+}
3826
3827=== added directory 'signing-helper'
3828=== added file 'signing-helper/CMakeLists.txt'
3829--- signing-helper/CMakeLists.txt 1970-01-01 00:00:00 +0000
3830+++ signing-helper/CMakeLists.txt 2014-05-12 14:11:46 +0000
3831@@ -0,0 +1,39 @@
3832+cmake_minimum_required(VERSION 2.8)
3833+SET (EXAMPLES_TARGET ubuntuoneauth-examples)
3834+
3835+SET (SIGNING_EXE "signing-helper")
3836+
3837+find_package (PkgConfig REQUIRED)
3838+pkg_check_modules(UBUNTUONE REQUIRED ubuntuoneauth-2.0)
3839+add_definitions(${UBUNTUONE_CFLAGS} ${UBUNTUONE_CFLAGS_OTHER})
3840+
3841+
3842+# Qt5 bits
3843+SET (CMAKE_INCLUDE_CURRENT_DIR ON)
3844+SET (CMAKE_AUTOMOC ON)
3845+find_package(Qt5Core REQUIRED)
3846+
3847+FILE (GLOB SIGNING_SOURCES signing*.cpp)
3848+FILE (GLOB SIGNING_HEADERS signing*.h)
3849+
3850+add_executable (${SIGNING_EXE}
3851+ ${SIGNING_SOURCES}
3852+ ${SIGNING_HEADERS})
3853+qt5_use_modules (${SIGNING_EXE} DBus Network)
3854+
3855+target_link_libraries (${SIGNING_EXE}
3856+ ${UBUNTUONE_LDFLAGS})
3857+
3858+
3859+
3860+add_custom_target(examples-valgrind
3861+ COMMAND "valgrind --tool=memcheck ${CMAKE_CURRENT_BINARY_DIR}/${SIGNING_EXE}"
3862+ DEPENDS ${SIGNING_EXE}
3863+)
3864+
3865+add_custom_target(examples-valgrind-leaks
3866+ COMMAND "valgrind --tool=memcheck --track-origins=yes --num-callers=40 --leak-resolution=high --leak-check=full ${CMAKE_CURRENT_BINARY_DIR}/${SIGNING_EXE}"
3867+ DEPENDS ${SIGNING_EXE}
3868+)
3869+
3870+INSTALL_TARGETS( "lib/ubuntu-push-client/" ${SIGNING_EXE})
3871
3872=== added file 'signing-helper/signing-helper.cpp'
3873--- signing-helper/signing-helper.cpp 1970-01-01 00:00:00 +0000
3874+++ signing-helper/signing-helper.cpp 2014-05-12 14:11:46 +0000
3875@@ -0,0 +1,97 @@
3876+/*
3877+ * Copyright (C) 2013-2014 Canonical Ltd.
3878+ *
3879+ * This program is free software: you can redistribute it and/or modify it
3880+ * under the terms of the GNU General Public License version 3, as published
3881+ * by the Free Software Foundation.
3882+ *
3883+ * This program is distributed in the hope that it will be useful, but
3884+ * WITHOUT ANY WARRANTY; without even the implied warranties of
3885+ * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
3886+ * PURPOSE. See the GNU General Public License for more details.
3887+ *
3888+ * You should have received a copy of the GNU General Public License along
3889+ * with this program. If not, see <http://www.gnu.org/licenses/>.
3890+ *
3891+ * In addition, as a special exception, the copyright holders give
3892+ * permission to link the code of portions of this program with the
3893+ * OpenSSL library under certain conditions as described in each
3894+ * individual source file, and distribute linked combinations
3895+ * including the two.
3896+ * You must obey the GNU General Public License in all respects
3897+ * for all of the code used other than OpenSSL. If you modify
3898+ * file(s) with this exception, you may extend this exception to your
3899+ * version of the file(s), but you are not obligated to do so. If you
3900+ * do not wish to do so, delete this exception statement from your
3901+ * version. If you delete this exception statement from all source
3902+ * files in the program, then also delete it here.
3903+ */
3904+
3905+#include <iostream>
3906+#include <QCoreApplication>
3907+#include <QDebug>
3908+#include <QObject>
3909+#include <QString>
3910+#include <QTimer>
3911+
3912+#include "ssoservice.h"
3913+#include "token.h"
3914+
3915+#include "signing.h"
3916+
3917+namespace UbuntuOne {
3918+
3919+ SigningExample::SigningExample(QObject *parent, QString url) :
3920+ QObject(parent)
3921+ {
3922+ QObject::connect(&service, SIGNAL(credentialsFound(const Token&)),
3923+ this, SLOT(handleCredentialsFound(Token)));
3924+ QObject::connect(&service, SIGNAL(credentialsNotFound()),
3925+ this, SLOT(handleCredentialsNotFound()));
3926+ this->url = url;
3927+
3928+ }
3929+
3930+ SigningExample::~SigningExample(){
3931+ }
3932+
3933+ void SigningExample::doExample()
3934+ {
3935+ service.getCredentials();
3936+ }
3937+
3938+ void SigningExample::handleCredentialsFound(Token token)
3939+ {
3940+ qDebug() << "Credentials found, signing url.";
3941+
3942+ QString authHeader = token.signUrl(this->url, QStringLiteral("GET"), true);
3943+
3944+ std::cout << authHeader.toStdString() << "\n";
3945+ QCoreApplication::instance()->exit(0);
3946+
3947+ }
3948+
3949+ void SigningExample::handleCredentialsNotFound()
3950+ {
3951+ qDebug() << "No credentials were found.";
3952+ QCoreApplication::instance()->exit(1);
3953+ }
3954+
3955+
3956+} // namespace UbuntuOne
3957+
3958+
3959+int main(int argc, char *argv[])
3960+{
3961+ QCoreApplication a(argc, argv);
3962+
3963+ UbuntuOne::SigningExample *example = new UbuntuOne::SigningExample(&a);
3964+
3965+ QObject::connect(example, SIGNAL(finished()), &a, SLOT(quit()));
3966+
3967+ QTimer::singleShot(0, example, SLOT(doExample()));
3968+
3969+ return a.exec();
3970+}
3971+
3972+
3973
3974=== added file 'signing-helper/signing.h'
3975--- signing-helper/signing.h 1970-01-01 00:00:00 +0000
3976+++ signing-helper/signing.h 2014-05-12 14:11:46 +0000
3977@@ -0,0 +1,76 @@
3978+/*
3979+ * Copyright (C) 2013-2014 Canonical Ltd.
3980+ *
3981+ * This program is free software: you can redistribute it and/or modify it
3982+ * under the terms of the GNU General Public License version 3, as published
3983+ * by the Free Software Foundation.
3984+ *
3985+ * This program is distributed in the hope that it will be useful, but
3986+ * WITHOUT ANY WARRANTY; without even the implied warranties of
3987+ * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
3988+ * PURPOSE. See the GNU General Public License for more details.
3989+ *
3990+ * You should have received a copy of the GNU General Public License along
3991+ * with this program. If not, see <http://www.gnu.org/licenses/>.
3992+ *
3993+ * In addition, as a special exception, the copyright holders give
3994+ * permission to link the code of portions of this program with the
3995+ * OpenSSL library under certain conditions as described in each
3996+ * individual source file, and distribute linked combinations
3997+ * including the two.
3998+ * You must obey the GNU General Public License in all respects
3999+ * for all of the code used other than OpenSSL. If you modify
4000+ * file(s) with this exception, you may extend this exception to your
4001+ * version of the file(s), but you are not obligated to do so. If you
4002+ * do not wish to do so, delete this exception statement from your
4003+ * version. If you delete this exception statement from all source
4004+ * files in the program, then also delete it here.
4005+ */
4006+
4007+#ifndef _SIGNING_H_
4008+#define _SIGNING_H_
4009+
4010+#include <QDebug>
4011+#include <QNetworkReply>
4012+#include <QObject>
4013+#include <QString>
4014+
4015+#include "ssoservice.h"
4016+#include "token.h"
4017+#include "requests.h"
4018+#include "errormessages.h"
4019+
4020+namespace UbuntuOne {
4021+
4022+class SigningExample : public QObject
4023+{
4024+ Q_OBJECT
4025+
4026+public:
4027+
4028+ explicit SigningExample(QObject *parent = 0, QString url="https://one.ubuntu.com/api/account/");
4029+ ~SigningExample();
4030+
4031+public slots:
4032+
4033+ void doExample();
4034+
4035+signals:
4036+
4037+ void finished();
4038+
4039+private slots:
4040+
4041+ void handleCredentialsFound(Token token);
4042+ void handleCredentialsNotFound();
4043+
4044+private:
4045+
4046+ SSOService service;
4047+ QNetworkAccessManager nam;
4048+ QString url;
4049+
4050+};
4051+
4052+}
4053+#endif /* _SIGNING_H_ */
4054
4055=== modified file 'testing/helpers.go'
4056--- testing/helpers.go 2014-02-21 16:04:44 +0000
4057+++ testing/helpers.go 2014-05-12 14:11:46 +0000
4058@@ -18,6 +18,7 @@
4059 package testing
4060
4061 import (
4062+ "encoding/json"
4063 "fmt"
4064 "os"
4065 "path/filepath"
4066@@ -26,6 +27,7 @@
4067 "sync"
4068
4069 "launchpad.net/ubuntu-push/logger"
4070+ "launchpad.net/ubuntu-push/protocol"
4071 )
4072
4073 type captureHelper struct {
4074@@ -122,3 +124,12 @@
4075 }
4076 return filepath.Join(dir, relativePath)
4077 }
4078+
4079+// Ns makes a []Notification from just payloads.
4080+func Ns(payloads ...json.RawMessage) []protocol.Notification {
4081+ res := make([]protocol.Notification, len(payloads))
4082+ for i := 0; i < len(payloads); i++ {
4083+ res[i].Payload = payloads[i]
4084+ }
4085+ return res
4086+}
4087
4088=== modified file 'ubuntu-push-client.go'
4089--- ubuntu-push-client.go 2014-04-15 11:27:38 +0000
4090+++ ubuntu-push-client.go 2014-05-12 14:11:46 +0000
4091@@ -58,6 +58,7 @@
4092 if err != nil {
4093 log.Fatalf("unable to open the levels database: %v", err)
4094 }
4095+
4096 cli := client.NewPushClient(cfgFname, lvlFname)
4097 err = cli.Start()
4098 if err != nil {

Subscribers

People subscribed via source and target branches