Merge lp:~pedronis/ubuntu-push/expose-more-acceptance-details into lp:ubuntu-push
- expose-more-acceptance-details
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Superseded |
---|---|
Proposed branch: | lp:~pedronis/ubuntu-push/expose-more-acceptance-details |
Merge into: | lp:ubuntu-push |
Diff against target: |
643 lines (+175/-103) 9 files modified
protocol/messages.go (+6/-0) protocol/messages_test.go (+3/-0) server/acceptance/acceptance_test.go (+6/-5) server/acceptance/suites/broadcast.go (+35/-35) server/acceptance/suites/helpers.go (+11/-2) server/acceptance/suites/pingpong.go (+8/-8) server/acceptance/suites/suite.go (+53/-44) server/broker/exchanges.go (+1/-0) server/broker/exchanges_test.go (+52/-9) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/expose-more-acceptance-details |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Push Hackers | Pending | ||
Review via email: mp+208126@code.launchpad.net |
Commit message
Description of the change
expose more bits of acceptance tests
To post a comment you must log in.
- 78. By Samuele Pedroni
-
Merged fix-split-reuse into expose-
more-acceptance -details. - 79. By Samuele Pedroni
-
log panics from servers in full in tests
- 80. By Samuele Pedroni
-
change the signature of StartServer
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'protocol/messages.go' |
2 | --- protocol/messages.go 2014-02-24 15:54:37 +0000 |
3 | +++ protocol/messages.go 2014-02-26 19:50:48 +0000 |
4 | @@ -93,6 +93,12 @@ |
5 | return true |
6 | } |
7 | |
8 | +// Reset resets the splitting state if the message storage is to be |
9 | +// reused. |
10 | +func (b *BroadcastMsg) Reset() { |
11 | + b.splitting = 0 |
12 | +} |
13 | + |
14 | // NOTIFICATIONS message |
15 | type NotificationsMsg struct { |
16 | Type string `json:"T"` |
17 | |
18 | === modified file 'protocol/messages_test.go' |
19 | --- protocol/messages_test.go 2014-02-10 22:53:00 +0000 |
20 | +++ protocol/messages_test.go 2014-02-26 19:50:48 +0000 |
21 | @@ -99,4 +99,7 @@ |
22 | n3 := len(b.Payloads) |
23 | c.Check(b.TopLevel, Equals, int64(n)) |
24 | c.Check(n1+n2+n3, Equals, n) |
25 | + // reset |
26 | + b.Reset() |
27 | + c.Check(b.splitting, Equals, 0) |
28 | } |
29 | |
30 | === modified file 'server/acceptance/acceptance_test.go' |
31 | --- server/acceptance/acceptance_test.go 2014-02-13 19:33:14 +0000 |
32 | +++ server/acceptance/acceptance_test.go 2014-02-26 19:50:48 +0000 |
33 | @@ -38,7 +38,7 @@ |
34 | } |
35 | |
36 | // Start a server. |
37 | -func StartServer(c *C) (<-chan string, func(), string, string) { |
38 | +func StartServer(c *C, s *suites.AcceptanceSuite, handle *suites.ServerHandle) { |
39 | if *serverCmd == "" { |
40 | c.Skip("executable server not specified") |
41 | } |
42 | @@ -46,10 +46,11 @@ |
43 | cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0") |
44 | cfgFilename := suites.WriteConfig(c, tmpDir, "config.json", cfg) |
45 | logs, killServer := suites.RunAndObserve(c, *serverCmd, cfgFilename) |
46 | - serverHTTPAddr := suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat) |
47 | - serverURL := fmt.Sprintf("http://%s", serverHTTPAddr) |
48 | - serverAddr := suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat) |
49 | - return logs, killServer, serverAddr, serverURL |
50 | + s.KillGroup["server"] = killServer |
51 | + handle.ServerHTTPAddr = suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat) |
52 | + s.ServerAPIURL = fmt.Sprintf("http://%s", handle.ServerHTTPAddr) |
53 | + handle.ServerAddr = suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat) |
54 | + handle.ServerEvents = logs |
55 | } |
56 | |
57 | // ping pong/connectivity |
58 | |
59 | === modified file 'server/acceptance/suites/broadcast.go' |
60 | --- server/acceptance/suites/broadcast.go 2014-02-12 15:43:24 +0000 |
61 | +++ server/acceptance/suites/broadcast.go 2014-02-26 19:50:48 +0000 |
62 | @@ -37,8 +37,8 @@ |
63 | var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339) |
64 | |
65 | func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) { |
66 | - events, errCh, stop := s.startClient(c, "DEVB", nil) |
67 | - got, err := s.postRequest("/broadcast", &api.Broadcast{ |
68 | + events, errCh, stop := s.StartClient(c, "DEVB", nil) |
69 | + got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
70 | Channel: "system", |
71 | ExpireOn: future, |
72 | Data: json.RawMessage(`{"n": 42}`), |
73 | @@ -47,13 +47,13 @@ |
74 | c.Assert(got, Matches, ".*ok.*") |
75 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
76 | stop() |
77 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
78 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
79 | c.Check(len(errCh), Equals, 0) |
80 | } |
81 | |
82 | func (s *BroadcastAcceptanceSuite) TestBroadcastPending(c *C) { |
83 | // send broadcast that will be pending |
84 | - got, err := s.postRequest("/broadcast", &api.Broadcast{ |
85 | + got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
86 | Channel: "system", |
87 | ExpireOn: future, |
88 | Data: json.RawMessage(`{"b": 1}`), |
89 | @@ -61,11 +61,11 @@ |
90 | c.Assert(err, IsNil) |
91 | c.Assert(got, Matches, ".*ok.*") |
92 | |
93 | - events, errCh, stop := s.startClient(c, "DEVB", nil) |
94 | + events, errCh, stop := s.StartClient(c, "DEVB", nil) |
95 | // gettting pending on connect |
96 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
97 | stop() |
98 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
99 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
100 | c.Check(len(errCh), Equals, 0) |
101 | } |
102 | |
103 | @@ -73,7 +73,7 @@ |
104 | // send bunch of broadcasts that will be pending |
105 | payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
106 | for i := 0; i < 32; i++ { |
107 | - got, err := s.postRequest("/broadcast", &api.Broadcast{ |
108 | + got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
109 | Channel: "system", |
110 | ExpireOn: future, |
111 | Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)), |
112 | @@ -82,22 +82,22 @@ |
113 | c.Assert(got, Matches, ".*ok.*") |
114 | } |
115 | |
116 | - events, errCh, stop := s.startClient(c, "DEVC", nil) |
117 | + events, errCh, stop := s.StartClient(c, "DEVC", nil) |
118 | // gettting pending on connect |
119 | c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`) |
120 | c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`) |
121 | stop() |
122 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
123 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
124 | c.Check(len(errCh), Equals, 0) |
125 | } |
126 | |
127 | func (s *BroadcastAcceptanceSuite) TestBroadcastDistribution2(c *C) { |
128 | - // start 1st clinet |
129 | - events1, errCh1, stop1 := s.startClient(c, "DEV1", nil) |
130 | + // start 1st client |
131 | + events1, errCh1, stop1 := s.StartClient(c, "DEV1", nil) |
132 | // start 2nd client |
133 | - events2, errCh2, stop2 := s.startClient(c, "DEV2", nil) |
134 | + events2, errCh2, stop2 := s.StartClient(c, "DEV2", nil) |
135 | // broadcast |
136 | - got, err := s.postRequest("/broadcast", &api.Broadcast{ |
137 | + got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
138 | Channel: "system", |
139 | ExpireOn: future, |
140 | Data: json.RawMessage(`{"n": 42}`), |
141 | @@ -108,15 +108,15 @@ |
142 | c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
143 | stop1() |
144 | stop2() |
145 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
146 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
147 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
148 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
149 | c.Check(len(errCh1), Equals, 0) |
150 | c.Check(len(errCh2), Equals, 0) |
151 | } |
152 | |
153 | func (s *BroadcastAcceptanceSuite) TestBroadcastFilterByLevel(c *C) { |
154 | - events, errCh, stop := s.startClient(c, "DEVD", nil) |
155 | - got, err := s.postRequest("/broadcast", &api.Broadcast{ |
156 | + events, errCh, stop := s.StartClient(c, "DEVD", nil) |
157 | + got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
158 | Channel: "system", |
159 | ExpireOn: future, |
160 | Data: json.RawMessage(`{"b": 1}`), |
161 | @@ -125,10 +125,10 @@ |
162 | c.Assert(got, Matches, ".*ok.*") |
163 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
164 | stop() |
165 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
166 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
167 | c.Check(len(errCh), Equals, 0) |
168 | // another broadcast |
169 | - got, err = s.postRequest("/broadcast", &api.Broadcast{ |
170 | + got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
171 | Channel: "system", |
172 | ExpireOn: future, |
173 | Data: json.RawMessage(`{"b": 2}`), |
174 | @@ -136,25 +136,25 @@ |
175 | c.Assert(err, IsNil) |
176 | c.Assert(got, Matches, ".*ok.*") |
177 | // reconnect, provide levels, get only later notification |
178 | - events, errCh, stop = s.startClient(c, "DEVD", map[string]int64{ |
179 | + events, errCh, stop = s.StartClient(c, "DEVD", map[string]int64{ |
180 | protocol.SystemChannelId: 1, |
181 | }) |
182 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) |
183 | stop() |
184 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
185 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
186 | c.Check(len(errCh), Equals, 0) |
187 | } |
188 | |
189 | func (s *BroadcastAcceptanceSuite) TestBroadcastTooAhead(c *C) { |
190 | // send broadcasts that will be pending |
191 | - got, err := s.postRequest("/broadcast", &api.Broadcast{ |
192 | + got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
193 | Channel: "system", |
194 | ExpireOn: future, |
195 | Data: json.RawMessage(`{"b": 1}`), |
196 | }) |
197 | c.Assert(err, IsNil) |
198 | c.Assert(got, Matches, ".*ok.*") |
199 | - got, err = s.postRequest("/broadcast", &api.Broadcast{ |
200 | + got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
201 | Channel: "system", |
202 | ExpireOn: future, |
203 | Data: json.RawMessage(`{"b": 2}`), |
204 | @@ -162,38 +162,38 @@ |
205 | c.Assert(err, IsNil) |
206 | c.Assert(got, Matches, ".*ok.*") |
207 | |
208 | - events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{ |
209 | + events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{ |
210 | protocol.SystemChannelId: 10, |
211 | }) |
212 | // gettting last one pending on connect |
213 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) |
214 | stop() |
215 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
216 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
217 | c.Check(len(errCh), Equals, 0) |
218 | } |
219 | |
220 | func (s *BroadcastAcceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) { |
221 | // nothing there |
222 | - events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{ |
223 | + events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{ |
224 | protocol.SystemChannelId: 10, |
225 | }) |
226 | // gettting empty pending on connect |
227 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`) |
228 | stop() |
229 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
230 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
231 | c.Check(len(errCh), Equals, 0) |
232 | } |
233 | |
234 | func (s *BroadcastAcceptanceSuite) TestBroadcastWayBehind(c *C) { |
235 | // send broadcasts that will be pending |
236 | - got, err := s.postRequest("/broadcast", &api.Broadcast{ |
237 | + got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
238 | Channel: "system", |
239 | ExpireOn: future, |
240 | Data: json.RawMessage(`{"b": 1}`), |
241 | }) |
242 | c.Assert(err, IsNil) |
243 | c.Assert(got, Matches, ".*ok.*") |
244 | - got, err = s.postRequest("/broadcast", &api.Broadcast{ |
245 | + got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
246 | Channel: "system", |
247 | ExpireOn: future, |
248 | Data: json.RawMessage(`{"b": 2}`), |
249 | @@ -201,26 +201,26 @@ |
250 | c.Assert(err, IsNil) |
251 | c.Assert(got, Matches, ".*ok.*") |
252 | |
253 | - events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{ |
254 | + events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{ |
255 | protocol.SystemChannelId: -10, |
256 | }) |
257 | // gettting pending on connect |
258 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`) |
259 | stop() |
260 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
261 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
262 | c.Check(len(errCh), Equals, 0) |
263 | } |
264 | |
265 | func (s *BroadcastAcceptanceSuite) TestBroadcastExpiration(c *C) { |
266 | // send broadcast that will be pending, and one that will expire |
267 | - got, err := s.postRequest("/broadcast", &api.Broadcast{ |
268 | + got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
269 | Channel: "system", |
270 | ExpireOn: future, |
271 | Data: json.RawMessage(`{"b": 1}`), |
272 | }) |
273 | c.Assert(err, IsNil) |
274 | c.Assert(got, Matches, ".*ok.*") |
275 | - got, err = s.postRequest("/broadcast", &api.Broadcast{ |
276 | + got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
277 | Channel: "system", |
278 | ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339), |
279 | Data: json.RawMessage(`{"b": 2}`), |
280 | @@ -231,10 +231,10 @@ |
281 | time.Sleep(2 * time.Second) |
282 | // second broadcast is expired |
283 | |
284 | - events, errCh, stop := s.startClient(c, "DEVB", nil) |
285 | + events, errCh, stop := s.StartClient(c, "DEVB", nil) |
286 | // gettting pending on connect |
287 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`) |
288 | stop() |
289 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
290 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
291 | c.Check(len(errCh), Equals, 0) |
292 | } |
293 | |
294 | === modified file 'server/acceptance/suites/helpers.go' |
295 | --- server/acceptance/suites/helpers.go 2014-02-11 20:36:32 +0000 |
296 | +++ server/acceptance/suites/helpers.go 2014-02-26 19:50:48 +0000 |
297 | @@ -92,12 +92,15 @@ |
298 | c.Fatal(err) |
299 | } |
300 | bufErr := bufio.NewReaderSize(stderr, 5000) |
301 | - getLineInfo := func() (string, error) { |
302 | + getLineInfo := func(full bool) (string, error) { |
303 | for { |
304 | line, err := bufErr.ReadString('\n') |
305 | if err != nil { |
306 | return "", err |
307 | } |
308 | + if full { |
309 | + return strings.TrimRight(line, "\n"), nil |
310 | + } |
311 | extracted := rxLineInfo.FindStringSubmatch(line) |
312 | if extracted == nil { |
313 | return "", fmt.Errorf("unexpected line: %#v", line) |
314 | @@ -108,13 +111,19 @@ |
315 | } |
316 | logs := make(chan string, 10) |
317 | go func() { |
318 | + paniced := false |
319 | for { |
320 | - info, err := getLineInfo() |
321 | + info, err := getLineInfo(paniced) |
322 | if err != nil { |
323 | logs <- fmt.Sprintf("%s capture: %v", cmdName, err) |
324 | close(logs) |
325 | return |
326 | } |
327 | + if paniced || strings.HasPrefix(info, "ERROR(PANIC") { |
328 | + paniced = true |
329 | + c.Log(info) |
330 | + continue |
331 | + } |
332 | logs <- info |
333 | } |
334 | }() |
335 | |
336 | === modified file 'server/acceptance/suites/pingpong.go' |
337 | --- server/acceptance/suites/pingpong.go 2014-02-11 20:36:32 +0000 |
338 | +++ server/acceptance/suites/pingpong.go 2014-02-26 19:50:48 +0000 |
339 | @@ -34,7 +34,7 @@ |
340 | func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) { |
341 | errCh := make(chan error, 1) |
342 | events := make(chan string, 10) |
343 | - sess := testClientSession(s.serverAddr, "DEVA", true) |
344 | + sess := testClientSession(s.ServerAddr, "DEVA", true) |
345 | err := sess.Dial() |
346 | c.Assert(err, IsNil) |
347 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
348 | @@ -50,8 +50,8 @@ |
349 | errCh <- sess.Run(events) |
350 | }() |
351 | connectCli := NextEvent(events, errCh) |
352 | - connectSrv := NextEvent(s.serverEvents, nil) |
353 | - registeredSrv := NextEvent(s.serverEvents, nil) |
354 | + connectSrv := NextEvent(s.ServerEvents, nil) |
355 | + registeredSrv := NextEvent(s.ServerEvents, nil) |
356 | tconnect := time.Now() |
357 | c.Assert(connectSrv, Matches, ".*session.* connected .*") |
358 | c.Assert(registeredSrv, Matches, ".*session.* registered DEVA") |
359 | @@ -61,14 +61,14 @@ |
360 | c.Check(elapsedOfPing >= 1.0, Equals, true) |
361 | c.Check(elapsedOfPing < 1.05, Equals, true) |
362 | c.Assert(NextEvent(events, errCh), Equals, "Ping") |
363 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF") |
364 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* ended with: EOF") |
365 | c.Check(len(errCh), Equals, 0) |
366 | } |
367 | |
368 | func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) { |
369 | errCh := make(chan error, 1) |
370 | events := make(chan string, 10) |
371 | - sess := testClientSession(s.serverAddr, "DEVB", true) |
372 | + sess := testClientSession(s.ServerAddr, "DEVB", true) |
373 | err := sess.Dial() |
374 | c.Assert(err, IsNil) |
375 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
376 | @@ -85,9 +85,9 @@ |
377 | errCh <- sess.Run(events) |
378 | }() |
379 | c.Assert(NextEvent(events, errCh), Matches, "connected .*") |
380 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") |
381 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*") |
382 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* connected .*") |
383 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* registered .*") |
384 | c.Assert(NextEvent(events, errCh), Equals, "Ping") |
385 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`) |
386 | + c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*timeout`) |
387 | c.Check(len(errCh), Equals, 0) |
388 | } |
389 | |
390 | === modified file 'server/acceptance/suites/suite.go' |
391 | --- server/acceptance/suites/suite.go 2014-02-11 20:36:32 +0000 |
392 | +++ server/acceptance/suites/suite.go 2014-02-26 19:50:48 +0000 |
393 | @@ -34,43 +34,79 @@ |
394 | helpers "launchpad.net/ubuntu-push/testing" |
395 | ) |
396 | |
397 | +// ServerHandle holds the information to attach a client to the test server. |
398 | +type ServerHandle struct { |
399 | + ServerAddr string |
400 | + ServerHTTPAddr string |
401 | + ServerEvents <-chan string |
402 | +} |
403 | + |
404 | +// Start a client. |
405 | +func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) { |
406 | + errCh := make(chan error, 1) |
407 | + cliEvents := make(chan string, 10) |
408 | + sess := testClientSession(h.ServerAddr, devId, false) |
409 | + sess.Levels = levels |
410 | + err := sess.Dial() |
411 | + c.Assert(err, IsNil) |
412 | + clientShutdown := make(chan bool, 1) // abused as an atomic flag |
413 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
414 | + // read after ack |
415 | + if op == "read" && len(clientShutdown) > 0 { |
416 | + // exit the sess.Run() goroutine, client will close |
417 | + runtime.Goexit() |
418 | + } |
419 | + return false, 0, nil |
420 | + } |
421 | + sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
422 | + go func() { |
423 | + errCh <- sess.Run(cliEvents) |
424 | + }() |
425 | + c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*") |
426 | + c.Assert(NextEvent(h.ServerEvents, nil), Matches, ".*session.* connected .*") |
427 | + c.Assert(NextEvent(h.ServerEvents, nil), Matches, ".*session.* registered "+devId) |
428 | + return cliEvents, errCh, func() { clientShutdown <- true } |
429 | +} |
430 | + |
431 | // AcceptanceSuite has the basic functionality of the acceptance suites. |
432 | type AcceptanceSuite struct { |
433 | // hook to start the server(s) |
434 | - StartServer func(c *C) (logs <-chan string, kill func(), serverAddr, apiURL string) |
435 | - // running bits |
436 | - serverKill func() |
437 | - serverAddr string |
438 | - serverAPIURL string |
439 | - serverEvents <-chan string |
440 | - httpClient *http.Client |
441 | + StartServer func(c *C, s *AcceptanceSuite, handle *ServerHandle) |
442 | + // populated by StartServer |
443 | + ServerHandle |
444 | + ServerAPIURL string |
445 | + // KillGroup should be populated by StartServer with functions |
446 | + // to kill the server process |
447 | + KillGroup map[string]func() |
448 | + // other state |
449 | + httpClient *http.Client |
450 | } |
451 | |
452 | // Start a new server for each test. |
453 | func (s *AcceptanceSuite) SetUpTest(c *C) { |
454 | - logs, kill, addr, url := s.StartServer(c) |
455 | - s.serverEvents = logs |
456 | - s.serverKill = kill |
457 | - s.serverAddr = addr |
458 | - s.serverAPIURL = url |
459 | + s.KillGroup = make(map[string]func()) |
460 | + s.StartServer(c, s, &s.ServerHandle) |
461 | + c.Assert(s.ServerHandle.ServerEvents, NotNil) |
462 | + c.Assert(s.ServerHandle.ServerAddr, Not(Equals), "") |
463 | + c.Assert(s.ServerAPIURL, Not(Equals), "") |
464 | s.httpClient = &http.Client{} |
465 | } |
466 | |
467 | func (s *AcceptanceSuite) TearDownTest(c *C) { |
468 | - if s.serverKill != nil { |
469 | - s.serverKill() |
470 | + for _, f := range s.KillGroup { |
471 | + f() |
472 | } |
473 | } |
474 | |
475 | -// Post a request. |
476 | -func (s *AcceptanceSuite) postRequest(path string, message interface{}) (string, error) { |
477 | +// Post a API request. |
478 | +func (s *AcceptanceSuite) PostRequest(path string, message interface{}) (string, error) { |
479 | packedMessage, err := json.Marshal(message) |
480 | if err != nil { |
481 | panic(err) |
482 | } |
483 | reader := bytes.NewReader(packedMessage) |
484 | |
485 | - url := s.serverAPIURL + path |
486 | + url := s.ServerAPIURL + path |
487 | request, _ := http.NewRequest("POST", url, reader) |
488 | request.ContentLength = int64(reader.Len()) |
489 | request.Header.Set("Content-Type", "application/json") |
490 | @@ -141,30 +177,3 @@ |
491 | } |
492 | return |
493 | } |
494 | - |
495 | -// Start a client. |
496 | -func (s *AcceptanceSuite) startClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) { |
497 | - errCh := make(chan error, 1) |
498 | - cliEvents := make(chan string, 10) |
499 | - sess := testClientSession(s.serverAddr, devId, false) |
500 | - sess.Levels = levels |
501 | - err := sess.Dial() |
502 | - c.Assert(err, IsNil) |
503 | - clientShutdown := make(chan bool, 1) // abused as an atomic flag |
504 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
505 | - // read after ack |
506 | - if op == "read" && len(clientShutdown) > 0 { |
507 | - // exit the sess.Run() goroutine, client will close |
508 | - runtime.Goexit() |
509 | - } |
510 | - return false, 0, nil |
511 | - } |
512 | - sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
513 | - go func() { |
514 | - errCh <- sess.Run(cliEvents) |
515 | - }() |
516 | - c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*") |
517 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") |
518 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId) |
519 | - return cliEvents, errCh, func() { clientShutdown <- true } |
520 | -} |
521 | |
522 | === modified file 'server/broker/exchanges.go' |
523 | --- server/broker/exchanges.go 2014-02-10 23:19:08 +0000 |
524 | +++ server/broker/exchanges.go 2014-02-26 19:50:48 +0000 |
525 | @@ -61,6 +61,7 @@ |
526 | // Prepare session for a BROADCAST. |
527 | func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
528 | scratchArea := sess.ExchangeScratchArea() |
529 | + scratchArea.broadcastMsg.Reset() |
530 | scratchArea.broadcastMsg.Type = "broadcast" |
531 | clientLevel := sess.Levels()[sbe.ChanId] |
532 | payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads) |
533 | |
534 | === modified file 'server/broker/exchanges_test.go' |
535 | --- server/broker/exchanges_test.go 2014-02-10 23:19:08 +0000 |
536 | +++ server/broker/exchanges_test.go 2014-02-26 19:50:48 +0000 |
537 | @@ -18,6 +18,8 @@ |
538 | |
539 | import ( |
540 | "encoding/json" |
541 | + "fmt" |
542 | + "strings" |
543 | stdtesting "testing" |
544 | |
545 | . "launchpad.net/gocheck" |
546 | @@ -45,19 +47,60 @@ |
547 | json.RawMessage(`{"a":"y"}`), |
548 | }, |
549 | } |
550 | - inMsg, outMsg, err := exchg.Prepare(sess) |
551 | + outMsg, inMsg, err := exchg.Prepare(sess) |
552 | c.Assert(err, IsNil) |
553 | // check |
554 | - marshalled, err := json.Marshal(inMsg) |
555 | + marshalled, err := json.Marshal(outMsg) |
556 | c.Assert(err, IsNil) |
557 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) |
558 | - err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
559 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
560 | c.Assert(err, IsNil) |
561 | err = exchg.Acked(sess, true) |
562 | c.Assert(err, IsNil) |
563 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3)) |
564 | } |
565 | |
566 | +func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) { |
567 | + sess := &testing.TestBrokerSession{ |
568 | + LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
569 | + } |
570 | + payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
571 | + needsSplitting := make([]json.RawMessage, 32) |
572 | + for i := 0; i < 32; i++ { |
573 | + needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i)) |
574 | + } |
575 | + |
576 | + topLevel := int64(len(needsSplitting)) |
577 | + exchg := &broker.BroadcastExchange{ |
578 | + ChanId: store.SystemInternalChannelId, |
579 | + TopLevel: topLevel, |
580 | + NotificationPayloads: needsSplitting, |
581 | + } |
582 | + outMsg, _, err := exchg.Prepare(sess) |
583 | + c.Assert(err, IsNil) |
584 | + parts := 0 |
585 | + for { |
586 | + done := outMsg.Split() |
587 | + parts++ |
588 | + if done { |
589 | + break |
590 | + } |
591 | + } |
592 | + c.Assert(parts, Equals, 2) |
593 | + exchg = &broker.BroadcastExchange{ |
594 | + ChanId: store.SystemInternalChannelId, |
595 | + TopLevel: topLevel + 2, |
596 | + NotificationPayloads: []json.RawMessage{ |
597 | + json.RawMessage(`{"a":"x"}`), |
598 | + json.RawMessage(`{"a":"y"}`), |
599 | + }, |
600 | + } |
601 | + outMsg, _, err = exchg.Prepare(sess) |
602 | + c.Assert(err, IsNil) |
603 | + done := outMsg.Split() // shouldn't panic |
604 | + c.Check(done, Equals, true) |
605 | +} |
606 | + |
607 | func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) { |
608 | sess := &testing.TestBrokerSession{ |
609 | LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
610 | @@ -69,13 +112,13 @@ |
611 | json.RawMessage(`{"a":"y"}`), |
612 | }, |
613 | } |
614 | - inMsg, outMsg, err := exchg.Prepare(sess) |
615 | + outMsg, inMsg, err := exchg.Prepare(sess) |
616 | c.Assert(err, IsNil) |
617 | // check |
618 | - marshalled, err := json.Marshal(inMsg) |
619 | + marshalled, err := json.Marshal(outMsg) |
620 | c.Assert(err, IsNil) |
621 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
622 | - err = json.Unmarshal([]byte(`{}`), outMsg) |
623 | + err = json.Unmarshal([]byte(`{}`), inMsg) |
624 | c.Assert(err, IsNil) |
625 | err = exchg.Acked(sess, true) |
626 | c.Assert(err, Not(IsNil)) |
627 | @@ -96,13 +139,13 @@ |
628 | json.RawMessage(`{"a":"y"}`), |
629 | }, |
630 | } |
631 | - inMsg, outMsg, err := exchg.Prepare(sess) |
632 | + outMsg, inMsg, err := exchg.Prepare(sess) |
633 | c.Assert(err, IsNil) |
634 | // check |
635 | - marshalled, err := json.Marshal(inMsg) |
636 | + marshalled, err := json.Marshal(outMsg) |
637 | c.Assert(err, IsNil) |
638 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
639 | - err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
640 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
641 | c.Assert(err, IsNil) |
642 | err = exchg.Acked(sess, true) |
643 | c.Assert(err, IsNil) |