Merge lp:~pedronis/ubuntu-push/handle-levels-that-are-off into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 46
Merged at revision: 47
Proposed branch: lp:~pedronis/ubuntu-push/handle-levels-that-are-off
Merge into: lp:ubuntu-push
Diff against target: 134 lines (+106/-0)
3 files modified
server/acceptance/acceptance_test.go (+89/-0)
server/broker/exchanges.go (+6/-0)
server/broker/exchg_impl_test.go (+11/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/handle-levels-that-are-off
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+204667@code.launchpad.net

Commit message

fix corner cases like client level too ahead etc, with acceptance tests

Description of the change

fix corner cases like client level too ahead etc, with acceptance tests

To post a comment you must log in.
46. By Samuele Pedroni

fix corner cases, client too ahead etc, with acceptance tests

Revision history for this message
John Lenton (chipaca) wrote :

Thank you!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'server/acceptance/acceptance_test.go'
2--- server/acceptance/acceptance_test.go 2014-01-30 08:56:57 +0000
3+++ server/acceptance/acceptance_test.go 2014-02-04 12:25:11 +0000
4@@ -484,3 +484,92 @@
5 c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
6 c.Check(len(errCh), Equals, 0)
7 }
8+
9+func (s *acceptanceSuite) TestBroadcastTooAhead(c *C) {
10+ // send broadcasts that will be pending
11+ got, err := s.postRequest("/broadcast", &api.Broadcast{
12+ Channel: "system",
13+ Data: json.RawMessage(`{"b": 1}`),
14+ })
15+ c.Check(err, IsNil)
16+ c.Check(got, Matches, ".*ok.*")
17+ got, err = s.postRequest("/broadcast", &api.Broadcast{
18+ Channel: "system",
19+ Data: json.RawMessage(`{"b": 2}`),
20+ })
21+ c.Check(err, IsNil)
22+ c.Check(got, Matches, ".*ok.*")
23+
24+ clientShutdown := make(chan bool, 1) // abused as an atomic flag
25+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
26+ // read after ack
27+ if op == "read" && len(clientShutdown) > 0 {
28+ // exit the sess.Run() goroutine, client will close
29+ runtime.Goexit()
30+ }
31+ return false, 0, nil
32+ }
33+ events, errCh := s.startClient(c, "DEVB", intercept, map[string]int64{
34+ protocol.SystemChannelId: 10,
35+ })
36+ // gettting last one pending on connect
37+ c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
38+ clientShutdown <- true
39+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
40+ c.Check(len(errCh), Equals, 0)
41+}
42+
43+func (s *acceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) {
44+ // nothing there
45+ clientShutdown := make(chan bool, 1) // abused as an atomic flag
46+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
47+ // read after ack
48+ if op == "read" && len(clientShutdown) > 0 {
49+ // exit the sess.Run() goroutine, client will close
50+ runtime.Goexit()
51+ }
52+ return false, 0, nil
53+ }
54+ events, errCh := s.startClient(c, "DEVB", intercept, map[string]int64{
55+ protocol.SystemChannelId: 10,
56+ })
57+ // gettting empty pending on connect
58+ c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`)
59+ clientShutdown <- true
60+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
61+ c.Check(len(errCh), Equals, 0)
62+}
63+
64+func (s *acceptanceSuite) TestBroadcastWayBehind(c *C) {
65+ // send broadcasts that will be pending
66+ got, err := s.postRequest("/broadcast", &api.Broadcast{
67+ Channel: "system",
68+ Data: json.RawMessage(`{"b": 1}`),
69+ })
70+ c.Check(err, IsNil)
71+ c.Check(got, Matches, ".*ok.*")
72+ got, err = s.postRequest("/broadcast", &api.Broadcast{
73+ Channel: "system",
74+ Data: json.RawMessage(`{"b": 2}`),
75+ })
76+ c.Check(err, IsNil)
77+ c.Check(got, Matches, ".*ok.*")
78+
79+ clientShutdown := make(chan bool, 1) // abused as an atomic flag
80+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
81+ // read after ack
82+ if op == "read" && len(clientShutdown) > 0 {
83+ // exit the sess.Run() goroutine, client will close
84+ runtime.Goexit()
85+ }
86+ return false, 0, nil
87+ }
88+ events, errCh := s.startClient(c, "DEVB", intercept, map[string]int64{
89+ protocol.SystemChannelId: -10,
90+ })
91+ // gettting pending on connect
92+ c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`)
93+ clientShutdown <- true
94+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
95+ c.Check(len(errCh), Equals, 0)
96+}
97
98=== modified file 'server/broker/exchanges.go'
99--- server/broker/exchanges.go 2014-01-31 16:36:16 +0000
100+++ server/broker/exchanges.go 2014-02-04 12:25:11 +0000
101@@ -44,7 +44,13 @@
102
103 func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
104 c := int64(len(payloads))
105+ if c == 0 {
106+ return nil
107+ }
108 delta := topLevel - clientLevel
109+ if delta < 0 { // means too ahead, send the last pending
110+ delta = 1
111+ }
112 if delta < c {
113 return payloads[c-delta:]
114 } else {
115
116=== modified file 'server/broker/exchg_impl_test.go'
117--- server/broker/exchg_impl_test.go 2014-01-23 20:13:22 +0000
118+++ server/broker/exchg_impl_test.go 2014-02-04 12:25:11 +0000
119@@ -44,4 +44,15 @@
120 c.Check(len(res), Equals, 3)
121 res = filterByLevel(1, 5, payloads)
122 c.Check(len(res), Equals, 3)
123+ // too ahead, pick only last
124+ res = filterByLevel(10, 5, payloads)
125+ c.Check(len(res), Equals, 1)
126+ c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`))
127+}
128+
129+func (s *exchangesImplSuite) TestFilterByLevelEmpty(c *C) {
130+ res := filterByLevel(5, 0, nil)
131+ c.Check(len(res), Equals, 0)
132+ res = filterByLevel(5, 10, nil)
133+ c.Check(len(res), Equals, 0)
134 }

Subscribers

People subscribed via source and target branches