Merge lp:~pedronis/ubuntu-push/expose-more-acceptance-details into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Superseded
Proposed branch: lp:~pedronis/ubuntu-push/expose-more-acceptance-details
Merge into: lp:ubuntu-push
Diff against target: 643 lines (+175/-103)
9 files modified
protocol/messages.go (+6/-0)
protocol/messages_test.go (+3/-0)
server/acceptance/acceptance_test.go (+6/-5)
server/acceptance/suites/broadcast.go (+35/-35)
server/acceptance/suites/helpers.go (+11/-2)
server/acceptance/suites/pingpong.go (+8/-8)
server/acceptance/suites/suite.go (+53/-44)
server/broker/exchanges.go (+1/-0)
server/broker/exchanges_test.go (+52/-9)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/expose-more-acceptance-details
Reviewer Review Type Date Requested Status
Ubuntu Push Hackers Pending
Review via email: mp+208126@code.launchpad.net

Description of the change

expose more bits of acceptance tests

To post a comment you must log in.
78. By Samuele Pedroni

Merged fix-split-reuse into expose-more-acceptance-details.

79. By Samuele Pedroni

log panics from servers in full in tests

80. By Samuele Pedroni

change the signature of StartServer

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'protocol/messages.go'
--- protocol/messages.go 2014-02-24 15:54:37 +0000
+++ protocol/messages.go 2014-02-26 19:50:48 +0000
@@ -93,6 +93,12 @@
93 return true93 return true
94}94}
9595
96// Reset resets the splitting state if the message storage is to be
97// reused.
98func (b *BroadcastMsg) Reset() {
99 b.splitting = 0
100}
101
96// NOTIFICATIONS message102// NOTIFICATIONS message
97type NotificationsMsg struct {103type NotificationsMsg struct {
98 Type string `json:"T"`104 Type string `json:"T"`
99105
=== modified file 'protocol/messages_test.go'
--- protocol/messages_test.go 2014-02-10 22:53:00 +0000
+++ protocol/messages_test.go 2014-02-26 19:50:48 +0000
@@ -99,4 +99,7 @@
99 n3 := len(b.Payloads)99 n3 := len(b.Payloads)
100 c.Check(b.TopLevel, Equals, int64(n))100 c.Check(b.TopLevel, Equals, int64(n))
101 c.Check(n1+n2+n3, Equals, n)101 c.Check(n1+n2+n3, Equals, n)
102 // reset
103 b.Reset()
104 c.Check(b.splitting, Equals, 0)
102}105}
103106
=== modified file 'server/acceptance/acceptance_test.go'
--- server/acceptance/acceptance_test.go 2014-02-13 19:33:14 +0000
+++ server/acceptance/acceptance_test.go 2014-02-26 19:50:48 +0000
@@ -38,7 +38,7 @@
38}38}
3939
40// Start a server.40// Start a server.
41func StartServer(c *C) (<-chan string, func(), string, string) {41func StartServer(c *C, s *suites.AcceptanceSuite, handle *suites.ServerHandle) {
42 if *serverCmd == "" {42 if *serverCmd == "" {
43 c.Skip("executable server not specified")43 c.Skip("executable server not specified")
44 }44 }
@@ -46,10 +46,11 @@
46 cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0")46 cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0")
47 cfgFilename := suites.WriteConfig(c, tmpDir, "config.json", cfg)47 cfgFilename := suites.WriteConfig(c, tmpDir, "config.json", cfg)
48 logs, killServer := suites.RunAndObserve(c, *serverCmd, cfgFilename)48 logs, killServer := suites.RunAndObserve(c, *serverCmd, cfgFilename)
49 serverHTTPAddr := suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat)49 s.KillGroup["server"] = killServer
50 serverURL := fmt.Sprintf("http://%s", serverHTTPAddr)50 handle.ServerHTTPAddr = suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat)
51 serverAddr := suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat)51 s.ServerAPIURL = fmt.Sprintf("http://%s", handle.ServerHTTPAddr)
52 return logs, killServer, serverAddr, serverURL52 handle.ServerAddr = suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat)
53 handle.ServerEvents = logs
53}54}
5455
55// ping pong/connectivity56// ping pong/connectivity
5657
=== modified file 'server/acceptance/suites/broadcast.go'
--- server/acceptance/suites/broadcast.go 2014-02-12 15:43:24 +0000
+++ server/acceptance/suites/broadcast.go 2014-02-26 19:50:48 +0000
@@ -37,8 +37,8 @@
37var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)37var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)
3838
39func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) {39func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) {
40 events, errCh, stop := s.startClient(c, "DEVB", nil)40 events, errCh, stop := s.StartClient(c, "DEVB", nil)
41 got, err := s.postRequest("/broadcast", &api.Broadcast{41 got, err := s.PostRequest("/broadcast", &api.Broadcast{
42 Channel: "system",42 Channel: "system",
43 ExpireOn: future,43 ExpireOn: future,
44 Data: json.RawMessage(`{"n": 42}`),44 Data: json.RawMessage(`{"n": 42}`),
@@ -47,13 +47,13 @@
47 c.Assert(got, Matches, ".*ok.*")47 c.Assert(got, Matches, ".*ok.*")
48 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)48 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
49 stop()49 stop()
50 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)50 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
51 c.Check(len(errCh), Equals, 0)51 c.Check(len(errCh), Equals, 0)
52}52}
5353
54func (s *BroadcastAcceptanceSuite) TestBroadcastPending(c *C) {54func (s *BroadcastAcceptanceSuite) TestBroadcastPending(c *C) {
55 // send broadcast that will be pending55 // send broadcast that will be pending
56 got, err := s.postRequest("/broadcast", &api.Broadcast{56 got, err := s.PostRequest("/broadcast", &api.Broadcast{
57 Channel: "system",57 Channel: "system",
58 ExpireOn: future,58 ExpireOn: future,
59 Data: json.RawMessage(`{"b": 1}`),59 Data: json.RawMessage(`{"b": 1}`),
@@ -61,11 +61,11 @@
61 c.Assert(err, IsNil)61 c.Assert(err, IsNil)
62 c.Assert(got, Matches, ".*ok.*")62 c.Assert(got, Matches, ".*ok.*")
6363
64 events, errCh, stop := s.startClient(c, "DEVB", nil)64 events, errCh, stop := s.StartClient(c, "DEVB", nil)
65 // gettting pending on connect65 // gettting pending on connect
66 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)66 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
67 stop()67 stop()
68 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)68 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
69 c.Check(len(errCh), Equals, 0)69 c.Check(len(errCh), Equals, 0)
70}70}
7171
@@ -73,7 +73,7 @@
73 // send bunch of broadcasts that will be pending73 // send bunch of broadcasts that will be pending
74 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))74 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
75 for i := 0; i < 32; i++ {75 for i := 0; i < 32; i++ {
76 got, err := s.postRequest("/broadcast", &api.Broadcast{76 got, err := s.PostRequest("/broadcast", &api.Broadcast{
77 Channel: "system",77 Channel: "system",
78 ExpireOn: future,78 ExpireOn: future,
79 Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)),79 Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)),
@@ -82,22 +82,22 @@
82 c.Assert(got, Matches, ".*ok.*")82 c.Assert(got, Matches, ".*ok.*")
83 }83 }
8484
85 events, errCh, stop := s.startClient(c, "DEVC", nil)85 events, errCh, stop := s.StartClient(c, "DEVC", nil)
86 // gettting pending on connect86 // gettting pending on connect
87 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`)87 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`)
88 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)88 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)
89 stop()89 stop()
90 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)90 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
91 c.Check(len(errCh), Equals, 0)91 c.Check(len(errCh), Equals, 0)
92}92}
9393
94func (s *BroadcastAcceptanceSuite) TestBroadcastDistribution2(c *C) {94func (s *BroadcastAcceptanceSuite) TestBroadcastDistribution2(c *C) {
95 // start 1st clinet95 // start 1st client
96 events1, errCh1, stop1 := s.startClient(c, "DEV1", nil)96 events1, errCh1, stop1 := s.StartClient(c, "DEV1", nil)
97 // start 2nd client97 // start 2nd client
98 events2, errCh2, stop2 := s.startClient(c, "DEV2", nil)98 events2, errCh2, stop2 := s.StartClient(c, "DEV2", nil)
99 // broadcast99 // broadcast
100 got, err := s.postRequest("/broadcast", &api.Broadcast{100 got, err := s.PostRequest("/broadcast", &api.Broadcast{
101 Channel: "system",101 Channel: "system",
102 ExpireOn: future,102 ExpireOn: future,
103 Data: json.RawMessage(`{"n": 42}`),103 Data: json.RawMessage(`{"n": 42}`),
@@ -108,15 +108,15 @@
108 c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)108 c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
109 stop1()109 stop1()
110 stop2()110 stop2()
111 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)111 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
112 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)112 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
113 c.Check(len(errCh1), Equals, 0)113 c.Check(len(errCh1), Equals, 0)
114 c.Check(len(errCh2), Equals, 0)114 c.Check(len(errCh2), Equals, 0)
115}115}
116116
117func (s *BroadcastAcceptanceSuite) TestBroadcastFilterByLevel(c *C) {117func (s *BroadcastAcceptanceSuite) TestBroadcastFilterByLevel(c *C) {
118 events, errCh, stop := s.startClient(c, "DEVD", nil)118 events, errCh, stop := s.StartClient(c, "DEVD", nil)
119 got, err := s.postRequest("/broadcast", &api.Broadcast{119 got, err := s.PostRequest("/broadcast", &api.Broadcast{
120 Channel: "system",120 Channel: "system",
121 ExpireOn: future,121 ExpireOn: future,
122 Data: json.RawMessage(`{"b": 1}`),122 Data: json.RawMessage(`{"b": 1}`),
@@ -125,10 +125,10 @@
125 c.Assert(got, Matches, ".*ok.*")125 c.Assert(got, Matches, ".*ok.*")
126 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)126 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
127 stop()127 stop()
128 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)128 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
129 c.Check(len(errCh), Equals, 0)129 c.Check(len(errCh), Equals, 0)
130 // another broadcast130 // another broadcast
131 got, err = s.postRequest("/broadcast", &api.Broadcast{131 got, err = s.PostRequest("/broadcast", &api.Broadcast{
132 Channel: "system",132 Channel: "system",
133 ExpireOn: future,133 ExpireOn: future,
134 Data: json.RawMessage(`{"b": 2}`),134 Data: json.RawMessage(`{"b": 2}`),
@@ -136,25 +136,25 @@
136 c.Assert(err, IsNil)136 c.Assert(err, IsNil)
137 c.Assert(got, Matches, ".*ok.*")137 c.Assert(got, Matches, ".*ok.*")
138 // reconnect, provide levels, get only later notification138 // reconnect, provide levels, get only later notification
139 events, errCh, stop = s.startClient(c, "DEVD", map[string]int64{139 events, errCh, stop = s.StartClient(c, "DEVD", map[string]int64{
140 protocol.SystemChannelId: 1,140 protocol.SystemChannelId: 1,
141 })141 })
142 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)142 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
143 stop()143 stop()
144 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)144 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
145 c.Check(len(errCh), Equals, 0)145 c.Check(len(errCh), Equals, 0)
146}146}
147147
148func (s *BroadcastAcceptanceSuite) TestBroadcastTooAhead(c *C) {148func (s *BroadcastAcceptanceSuite) TestBroadcastTooAhead(c *C) {
149 // send broadcasts that will be pending149 // send broadcasts that will be pending
150 got, err := s.postRequest("/broadcast", &api.Broadcast{150 got, err := s.PostRequest("/broadcast", &api.Broadcast{
151 Channel: "system",151 Channel: "system",
152 ExpireOn: future,152 ExpireOn: future,
153 Data: json.RawMessage(`{"b": 1}`),153 Data: json.RawMessage(`{"b": 1}`),
154 })154 })
155 c.Assert(err, IsNil)155 c.Assert(err, IsNil)
156 c.Assert(got, Matches, ".*ok.*")156 c.Assert(got, Matches, ".*ok.*")
157 got, err = s.postRequest("/broadcast", &api.Broadcast{157 got, err = s.PostRequest("/broadcast", &api.Broadcast{
158 Channel: "system",158 Channel: "system",
159 ExpireOn: future,159 ExpireOn: future,
160 Data: json.RawMessage(`{"b": 2}`),160 Data: json.RawMessage(`{"b": 2}`),
@@ -162,38 +162,38 @@
162 c.Assert(err, IsNil)162 c.Assert(err, IsNil)
163 c.Assert(got, Matches, ".*ok.*")163 c.Assert(got, Matches, ".*ok.*")
164164
165 events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{165 events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{
166 protocol.SystemChannelId: 10,166 protocol.SystemChannelId: 10,
167 })167 })
168 // gettting last one pending on connect168 // gettting last one pending on connect
169 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)169 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
170 stop()170 stop()
171 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)171 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
172 c.Check(len(errCh), Equals, 0)172 c.Check(len(errCh), Equals, 0)
173}173}
174174
175func (s *BroadcastAcceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) {175func (s *BroadcastAcceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) {
176 // nothing there176 // nothing there
177 events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{177 events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{
178 protocol.SystemChannelId: 10,178 protocol.SystemChannelId: 10,
179 })179 })
180 // gettting empty pending on connect180 // gettting empty pending on connect
181 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`)181 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`)
182 stop()182 stop()
183 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)183 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
184 c.Check(len(errCh), Equals, 0)184 c.Check(len(errCh), Equals, 0)
185}185}
186186
187func (s *BroadcastAcceptanceSuite) TestBroadcastWayBehind(c *C) {187func (s *BroadcastAcceptanceSuite) TestBroadcastWayBehind(c *C) {
188 // send broadcasts that will be pending188 // send broadcasts that will be pending
189 got, err := s.postRequest("/broadcast", &api.Broadcast{189 got, err := s.PostRequest("/broadcast", &api.Broadcast{
190 Channel: "system",190 Channel: "system",
191 ExpireOn: future,191 ExpireOn: future,
192 Data: json.RawMessage(`{"b": 1}`),192 Data: json.RawMessage(`{"b": 1}`),
193 })193 })
194 c.Assert(err, IsNil)194 c.Assert(err, IsNil)
195 c.Assert(got, Matches, ".*ok.*")195 c.Assert(got, Matches, ".*ok.*")
196 got, err = s.postRequest("/broadcast", &api.Broadcast{196 got, err = s.PostRequest("/broadcast", &api.Broadcast{
197 Channel: "system",197 Channel: "system",
198 ExpireOn: future,198 ExpireOn: future,
199 Data: json.RawMessage(`{"b": 2}`),199 Data: json.RawMessage(`{"b": 2}`),
@@ -201,26 +201,26 @@
201 c.Assert(err, IsNil)201 c.Assert(err, IsNil)
202 c.Assert(got, Matches, ".*ok.*")202 c.Assert(got, Matches, ".*ok.*")
203203
204 events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{204 events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{
205 protocol.SystemChannelId: -10,205 protocol.SystemChannelId: -10,
206 })206 })
207 // gettting pending on connect207 // gettting pending on connect
208 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`)208 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`)
209 stop()209 stop()
210 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)210 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
211 c.Check(len(errCh), Equals, 0)211 c.Check(len(errCh), Equals, 0)
212}212}
213213
214func (s *BroadcastAcceptanceSuite) TestBroadcastExpiration(c *C) {214func (s *BroadcastAcceptanceSuite) TestBroadcastExpiration(c *C) {
215 // send broadcast that will be pending, and one that will expire215 // send broadcast that will be pending, and one that will expire
216 got, err := s.postRequest("/broadcast", &api.Broadcast{216 got, err := s.PostRequest("/broadcast", &api.Broadcast{
217 Channel: "system",217 Channel: "system",
218 ExpireOn: future,218 ExpireOn: future,
219 Data: json.RawMessage(`{"b": 1}`),219 Data: json.RawMessage(`{"b": 1}`),
220 })220 })
221 c.Assert(err, IsNil)221 c.Assert(err, IsNil)
222 c.Assert(got, Matches, ".*ok.*")222 c.Assert(got, Matches, ".*ok.*")
223 got, err = s.postRequest("/broadcast", &api.Broadcast{223 got, err = s.PostRequest("/broadcast", &api.Broadcast{
224 Channel: "system",224 Channel: "system",
225 ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339),225 ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339),
226 Data: json.RawMessage(`{"b": 2}`),226 Data: json.RawMessage(`{"b": 2}`),
@@ -231,10 +231,10 @@
231 time.Sleep(2 * time.Second)231 time.Sleep(2 * time.Second)
232 // second broadcast is expired232 // second broadcast is expired
233233
234 events, errCh, stop := s.startClient(c, "DEVB", nil)234 events, errCh, stop := s.StartClient(c, "DEVB", nil)
235 // gettting pending on connect235 // gettting pending on connect
236 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`)236 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`)
237 stop()237 stop()
238 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)238 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
239 c.Check(len(errCh), Equals, 0)239 c.Check(len(errCh), Equals, 0)
240}240}
241241
=== modified file 'server/acceptance/suites/helpers.go'
--- server/acceptance/suites/helpers.go 2014-02-11 20:36:32 +0000
+++ server/acceptance/suites/helpers.go 2014-02-26 19:50:48 +0000
@@ -92,12 +92,15 @@
92 c.Fatal(err)92 c.Fatal(err)
93 }93 }
94 bufErr := bufio.NewReaderSize(stderr, 5000)94 bufErr := bufio.NewReaderSize(stderr, 5000)
95 getLineInfo := func() (string, error) {95 getLineInfo := func(full bool) (string, error) {
96 for {96 for {
97 line, err := bufErr.ReadString('\n')97 line, err := bufErr.ReadString('\n')
98 if err != nil {98 if err != nil {
99 return "", err99 return "", err
100 }100 }
101 if full {
102 return strings.TrimRight(line, "\n"), nil
103 }
101 extracted := rxLineInfo.FindStringSubmatch(line)104 extracted := rxLineInfo.FindStringSubmatch(line)
102 if extracted == nil {105 if extracted == nil {
103 return "", fmt.Errorf("unexpected line: %#v", line)106 return "", fmt.Errorf("unexpected line: %#v", line)
@@ -108,13 +111,19 @@
108 }111 }
109 logs := make(chan string, 10)112 logs := make(chan string, 10)
110 go func() {113 go func() {
114 paniced := false
111 for {115 for {
112 info, err := getLineInfo()116 info, err := getLineInfo(paniced)
113 if err != nil {117 if err != nil {
114 logs <- fmt.Sprintf("%s capture: %v", cmdName, err)118 logs <- fmt.Sprintf("%s capture: %v", cmdName, err)
115 close(logs)119 close(logs)
116 return120 return
117 }121 }
122 if paniced || strings.HasPrefix(info, "ERROR(PANIC") {
123 paniced = true
124 c.Log(info)
125 continue
126 }
118 logs <- info127 logs <- info
119 }128 }
120 }()129 }()
121130
=== modified file 'server/acceptance/suites/pingpong.go'
--- server/acceptance/suites/pingpong.go 2014-02-11 20:36:32 +0000
+++ server/acceptance/suites/pingpong.go 2014-02-26 19:50:48 +0000
@@ -34,7 +34,7 @@
34func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) {34func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) {
35 errCh := make(chan error, 1)35 errCh := make(chan error, 1)
36 events := make(chan string, 10)36 events := make(chan string, 10)
37 sess := testClientSession(s.serverAddr, "DEVA", true)37 sess := testClientSession(s.ServerAddr, "DEVA", true)
38 err := sess.Dial()38 err := sess.Dial()
39 c.Assert(err, IsNil)39 c.Assert(err, IsNil)
40 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {40 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
@@ -50,8 +50,8 @@
50 errCh <- sess.Run(events)50 errCh <- sess.Run(events)
51 }()51 }()
52 connectCli := NextEvent(events, errCh)52 connectCli := NextEvent(events, errCh)
53 connectSrv := NextEvent(s.serverEvents, nil)53 connectSrv := NextEvent(s.ServerEvents, nil)
54 registeredSrv := NextEvent(s.serverEvents, nil)54 registeredSrv := NextEvent(s.ServerEvents, nil)
55 tconnect := time.Now()55 tconnect := time.Now()
56 c.Assert(connectSrv, Matches, ".*session.* connected .*")56 c.Assert(connectSrv, Matches, ".*session.* connected .*")
57 c.Assert(registeredSrv, Matches, ".*session.* registered DEVA")57 c.Assert(registeredSrv, Matches, ".*session.* registered DEVA")
@@ -61,14 +61,14 @@
61 c.Check(elapsedOfPing >= 1.0, Equals, true)61 c.Check(elapsedOfPing >= 1.0, Equals, true)
62 c.Check(elapsedOfPing < 1.05, Equals, true)62 c.Check(elapsedOfPing < 1.05, Equals, true)
63 c.Assert(NextEvent(events, errCh), Equals, "Ping")63 c.Assert(NextEvent(events, errCh), Equals, "Ping")
64 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF")64 c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* ended with: EOF")
65 c.Check(len(errCh), Equals, 0)65 c.Check(len(errCh), Equals, 0)
66}66}
6767
68func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) {68func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) {
69 errCh := make(chan error, 1)69 errCh := make(chan error, 1)
70 events := make(chan string, 10)70 events := make(chan string, 10)
71 sess := testClientSession(s.serverAddr, "DEVB", true)71 sess := testClientSession(s.ServerAddr, "DEVB", true)
72 err := sess.Dial()72 err := sess.Dial()
73 c.Assert(err, IsNil)73 c.Assert(err, IsNil)
74 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {74 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
@@ -85,9 +85,9 @@
85 errCh <- sess.Run(events)85 errCh <- sess.Run(events)
86 }()86 }()
87 c.Assert(NextEvent(events, errCh), Matches, "connected .*")87 c.Assert(NextEvent(events, errCh), Matches, "connected .*")
88 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")88 c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* connected .*")
89 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*")89 c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* registered .*")
90 c.Assert(NextEvent(events, errCh), Equals, "Ping")90 c.Assert(NextEvent(events, errCh), Equals, "Ping")
91 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`)91 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*timeout`)
92 c.Check(len(errCh), Equals, 0)92 c.Check(len(errCh), Equals, 0)
93}93}
9494
=== modified file 'server/acceptance/suites/suite.go'
--- server/acceptance/suites/suite.go 2014-02-11 20:36:32 +0000
+++ server/acceptance/suites/suite.go 2014-02-26 19:50:48 +0000
@@ -34,43 +34,79 @@
34 helpers "launchpad.net/ubuntu-push/testing"34 helpers "launchpad.net/ubuntu-push/testing"
35)35)
3636
37// ServerHandle holds the information to attach a client to the test server.
38type ServerHandle struct {
39 ServerAddr string
40 ServerHTTPAddr string
41 ServerEvents <-chan string
42}
43
44// Start a client.
45func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
46 errCh := make(chan error, 1)
47 cliEvents := make(chan string, 10)
48 sess := testClientSession(h.ServerAddr, devId, false)
49 sess.Levels = levels
50 err := sess.Dial()
51 c.Assert(err, IsNil)
52 clientShutdown := make(chan bool, 1) // abused as an atomic flag
53 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
54 // read after ack
55 if op == "read" && len(clientShutdown) > 0 {
56 // exit the sess.Run() goroutine, client will close
57 runtime.Goexit()
58 }
59 return false, 0, nil
60 }
61 sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
62 go func() {
63 errCh <- sess.Run(cliEvents)
64 }()
65 c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*")
66 c.Assert(NextEvent(h.ServerEvents, nil), Matches, ".*session.* connected .*")
67 c.Assert(NextEvent(h.ServerEvents, nil), Matches, ".*session.* registered "+devId)
68 return cliEvents, errCh, func() { clientShutdown <- true }
69}
70
37// AcceptanceSuite has the basic functionality of the acceptance suites.71// AcceptanceSuite has the basic functionality of the acceptance suites.
38type AcceptanceSuite struct {72type AcceptanceSuite struct {
39 // hook to start the server(s)73 // hook to start the server(s)
40 StartServer func(c *C) (logs <-chan string, kill func(), serverAddr, apiURL string)74 StartServer func(c *C, s *AcceptanceSuite, handle *ServerHandle)
41 // running bits75 // populated by StartServer
42 serverKill func()76 ServerHandle
43 serverAddr string77 ServerAPIURL string
44 serverAPIURL string78 // KillGroup should be populated by StartServer with functions
45 serverEvents <-chan string79 // to kill the server process
46 httpClient *http.Client80 KillGroup map[string]func()
81 // other state
82 httpClient *http.Client
47}83}
4884
49// Start a new server for each test.85// Start a new server for each test.
50func (s *AcceptanceSuite) SetUpTest(c *C) {86func (s *AcceptanceSuite) SetUpTest(c *C) {
51 logs, kill, addr, url := s.StartServer(c)87 s.KillGroup = make(map[string]func())
52 s.serverEvents = logs88 s.StartServer(c, s, &s.ServerHandle)
53 s.serverKill = kill89 c.Assert(s.ServerHandle.ServerEvents, NotNil)
54 s.serverAddr = addr90 c.Assert(s.ServerHandle.ServerAddr, Not(Equals), "")
55 s.serverAPIURL = url91 c.Assert(s.ServerAPIURL, Not(Equals), "")
56 s.httpClient = &http.Client{}92 s.httpClient = &http.Client{}
57}93}
5894
59func (s *AcceptanceSuite) TearDownTest(c *C) {95func (s *AcceptanceSuite) TearDownTest(c *C) {
60 if s.serverKill != nil {96 for _, f := range s.KillGroup {
61 s.serverKill()97 f()
62 }98 }
63}99}
64100
65// Post a request.101// Post a API request.
66func (s *AcceptanceSuite) postRequest(path string, message interface{}) (string, error) {102func (s *AcceptanceSuite) PostRequest(path string, message interface{}) (string, error) {
67 packedMessage, err := json.Marshal(message)103 packedMessage, err := json.Marshal(message)
68 if err != nil {104 if err != nil {
69 panic(err)105 panic(err)
70 }106 }
71 reader := bytes.NewReader(packedMessage)107 reader := bytes.NewReader(packedMessage)
72108
73 url := s.serverAPIURL + path109 url := s.ServerAPIURL + path
74 request, _ := http.NewRequest("POST", url, reader)110 request, _ := http.NewRequest("POST", url, reader)
75 request.ContentLength = int64(reader.Len())111 request.ContentLength = int64(reader.Len())
76 request.Header.Set("Content-Type", "application/json")112 request.Header.Set("Content-Type", "application/json")
@@ -141,30 +177,3 @@
141 }177 }
142 return178 return
143}179}
144
145// Start a client.
146func (s *AcceptanceSuite) startClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
147 errCh := make(chan error, 1)
148 cliEvents := make(chan string, 10)
149 sess := testClientSession(s.serverAddr, devId, false)
150 sess.Levels = levels
151 err := sess.Dial()
152 c.Assert(err, IsNil)
153 clientShutdown := make(chan bool, 1) // abused as an atomic flag
154 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
155 // read after ack
156 if op == "read" && len(clientShutdown) > 0 {
157 // exit the sess.Run() goroutine, client will close
158 runtime.Goexit()
159 }
160 return false, 0, nil
161 }
162 sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
163 go func() {
164 errCh <- sess.Run(cliEvents)
165 }()
166 c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*")
167 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")
168 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId)
169 return cliEvents, errCh, func() { clientShutdown <- true }
170}
171180
=== modified file 'server/broker/exchanges.go'
--- server/broker/exchanges.go 2014-02-10 23:19:08 +0000
+++ server/broker/exchanges.go 2014-02-26 19:50:48 +0000
@@ -61,6 +61,7 @@
61// Prepare session for a BROADCAST.61// Prepare session for a BROADCAST.
62func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {62func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
63 scratchArea := sess.ExchangeScratchArea()63 scratchArea := sess.ExchangeScratchArea()
64 scratchArea.broadcastMsg.Reset()
64 scratchArea.broadcastMsg.Type = "broadcast"65 scratchArea.broadcastMsg.Type = "broadcast"
65 clientLevel := sess.Levels()[sbe.ChanId]66 clientLevel := sess.Levels()[sbe.ChanId]
66 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)67 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
6768
=== modified file 'server/broker/exchanges_test.go'
--- server/broker/exchanges_test.go 2014-02-10 23:19:08 +0000
+++ server/broker/exchanges_test.go 2014-02-26 19:50:48 +0000
@@ -18,6 +18,8 @@
1818
19import (19import (
20 "encoding/json"20 "encoding/json"
21 "fmt"
22 "strings"
21 stdtesting "testing"23 stdtesting "testing"
2224
23 . "launchpad.net/gocheck"25 . "launchpad.net/gocheck"
@@ -45,19 +47,60 @@
45 json.RawMessage(`{"a":"y"}`),47 json.RawMessage(`{"a":"y"}`),
46 },48 },
47 }49 }
48 inMsg, outMsg, err := exchg.Prepare(sess)50 outMsg, inMsg, err := exchg.Prepare(sess)
49 c.Assert(err, IsNil)51 c.Assert(err, IsNil)
50 // check52 // check
51 marshalled, err := json.Marshal(inMsg)53 marshalled, err := json.Marshal(outMsg)
52 c.Assert(err, IsNil)54 c.Assert(err, IsNil)
53 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)55 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
54 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)56 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
55 c.Assert(err, IsNil)57 c.Assert(err, IsNil)
56 err = exchg.Acked(sess, true)58 err = exchg.Acked(sess, true)
57 c.Assert(err, IsNil)59 c.Assert(err, IsNil)
58 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))60 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))
59}61}
6062
63func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {
64 sess := &testing.TestBrokerSession{
65 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
66 }
67 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
68 needsSplitting := make([]json.RawMessage, 32)
69 for i := 0; i < 32; i++ {
70 needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i))
71 }
72
73 topLevel := int64(len(needsSplitting))
74 exchg := &broker.BroadcastExchange{
75 ChanId: store.SystemInternalChannelId,
76 TopLevel: topLevel,
77 NotificationPayloads: needsSplitting,
78 }
79 outMsg, _, err := exchg.Prepare(sess)
80 c.Assert(err, IsNil)
81 parts := 0
82 for {
83 done := outMsg.Split()
84 parts++
85 if done {
86 break
87 }
88 }
89 c.Assert(parts, Equals, 2)
90 exchg = &broker.BroadcastExchange{
91 ChanId: store.SystemInternalChannelId,
92 TopLevel: topLevel + 2,
93 NotificationPayloads: []json.RawMessage{
94 json.RawMessage(`{"a":"x"}`),
95 json.RawMessage(`{"a":"y"}`),
96 },
97 }
98 outMsg, _, err = exchg.Prepare(sess)
99 c.Assert(err, IsNil)
100 done := outMsg.Split() // shouldn't panic
101 c.Check(done, Equals, true)
102}
103
61func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {104func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {
62 sess := &testing.TestBrokerSession{105 sess := &testing.TestBrokerSession{
63 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),106 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
@@ -69,13 +112,13 @@
69 json.RawMessage(`{"a":"y"}`),112 json.RawMessage(`{"a":"y"}`),
70 },113 },
71 }114 }
72 inMsg, outMsg, err := exchg.Prepare(sess)115 outMsg, inMsg, err := exchg.Prepare(sess)
73 c.Assert(err, IsNil)116 c.Assert(err, IsNil)
74 // check117 // check
75 marshalled, err := json.Marshal(inMsg)118 marshalled, err := json.Marshal(outMsg)
76 c.Assert(err, IsNil)119 c.Assert(err, IsNil)
77 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)120 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
78 err = json.Unmarshal([]byte(`{}`), outMsg)121 err = json.Unmarshal([]byte(`{}`), inMsg)
79 c.Assert(err, IsNil)122 c.Assert(err, IsNil)
80 err = exchg.Acked(sess, true)123 err = exchg.Acked(sess, true)
81 c.Assert(err, Not(IsNil))124 c.Assert(err, Not(IsNil))
@@ -96,13 +139,13 @@
96 json.RawMessage(`{"a":"y"}`),139 json.RawMessage(`{"a":"y"}`),
97 },140 },
98 }141 }
99 inMsg, outMsg, err := exchg.Prepare(sess)142 outMsg, inMsg, err := exchg.Prepare(sess)
100 c.Assert(err, IsNil)143 c.Assert(err, IsNil)
101 // check144 // check
102 marshalled, err := json.Marshal(inMsg)145 marshalled, err := json.Marshal(outMsg)
103 c.Assert(err, IsNil)146 c.Assert(err, IsNil)
104 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)147 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
105 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)148 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
106 c.Assert(err, IsNil)149 c.Assert(err, IsNil)
107 err = exchg.Acked(sess, true)150 err = exchg.Acked(sess, true)
108 c.Assert(err, IsNil)151 c.Assert(err, IsNil)

Subscribers

People subscribed via source and target branches