Merge lp:ubuntu-push/automatic into lp:ubuntu-push

Proposed by John Lenton
Status: Merged
Merged at revision: 99
Proposed branch: lp:ubuntu-push/automatic
Merge into: lp:ubuntu-push
Diff against target: 918 lines (+365/-45) (has conflicts)
22 files modified
client/client.go (+10/-5)
client/client_test.go (+13/-7)
client/session/session.go (+21/-1)
client/session/session_test.go (+69/-4)
debian/changelog (+9/-0)
debian/copyright (+1/-1)
protocol/messages.go (+21/-5)
protocol/messages_test.go (+4/-0)
sampleconfigs/dev.json (+2/-1)
server/acceptance/acceptance_test.go (+1/-0)
server/acceptance/acceptanceclient.go (+5/-4)
server/acceptance/cmd/acceptanceclient.go (+10/-3)
server/acceptance/suites/broadcast.go (+10/-2)
server/acceptance/suites/pingpong.go (+3/-3)
server/broker/broker.go (+4/-0)
server/broker/exchanges.go (+32/-6)
server/broker/exchanges_test.go (+52/-0)
server/broker/simple/simple.go (+4/-0)
server/broker/testsuite/suite.go (+8/-0)
server/dev/server.go (+18/-1)
server/session/session.go (+13/-2)
server/session/session_test.go (+55/-0)
Text conflict in debian/changelog
To merge this branch: bzr merge lp:ubuntu-push/automatic
Reviewer Review Type Date Requested Status
Ubuntu Push Hackers Pending
Review via email: mp+215479@code.launchpad.net

Description of the change

