Merge lp:ubuntu-push/automatic into lp:ubuntu-push
- automatic
- Merge into trunk
Proposed by
John Lenton
Status: | Merged | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Merged at revision: | 99 | ||||||||||||
Proposed branch: | lp:ubuntu-push/automatic | ||||||||||||
Merge into: | lp:ubuntu-push | ||||||||||||
Diff against target: |
918 lines (+365/-45) (has conflicts) 22 files modified
client/client.go (+10/-5) client/client_test.go (+13/-7) client/session/session.go (+21/-1) client/session/session_test.go (+69/-4) debian/changelog (+9/-0) debian/copyright (+1/-1) protocol/messages.go (+21/-5) protocol/messages_test.go (+4/-0) sampleconfigs/dev.json (+2/-1) server/acceptance/acceptance_test.go (+1/-0) server/acceptance/acceptanceclient.go (+5/-4) server/acceptance/cmd/acceptanceclient.go (+10/-3) server/acceptance/suites/broadcast.go (+10/-2) server/acceptance/suites/pingpong.go (+3/-3) server/broker/broker.go (+4/-0) server/broker/exchanges.go (+32/-6) server/broker/exchanges_test.go (+52/-0) server/broker/simple/simple.go (+4/-0) server/broker/testsuite/suite.go (+8/-0) server/dev/server.go (+18/-1) server/session/session.go (+13/-2) server/session/session_test.go (+55/-0) Text conflict in debian/changelog |
||||||||||||
To merge this branch: | bzr merge lp:ubuntu-push/automatic | ||||||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Push Hackers | Pending | ||
Review via email: mp+215479@code.launchpad.net |
Commit message
Description of the change
Merging automatic.
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'client/client.go' |
2 | --- client/client.go 2014-04-03 20:56:25 +0000 |
3 | +++ client/client.go 2014-04-11 18:18:05 +0000 |
4 | @@ -81,6 +81,8 @@ |
5 | sessionConnectedCh chan uint32 |
6 | } |
7 | |
8 | +var ACTION_ID_SNOWFLAKE = "::ubuntu-push-client::" |
9 | + |
10 | // Creates a new Ubuntu Push Notifications client-side daemon that will use |
11 | // the given configuration file. |
12 | func NewPushClient(configPath string, leveldbPath string) *PushClient { |
13 | @@ -278,7 +280,7 @@ |
14 | if !client.filterNotification(msg) { |
15 | return nil |
16 | } |
17 | - action_id := "dummy_id" |
18 | + action_id := ACTION_ID_SNOWFLAKE |
19 | a := []string{action_id, "Go get it!"} // action value not visible on the phone |
20 | h := map[string]*dbus.Variant{"x-canonical-switch-to-application": &dbus.Variant{true}} |
21 | nots := notifications.Raw(client.notificationsEndp, client.log) |
22 | @@ -305,20 +307,23 @@ |
23 | } |
24 | |
25 | // handleClick deals with the user clicking a notification |
26 | -func (client *PushClient) handleClick() error { |
27 | +func (client *PushClient) handleClick(action_id string) error { |
28 | + if action_id != ACTION_ID_SNOWFLAKE { |
29 | + return nil |
30 | + } |
31 | // it doesn't get much simpler... |
32 | urld := urldispatcher.New(client.urlDispatcherEndp, client.log) |
33 | return urld.DispatchURL("settings:///system/system-update") |
34 | } |
35 | |
36 | // doLoop connects events with their handlers |
37 | -func (client *PushClient) doLoop(connhandler func(bool), clickhandler func() error, notifhandler func(*session.Notification) error, errhandler func(error)) { |
38 | +func (client *PushClient) doLoop(connhandler func(bool), clickhandler func(string) error, notifhandler func(*session.Notification) error, errhandler func(error)) { |
39 | for { |
40 | select { |
41 | case state := <-client.connCh: |
42 | connhandler(state) |
43 | - case <-client.actionsCh: |
44 | - clickhandler() |
45 | + case action := <-client.actionsCh: |
46 | + clickhandler(action.ActionId) |
47 | case msg := <-client.session.MsgCh: |
48 | notifhandler(msg) |
49 | case err := <-client.session.ErrCh: |
50 | |
51 | === modified file 'client/client_test.go' |
52 | --- client/client_test.go 2014-04-03 20:56:25 +0000 |
53 | +++ client/client_test.go 2014-04-11 18:18:05 +0000 |
54 | @@ -588,9 +588,15 @@ |
55 | cli.log = cs.log |
56 | endp := testibus.NewTestingEndpoint(nil, condition.Work(true)) |
57 | cli.urlDispatcherEndp = endp |
58 | - c.Check(cli.handleClick(), IsNil) |
59 | + // check we don't fail on something random |
60 | + c.Check(cli.handleClick("something random"), IsNil) |
61 | + // ... but we don't send anything either |
62 | + args := testibus.GetCallArgs(endp) |
63 | + c.Assert(args, HasLen, 0) |
64 | + // check we worked with the right action id |
65 | + c.Check(cli.handleClick(ACTION_ID_SNOWFLAKE), IsNil) |
66 | // check we sent the notification |
67 | - args := testibus.GetCallArgs(endp) |
68 | + args = testibus.GetCallArgs(endp) |
69 | c.Assert(args, HasLen, 1) |
70 | c.Check(args[0].Member, Equals, "DispatchURL") |
71 | c.Check(args[0].Args, DeepEquals, []interface{}{"settings:///system/system-update"}) |
72 | @@ -609,7 +615,7 @@ |
73 | c.Assert(cli.initSession(), IsNil) |
74 | |
75 | ch := make(chan bool, 1) |
76 | - go cli.doLoop(func(bool) { ch <- true }, func() error { return nil }, func(_ *session.Notification) error { return nil }, func(error) {}) |
77 | + go cli.doLoop(func(bool) { ch <- true }, func(_ string) error { return nil }, func(_ *session.Notification) error { return nil }, func(error) {}) |
78 | c.Check(takeNextBool(ch), Equals, true) |
79 | } |
80 | |
81 | @@ -623,7 +629,7 @@ |
82 | cli.actionsCh = aCh |
83 | |
84 | ch := make(chan bool, 1) |
85 | - go cli.doLoop(func(bool) {}, func() error { ch <- true; return nil }, func(_ *session.Notification) error { return nil }, func(error) {}) |
86 | + go cli.doLoop(func(bool) {}, func(_ string) error { ch <- true; return nil }, func(_ *session.Notification) error { return nil }, func(error) {}) |
87 | c.Check(takeNextBool(ch), Equals, true) |
88 | } |
89 | |
90 | @@ -636,7 +642,7 @@ |
91 | cli.session.MsgCh <- &session.Notification{} |
92 | |
93 | ch := make(chan bool, 1) |
94 | - go cli.doLoop(func(bool) {}, func() error { return nil }, func(_ *session.Notification) error { ch <- true; return nil }, func(error) {}) |
95 | + go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.Notification) error { ch <- true; return nil }, func(error) {}) |
96 | c.Check(takeNextBool(ch), Equals, true) |
97 | } |
98 | |
99 | @@ -649,7 +655,7 @@ |
100 | cli.session.ErrCh <- nil |
101 | |
102 | ch := make(chan bool, 1) |
103 | - go cli.doLoop(func(bool) {}, func() error { return nil }, func(_ *session.Notification) error { return nil }, func(error) { ch <- true }) |
104 | + go cli.doLoop(func(bool) {}, func(_ string) error { return nil }, func(_ *session.Notification) error { return nil }, func(error) { ch <- true }) |
105 | c.Check(takeNextBool(ch), Equals, true) |
106 | } |
107 | |
108 | @@ -716,7 +722,7 @@ |
109 | c.Check(cs.log.Captured(), Matches, "(?ms).*Session connected after 42 attempts$") |
110 | |
111 | // * actionsCh to the click handler/url dispatcher |
112 | - aCh <- notifications.RawActionReply{} |
113 | + aCh <- notifications.RawActionReply{ActionId: ACTION_ID_SNOWFLAKE} |
114 | tick() |
115 | uargs := testibus.GetCallArgs(cli.urlDispatcherEndp) |
116 | c.Assert(uargs, HasLen, 1) |
117 | |
118 | === modified file 'client/session/session.go' |
119 | --- client/session/session.go 2014-04-04 12:28:40 +0000 |
120 | +++ client/session/session.go 2014-04-11 18:18:05 +0000 |
121 | @@ -49,6 +49,7 @@ |
122 | Type string `json:"T"` |
123 | protocol.BroadcastMsg |
124 | protocol.NotificationsMsg |
125 | + protocol.ConnBrokenMsg |
126 | } |
127 | |
128 | // parseServerAddrSpec recognizes whether spec is a HTTP URL to get |
129 | @@ -176,7 +177,7 @@ |
130 | // getHosts sets deliveryHosts possibly querying a remote endpoint |
131 | func (sess *ClientSession) getHosts() error { |
132 | if sess.getHost != nil { |
133 | - if sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime { |
134 | + if sess.deliveryHosts != nil && sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime { |
135 | return nil |
136 | } |
137 | hosts, err := sess.getHost.Get() |
138 | @@ -193,6 +194,10 @@ |
139 | return nil |
140 | } |
141 | |
142 | +func (sess *ClientSession) resetHosts() { |
143 | + sess.deliveryHosts = nil |
144 | +} |
145 | + |
146 | // startConnectionAttempt/nextHostToTry help connect iterating over candidate hosts |
147 | |
148 | func (sess *ClientSession) startConnectionAttempt() { |
149 | @@ -338,6 +343,19 @@ |
150 | return nil |
151 | } |
152 | |
153 | +// handle "connbroken" messages |
154 | +func (sess *ClientSession) handleConnBroken(connBroken *serverMsg) error { |
155 | + sess.setState(Error) |
156 | + reason := connBroken.Reason |
157 | + err := fmt.Errorf("server broke connection: %s", reason) |
158 | + sess.Log.Errorf("%s", err) |
159 | + switch reason { |
160 | + case protocol.BrokenHostMismatch: |
161 | + sess.resetHosts() |
162 | + } |
163 | + return err |
164 | +} |
165 | + |
166 | // loop runs the session with the server, emits a stream of events. |
167 | func (sess *ClientSession) loop() error { |
168 | var err error |
169 | @@ -356,6 +374,8 @@ |
170 | err = sess.handlePing() |
171 | case "broadcast": |
172 | err = sess.handleBroadcast(&recv) |
173 | + case "connbroken": |
174 | + err = sess.handleConnBroken(&recv) |
175 | } |
176 | if err != nil { |
177 | return err |
178 | |
179 | === modified file 'client/session/session_test.go' |
180 | --- client/session/session_test.go 2014-04-03 20:56:25 +0000 |
181 | +++ client/session/session_test.go 2014-04-11 18:18:05 +0000 |
182 | @@ -317,6 +317,29 @@ |
183 | c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"}) |
184 | } |
185 | |
186 | +func (cs *clientSessionSuite) TestGetHostsRemoteCachingReset(c *C) { |
187 | + hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil} |
188 | + sess := &ClientSession{ |
189 | + getHost: hostGetter, |
190 | + ClientSessionConfig: ClientSessionConfig{ |
191 | + HostsCachingExpiryTime: 2 * time.Hour, |
192 | + }, |
193 | + timeSince: time.Since, |
194 | + } |
195 | + err := sess.getHosts() |
196 | + c.Assert(err, IsNil) |
197 | + hostGetter.hosts = []string{"baz:443"} |
198 | + // cached |
199 | + err = sess.getHosts() |
200 | + c.Assert(err, IsNil) |
201 | + c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"}) |
202 | + // reset |
203 | + sess.resetHosts() |
204 | + err = sess.getHosts() |
205 | + c.Assert(err, IsNil) |
206 | + c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"}) |
207 | +} |
208 | + |
209 | /**************************************************************** |
210 | startConnectionAttempt()/nextHostToTry()/started tests |
211 | ****************************************************************/ |
212 | @@ -587,7 +610,7 @@ |
213 | json.RawMessage("false"), // shouldn't happen but robust |
214 | json.RawMessage(`{"img1/m1":[102,"tubular"]}`), |
215 | }, |
216 | - }, protocol.NotificationsMsg{}} |
217 | + }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}} |
218 | go func() { s.errCh <- s.sess.handleBroadcast(&msg) }() |
219 | c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
220 | s.upCh <- nil // ack ok |
221 | @@ -618,7 +641,7 @@ |
222 | ChanId: "0", |
223 | TopLevel: 2, |
224 | Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
225 | - }, protocol.NotificationsMsg{}} |
226 | + }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}} |
227 | go func() { s.errCh <- s.sess.handleBroadcast(&msg) }() |
228 | c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
229 | failure := errors.New("ACK ACK ACK") |
230 | @@ -635,7 +658,7 @@ |
231 | ChanId: "something awful", |
232 | TopLevel: 2, |
233 | Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
234 | - }, protocol.NotificationsMsg{}} |
235 | + }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}} |
236 | go func() { s.errCh <- s.sess.handleBroadcast(&msg) }() |
237 | c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
238 | s.upCh <- nil // ack ok |
239 | @@ -652,7 +675,7 @@ |
240 | ChanId: "0", |
241 | TopLevel: 2, |
242 | Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
243 | - }, protocol.NotificationsMsg{}} |
244 | + }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}} |
245 | go func() { s.errCh <- s.sess.handleBroadcast(&msg) }() |
246 | s.upCh <- nil // ack ok |
247 | // start returns with error |
248 | @@ -665,6 +688,37 @@ |
249 | } |
250 | |
251 | /**************************************************************** |
252 | + handleConnBroken() tests |
253 | +****************************************************************/ |
254 | + |
255 | +func (s *msgSuite) TestHandleConnBrokenUnkwown(c *C) { |
256 | + msg := serverMsg{"connbroken", |
257 | + protocol.BroadcastMsg{}, protocol.NotificationsMsg{}, |
258 | + protocol.ConnBrokenMsg{ |
259 | + Reason: "REASON", |
260 | + }, |
261 | + } |
262 | + go func() { s.errCh <- s.sess.handleConnBroken(&msg) }() |
263 | + c.Check(<-s.errCh, ErrorMatches, "server broke connection: REASON") |
264 | + c.Check(s.sess.State(), Equals, Error) |
265 | +} |
266 | + |
267 | +func (s *msgSuite) TestHandleConnBrokenHostMismatch(c *C) { |
268 | + msg := serverMsg{"connbroken", |
269 | + protocol.BroadcastMsg{}, protocol.NotificationsMsg{}, |
270 | + protocol.ConnBrokenMsg{ |
271 | + Reason: protocol.BrokenHostMismatch, |
272 | + }, |
273 | + } |
274 | + s.sess.deliveryHosts = []string{"foo:443", "bar:443"} |
275 | + go func() { s.errCh <- s.sess.handleConnBroken(&msg) }() |
276 | + c.Check(<-s.errCh, ErrorMatches, "server broke connection: host-mismatch") |
277 | + c.Check(s.sess.State(), Equals, Error) |
278 | + // hosts were reset |
279 | + c.Check(s.sess.deliveryHosts, IsNil) |
280 | +} |
281 | + |
282 | +/**************************************************************** |
283 | loop() tests |
284 | ****************************************************************/ |
285 | |
286 | @@ -728,6 +782,17 @@ |
287 | c.Check(<-s.errCh, Equals, failure) |
288 | } |
289 | |
290 | +func (s *loopSuite) TestLoopConnBroken(c *C) { |
291 | + c.Check(s.sess.State(), Equals, Running) |
292 | + broken := protocol.ConnBrokenMsg{ |
293 | + Type: "connbroken", |
294 | + Reason: "REASON", |
295 | + } |
296 | + c.Check(takeNext(s.downCh), Equals, "deadline 1ms") |
297 | + s.upCh <- broken |
298 | + c.Check(<-s.errCh, NotNil) |
299 | +} |
300 | + |
301 | /**************************************************************** |
302 | start() tests |
303 | ****************************************************************/ |
304 | |
305 | === modified file 'debian/changelog' |
306 | --- debian/changelog 2014-04-04 14:52:06 +0000 |
307 | +++ debian/changelog 2014-04-11 18:18:05 +0000 |
308 | @@ -1,3 +1,4 @@ |
309 | +<<<<<<< TREE |
310 | ubuntu-push (0.1+14.04.20140404-0ubuntu1) trusty; urgency=low |
311 | |
312 | [ Tarmac ] |
313 | @@ -5,6 +6,14 @@ |
314 | |
315 | -- Ubuntu daily release <ps-jenkins@lists.canonical.com> Fri, 04 Apr 2014 14:52:06 +0000 |
316 | |
317 | +======= |
318 | +ubuntu-push (0.2-0ubuntu1) UNRELEASED; urgency=medium |
319 | + |
320 | + * New upstream release. |
321 | + |
322 | + -- John Lenton <john.lenton@canonical.com> Fri, 11 Apr 2014 11:19:42 +0100 |
323 | + |
324 | +>>>>>>> MERGE-SOURCE |
325 | ubuntu-push (0.1+14.04.20140327-0ubuntu1) trusty; urgency=medium |
326 | |
327 | [ John Lenton ] |
328 | |
329 | === modified file 'debian/copyright' |
330 | --- debian/copyright 2014-03-24 15:31:42 +0000 |
331 | +++ debian/copyright 2014-04-11 18:18:05 +0000 |
332 | @@ -11,7 +11,7 @@ |
333 | License: BSD-3-clause |
334 | |
335 | Files: external/murmur3/* |
336 | -Copyright: 2013 Sébastien Paolacci |
337 | +Copyright: 2013 Sébastien Paolacci |
338 | License: BSD-3-clause |
339 | |
340 | License: GPL-3.0 |
341 | |
342 | === modified file 'protocol/messages.go' |
343 | --- protocol/messages.go 2014-03-19 22:31:20 +0000 |
344 | +++ protocol/messages.go 2014-04-11 18:18:05 +0000 |
345 | @@ -49,6 +49,27 @@ |
346 | PingInterval string |
347 | } |
348 | |
349 | +// SplittableMsg are messages that may require and are capable of splitting. |
350 | +type SplittableMsg interface { |
351 | + Split() (done bool) |
352 | +} |
353 | + |
354 | +// CONNBROKEN message, server side is breaking the connection for reason. |
355 | +type ConnBrokenMsg struct { |
356 | + Type string `json:"T"` |
357 | + // reason |
358 | + Reason string |
359 | +} |
360 | + |
361 | +func (m *ConnBrokenMsg) Split() bool { |
362 | + return true |
363 | +} |
364 | + |
365 | +// CONNBROKEN reasons |
366 | +const ( |
367 | + BrokenHostMismatch = "host-mismatch" |
368 | +) |
369 | + |
370 | // PING/PONG messages |
371 | type PingPongMsg struct { |
372 | Type string `json:"T"` |
373 | @@ -56,11 +77,6 @@ |
374 | |
375 | const maxPayloadSize = 62 * 1024 |
376 | |
377 | -// SplittableMsg are messages that may require and are capable of splitting. |
378 | -type SplittableMsg interface { |
379 | - Split() (done bool) |
380 | -} |
381 | - |
382 | // BROADCAST messages |
383 | type BroadcastMsg struct { |
384 | Type string `json:"T"` |
385 | |
386 | === modified file 'protocol/messages_test.go' |
387 | --- protocol/messages_test.go 2014-02-26 16:04:57 +0000 |
388 | +++ protocol/messages_test.go 2014-04-11 18:18:05 +0000 |
389 | @@ -103,3 +103,7 @@ |
390 | b.Reset() |
391 | c.Check(b.splitting, Equals, 0) |
392 | } |
393 | + |
394 | +func (s *messagesSuite) TestSplitConnBrokenMsg(c *C) { |
395 | + c.Check((&ConnBrokenMsg{}).Split(), Equals, true) |
396 | +} |
397 | |
398 | === modified file 'sampleconfigs/dev.json' |
399 | --- sampleconfigs/dev.json 2014-03-31 16:48:00 +0000 |
400 | +++ sampleconfigs/dev.json 2014-04-11 18:18:05 +0000 |
401 | @@ -8,5 +8,6 @@ |
402 | "cert_pem_file": "../server/acceptance/ssl/testing.cert", |
403 | "http_addr": "127.0.0.1:8080", |
404 | "http_read_timeout": "5s", |
405 | - "http_write_timeout": "5s" |
406 | + "http_write_timeout": "5s", |
407 | + "delivery_domain": "localhost" |
408 | } |
409 | |
410 | === modified file 'server/acceptance/acceptance_test.go' |
411 | --- server/acceptance/acceptance_test.go 2014-02-26 19:50:46 +0000 |
412 | +++ server/acceptance/acceptance_test.go 2014-04-11 18:18:05 +0000 |
413 | @@ -34,6 +34,7 @@ |
414 | cfg := make(map[string]interface{}) |
415 | suites.FillServerConfig(cfg, addr) |
416 | suites.FillHTTPServerConfig(cfg, httpAddr) |
417 | + cfg["delivery_domain"] = "localhost" |
418 | return cfg |
419 | } |
420 | |
421 | |
422 | === modified file 'server/acceptance/acceptanceclient.go' |
423 | --- server/acceptance/acceptanceclient.go 2014-04-03 16:47:47 +0000 |
424 | +++ server/acceptance/acceptanceclient.go 2014-04-11 18:18:05 +0000 |
425 | @@ -42,7 +42,8 @@ |
426 | CertPEMBlock []byte |
427 | ReportPings bool |
428 | Levels map[string]int64 |
429 | - Insecure bool // don't verify certs |
430 | + Insecure bool // don't verify certs |
431 | + Prefix string // prefix for events |
432 | // connection |
433 | Connection net.Conn |
434 | } |
435 | @@ -105,7 +106,7 @@ |
436 | if err != nil { |
437 | return err |
438 | } |
439 | - events <- fmt.Sprintf("connected %v", conn.LocalAddr()) |
440 | + events <- fmt.Sprintf("%sconnected %v", sess.Prefix, conn.LocalAddr()) |
441 | var recv serverMsg |
442 | for { |
443 | deadAfter := pingInterval + sess.ExchangeTimeout |
444 | @@ -122,7 +123,7 @@ |
445 | return err |
446 | } |
447 | if sess.ReportPings { |
448 | - events <- "Ping" |
449 | + events <- sess.Prefix + "ping" |
450 | } |
451 | case "broadcast": |
452 | conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
453 | @@ -134,7 +135,7 @@ |
454 | if err != nil { |
455 | return err |
456 | } |
457 | - events <- fmt.Sprintf("broadcast chan:%v app:%v topLevel:%d payloads:%s", recv.ChanId, recv.AppId, recv.TopLevel, pack) |
458 | + events <- fmt.Sprintf("%sbroadcast chan:%v app:%v topLevel:%d payloads:%s", sess.Prefix, recv.ChanId, recv.AppId, recv.TopLevel, pack) |
459 | } |
460 | } |
461 | return nil |
462 | |
463 | === modified file 'server/acceptance/cmd/acceptanceclient.go' |
464 | --- server/acceptance/cmd/acceptanceclient.go 2014-04-03 16:47:47 +0000 |
465 | +++ server/acceptance/cmd/acceptanceclient.go 2014-04-11 18:18:05 +0000 |
466 | @@ -19,6 +19,7 @@ |
467 | |
468 | import ( |
469 | "flag" |
470 | + "fmt" |
471 | "log" |
472 | "os" |
473 | "path/filepath" |
474 | @@ -43,6 +44,10 @@ |
475 | } |
476 | |
477 | func main() { |
478 | + flag.Usage = func() { |
479 | + fmt.Fprintf(os.Stderr, "Usage: acceptancclient [options] <config.json> <device id>\n") |
480 | + flag.PrintDefaults() |
481 | + } |
482 | flag.Parse() |
483 | narg := flag.NArg() |
484 | switch { |
485 | @@ -72,9 +77,11 @@ |
486 | Insecure: *insecureFlag, |
487 | } |
488 | log.Printf("with: %#v", session) |
489 | - session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName)) |
490 | - if err != nil { |
491 | - log.Fatalf("reading CertPEMFile: %v", err) |
492 | + if !*insecureFlag { |
493 | + session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName)) |
494 | + if err != nil { |
495 | + log.Fatalf("reading CertPEMFile: %v", err) |
496 | + } |
497 | } |
498 | err = session.Dial() |
499 | if err != nil { |
500 | |
501 | === modified file 'server/acceptance/suites/broadcast.go' |
502 | --- server/acceptance/suites/broadcast.go 2014-04-03 16:47:47 +0000 |
503 | +++ server/acceptance/suites/broadcast.go 2014-04-11 18:18:05 +0000 |
504 | @@ -24,6 +24,7 @@ |
505 | |
506 | . "launchpad.net/gocheck" |
507 | |
508 | + "launchpad.net/ubuntu-push/client/gethosts" |
509 | "launchpad.net/ubuntu-push/protocol" |
510 | "launchpad.net/ubuntu-push/server/api" |
511 | ) |
512 | @@ -66,8 +67,6 @@ |
513 | }) |
514 | c.Assert(err, IsNil) |
515 | c.Assert(got, Matches, ".*ok.*") |
516 | - // xxx don't send this one |
517 | - c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[]`) |
518 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":20}]`) |
519 | stop() |
520 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
521 | @@ -261,3 +260,12 @@ |
522 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
523 | c.Check(len(errCh), Equals, 0) |
524 | } |
525 | + |
526 | +// test /delivery-hosts |
527 | + |
528 | +func (s *BroadcastAcceptanceSuite) TestGetHosts(c *C) { |
529 | + gh := gethosts.New("", s.ServerAPIURL+"/delivery-hosts", 2*time.Second) |
530 | + hosts, err := gh.Get() |
531 | + c.Assert(err, IsNil) |
532 | + c.Check(hosts, DeepEquals, []string{s.ServerAddr}) |
533 | +} |
534 | |
535 | === modified file 'server/acceptance/suites/pingpong.go' |
536 | --- server/acceptance/suites/pingpong.go 2014-04-03 16:47:47 +0000 |
537 | +++ server/acceptance/suites/pingpong.go 2014-04-11 18:18:05 +0000 |
538 | @@ -56,11 +56,11 @@ |
539 | c.Assert(connectSrv, Matches, ".*session.* connected .*") |
540 | c.Assert(registeredSrv, Matches, ".*session.* registered DEVA") |
541 | c.Assert(strings.HasSuffix(connectSrv, connectCli), Equals, true) |
542 | - c.Assert(NextEvent(events, errCh), Equals, "Ping") |
543 | + c.Assert(NextEvent(events, errCh), Equals, "ping") |
544 | elapsedOfPing := float64(time.Since(tconnect)) / float64(500*time.Millisecond) |
545 | c.Check(elapsedOfPing >= 1.0, Equals, true) |
546 | c.Check(elapsedOfPing < 1.05, Equals, true) |
547 | - c.Assert(NextEvent(events, errCh), Equals, "Ping") |
548 | + c.Assert(NextEvent(events, errCh), Equals, "ping") |
549 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* ended with: EOF") |
550 | c.Check(len(errCh), Equals, 0) |
551 | } |
552 | @@ -87,7 +87,7 @@ |
553 | c.Assert(NextEvent(events, errCh), Matches, "connected .*") |
554 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* connected .*") |
555 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* registered .*") |
556 | - c.Assert(NextEvent(events, errCh), Equals, "Ping") |
557 | + c.Assert(NextEvent(events, errCh), Equals, "ping") |
558 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*timeout`) |
559 | c.Check(len(errCh), Equals, 0) |
560 | } |
561 | |
562 | === modified file 'server/broker/broker.go' |
563 | --- server/broker/broker.go 2014-04-03 14:31:10 +0000 |
564 | +++ server/broker/broker.go 2014-04-11 18:18:05 +0000 |
565 | @@ -19,6 +19,7 @@ |
566 | package broker |
567 | |
568 | import ( |
569 | + "errors" |
570 | "fmt" |
571 | |
572 | "launchpad.net/ubuntu-push/protocol" |
573 | @@ -46,6 +47,9 @@ |
574 | Acked(sess BrokerSession, done bool) error |
575 | } |
576 | |
577 | +// ErrNop returned by Prepare means nothing to do/send. |
578 | +var ErrNop = errors.New("nothing to send") |
579 | + |
580 | // LevelsMap is the type for holding channel levels for session. |
581 | type LevelsMap map[store.InternalChannelId]int64 |
582 | |
583 | |
584 | === modified file 'server/broker/exchanges.go' |
585 | --- server/broker/exchanges.go 2014-04-03 16:00:53 +0000 |
586 | +++ server/broker/exchanges.go 2014-04-11 18:18:05 +0000 |
587 | @@ -28,8 +28,9 @@ |
588 | |
589 | // Scratch area for exchanges, sessions should hold one of these. |
590 | type ExchangesScratchArea struct { |
591 | - broadcastMsg protocol.BroadcastMsg |
592 | - ackMsg protocol.AckMsg |
593 | + broadcastMsg protocol.BroadcastMsg |
594 | + ackMsg protocol.AckMsg |
595 | + connBrokenMsg protocol.ConnBrokenMsg |
596 | } |
597 | |
598 | // BroadcastExchange leads a session through delivering a BROADCAST. |
599 | @@ -42,7 +43,7 @@ |
600 | } |
601 | |
602 | // check interface already here |
603 | -var _ Exchange = &BroadcastExchange{} |
604 | +var _ Exchange = (*BroadcastExchange)(nil) |
605 | |
606 | // Init ensures the BroadcastExchange is fully initialized for the sessions. |
607 | func (sbe *BroadcastExchange) Init() { |
608 | @@ -88,14 +89,18 @@ |
609 | |
610 | // Prepare session for a BROADCAST. |
611 | func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
612 | - scratchArea := sess.ExchangeScratchArea() |
613 | - scratchArea.broadcastMsg.Reset() |
614 | - scratchArea.broadcastMsg.Type = "broadcast" |
615 | clientLevel := sess.Levels()[sbe.ChanId] |
616 | payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads) |
617 | tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel()) |
618 | payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded) |
619 | + if len(payloads) == 0 && sbe.TopLevel >= clientLevel { |
620 | + // empty and don't need to force resync => do nothing |
621 | + return nil, nil, ErrNop |
622 | + } |
623 | |
624 | + scratchArea := sess.ExchangeScratchArea() |
625 | + scratchArea.broadcastMsg.Reset() |
626 | + scratchArea.broadcastMsg.Type = "broadcast" |
627 | // xxx need an AppId as well, later |
628 | scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId) |
629 | scratchArea.broadcastMsg.TopLevel = sbe.TopLevel |
630 | @@ -113,3 +118,24 @@ |
631 | sess.Levels()[sbe.ChanId] = sbe.TopLevel |
632 | return nil |
633 | } |
634 | + |
635 | +// ConnBrokenExchange breaks a session giving a reason. |
636 | +type ConnBrokenExchange struct { |
637 | + Reason string |
638 | +} |
639 | + |
640 | +// check interface already here |
641 | +var _ Exchange = (*ConnBrokenExchange)(nil) |
642 | + |
643 | +// Prepare session for a CONNBROKEN. |
644 | +func (cbe *ConnBrokenExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
645 | + scratchArea := sess.ExchangeScratchArea() |
646 | + scratchArea.connBrokenMsg.Type = "connbroken" |
647 | + scratchArea.connBrokenMsg.Reason = cbe.Reason |
648 | + return &scratchArea.connBrokenMsg, nil, nil |
649 | +} |
650 | + |
651 | +// CONNBROKEN isn't acked |
652 | +func (cbe *ConnBrokenExchange) Acked(sess BrokerSession, done bool) error { |
653 | + panic("Acked should not get invoked on ConnBrokenExchange") |
654 | +} |
655 | |
656 | === modified file 'server/broker/exchanges_test.go' |
657 | --- server/broker/exchanges_test.go 2014-04-04 09:57:02 +0000 |
658 | +++ server/broker/exchanges_test.go 2014-04-11 18:18:05 +0000 |
659 | @@ -81,6 +81,44 @@ |
660 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3)) |
661 | } |
662 | |
663 | +func (s *exchangesSuite) TestBroadcastExchangeEmpty(c *C) { |
664 | + sess := &testing.TestBrokerSession{ |
665 | + LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
666 | + Model: "m1", |
667 | + ImageChannel: "img1", |
668 | + } |
669 | + exchg := &broker.BroadcastExchange{ |
670 | + ChanId: store.SystemInternalChannelId, |
671 | + TopLevel: 3, |
672 | + NotificationPayloads: []json.RawMessage{}, |
673 | + } |
674 | + exchg.Init() |
675 | + outMsg, inMsg, err := exchg.Prepare(sess) |
676 | + c.Assert(err, Equals, broker.ErrNop) |
677 | + c.Check(outMsg, IsNil) |
678 | + c.Check(inMsg, IsNil) |
679 | +} |
680 | + |
681 | +func (s *exchangesSuite) TestBroadcastExchangeEmptyButAhead(c *C) { |
682 | + sess := &testing.TestBrokerSession{ |
683 | + LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{ |
684 | + store.SystemInternalChannelId: 10, |
685 | + }), |
686 | + Model: "m1", |
687 | + ImageChannel: "img1", |
688 | + } |
689 | + exchg := &broker.BroadcastExchange{ |
690 | + ChanId: store.SystemInternalChannelId, |
691 | + TopLevel: 3, |
692 | + NotificationPayloads: []json.RawMessage{}, |
693 | + } |
694 | + exchg.Init() |
695 | + outMsg, inMsg, err := exchg.Prepare(sess) |
696 | + c.Assert(err, IsNil) |
697 | + c.Check(outMsg, NotNil) |
698 | + c.Check(inMsg, NotNil) |
699 | +} |
700 | + |
701 | func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) { |
702 | sess := &testing.TestBrokerSession{ |
703 | LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
704 | @@ -210,3 +248,17 @@ |
705 | c.Assert(err, IsNil) |
706 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5)) |
707 | } |
708 | + |
709 | +func (s *exchangesSuite) TestConnBrokenExchange(c *C) { |
710 | + sess := &testing.TestBrokerSession{} |
711 | + cbe := &broker.ConnBrokenExchange{"REASON"} |
712 | + outMsg, inMsg, err := cbe.Prepare(sess) |
713 | + c.Assert(err, IsNil) |
714 | + c.Check(inMsg, IsNil) // no answer is expected |
715 | + // check |
716 | + marshalled, err := json.Marshal(outMsg) |
717 | + c.Assert(err, IsNil) |
718 | + c.Check(string(marshalled), Equals, `{"T":"connbroken","Reason":"REASON"}`) |
719 | + |
720 | + c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnBrokenExchange") |
721 | +} |
722 | |
723 | === modified file 'server/broker/simple/simple.go' |
724 | --- server/broker/simple/simple.go 2014-04-03 16:00:53 +0000 |
725 | +++ server/broker/simple/simple.go 2014-04-11 18:18:05 +0000 |
726 | @@ -222,6 +222,10 @@ |
727 | delete(b.registry, sess.deviceId) |
728 | } |
729 | } else { // register |
730 | + prev := b.registry[sess.deviceId] |
731 | + if prev != nil { // kick it |
732 | + close(prev.exchanges) |
733 | + } |
734 | b.registry[sess.deviceId] = sess |
735 | sess.registered = true |
736 | sess.done <- true |
737 | |
738 | === modified file 'server/broker/testsuite/suite.go' |
739 | --- server/broker/testsuite/suite.go 2014-04-03 16:00:53 +0000 |
740 | +++ server/broker/testsuite/suite.go 2014-04-11 18:18:05 +0000 |
741 | @@ -164,6 +164,14 @@ |
742 | c.Assert(err, IsNil) |
743 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}) |
744 | c.Assert(err, IsNil) |
745 | + checkAndFalse := false |
746 | + // previous session got signaled by closing its channel |
747 | + select { |
748 | + case _, ok := <-sess1.SessionChannel(): |
749 | + checkAndFalse = ok == false |
750 | + default: |
751 | + } |
752 | + c.Check(checkAndFalse, Equals, true) |
753 | c.Assert(s.RevealSession(b, "dev-1"), Equals, sess2) |
754 | b.Unregister(sess1) |
755 | // just to make sure the unregister was processed |
756 | |
757 | === modified file 'server/dev/server.go' |
758 | --- server/dev/server.go 2014-03-25 19:02:18 +0000 |
759 | +++ server/dev/server.go 2014-04-11 18:18:05 +0000 |
760 | @@ -18,6 +18,7 @@ |
761 | package main |
762 | |
763 | import ( |
764 | + "encoding/json" |
765 | "net" |
766 | "net/http" |
767 | "os" |
768 | @@ -37,6 +38,8 @@ |
769 | server.DevicesParsedConfig |
770 | // api http server configuration |
771 | server.HTTPServeParsedConfig |
772 | + // delivery domain |
773 | + DeliveryDomain string `json:"delivery_domain"` |
774 | } |
775 | |
776 | func main() { |
777 | @@ -60,11 +63,25 @@ |
778 | storeForRequest := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) { |
779 | return sto, nil |
780 | } |
781 | + lst, err := net.Listen("tcp", cfg.Addr()) |
782 | + if err != nil { |
783 | + server.BootLogFatalf("start device listening: %v", err) |
784 | + } |
785 | mux := api.MakeHandlersMux(storeForRequest, broker, logger) |
786 | + // & /delivery-hosts |
787 | + mux.HandleFunc("/delivery-hosts", func(w http.ResponseWriter, req *http.Request) { |
788 | + w.Header().Set("Cache-Control", "no-cache") |
789 | + w.Header().Set("Content-Type", "application/json") |
790 | + enc := json.NewEncoder(w) |
791 | + enc.Encode(map[string]interface{}{ |
792 | + "hosts": []string{lst.Addr().String()}, |
793 | + "domain": cfg.DeliveryDomain, |
794 | + }) |
795 | + }) |
796 | handler := api.PanicTo500Handler(mux, logger) |
797 | go server.HTTPServeRunner(nil, handler, &cfg.HTTPServeParsedConfig)() |
798 | // listen for device connections |
799 | - server.DevicesRunner(nil, func(conn net.Conn) error { |
800 | + server.DevicesRunner(lst, func(conn net.Conn) error { |
801 | track := session.NewTracker(logger) |
802 | return session.Session(conn, broker, cfg, track) |
803 | }, logger, &cfg.DevicesParsedConfig)() |
804 | |
805 | === modified file 'server/session/session.go' |
806 | --- server/session/session.go 2014-02-10 23:19:08 +0000 |
807 | +++ server/session/session.go 2014-04-11 18:18:05 +0000 |
808 | @@ -62,6 +62,9 @@ |
809 | if err != nil { |
810 | return err |
811 | } |
812 | + if inMsg == nil { // no answer expected, breaking connection |
813 | + return &broker.ErrAbort{"session broken for reason"} |
814 | + } |
815 | err = proto.ReadMessage(inMsg) |
816 | if err != nil { |
817 | return err |
818 | @@ -76,6 +79,7 @@ |
819 | pingTimer := time.NewTimer(pingInterval) |
820 | intervalStart := time.Now() |
821 | ch := sess.SessionChannel() |
822 | +Loop: |
823 | for { |
824 | select { |
825 | case <-pingTimer.C: |
826 | @@ -90,10 +94,17 @@ |
827 | return &broker.ErrAbort{"expected PONG message"} |
828 | } |
829 | pingTimer.Reset(pingInterval) |
830 | - case exchg := <-ch: |
831 | - // xxx later can use ch closing for shutdown/reset |
832 | + case exchg, ok := <-ch: |
833 | pingTimer.Stop() |
834 | + if !ok { |
835 | + return &broker.ErrAbort{"terminated"} |
836 | + } |
837 | outMsg, inMsg, err := exchg.Prepare(sess) |
838 | + if err == broker.ErrNop { // nothing to do |
839 | + pingTimer.Reset(pingInterval) |
840 | + intervalStart = time.Now() |
841 | + continue Loop |
842 | + } |
843 | if err != nil { |
844 | return err |
845 | } |
846 | |
847 | === modified file 'server/session/session_test.go' |
848 | --- server/session/session_test.go 2014-03-19 23:46:18 +0000 |
849 | +++ server/session/session_test.go 2014-04-11 18:18:05 +0000 |
850 | @@ -346,6 +346,42 @@ |
851 | c.Check(err, Equals, io.EOF) |
852 | } |
853 | |
854 | +func (s *sessionSuite) TestSessionLoopKick(c *C) { |
855 | + nopTrack := NewTracker(s.testlog) |
856 | + errCh := make(chan error, 1) |
857 | + up := make(chan interface{}, 5) |
858 | + down := make(chan interface{}, 5) |
859 | + tp := &testProtocol{up, down} |
860 | + exchanges := make(chan broker.Exchange, 1) |
861 | + sess := &testing.TestBrokerSession{Exchanges: exchanges} |
862 | + go func() { |
863 | + errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
864 | + }() |
865 | + close(exchanges) |
866 | + err := <-errCh |
867 | + c.Check(err, DeepEquals, &broker.ErrAbort{"terminated"}) |
868 | +} |
869 | + |
870 | +func (s *sessionSuite) TestSessionLoopExchangeErrNop(c *C) { |
871 | + nopTrack := NewTracker(s.testlog) |
872 | + errCh := make(chan error, 1) |
873 | + up := make(chan interface{}, 5) |
874 | + down := make(chan interface{}, 5) |
875 | + tp := &testProtocol{up, down} |
876 | + exchanges := make(chan broker.Exchange, 1) |
877 | + exchanges <- &testExchange{prepErr: broker.ErrNop} |
878 | + sess := &testing.TestBrokerSession{Exchanges: exchanges} |
879 | + go func() { |
880 | + errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
881 | + }() |
882 | + c.Check(takeNext(down), Equals, "deadline 2ms") |
883 | + c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) |
884 | + up <- nil // no write error |
885 | + up <- io.EOF |
886 | + err := <-errCh |
887 | + c.Check(err, Equals, io.EOF) |
888 | +} |
889 | + |
890 | func (s *sessionSuite) TestSessionLoopExchangeSplit(c *C) { |
891 | nopTrack := NewTracker(s.testlog) |
892 | errCh := make(chan error, 1) |
893 | @@ -434,6 +470,25 @@ |
894 | c.Check(err, Equals, io.ErrUnexpectedEOF) |
895 | } |
896 | |
897 | +func (s *sessionSuite) TestSessionLoopConnBrokenExchange(c *C) { |
898 | + nopTrack := NewTracker(s.testlog) |
899 | + errCh := make(chan error, 1) |
900 | + up := make(chan interface{}, 5) |
901 | + down := make(chan interface{}, 5) |
902 | + tp := &testProtocol{up, down} |
903 | + exchanges := make(chan broker.Exchange, 1) |
904 | + exchanges <- &broker.ConnBrokenExchange{"REASON"} |
905 | + sess := &testing.TestBrokerSession{Exchanges: exchanges} |
906 | + go func() { |
907 | + errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
908 | + }() |
909 | + c.Check(takeNext(down), Equals, "deadline 2ms") |
910 | + c.Check(takeNext(down), DeepEquals, protocol.ConnBrokenMsg{"connbroken", "REASON"}) |
911 | + up <- nil // no write error |
912 | + err := <-errCh |
913 | + c.Check(err, DeepEquals, &broker.ErrAbort{"session broken for reason"}) |
914 | +} |
915 | + |
916 | type testTracker struct { |
917 | SessionTracker |
918 | interval chan interface{} |