Merge lp:~pedronis/ubuntu-push/acceptance-flex-2 into lp:ubuntu-push
- acceptance-flex-2
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 69 |
Merged at revision: | 66 |
Proposed branch: | lp:~pedronis/ubuntu-push/acceptance-flex-2 |
Merge into: | lp:ubuntu-push |
Prerequisite: | lp:~pedronis/ubuntu-push/acceptance-flex-1 |
Diff against target: |
850 lines (+363/-319) 5 files modified
server/acceptance/acceptance_test.go (+62/-0) server/acceptance/suites/broadcast.go (+35/-316) server/acceptance/suites/helpers.go (+3/-3) server/acceptance/suites/pingpong.go (+93/-0) server/acceptance/suites/suite.go (+170/-0) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/acceptance-flex-2 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email:
|
Commit message
restructure acceptance tests into reusable suites
Description of the change
restructure acceptance tests into reusable suites
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added file 'server/acceptance/acceptance_test.go' |
2 | --- server/acceptance/acceptance_test.go 1970-01-01 00:00:00 +0000 |
3 | +++ server/acceptance/acceptance_test.go 2014-02-11 20:40:06 +0000 |
4 | @@ -0,0 +1,62 @@ |
5 | +/* |
6 | + Copyright 2013-2014 Canonical Ltd. |
7 | + |
8 | + This program is free software: you can redistribute it and/or modify it |
9 | + under the terms of the GNU General Public License version 3, as published |
10 | + by the Free Software Foundation. |
11 | + |
12 | + This program is distributed in the hope that it will be useful, but |
13 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
14 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
15 | + PURPOSE. See the GNU General Public License for more details. |
16 | + |
17 | + You should have received a copy of the GNU General Public License along |
18 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | +*/ |
20 | + |
21 | +package acceptance_test |
22 | + |
23 | +import ( |
24 | + "flag" |
25 | + "fmt" |
26 | + "strings" |
27 | + "testing" |
28 | + |
29 | + . "launchpad.net/gocheck" |
30 | + |
31 | + "launchpad.net/ubuntu-push/server/acceptance/suites" |
32 | +) |
33 | + |
34 | +func TestAcceptance(t *testing.T) { TestingT(t) } |
35 | + |
36 | +var serverCmd = flag.String("server", "", "server to test") |
37 | +var serverAuxCfg = flag.String("auxcfg", "", "auxiliary config for the server") |
38 | + |
39 | +func testServerConfig(addr, httpAddr string) map[string]interface{} { |
40 | + cfg := make(map[string]interface{}) |
41 | + suites.FillServerConfig(cfg, addr) |
42 | + suites.FillHTTPServerConfig(cfg, httpAddr) |
43 | + return cfg |
44 | +} |
45 | + |
46 | +// Start a server. |
47 | +func StartServer(c *C) (<-chan string, func(), string, string) { |
48 | + if *serverCmd == "" { |
49 | + c.Skip("executable server not specified") |
50 | + } |
51 | + tmpDir := c.MkDir() |
52 | + cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0") |
53 | + cfgFilename := suites.WriteConfig(c, tmpDir, "config.json", cfg) |
54 | + cfgs := append(strings.Fields(*serverAuxCfg), cfgFilename) |
55 | + logs, killServer := suites.RunAndObserve(c, *serverCmd, cfgs...) |
56 | + serverHTTPAddr := suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat) |
57 | + serverURL := fmt.Sprintf("http://%s", serverHTTPAddr) |
58 | + serverAddr := suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat) |
59 | + return logs, killServer, serverAddr, serverURL |
60 | +} |
61 | + |
62 | +// ping pong/connectivity |
63 | +var _ = Suite(&suites.PingPongAcceptanceSuite{suites.AcceptanceSuite{StartServer: StartServer}}) |
64 | + |
65 | +// broadcast |
66 | +var _ = Suite(&suites.BroadcastAcceptanceSuite{suites.AcceptanceSuite{StartServer: StartServer}}) |
67 | |
68 | === added directory 'server/acceptance/suites' |
69 | === renamed file 'server/acceptance/acceptance_test.go' => 'server/acceptance/suites/broadcast.go' |
70 | --- server/acceptance/acceptance_test.go 2014-02-11 20:40:06 +0000 |
71 | +++ server/acceptance/suites/broadcast.go 2014-02-11 20:40:06 +0000 |
72 | @@ -14,248 +14,30 @@ |
73 | with this program. If not, see <http://www.gnu.org/licenses/>. |
74 | */ |
75 | |
76 | -package acceptance |
77 | +package suites |
78 | |
79 | import ( |
80 | - "bytes" |
81 | "encoding/json" |
82 | - "flag" |
83 | "fmt" |
84 | - "io/ioutil" |
85 | - "net" |
86 | - "net/http" |
87 | - "runtime" |
88 | "strings" |
89 | - "testing" |
90 | "time" |
91 | |
92 | . "launchpad.net/gocheck" |
93 | |
94 | "launchpad.net/ubuntu-push/protocol" |
95 | "launchpad.net/ubuntu-push/server/api" |
96 | - helpers "launchpad.net/ubuntu-push/testing" |
97 | ) |
98 | |
99 | -func TestAcceptance(t *testing.T) { TestingT(t) } |
100 | - |
101 | -type acceptanceSuite struct { |
102 | - serverKill func() |
103 | - serverAddr string |
104 | - serverURL string |
105 | - serverEvents <-chan string |
106 | - httpClient *http.Client |
107 | -} |
108 | - |
109 | -var _ = Suite(&acceptanceSuite{}) |
110 | - |
111 | -var serverCmd = flag.String("server", "", "server to test") |
112 | -var serverAuxCfg = flag.String("auxcfg", "", "auxiliary config for the server") |
113 | - |
114 | -func testServerConfig(addr, httpAddr string) map[string]interface{} { |
115 | - cfg := make(map[string]interface{}) |
116 | - FillServerConfig(cfg, addr) |
117 | - FillHTTPServerConfig(cfg, httpAddr) |
118 | - return cfg |
119 | -} |
120 | - |
121 | -func testClientSession(addr string, deviceId string, reportPings bool) *ClientSession { |
122 | - certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("config/testing.cert")) |
123 | - if err != nil { |
124 | - panic(fmt.Sprintf("could not read config/testing.cert: %v", err)) |
125 | - } |
126 | - return &ClientSession{ |
127 | - ExchangeTimeout: 100 * time.Millisecond, |
128 | - ServerAddr: addr, |
129 | - CertPEMBlock: certPEMBlock, |
130 | - DeviceId: deviceId, |
131 | - ReportPings: reportPings, |
132 | - } |
133 | -} |
134 | - |
135 | -// start a new server for each test |
136 | -func (s *acceptanceSuite) SetUpTest(c *C) { |
137 | - if *serverCmd == "" { |
138 | - c.Skip("executable server not specified") |
139 | - } |
140 | - tmpDir := c.MkDir() |
141 | - cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0") |
142 | - cfgFilename := WriteConfig(c, tmpDir, "config.json", cfg) |
143 | - cfgs := append(strings.Fields(*serverAuxCfg), cfgFilename) |
144 | - serverEvents, killServer := RunAndObserve(c, *serverCmd, cfgs...) |
145 | - s.serverKill = killServer |
146 | - serverHTTPAddr := ExtractListeningAddr(c, serverEvents, HTTPListeningOnPat) |
147 | - s.serverURL = fmt.Sprintf("http://%s", serverHTTPAddr) |
148 | - s.serverAddr = ExtractListeningAddr(c, serverEvents, DevListeningOnPat) |
149 | - s.serverEvents = serverEvents |
150 | - s.httpClient = &http.Client{} |
151 | -} |
152 | - |
153 | -func (s *acceptanceSuite) TearDownTest(c *C) { |
154 | - if s.serverKill != nil { |
155 | - s.serverKill() |
156 | - } |
157 | -} |
158 | - |
159 | -// Tests about connection, ping-pong, disconnection scenarios |
160 | - |
161 | -// typically combined with -gocheck.vv or test selection |
162 | -var logTraffic = flag.Bool("logTraffic", false, "log traffic") |
163 | - |
164 | -type connInterceptor func(ic *interceptingConn, op string, b []byte) (bool, int, error) |
165 | - |
166 | -type interceptingConn struct { |
167 | - net.Conn |
168 | - totalRead int |
169 | - totalWritten int |
170 | - intercept connInterceptor |
171 | -} |
172 | - |
173 | -func (ic *interceptingConn) Write(b []byte) (n int, err error) { |
174 | - done := false |
175 | - before := ic.totalWritten |
176 | - if ic.intercept != nil { |
177 | - done, n, err = ic.intercept(ic, "write", b) |
178 | - } |
179 | - if !done { |
180 | - n, err = ic.Conn.Write(b) |
181 | - } |
182 | - ic.totalWritten += n |
183 | - if *logTraffic { |
184 | - fmt.Printf("W[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalWritten) |
185 | - } |
186 | - return |
187 | -} |
188 | - |
189 | -func (ic *interceptingConn) Read(b []byte) (n int, err error) { |
190 | - done := false |
191 | - before := ic.totalRead |
192 | - if ic.intercept != nil { |
193 | - done, n, err = ic.intercept(ic, "read", b) |
194 | - } |
195 | - if !done { |
196 | - n, err = ic.Conn.Read(b) |
197 | - } |
198 | - ic.totalRead += n |
199 | - if *logTraffic { |
200 | - fmt.Printf("R[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalRead) |
201 | - } |
202 | - return |
203 | -} |
204 | - |
205 | -func (s *acceptanceSuite) TestConnectPingPing(c *C) { |
206 | - errCh := make(chan error, 1) |
207 | - events := make(chan string, 10) |
208 | - sess := testClientSession(s.serverAddr, "DEVA", true) |
209 | - err := sess.Dial() |
210 | - c.Assert(err, IsNil) |
211 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
212 | - // would be 3rd ping read, based on logged traffic |
213 | - if op == "read" && ic.totalRead >= 79 { |
214 | - // exit the sess.Run() goroutine, client will close |
215 | - runtime.Goexit() |
216 | - } |
217 | - return false, 0, nil |
218 | - } |
219 | - sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
220 | - go func() { |
221 | - errCh <- sess.Run(events) |
222 | - }() |
223 | - connectCli := NextEvent(events, errCh) |
224 | - connectSrv := NextEvent(s.serverEvents, nil) |
225 | - registeredSrv := NextEvent(s.serverEvents, nil) |
226 | - tconnect := time.Now() |
227 | - c.Assert(connectSrv, Matches, ".*session.* connected .*") |
228 | - c.Assert(registeredSrv, Matches, ".*session.* registered DEVA") |
229 | - c.Assert(strings.HasSuffix(connectSrv, connectCli), Equals, true) |
230 | - c.Assert(NextEvent(events, errCh), Equals, "Ping") |
231 | - elapsedOfPing := float64(time.Since(tconnect)) / float64(500*time.Millisecond) |
232 | - c.Check(elapsedOfPing >= 1.0, Equals, true) |
233 | - c.Check(elapsedOfPing < 1.05, Equals, true) |
234 | - c.Assert(NextEvent(events, errCh), Equals, "Ping") |
235 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF") |
236 | - c.Check(len(errCh), Equals, 0) |
237 | -} |
238 | - |
239 | -func (s *acceptanceSuite) TestConnectPingNeverPong(c *C) { |
240 | - errCh := make(chan error, 1) |
241 | - events := make(chan string, 10) |
242 | - sess := testClientSession(s.serverAddr, "DEVB", true) |
243 | - err := sess.Dial() |
244 | - c.Assert(err, IsNil) |
245 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
246 | - // would be pong to 2nd ping, based on logged traffic |
247 | - if op == "write" && ic.totalRead >= 67 { |
248 | - time.Sleep(200 * time.Millisecond) |
249 | - // exit the sess.Run() goroutine, client will close |
250 | - runtime.Goexit() |
251 | - } |
252 | - return false, 0, nil |
253 | - } |
254 | - sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
255 | - go func() { |
256 | - errCh <- sess.Run(events) |
257 | - }() |
258 | - c.Assert(NextEvent(events, errCh), Matches, "connected .*") |
259 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") |
260 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*") |
261 | - c.Assert(NextEvent(events, errCh), Equals, "Ping") |
262 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`) |
263 | - c.Check(len(errCh), Equals, 0) |
264 | -} |
265 | - |
266 | -// Tests about broadcast |
267 | - |
268 | +// BroadCastAcceptanceSuite has tests about broadcast. |
269 | +type BroadcastAcceptanceSuite struct { |
270 | + AcceptanceSuite |
271 | +} |
272 | + |
273 | +// Long after the end of the tests. |
274 | var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339) |
275 | |
276 | -func (s *acceptanceSuite) postRequest(path string, message interface{}) (string, error) { |
277 | - packedMessage, err := json.Marshal(message) |
278 | - if err != nil { |
279 | - panic(err) |
280 | - } |
281 | - reader := bytes.NewReader(packedMessage) |
282 | - |
283 | - url := s.serverURL + path |
284 | - request, _ := http.NewRequest("POST", url, reader) |
285 | - request.ContentLength = int64(reader.Len()) |
286 | - request.Header.Set("Content-Type", "application/json") |
287 | - |
288 | - resp, err := s.httpClient.Do(request) |
289 | - if err != nil { |
290 | - panic(err) |
291 | - } |
292 | - defer resp.Body.Close() |
293 | - body, err := ioutil.ReadAll(resp.Body) |
294 | - return string(body), err |
295 | -} |
296 | - |
297 | -func (s *acceptanceSuite) startClient(c *C, devId string, intercept connInterceptor, levels map[string]int64) (<-chan string, <-chan error) { |
298 | - errCh := make(chan error, 1) |
299 | - events := make(chan string, 10) |
300 | - sess := testClientSession(s.serverAddr, devId, false) |
301 | - sess.Levels = levels |
302 | - err := sess.Dial() |
303 | - c.Assert(err, IsNil) |
304 | - sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
305 | - go func() { |
306 | - errCh <- sess.Run(events) |
307 | - }() |
308 | - c.Assert(NextEvent(events, errCh), Matches, "connected .*") |
309 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") |
310 | - c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId) |
311 | - return events, errCh |
312 | -} |
313 | - |
314 | -func (s *acceptanceSuite) TestBroadcastToConnected(c *C) { |
315 | - clientShutdown := make(chan bool, 1) // abused as an atomic flag |
316 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
317 | - // read after ack |
318 | - if op == "read" && len(clientShutdown) > 0 { |
319 | - // exit the sess.Run() goroutine, client will close |
320 | - runtime.Goexit() |
321 | - } |
322 | - return false, 0, nil |
323 | - } |
324 | - events, errCh := s.startClient(c, "DEVB", intercept, nil) |
325 | +func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) { |
326 | + events, errCh, stop := s.startClient(c, "DEVB", nil) |
327 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
328 | Channel: "system", |
329 | ExpireOn: future, |
330 | @@ -264,12 +46,12 @@ |
331 | c.Assert(err, IsNil) |
332 | c.Assert(got, Matches, ".*ok.*") |
333 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
334 | - clientShutdown <- true |
335 | + stop() |
336 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
337 | c.Check(len(errCh), Equals, 0) |
338 | } |
339 | |
340 | -func (s *acceptanceSuite) TestBroadcastPending(c *C) { |
341 | +func (s *BroadcastAcceptanceSuite) TestBroadcastPending(c *C) { |
342 | // send broadcast that will be pending |
343 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
344 | Channel: "system", |
345 | @@ -279,24 +61,15 @@ |
346 | c.Assert(err, IsNil) |
347 | c.Assert(got, Matches, ".*ok.*") |
348 | |
349 | - clientShutdown := make(chan bool, 1) // abused as an atomic flag |
350 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
351 | - // read after ack |
352 | - if op == "read" && len(clientShutdown) > 0 { |
353 | - // exit the sess.Run() goroutine, client will close |
354 | - runtime.Goexit() |
355 | - } |
356 | - return false, 0, nil |
357 | - } |
358 | - events, errCh := s.startClient(c, "DEVB", intercept, nil) |
359 | + events, errCh, stop := s.startClient(c, "DEVB", nil) |
360 | // gettting pending on connect |
361 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
362 | - clientShutdown <- true |
363 | + stop() |
364 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
365 | c.Check(len(errCh), Equals, 0) |
366 | } |
367 | |
368 | -func (s *acceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) { |
369 | +func (s *BroadcastAcceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) { |
370 | // send bunch of broadcasts that will be pending |
371 | payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
372 | for i := 0; i < 32; i++ { |
373 | @@ -309,38 +82,20 @@ |
374 | c.Assert(got, Matches, ".*ok.*") |
375 | } |
376 | |
377 | - clientShutdown := make(chan bool, 1) // abused as an atomic flag |
378 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
379 | - // read after ack |
380 | - if op == "read" && len(clientShutdown) > 0 { |
381 | - // exit the sess.Run() goroutine, client will close |
382 | - runtime.Goexit() |
383 | - } |
384 | - return false, 0, nil |
385 | - } |
386 | - events, errCh := s.startClient(c, "DEVC", intercept, nil) |
387 | + events, errCh, stop := s.startClient(c, "DEVC", nil) |
388 | // gettting pending on connect |
389 | c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`) |
390 | c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`) |
391 | - clientShutdown <- true |
392 | + stop() |
393 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
394 | c.Check(len(errCh), Equals, 0) |
395 | } |
396 | |
397 | -func (s *acceptanceSuite) TestBroadcastDistribution2(c *C) { |
398 | - clientShutdown := make(chan bool, 1) // abused as an atomic flag |
399 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
400 | - // read after ack |
401 | - if op == "read" && len(clientShutdown) > 0 { |
402 | - // exit the sess.Run() goroutine, client will close |
403 | - runtime.Goexit() |
404 | - } |
405 | - return false, 0, nil |
406 | - } |
407 | +func (s *BroadcastAcceptanceSuite) TestBroadcastDistribution2(c *C) { |
408 | // start 1st clinet |
409 | - events1, errCh1 := s.startClient(c, "DEV1", intercept, nil) |
410 | + events1, errCh1, stop1 := s.startClient(c, "DEV1", nil) |
411 | // start 2nd client |
412 | - events2, errCh2 := s.startClient(c, "DEV2", intercept, nil) |
413 | + events2, errCh2, stop2 := s.startClient(c, "DEV2", nil) |
414 | // broadcast |
415 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
416 | Channel: "system", |
417 | @@ -351,24 +106,16 @@ |
418 | c.Assert(got, Matches, ".*ok.*") |
419 | c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
420 | c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
421 | - clientShutdown <- true |
422 | + stop1() |
423 | + stop2() |
424 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
425 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
426 | c.Check(len(errCh1), Equals, 0) |
427 | c.Check(len(errCh2), Equals, 0) |
428 | } |
429 | |
430 | -func (s *acceptanceSuite) TestBroadcastFilterByLevel(c *C) { |
431 | - clientShutdown := make(chan bool, 1) // abused as an atomic flag |
432 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
433 | - // read after ack |
434 | - if op == "read" && len(clientShutdown) > 0 { |
435 | - // exit the sess.Run() goroutine, client will close |
436 | - runtime.Goexit() |
437 | - } |
438 | - return false, 0, nil |
439 | - } |
440 | - events, errCh := s.startClient(c, "DEVD", intercept, nil) |
441 | +func (s *BroadcastAcceptanceSuite) TestBroadcastFilterByLevel(c *C) { |
442 | + events, errCh, stop := s.startClient(c, "DEVD", nil) |
443 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
444 | Channel: "system", |
445 | ExpireOn: future, |
446 | @@ -377,7 +124,7 @@ |
447 | c.Assert(err, IsNil) |
448 | c.Assert(got, Matches, ".*ok.*") |
449 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
450 | - clientShutdown <- true |
451 | + stop() |
452 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
453 | c.Check(len(errCh), Equals, 0) |
454 | // another broadcast |
455 | @@ -389,17 +136,16 @@ |
456 | c.Assert(err, IsNil) |
457 | c.Assert(got, Matches, ".*ok.*") |
458 | // reconnect, provide levels, get only later notification |
459 | - <-clientShutdown // reset |
460 | - events, errCh = s.startClient(c, "DEVD", intercept, map[string]int64{ |
461 | + events, errCh, stop = s.startClient(c, "DEVD", map[string]int64{ |
462 | protocol.SystemChannelId: 1, |
463 | }) |
464 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) |
465 | - clientShutdown <- true |
466 | + stop() |
467 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
468 | c.Check(len(errCh), Equals, 0) |
469 | } |
470 | |
471 | -func (s *acceptanceSuite) TestBroadcastTooAhead(c *C) { |
472 | +func (s *BroadcastAcceptanceSuite) TestBroadcastTooAhead(c *C) { |
473 | // send broadcasts that will be pending |
474 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
475 | Channel: "system", |
476 | @@ -416,47 +162,29 @@ |
477 | c.Assert(err, IsNil) |
478 | c.Assert(got, Matches, ".*ok.*") |
479 | |
480 | - clientShutdown := make(chan bool, 1) // abused as an atomic flag |
481 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
482 | - // read after ack |
483 | - if op == "read" && len(clientShutdown) > 0 { |
484 | - // exit the sess.Run() goroutine, client will close |
485 | - runtime.Goexit() |
486 | - } |
487 | - return false, 0, nil |
488 | - } |
489 | - events, errCh := s.startClient(c, "DEVB", intercept, map[string]int64{ |
490 | + events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{ |
491 | protocol.SystemChannelId: 10, |
492 | }) |
493 | // gettting last one pending on connect |
494 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) |
495 | - clientShutdown <- true |
496 | + stop() |
497 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
498 | c.Check(len(errCh), Equals, 0) |
499 | } |
500 | |
501 | -func (s *acceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) { |
502 | +func (s *BroadcastAcceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) { |
503 | // nothing there |
504 | - clientShutdown := make(chan bool, 1) // abused as an atomic flag |
505 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
506 | - // read after ack |
507 | - if op == "read" && len(clientShutdown) > 0 { |
508 | - // exit the sess.Run() goroutine, client will close |
509 | - runtime.Goexit() |
510 | - } |
511 | - return false, 0, nil |
512 | - } |
513 | - events, errCh := s.startClient(c, "DEVB", intercept, map[string]int64{ |
514 | + events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{ |
515 | protocol.SystemChannelId: 10, |
516 | }) |
517 | // gettting empty pending on connect |
518 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`) |
519 | - clientShutdown <- true |
520 | + stop() |
521 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
522 | c.Check(len(errCh), Equals, 0) |
523 | } |
524 | |
525 | -func (s *acceptanceSuite) TestBroadcastWayBehind(c *C) { |
526 | +func (s *BroadcastAcceptanceSuite) TestBroadcastWayBehind(c *C) { |
527 | // send broadcasts that will be pending |
528 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
529 | Channel: "system", |
530 | @@ -473,21 +201,12 @@ |
531 | c.Assert(err, IsNil) |
532 | c.Assert(got, Matches, ".*ok.*") |
533 | |
534 | - clientShutdown := make(chan bool, 1) // abused as an atomic flag |
535 | - intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
536 | - // read after ack |
537 | - if op == "read" && len(clientShutdown) > 0 { |
538 | - // exit the sess.Run() goroutine, client will close |
539 | - runtime.Goexit() |
540 | - } |
541 | - return false, 0, nil |
542 | - } |
543 | - events, errCh := s.startClient(c, "DEVB", intercept, map[string]int64{ |
544 | + events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{ |
545 | protocol.SystemChannelId: -10, |
546 | }) |
547 | // gettting pending on connect |
548 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`) |
549 | - clientShutdown <- true |
550 | + stop() |
551 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
552 | c.Check(len(errCh), Equals, 0) |
553 | } |
554 | |
555 | === renamed file 'server/acceptance/acceptance_helpers.go' => 'server/acceptance/suites/helpers.go' |
556 | --- server/acceptance/acceptance_helpers.go 2014-02-11 20:40:06 +0000 |
557 | +++ server/acceptance/suites/helpers.go 2014-02-11 20:40:06 +0000 |
558 | @@ -14,7 +14,7 @@ |
559 | with this program. If not, see <http://www.gnu.org/licenses/>. |
560 | */ |
561 | |
562 | -package acceptance |
563 | +package suites |
564 | |
565 | import ( |
566 | "bufio" |
567 | @@ -48,8 +48,8 @@ |
568 | "session_queue_size": 10, |
569 | "broker_queue_size": 100, |
570 | "addr": addr, |
571 | - "key_pem_file": helpers.SourceRelative("config/testing.key"), |
572 | - "cert_pem_file": helpers.SourceRelative("config/testing.cert"), |
573 | + "key_pem_file": helpers.SourceRelative("../config/testing.key"), |
574 | + "cert_pem_file": helpers.SourceRelative("../config/testing.cert"), |
575 | }) |
576 | } |
577 | |
578 | |
579 | === added file 'server/acceptance/suites/pingpong.go' |
580 | --- server/acceptance/suites/pingpong.go 1970-01-01 00:00:00 +0000 |
581 | +++ server/acceptance/suites/pingpong.go 2014-02-11 20:40:06 +0000 |
582 | @@ -0,0 +1,93 @@ |
583 | +/* |
584 | + Copyright 2013-2014 Canonical Ltd. |
585 | + |
586 | + This program is free software: you can redistribute it and/or modify it |
587 | + under the terms of the GNU General Public License version 3, as published |
588 | + by the Free Software Foundation. |
589 | + |
590 | + This program is distributed in the hope that it will be useful, but |
591 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
592 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
593 | + PURPOSE. See the GNU General Public License for more details. |
594 | + |
595 | + You should have received a copy of the GNU General Public License along |
596 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
597 | +*/ |
598 | + |
599 | +package suites |
600 | + |
601 | +import ( |
602 | + "runtime" |
603 | + "strings" |
604 | + "time" |
605 | + |
606 | + . "launchpad.net/gocheck" |
607 | +) |
608 | + |
609 | +// PingPongAcceptanceSuite has tests about connectivity and ping-pong requests. |
610 | +type PingPongAcceptanceSuite struct { |
611 | + AcceptanceSuite |
612 | +} |
613 | + |
614 | +// Tests about connection, ping-pong, disconnection scenarios |
615 | + |
616 | +func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) { |
617 | + errCh := make(chan error, 1) |
618 | + events := make(chan string, 10) |
619 | + sess := testClientSession(s.serverAddr, "DEVA", true) |
620 | + err := sess.Dial() |
621 | + c.Assert(err, IsNil) |
622 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
623 | + // would be 3rd ping read, based on logged traffic |
624 | + if op == "read" && ic.totalRead >= 79 { |
625 | + // exit the sess.Run() goroutine, client will close |
626 | + runtime.Goexit() |
627 | + } |
628 | + return false, 0, nil |
629 | + } |
630 | + sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
631 | + go func() { |
632 | + errCh <- sess.Run(events) |
633 | + }() |
634 | + connectCli := NextEvent(events, errCh) |
635 | + connectSrv := NextEvent(s.serverEvents, nil) |
636 | + registeredSrv := NextEvent(s.serverEvents, nil) |
637 | + tconnect := time.Now() |
638 | + c.Assert(connectSrv, Matches, ".*session.* connected .*") |
639 | + c.Assert(registeredSrv, Matches, ".*session.* registered DEVA") |
640 | + c.Assert(strings.HasSuffix(connectSrv, connectCli), Equals, true) |
641 | + c.Assert(NextEvent(events, errCh), Equals, "Ping") |
642 | + elapsedOfPing := float64(time.Since(tconnect)) / float64(500*time.Millisecond) |
643 | + c.Check(elapsedOfPing >= 1.0, Equals, true) |
644 | + c.Check(elapsedOfPing < 1.05, Equals, true) |
645 | + c.Assert(NextEvent(events, errCh), Equals, "Ping") |
646 | + c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF") |
647 | + c.Check(len(errCh), Equals, 0) |
648 | +} |
649 | + |
650 | +func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) { |
651 | + errCh := make(chan error, 1) |
652 | + events := make(chan string, 10) |
653 | + sess := testClientSession(s.serverAddr, "DEVB", true) |
654 | + err := sess.Dial() |
655 | + c.Assert(err, IsNil) |
656 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
657 | + // would be pong to 2nd ping, based on logged traffic |
658 | + if op == "write" && ic.totalRead >= 67 { |
659 | + time.Sleep(200 * time.Millisecond) |
660 | + // exit the sess.Run() goroutine, client will close |
661 | + runtime.Goexit() |
662 | + } |
663 | + return false, 0, nil |
664 | + } |
665 | + sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
666 | + go func() { |
667 | + errCh <- sess.Run(events) |
668 | + }() |
669 | + c.Assert(NextEvent(events, errCh), Matches, "connected .*") |
670 | + c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") |
671 | + c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*") |
672 | + c.Assert(NextEvent(events, errCh), Equals, "Ping") |
673 | + c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`) |
674 | + c.Check(len(errCh), Equals, 0) |
675 | +} |
676 | |
677 | === added file 'server/acceptance/suites/suite.go' |
678 | --- server/acceptance/suites/suite.go 1970-01-01 00:00:00 +0000 |
679 | +++ server/acceptance/suites/suite.go 2014-02-11 20:40:06 +0000 |
680 | @@ -0,0 +1,170 @@ |
681 | +/* |
682 | + Copyright 2013-2014 Canonical Ltd. |
683 | + |
684 | + This program is free software: you can redistribute it and/or modify it |
685 | + under the terms of the GNU General Public License version 3, as published |
686 | + by the Free Software Foundation. |
687 | + |
688 | + This program is distributed in the hope that it will be useful, but |
689 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
690 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
691 | + PURPOSE. See the GNU General Public License for more details. |
692 | + |
693 | + You should have received a copy of the GNU General Public License along |
694 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
695 | +*/ |
696 | + |
697 | +// Package suites contains reusable acceptance test suites. |
698 | +package suites |
699 | + |
700 | +import ( |
701 | + "bytes" |
702 | + "encoding/json" |
703 | + "flag" |
704 | + "fmt" |
705 | + "io/ioutil" |
706 | + "net" |
707 | + "net/http" |
708 | + "runtime" |
709 | + "time" |
710 | + |
711 | + . "launchpad.net/gocheck" |
712 | + |
713 | + "launchpad.net/ubuntu-push/server/acceptance" |
714 | + helpers "launchpad.net/ubuntu-push/testing" |
715 | +) |
716 | + |
717 | +// AcceptanceSuite has the basic functionality of the acceptance suites. |
718 | +type AcceptanceSuite struct { |
719 | + // hook to start the server(s) |
720 | + StartServer func(c *C) (logs <-chan string, kill func(), serverAddr, apiURL string) |
721 | + // running bits |
722 | + serverKill func() |
723 | + serverAddr string |
724 | + serverAPIURL string |
725 | + serverEvents <-chan string |
726 | + httpClient *http.Client |
727 | +} |
728 | + |
729 | +// Start a new server for each test. |
730 | +func (s *AcceptanceSuite) SetUpTest(c *C) { |
731 | + logs, kill, addr, url := s.StartServer(c) |
732 | + s.serverEvents = logs |
733 | + s.serverKill = kill |
734 | + s.serverAddr = addr |
735 | + s.serverAPIURL = url |
736 | + s.httpClient = &http.Client{} |
737 | +} |
738 | + |
739 | +func (s *AcceptanceSuite) TearDownTest(c *C) { |
740 | + if s.serverKill != nil { |
741 | + s.serverKill() |
742 | + } |
743 | +} |
744 | + |
745 | +// Post a request. |
746 | +func (s *AcceptanceSuite) postRequest(path string, message interface{}) (string, error) { |
747 | + packedMessage, err := json.Marshal(message) |
748 | + if err != nil { |
749 | + panic(err) |
750 | + } |
751 | + reader := bytes.NewReader(packedMessage) |
752 | + |
753 | + url := s.serverAPIURL + path |
754 | + request, _ := http.NewRequest("POST", url, reader) |
755 | + request.ContentLength = int64(reader.Len()) |
756 | + request.Header.Set("Content-Type", "application/json") |
757 | + |
758 | + resp, err := s.httpClient.Do(request) |
759 | + if err != nil { |
760 | + panic(err) |
761 | + } |
762 | + defer resp.Body.Close() |
763 | + body, err := ioutil.ReadAll(resp.Body) |
764 | + return string(body), err |
765 | +} |
766 | + |
767 | +func testClientSession(addr string, deviceId string, reportPings bool) *acceptance.ClientSession { |
768 | + certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("../config/testing.cert")) |
769 | + if err != nil { |
770 | + panic(fmt.Sprintf("could not read config/testing.cert: %v", err)) |
771 | + } |
772 | + return &acceptance.ClientSession{ |
773 | + ExchangeTimeout: 100 * time.Millisecond, |
774 | + ServerAddr: addr, |
775 | + CertPEMBlock: certPEMBlock, |
776 | + DeviceId: deviceId, |
777 | + ReportPings: reportPings, |
778 | + } |
779 | +} |
780 | + |
781 | +// typically combined with -gocheck.vv or test selection |
782 | +var logTraffic = flag.Bool("logTraffic", false, "log traffic") |
783 | + |
784 | +type connInterceptor func(ic *interceptingConn, op string, b []byte) (bool, int, error) |
785 | + |
786 | +type interceptingConn struct { |
787 | + net.Conn |
788 | + totalRead int |
789 | + totalWritten int |
790 | + intercept connInterceptor |
791 | +} |
792 | + |
793 | +func (ic *interceptingConn) Write(b []byte) (n int, err error) { |
794 | + done := false |
795 | + before := ic.totalWritten |
796 | + if ic.intercept != nil { |
797 | + done, n, err = ic.intercept(ic, "write", b) |
798 | + } |
799 | + if !done { |
800 | + n, err = ic.Conn.Write(b) |
801 | + } |
802 | + ic.totalWritten += n |
803 | + if *logTraffic { |
804 | + fmt.Printf("W[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalWritten) |
805 | + } |
806 | + return |
807 | +} |
808 | + |
809 | +func (ic *interceptingConn) Read(b []byte) (n int, err error) { |
810 | + done := false |
811 | + before := ic.totalRead |
812 | + if ic.intercept != nil { |
813 | + done, n, err = ic.intercept(ic, "read", b) |
814 | + } |
815 | + if !done { |
816 | + n, err = ic.Conn.Read(b) |
817 | + } |
818 | + ic.totalRead += n |
819 | + if *logTraffic { |
820 | + fmt.Printf("R[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalRead) |
821 | + } |
822 | + return |
823 | +} |
824 | + |
825 | +// Start a client. |
826 | +func (s *AcceptanceSuite) startClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) { |
827 | + errCh := make(chan error, 1) |
828 | + cliEvents := make(chan string, 10) |
829 | + sess := testClientSession(s.serverAddr, devId, false) |
830 | + sess.Levels = levels |
831 | + err := sess.Dial() |
832 | + c.Assert(err, IsNil) |
833 | + clientShutdown := make(chan bool, 1) // abused as an atomic flag |
834 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
835 | + // read after ack |
836 | + if op == "read" && len(clientShutdown) > 0 { |
837 | + // exit the sess.Run() goroutine, client will close |
838 | + runtime.Goexit() |
839 | + } |
840 | + return false, 0, nil |
841 | + } |
842 | + sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
843 | + go func() { |
844 | + errCh <- sess.Run(cliEvents) |
845 | + }() |
846 | + c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*") |
847 | + c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") |
848 | + c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId) |
849 | + return cliEvents, errCh, func() { clientShutdown <- true } |
850 | +} |
Thank you!