Merging automatic.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'client/client.go'
2--- client/client.go 2014-04-03 20:56:25 +0000
3+++ client/client.go 2014-04-11 18:18:05 +0000
4@@ -81,6 +81,8 @@
5 sessionConnectedCh chan uint32
6 }
7
8+var ACTION_ID_SNOWFLAKE = "::ubuntu-push-client::"
9+
10 // Creates a new Ubuntu Push Notifications client-side daemon that will use
11 // the given configuration file.
12 func NewPushClient(configPath string, leveldbPath string) *PushClient {
13@@ -278,7 +280,7 @@
14 if !client.filterNotification(msg) {
15 return nil
16 }
17- action_id := "dummy_id"
18+ action_id := ACTION_ID_SNOWFLAKE
19 a := []string{action_id, "Go get it!"} // action value not visible on the phone
20 h := map[string]*dbus.Variant{"x-canonical-switch-to-application": &dbus.Variant{true}}
21 nots := notifications.Raw(client.notificationsEndp, client.log)
22@@ -305,20 +307,23 @@
23 }
24
25 // handleClick deals with the user clicking a notification
26-func (client *PushClient) handleClick() error {
27+func (client *PushClient) handleClick(action_id string) error {
28+ if action_id != ACTION_ID_SNOWFLAKE {
29+ return nil
30+ }
31 // it doesn't get much simpler...
32 urld := urldispatcher.New(client.urlDispatcherEndp, client.log)
33 return urld.DispatchURL("settings:///system/system-update")
34 }
35
36 // doLoop connects events with their handlers
37-func (client *PushClient) doLoop(connhandler func(bool), clickhandler func() error, notifhandler func(*session.Notification) error, errhandler func(error)) {
38+func (client *PushClient) doLoop(connhandler func(bool), clickhandler func(string) error, notifhandler func(*session.Notification) error, errhandler func(error)) {
39 for {
40 select {
41 case state := <-client.connCh:
42 connhandler(state)
43- case <-client.actionsCh:
44- clickhandler()
45+ case action := <-client.actionsCh:
46+ clickhandler(action.ActionId)
47 case msg := <-client.session.MsgCh:
48 notifhandler(msg)
49 case err := <-client.session.ErrCh:
50
51=== modified file 'client/client_test.go'
52--- client/client_test.go 2014-04-03 20:56:25 +0000
53+++ client/client_test.go 2014-04-11 18:18:05 +0000
54@@ -588,9 +588,15 @@
55 cli.log = cs.log
56 endp := testibus.NewTestingEndpoint(nil, condition.Work(true))
57 cli.urlDispatcherEndp = endp
58- c.Check(cli.handleClick(), IsNil)
59+ // check we don't fail on something random
60+ c.Check(cli.handleClick("something random"), IsNil)
61+ // ... but we don't send anything either
62+ args := testibus.GetCallArgs(endp)
63+ c.Assert(args, HasLen, 0)
64+ // check we worked with the right action id
65+ c.Check(cli.handleClick(ACTION_ID_SNOWFLAKE), IsNil)
66 // check we sent the notification
67- args := testibus.GetCallArgs(endp)
68+ args = testibus.GetCallArgs(endp)
69 c.Assert(args, HasLen, 1)
70 c.Check(args[0].Member, Equals, "DispatchURL")
71 c.Check(args[0].Args, DeepEquals, []interface{}{"settings:///system/system-update"})
72@@ -609,7 +615,7 @@
73 c.Assert(cli.initSession(), IsNil)
74
75 ch := make(chan bool, 1)
76- go cli.doLoop(func(bool) { ch <- true }, func() error { return nil }, func(_ *session.Notification) error { return nil }, func(error) {})
77+ go cli.doLoop(func(bool) { ch <- true }, func(_ string) error { return nil }, func(_ *session.Notification) error { return nil }, func(error) {})
78 c.Check(takeNextBool(ch), Equals, true)
79 }
80
81@@ -623,7 +629,7 @@
82 cli.actionsCh = aCh
83
84 ch := make(chan bool, 1)
85- go cli.doLoop(func(bool) {}, func() error { ch <- true; return nil }, func(_ *session.Notification) error { return nil }, func(error) {})
86+ go cli.doLoop(func(bool) {}, func(_ string) error { ch <- true; return nil }, func(_ *session.Notification) error { return nil }, func(error) {})
87 c.Check(takeNextBool(ch), Equals, true)
88 }
89
90@@ -636,7 +642,7 @@
91 cli.session.MsgCh <- &session.Notification{}
92
93 ch := make(chan bool, 1)
94- go cli.doLoop(func(bool) {}, func() error { return nil }, func(_ *session.Notification) error { ch <- true; return nil }, func(error) {})
95+ go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.Notification) error { ch <- true; return nil }, func(error) {})
96 c.Check(takeNextBool(ch), Equals, true)
97 }
98
99@@ -649,7 +655,7 @@
100 cli.session.ErrCh <- nil
101
102 ch := make(chan bool, 1)
103- go cli.doLoop(func(bool) {}, func() error { return nil }, func(_ *session.Notification) error { return nil }, func(error) { ch <- true })
104+ go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.Notification) error { return nil }, func(error) { ch <- true })
105 c.Check(takeNextBool(ch), Equals, true)
106 }
107
108@@ -716,7 +722,7 @@
109 c.Check(cs.log.Captured(), Matches, "(?ms).*Session connected after 42 attempts$")
110
111 // * actionsCh to the click handler/url dispatcher
112- aCh <- notifications.RawActionReply{}
113+ aCh <- notifications.RawActionReply{ActionId: ACTION_ID_SNOWFLAKE}
114 tick()
115 uargs := testibus.GetCallArgs(cli.urlDispatcherEndp)
116 c.Assert(uargs, HasLen, 1)
117
118=== modified file 'client/session/session.go'
119--- client/session/session.go 2014-04-04 12:28:40 +0000
120+++ client/session/session.go 2014-04-11 18:18:05 +0000
121@@ -49,6 +49,7 @@
122 Type string `json:"T"`
123 protocol.BroadcastMsg
124 protocol.NotificationsMsg
125+ protocol.ConnBrokenMsg
126 }
127
128 // parseServerAddrSpec recognizes whether spec is a HTTP URL to get
129@@ -176,7 +177,7 @@
130 // getHosts sets deliveryHosts possibly querying a remote endpoint
131 func (sess *ClientSession) getHosts() error {
132 if sess.getHost != nil {
133- if sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime {
134+ if sess.deliveryHosts != nil && sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime {
135 return nil
136 }
137 hosts, err := sess.getHost.Get()
138@@ -193,6 +194,10 @@
139 return nil
140 }
141
142+func (sess *ClientSession) resetHosts() {
143+ sess.deliveryHosts = nil
144+}
145+
146 // startConnectionAttempt/nextHostToTry help connect iterating over candidate hosts
147
148 func (sess *ClientSession) startConnectionAttempt() {
149@@ -338,6 +343,19 @@
150 return nil
151 }
152
153+// handle "connbroken" messages
154+func (sess *ClientSession) handleConnBroken(connBroken *serverMsg) error {
155+ sess.setState(Error)
156+ reason := connBroken.Reason
157+ err := fmt.Errorf("server broke connection: %s", reason)
158+ sess.Log.Errorf("%s", err)
159+ switch reason {
160+ case protocol.BrokenHostMismatch:
161+ sess.resetHosts()
162+ }
163+ return err
164+}
165+
166 // loop runs the session with the server, emits a stream of events.
167 func (sess *ClientSession) loop() error {
168 var err error
169@@ -356,6 +374,8 @@
170 err = sess.handlePing()
171 case "broadcast":
172 err = sess.handleBroadcast(&recv)
173+ case "connbroken":
174+ err = sess.handleConnBroken(&recv)
175 }
176 if err != nil {
177 return err
178
179=== modified file 'client/session/session_test.go'
180--- client/session/session_test.go 2014-04-03 20:56:25 +0000
181+++ client/session/session_test.go 2014-04-11 18:18:05 +0000
182@@ -317,6 +317,29 @@
183 c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"})
184 }
185
186+func (cs *clientSessionSuite) TestGetHostsRemoteCachingReset(c *C) {
187+ hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil}
188+ sess := &ClientSession{
189+ getHost: hostGetter,
190+ ClientSessionConfig: ClientSessionConfig{
191+ HostsCachingExpiryTime: 2 * time.Hour,
192+ },
193+ timeSince: time.Since,
194+ }
195+ err := sess.getHosts()
196+ c.Assert(err, IsNil)
197+ hostGetter.hosts = []string{"baz:443"}
198+ // cached
199+ err = sess.getHosts()
200+ c.Assert(err, IsNil)
201+ c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"})
202+ // reset
203+ sess.resetHosts()
204+ err = sess.getHosts()
205+ c.Assert(err, IsNil)
206+ c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"})
207+}
208+
209 /****************************************************************
210 startConnectionAttempt()/nextHostToTry()/started tests
211 ****************************************************************/
212@@ -587,7 +610,7 @@
213 json.RawMessage("false"), // shouldn't happen but robust
214 json.RawMessage(`{"img1/m1":[102,"tubular"]}`),
215 },
216- }, protocol.NotificationsMsg{}}
217+ }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}}
218 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
219 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
220 s.upCh <- nil // ack ok
221@@ -618,7 +641,7 @@
222 ChanId: "0",
223 TopLevel: 2,
224 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
225- }, protocol.NotificationsMsg{}}
226+ }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}}
227 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
228 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
229 failure := errors.New("ACK ACK ACK")
230@@ -635,7 +658,7 @@
231 ChanId: "something awful",
232 TopLevel: 2,
233 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
234- }, protocol.NotificationsMsg{}}
235+ }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}}
236 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
237 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
238 s.upCh <- nil // ack ok
239@@ -652,7 +675,7 @@
240 ChanId: "0",
241 TopLevel: 2,
242 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
243- }, protocol.NotificationsMsg{}}
244+ }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}}
245 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
246 s.upCh <- nil // ack ok
247 // start returns with error
248@@ -665,6 +688,37 @@
249 }
250
251 /****************************************************************
252+ handleConnBroken() tests
253+****************************************************************/
254+
255+func (s *msgSuite) TestHandleConnBrokenUnkwown(c *C) {
256+ msg := serverMsg{"connbroken",
257+ protocol.BroadcastMsg{}, protocol.NotificationsMsg{},
258+ protocol.ConnBrokenMsg{
259+ Reason: "REASON",
260+ },
261+ }
262+ go func() { s.errCh <- s.sess.handleConnBroken(&msg) }()
263+ c.Check(<-s.errCh, ErrorMatches, "server broke connection: REASON")
264+ c.Check(s.sess.State(), Equals, Error)
265+}
266+
267+func (s *msgSuite) TestHandleConnBrokenHostMismatch(c *C) {
268+ msg := serverMsg{"connbroken",
269+ protocol.BroadcastMsg{}, protocol.NotificationsMsg{},
270+ protocol.ConnBrokenMsg{
271+ Reason: protocol.BrokenHostMismatch,
272+ },
273+ }
274+ s.sess.deliveryHosts = []string{"foo:443", "bar:443"}
275+ go func() { s.errCh <- s.sess.handleConnBroken(&msg) }()
276+ c.Check(<-s.errCh, ErrorMatches, "server broke connection: host-mismatch")
277+ c.Check(s.sess.State(), Equals, Error)
278+ // hosts were reset
279+ c.Check(s.sess.deliveryHosts, IsNil)
280+}
281+
282+/****************************************************************
283 loop() tests
284 ****************************************************************/
285
286@@ -728,6 +782,17 @@
287 c.Check(<-s.errCh, Equals, failure)
288 }
289
290+func (s *loopSuite) TestLoopConnBroken(c *C) {
291+ c.Check(s.sess.State(), Equals, Running)
292+ broken := protocol.ConnBrokenMsg{
293+ Type: "connbroken",
294+ Reason: "REASON",
295+ }
296+ c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
297+ s.upCh <- broken
298+ c.Check(<-s.errCh, NotNil)
299+}
300+
301 /****************************************************************
302 start() tests
303 ****************************************************************/
304
305=== modified file 'debian/changelog'
306--- debian/changelog 2014-04-04 14:52:06 +0000
307+++ debian/changelog 2014-04-11 18:18:05 +0000
308@@ -1,3 +1,4 @@
309+<<<<<<< TREE
310 ubuntu-push (0.1+14.04.20140404-0ubuntu1) trusty; urgency=low
311
312 [ Tarmac ]
313@@ -5,6 +6,14 @@
314
315 -- Ubuntu daily release <ps-jenkins@lists.canonical.com> Fri, 04 Apr 2014 14:52:06 +0000
316
317+=======
318+ubuntu-push (0.2-0ubuntu1) UNRELEASED; urgency=medium
319+
320+ * New upstream release.
321+
322+ -- John Lenton <john.lenton@canonical.com> Fri, 11 Apr 2014 11:19:42 +0100
323+
324+>>>>>>> MERGE-SOURCE
325 ubuntu-push (0.1+14.04.20140327-0ubuntu1) trusty; urgency=medium
326
327 [ John Lenton ]
328
329=== modified file 'debian/copyright'
330--- debian/copyright 2014-03-24 15:31:42 +0000
331+++ debian/copyright 2014-04-11 18:18:05 +0000
332@@ -11,7 +11,7 @@
333 License: BSD-3-clause
334
335 Files: external/murmur3/*
336-Copyright: 2013 Sébastien Paolacci
337+Copyright: 2013 Sébastien Paolacci
338 License: BSD-3-clause
339
340 License: GPL-3.0
341
342=== modified file 'protocol/messages.go'
343--- protocol/messages.go 2014-03-19 22:31:20 +0000
344+++ protocol/messages.go 2014-04-11 18:18:05 +0000
345@@ -49,6 +49,27 @@
346 PingInterval string
347 }
348
349+// SplittableMsg are messages that may require and are capable of splitting.
350+type SplittableMsg interface {
351+ Split() (done bool)
352+}
353+
354+// CONNBROKEN message, server side is breaking the connection for reason.
355+type ConnBrokenMsg struct {
356+ Type string `json:"T"`
357+ // reason
358+ Reason string
359+}
360+
361+func (m *ConnBrokenMsg) Split() bool {
362+ return true
363+}
364+
365+// CONNBROKEN reasons
366+const (
367+ BrokenHostMismatch = "host-mismatch"
368+)
369+
370 // PING/PONG messages
371 type PingPongMsg struct {
372 Type string `json:"T"`
373@@ -56,11 +77,6 @@
374
375 const maxPayloadSize = 62 * 1024
376
377-// SplittableMsg are messages that may require and are capable of splitting.
378-type SplittableMsg interface {
379- Split() (done bool)
380-}
381-
382 // BROADCAST messages
383 type BroadcastMsg struct {
384 Type string `json:"T"`
385
386=== modified file 'protocol/messages_test.go'
387--- protocol/messages_test.go 2014-02-26 16:04:57 +0000
388+++ protocol/messages_test.go 2014-04-11 18:18:05 +0000
389@@ -103,3 +103,7 @@
390 b.Reset()
391 c.Check(b.splitting, Equals, 0)
392 }
393+
394+func (s *messagesSuite) TestSplitConnBrokenMsg(c *C) {
395+ c.Check((&ConnBrokenMsg{}).Split(), Equals, true)
396+}
397
398=== modified file 'sampleconfigs/dev.json'
399--- sampleconfigs/dev.json 2014-03-31 16:48:00 +0000
400+++ sampleconfigs/dev.json 2014-04-11 18:18:05 +0000
401@@ -8,5 +8,6 @@
402 "cert_pem_file": "../server/acceptance/ssl/testing.cert",
403 "http_addr": "127.0.0.1:8080",
404 "http_read_timeout": "5s",
405- "http_write_timeout": "5s"
406+ "http_write_timeout": "5s",
407+ "delivery_domain": "localhost"
408 }
409
410=== modified file 'server/acceptance/acceptance_test.go'
411--- server/acceptance/acceptance_test.go 2014-02-26 19:50:46 +0000
412+++ server/acceptance/acceptance_test.go 2014-04-11 18:18:05 +0000
413@@ -34,6 +34,7 @@
414 cfg := make(map[string]interface{})
415 suites.FillServerConfig(cfg, addr)
416 suites.FillHTTPServerConfig(cfg, httpAddr)
417+ cfg["delivery_domain"] = "localhost"
418 return cfg
419 }
420
421
422=== modified file 'server/acceptance/acceptanceclient.go'
423--- server/acceptance/acceptanceclient.go 2014-04-03 16:47:47 +0000
424+++ server/acceptance/acceptanceclient.go 2014-04-11 18:18:05 +0000
425@@ -42,7 +42,8 @@
426 CertPEMBlock []byte
427 ReportPings bool
428 Levels map[string]int64
429- Insecure bool // don't verify certs
430+ Insecure bool // don't verify certs
431+ Prefix string // prefix for events
432 // connection
433 Connection net.Conn
434 }
435@@ -105,7 +106,7 @@
436 if err != nil {
437 return err
438 }
439- events <- fmt.Sprintf("connected %v", conn.LocalAddr())
440+ events <- fmt.Sprintf("%sconnected %v", sess.Prefix, conn.LocalAddr())
441 var recv serverMsg
442 for {
443 deadAfter := pingInterval + sess.ExchangeTimeout
444@@ -122,7 +123,7 @@
445 return err
446 }
447 if sess.ReportPings {
448- events <- "Ping"
449+ events <- sess.Prefix + "ping"
450 }
451 case "broadcast":
452 conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
453@@ -134,7 +135,7 @@
454 if err != nil {
455 return err
456 }
457- events <- fmt.Sprintf("broadcast chan:%v app:%v topLevel:%d payloads:%s", recv.ChanId, recv.AppId, recv.TopLevel, pack)
458+ events <- fmt.Sprintf("%sbroadcast chan:%v app:%v topLevel:%d payloads:%s", sess.Prefix, recv.ChanId, recv.AppId, recv.TopLevel, pack)
459 }
460 }
461 return nil
462
463=== modified file 'server/acceptance/cmd/acceptanceclient.go'
464--- server/acceptance/cmd/acceptanceclient.go 2014-04-03 16:47:47 +0000
465+++ server/acceptance/cmd/acceptanceclient.go 2014-04-11 18:18:05 +0000
466@@ -19,6 +19,7 @@
467
468 import (
469 "flag"
470+ "fmt"
471 "log"
472 "os"
473 "path/filepath"
474@@ -43,6 +44,10 @@
475 }
476
477 func main() {
478+ flag.Usage = func() {
479+ fmt.Fprintf(os.Stderr, "Usage: acceptancclient [options] <config.json> <device id>\n")
480+ flag.PrintDefaults()
481+ }
482 flag.Parse()
483 narg := flag.NArg()
484 switch {
485@@ -72,9 +77,11 @@
486 Insecure: *insecureFlag,
487 }
488 log.Printf("with: %#v", session)
489- session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName))
490- if err != nil {
491- log.Fatalf("reading CertPEMFile: %v", err)
492+ if !*insecureFlag {
493+ session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName))
494+ if err != nil {
495+ log.Fatalf("reading CertPEMFile: %v", err)
496+ }
497 }
498 err = session.Dial()
499 if err != nil {
500
501=== modified file 'server/acceptance/suites/broadcast.go'
502--- server/acceptance/suites/broadcast.go 2014-04-03 16:47:47 +0000
503+++ server/acceptance/suites/broadcast.go 2014-04-11 18:18:05 +0000
504@@ -24,6 +24,7 @@
505
506 . "launchpad.net/gocheck"
507
508+ "launchpad.net/ubuntu-push/client/gethosts"
509 "launchpad.net/ubuntu-push/protocol"
510 "launchpad.net/ubuntu-push/server/api"
511 )
512@@ -66,8 +67,6 @@
513 })
514 c.Assert(err, IsNil)
515 c.Assert(got, Matches, ".*ok.*")
516- // xxx don't send this one
517- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[]`)
518 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":20}]`)
519 stop()
520 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
521@@ -261,3 +260,12 @@
522 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
523 c.Check(len(errCh), Equals, 0)
524 }
525+
526+// test /delivery-hosts
527+
528+func (s *BroadcastAcceptanceSuite) TestGetHosts(c *C) {
529+ gh := gethosts.New("", s.ServerAPIURL+"/delivery-hosts", 2*time.Second)
530+ hosts, err := gh.Get()
531+ c.Assert(err, IsNil)
532+ c.Check(hosts, DeepEquals, []string{s.ServerAddr})
533+}
534
535=== modified file 'server/acceptance/suites/pingpong.go'
536--- server/acceptance/suites/pingpong.go 2014-04-03 16:47:47 +0000
537+++ server/acceptance/suites/pingpong.go 2014-04-11 18:18:05 +0000
538@@ -56,11 +56,11 @@
539 c.Assert(connectSrv, Matches, ".*session.* connected .*")
540 c.Assert(registeredSrv, Matches, ".*session.* registered DEVA")
541 c.Assert(strings.HasSuffix(connectSrv, connectCli), Equals, true)
542- c.Assert(NextEvent(events, errCh), Equals, "Ping")
543+ c.Assert(NextEvent(events, errCh), Equals, "ping")
544 elapsedOfPing := float64(time.Since(tconnect)) / float64(500*time.Millisecond)
545 c.Check(elapsedOfPing >= 1.0, Equals, true)
546 c.Check(elapsedOfPing < 1.05, Equals, true)
547- c.Assert(NextEvent(events, errCh), Equals, "Ping")
548+ c.Assert(NextEvent(events, errCh), Equals, "ping")
549 c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* ended with: EOF")
550 c.Check(len(errCh), Equals, 0)
551 }
552@@ -87,7 +87,7 @@
553 c.Assert(NextEvent(events, errCh), Matches, "connected .*")
554 c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* connected .*")
555 c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* registered .*")
556- c.Assert(NextEvent(events, errCh), Equals, "Ping")
557+ c.Assert(NextEvent(events, errCh), Equals, "ping")
558 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*timeout`)
559 c.Check(len(errCh), Equals, 0)
560 }
561
562=== modified file 'server/broker/broker.go'
563--- server/broker/broker.go 2014-04-03 14:31:10 +0000
564+++ server/broker/broker.go 2014-04-11 18:18:05 +0000
565@@ -19,6 +19,7 @@
566 package broker
567
568 import (
569+ "errors"
570 "fmt"
571
572 "launchpad.net/ubuntu-push/protocol"
573@@ -46,6 +47,9 @@
574 Acked(sess BrokerSession, done bool) error
575 }
576
577+// ErrNop returned by Prepare means nothing to do/send.
578+var ErrNop = errors.New("nothing to send")
579+
580 // LevelsMap is the type for holding channel levels for session.
581 type LevelsMap map[store.InternalChannelId]int64
582
583
584=== modified file 'server/broker/exchanges.go'
585--- server/broker/exchanges.go 2014-04-03 16:00:53 +0000
586+++ server/broker/exchanges.go 2014-04-11 18:18:05 +0000
587@@ -28,8 +28,9 @@
588
589 // Scratch area for exchanges, sessions should hold one of these.
590 type ExchangesScratchArea struct {
591- broadcastMsg protocol.BroadcastMsg
592- ackMsg protocol.AckMsg
593+ broadcastMsg protocol.BroadcastMsg
594+ ackMsg protocol.AckMsg
595+ connBrokenMsg protocol.ConnBrokenMsg
596 }
597
598 // BroadcastExchange leads a session through delivering a BROADCAST.
599@@ -42,7 +43,7 @@
600 }
601
602 // check interface already here
603-var _ Exchange = &BroadcastExchange{}
604+var _ Exchange = (*BroadcastExchange)(nil)
605
606 // Init ensures the BroadcastExchange is fully initialized for the sessions.
607 func (sbe *BroadcastExchange) Init() {
608@@ -88,14 +89,18 @@
609
610 // Prepare session for a BROADCAST.
611 func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
612- scratchArea := sess.ExchangeScratchArea()
613- scratchArea.broadcastMsg.Reset()
614- scratchArea.broadcastMsg.Type = "broadcast"
615 clientLevel := sess.Levels()[sbe.ChanId]
616 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
617 tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel())
618 payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded)
619+ if len(payloads) == 0 && sbe.TopLevel >= clientLevel {
620+ // empty and don't need to force resync => do nothing
621+ return nil, nil, ErrNop
622+ }
623
624+ scratchArea := sess.ExchangeScratchArea()
625+ scratchArea.broadcastMsg.Reset()
626+ scratchArea.broadcastMsg.Type = "broadcast"
627 // xxx need an AppId as well, later
628 scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId)
629 scratchArea.broadcastMsg.TopLevel = sbe.TopLevel
630@@ -113,3 +118,24 @@
631 sess.Levels()[sbe.ChanId] = sbe.TopLevel
632 return nil
633 }
634+
635+// ConnBrokenExchange breaks a session giving a reason.
636+type ConnBrokenExchange struct {
637+ Reason string
638+}
639+
640+// check interface already here
641+var _ Exchange = (*ConnBrokenExchange)(nil)
642+
643+// Prepare session for a CONNBROKEN.
644+func (cbe *ConnBrokenExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
645+ scratchArea := sess.ExchangeScratchArea()
646+ scratchArea.connBrokenMsg.Type = "connbroken"
647+ scratchArea.connBrokenMsg.Reason = cbe.Reason
648+ return &scratchArea.connBrokenMsg, nil, nil
649+}
650+
651+// CONNBROKEN isn't acked
652+func (cbe *ConnBrokenExchange) Acked(sess BrokerSession, done bool) error {
653+ panic("Acked should not get invoked on ConnBrokenExchange")
654+}
655
656=== modified file 'server/broker/exchanges_test.go'
657--- server/broker/exchanges_test.go 2014-04-04 09:57:02 +0000
658+++ server/broker/exchanges_test.go 2014-04-11 18:18:05 +0000
659@@ -81,6 +81,44 @@
660 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))
661 }
662
663+func (s *exchangesSuite) TestBroadcastExchangeEmpty(c *C) {
664+ sess := &testing.TestBrokerSession{
665+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
666+ Model: "m1",
667+ ImageChannel: "img1",
668+ }
669+ exchg := &broker.BroadcastExchange{
670+ ChanId: store.SystemInternalChannelId,
671+ TopLevel: 3,
672+ NotificationPayloads: []json.RawMessage{},
673+ }
674+ exchg.Init()
675+ outMsg, inMsg, err := exchg.Prepare(sess)
676+ c.Assert(err, Equals, broker.ErrNop)
677+ c.Check(outMsg, IsNil)
678+ c.Check(inMsg, IsNil)
679+}
680+
681+func (s *exchangesSuite) TestBroadcastExchangeEmptyButAhead(c *C) {
682+ sess := &testing.TestBrokerSession{
683+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{
684+ store.SystemInternalChannelId: 10,
685+ }),
686+ Model: "m1",
687+ ImageChannel: "img1",
688+ }
689+ exchg := &broker.BroadcastExchange{
690+ ChanId: store.SystemInternalChannelId,
691+ TopLevel: 3,
692+ NotificationPayloads: []json.RawMessage{},
693+ }
694+ exchg.Init()
695+ outMsg, inMsg, err := exchg.Prepare(sess)
696+ c.Assert(err, IsNil)
697+ c.Check(outMsg, NotNil)
698+ c.Check(inMsg, NotNil)
699+}
700+
701 func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {
702 sess := &testing.TestBrokerSession{
703 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
704@@ -210,3 +248,17 @@
705 c.Assert(err, IsNil)
706 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5))
707 }
708+
709+func (s *exchangesSuite) TestConnBrokenExchange(c *C) {
710+ sess := &testing.TestBrokerSession{}
711+ cbe := &broker.ConnBrokenExchange{"REASON"}
712+ outMsg, inMsg, err := cbe.Prepare(sess)
713+ c.Assert(err, IsNil)
714+ c.Check(inMsg, IsNil) // no answer is expected
715+ // check
716+ marshalled, err := json.Marshal(outMsg)
717+ c.Assert(err, IsNil)
718+ c.Check(string(marshalled), Equals, `{"T":"connbroken","Reason":"REASON"}`)
719+
720+ c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnBrokenExchange")
721+}
722
723=== modified file 'server/broker/simple/simple.go'
724--- server/broker/simple/simple.go 2014-04-03 16:00:53 +0000
725+++ server/broker/simple/simple.go 2014-04-11 18:18:05 +0000
726@@ -222,6 +222,10 @@
727 delete(b.registry, sess.deviceId)
728 }
729 } else { // register
730+ prev := b.registry[sess.deviceId]
731+ if prev != nil { // kick it
732+ close(prev.exchanges)
733+ }
734 b.registry[sess.deviceId] = sess
735 sess.registered = true
736 sess.done <- true
737
738=== modified file 'server/broker/testsuite/suite.go'
739--- server/broker/testsuite/suite.go 2014-04-03 16:00:53 +0000
740+++ server/broker/testsuite/suite.go 2014-04-11 18:18:05 +0000
741@@ -164,6 +164,14 @@
742 c.Assert(err, IsNil)
743 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
744 c.Assert(err, IsNil)
745+ checkAndFalse := false
746+ // previous session got signaled by closing its channel
747+ select {
748+ case _, ok := <-sess1.SessionChannel():
749+ checkAndFalse = ok == false
750+ default:
751+ }
752+ c.Check(checkAndFalse, Equals, true)
753 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess2)
754 b.Unregister(sess1)
755 // just to make sure the unregister was processed
756
757=== modified file 'server/dev/server.go'
758--- server/dev/server.go 2014-03-25 19:02:18 +0000
759+++ server/dev/server.go 2014-04-11 18:18:05 +0000
760@@ -18,6 +18,7 @@
761 package main
762
763 import (
764+ "encoding/json"
765 "net"
766 "net/http"
767 "os"
768@@ -37,6 +38,8 @@
769 server.DevicesParsedConfig
770 // api http server configuration
771 server.HTTPServeParsedConfig
772+ // delivery domain
773+ DeliveryDomain string `json:"delivery_domain"`
774 }
775
776 func main() {
777@@ -60,11 +63,25 @@
778 storeForRequest := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
779 return sto, nil
780 }
781+ lst, err := net.Listen("tcp", cfg.Addr())
782+ if err != nil {
783+ server.BootLogFatalf("start device listening: %v", err)
784+ }
785 mux := api.MakeHandlersMux(storeForRequest, broker, logger)
786+ // & /delivery-hosts
787+ mux.HandleFunc("/delivery-hosts", func(w http.ResponseWriter, req *http.Request) {
788+ w.Header().Set("Cache-Control", "no-cache")
789+ w.Header().Set("Content-Type", "application/json")
790+ enc := json.NewEncoder(w)
791+ enc.Encode(map[string]interface{}{
792+ "hosts": []string{lst.Addr().String()},
793+ "domain": cfg.DeliveryDomain,
794+ })
795+ })
796 handler := api.PanicTo500Handler(mux, logger)
797 go server.HTTPServeRunner(nil, handler, &cfg.HTTPServeParsedConfig)()
798 // listen for device connections
799- server.DevicesRunner(nil, func(conn net.Conn) error {
800+ server.DevicesRunner(lst, func(conn net.Conn) error {
801 track := session.NewTracker(logger)
802 return session.Session(conn, broker, cfg, track)
803 }, logger, &cfg.DevicesParsedConfig)()
804
805=== modified file 'server/session/session.go'
806--- server/session/session.go 2014-02-10 23:19:08 +0000
807+++ server/session/session.go 2014-04-11 18:18:05 +0000
808@@ -62,6 +62,9 @@
809 if err != nil {
810 return err
811 }
812+ if inMsg == nil { // no answer expected, breaking connection
813+ return &broker.ErrAbort{"session broken for reason"}
814+ }
815 err = proto.ReadMessage(inMsg)
816 if err != nil {
817 return err
818@@ -76,6 +79,7 @@
819 pingTimer := time.NewTimer(pingInterval)
820 intervalStart := time.Now()
821 ch := sess.SessionChannel()
822+Loop:
823 for {
824 select {
825 case <-pingTimer.C:
826@@ -90,10 +94,17 @@
827 return &broker.ErrAbort{"expected PONG message"}
828 }
829 pingTimer.Reset(pingInterval)
830- case exchg := <-ch:
831- // xxx later can use ch closing for shutdown/reset
832+ case exchg, ok := <-ch:
833 pingTimer.Stop()
834+ if !ok {
835+ return &broker.ErrAbort{"terminated"}
836+ }
837 outMsg, inMsg, err := exchg.Prepare(sess)
838+ if err == broker.ErrNop { // nothing to do
839+ pingTimer.Reset(pingInterval)
840+ intervalStart = time.Now()
841+ continue Loop
842+ }
843 if err != nil {
844 return err
845 }
846
847=== modified file 'server/session/session_test.go'
848--- server/session/session_test.go 2014-03-19 23:46:18 +0000
849+++ server/session/session_test.go 2014-04-11 18:18:05 +0000
850@@ -346,6 +346,42 @@
851 c.Check(err, Equals, io.EOF)
852 }
853
854+func (s *sessionSuite) TestSessionLoopKick(c *C) {
855+ nopTrack := NewTracker(s.testlog)
856+ errCh := make(chan error, 1)
857+ up := make(chan interface{}, 5)
858+ down := make(chan interface{}, 5)
859+ tp := &testProtocol{up, down}
860+ exchanges := make(chan broker.Exchange, 1)
861+ sess := &testing.TestBrokerSession{Exchanges: exchanges}
862+ go func() {
863+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
864+ }()
865+ close(exchanges)
866+ err := <-errCh
867+ c.Check(err, DeepEquals, &broker.ErrAbort{"terminated"})
868+}
869+
870+func (s *sessionSuite) TestSessionLoopExchangeErrNop(c *C) {
871+ nopTrack := NewTracker(s.testlog)
872+ errCh := make(chan error, 1)
873+ up := make(chan interface{}, 5)
874+ down := make(chan interface{}, 5)
875+ tp := &testProtocol{up, down}
876+ exchanges := make(chan broker.Exchange, 1)
877+ exchanges <- &testExchange{prepErr: broker.ErrNop}
878+ sess := &testing.TestBrokerSession{Exchanges: exchanges}
879+ go func() {
880+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
881+ }()
882+ c.Check(takeNext(down), Equals, "deadline 2ms")
883+ c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
884+ up <- nil // no write error
885+ up <- io.EOF
886+ err := <-errCh
887+ c.Check(err, Equals, io.EOF)
888+}
889+
890 func (s *sessionSuite) TestSessionLoopExchangeSplit(c *C) {
891 nopTrack := NewTracker(s.testlog)
892 errCh := make(chan error, 1)
893@@ -434,6 +470,25 @@
894 c.Check(err, Equals, io.ErrUnexpectedEOF)
895 }
896
897+func (s *sessionSuite) TestSessionLoopConnBrokenExchange(c *C) {
898+ nopTrack := NewTracker(s.testlog)
899+ errCh := make(chan error, 1)
900+ up := make(chan interface{}, 5)
901+ down := make(chan interface{}, 5)
902+ tp := &testProtocol{up, down}
903+ exchanges := make(chan broker.Exchange, 1)
904+ exchanges <- &broker.ConnBrokenExchange{"REASON"}
905+ sess := &testing.TestBrokerSession{Exchanges: exchanges}
906+ go func() {
907+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
908+ }()
909+ c.Check(takeNext(down), Equals, "deadline 2ms")
910+ c.Check(takeNext(down), DeepEquals, protocol.ConnBrokenMsg{"connbroken", "REASON"})
911+ up <- nil // no write error
912+ err := <-errCh
913+ c.Check(err, DeepEquals, &broker.ErrAbort{"session broken for reason"})
914+}
915+
916 type testTracker struct {
917 SessionTracker
918 interval chan interface{}

Subscribers

People subscribed via source and target branches