Merge lp:~pedronis/ubuntu-push/acceptance-flex-2 into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 69
Merged at revision: 66
Proposed branch: lp:~pedronis/ubuntu-push/acceptance-flex-2
Merge into: lp:ubuntu-push
Prerequisite: lp:~pedronis/ubuntu-push/acceptance-flex-1
Diff against target: 850 lines (+363/-319)
5 files modified
server/acceptance/acceptance_test.go (+62/-0)
server/acceptance/suites/broadcast.go (+35/-316)
server/acceptance/suites/helpers.go (+3/-3)
server/acceptance/suites/pingpong.go (+93/-0)
server/acceptance/suites/suite.go (+170/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/acceptance-flex-2
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+205847@code.launchpad.net

Commit message

restructure acceptance tests into reusable suites

Description of the change

restructure acceptance tests into reusable suites

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

Thank you!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'server/acceptance/acceptance_test.go'
--- server/acceptance/acceptance_test.go 1970-01-01 00:00:00 +0000
+++ server/acceptance/acceptance_test.go 2014-02-11 20:40:06 +0000
@@ -0,0 +1,62 @@
1/*
2 Copyright 2013-2014 Canonical Ltd.
3
4 This program is free software: you can redistribute it and/or modify it
5 under the terms of the GNU General Public License version 3, as published
6 by the Free Software Foundation.
7
8 This program is distributed in the hope that it will be useful, but
9 WITHOUT ANY WARRANTY; without even the implied warranties of
10 MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along
14 with this program. If not, see <http://www.gnu.org/licenses/>.
15*/
16
17package acceptance_test
18
19import (
20 "flag"
21 "fmt"
22 "strings"
23 "testing"
24
25 . "launchpad.net/gocheck"
26
27 "launchpad.net/ubuntu-push/server/acceptance/suites"
28)
29
30func TestAcceptance(t *testing.T) { TestingT(t) }
31
32var serverCmd = flag.String("server", "", "server to test")
33var serverAuxCfg = flag.String("auxcfg", "", "auxiliary config for the server")
34
35func testServerConfig(addr, httpAddr string) map[string]interface{} {
36 cfg := make(map[string]interface{})
37 suites.FillServerConfig(cfg, addr)
38 suites.FillHTTPServerConfig(cfg, httpAddr)
39 return cfg
40}
41
42// Start a server.
43func StartServer(c *C) (<-chan string, func(), string, string) {
44 if *serverCmd == "" {
45 c.Skip("executable server not specified")
46 }
47 tmpDir := c.MkDir()
48 cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0")
49 cfgFilename := suites.WriteConfig(c, tmpDir, "config.json", cfg)
50 cfgs := append(strings.Fields(*serverAuxCfg), cfgFilename)
51 logs, killServer := suites.RunAndObserve(c, *serverCmd, cfgs...)
52 serverHTTPAddr := suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat)
53 serverURL := fmt.Sprintf("http://%s", serverHTTPAddr)
54 serverAddr := suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat)
55 return logs, killServer, serverAddr, serverURL
56}
57
58// ping pong/connectivity
59var _ = Suite(&suites.PingPongAcceptanceSuite{suites.AcceptanceSuite{StartServer: StartServer}})
60
61// broadcast
62var _ = Suite(&suites.BroadcastAcceptanceSuite{suites.AcceptanceSuite{StartServer: StartServer}})
063
=== added directory 'server/acceptance/suites'
=== renamed file 'server/acceptance/acceptance_test.go' => 'server/acceptance/suites/broadcast.go'
--- server/acceptance/acceptance_test.go 2014-02-11 20:40:06 +0000
+++ server/acceptance/suites/broadcast.go 2014-02-11 20:40:06 +0000
@@ -14,248 +14,30 @@
14 with this program. If not, see <http://www.gnu.org/licenses/>.14 with this program. If not, see <http://www.gnu.org/licenses/>.
15*/15*/
1616
17package acceptance17package suites
1818
19import (19import (
20 "bytes"
21 "encoding/json"20 "encoding/json"
22 "flag"
23 "fmt"21 "fmt"
24 "io/ioutil"
25 "net"
26 "net/http"
27 "runtime"
28 "strings"22 "strings"
29 "testing"
30 "time"23 "time"
3124
32 . "launchpad.net/gocheck"25 . "launchpad.net/gocheck"
3326
34 "launchpad.net/ubuntu-push/protocol"27 "launchpad.net/ubuntu-push/protocol"
35 "launchpad.net/ubuntu-push/server/api"28 "launchpad.net/ubuntu-push/server/api"
36 helpers "launchpad.net/ubuntu-push/testing"
37)29)
3830
39func TestAcceptance(t *testing.T) { TestingT(t) }31// BroadCastAcceptanceSuite has tests about broadcast.
4032type BroadcastAcceptanceSuite struct {
41type acceptanceSuite struct {33 AcceptanceSuite
42 serverKill func()34}
43 serverAddr string35
44 serverURL string36// Long after the end of the tests.
45 serverEvents <-chan string
46 httpClient *http.Client
47}
48
49var _ = Suite(&acceptanceSuite{})
50
51var serverCmd = flag.String("server", "", "server to test")
52var serverAuxCfg = flag.String("auxcfg", "", "auxiliary config for the server")
53
54func testServerConfig(addr, httpAddr string) map[string]interface{} {
55 cfg := make(map[string]interface{})
56 FillServerConfig(cfg, addr)
57 FillHTTPServerConfig(cfg, httpAddr)
58 return cfg
59}
60
61func testClientSession(addr string, deviceId string, reportPings bool) *ClientSession {
62 certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("config/testing.cert"))
63 if err != nil {
64 panic(fmt.Sprintf("could not read config/testing.cert: %v", err))
65 }
66 return &ClientSession{
67 ExchangeTimeout: 100 * time.Millisecond,
68 ServerAddr: addr,
69 CertPEMBlock: certPEMBlock,
70 DeviceId: deviceId,
71 ReportPings: reportPings,
72 }
73}
74
75// start a new server for each test
76func (s *acceptanceSuite) SetUpTest(c *C) {
77 if *serverCmd == "" {
78 c.Skip("executable server not specified")
79 }
80 tmpDir := c.MkDir()
81 cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0")
82 cfgFilename := WriteConfig(c, tmpDir, "config.json", cfg)
83 cfgs := append(strings.Fields(*serverAuxCfg), cfgFilename)
84 serverEvents, killServer := RunAndObserve(c, *serverCmd, cfgs...)
85 s.serverKill = killServer
86 serverHTTPAddr := ExtractListeningAddr(c, serverEvents, HTTPListeningOnPat)
87 s.serverURL = fmt.Sprintf("http://%s", serverHTTPAddr)
88 s.serverAddr = ExtractListeningAddr(c, serverEvents, DevListeningOnPat)
89 s.serverEvents = serverEvents
90 s.httpClient = &http.Client{}
91}
92
93func (s *acceptanceSuite) TearDownTest(c *C) {
94 if s.serverKill != nil {
95 s.serverKill()
96 }
97}
98
99// Tests about connection, ping-pong, disconnection scenarios
100
101// typically combined with -gocheck.vv or test selection
102var logTraffic = flag.Bool("logTraffic", false, "log traffic")
103
104type connInterceptor func(ic *interceptingConn, op string, b []byte) (bool, int, error)
105
106type interceptingConn struct {
107 net.Conn
108 totalRead int
109 totalWritten int
110 intercept connInterceptor
111}
112
113func (ic *interceptingConn) Write(b []byte) (n int, err error) {
114 done := false
115 before := ic.totalWritten
116 if ic.intercept != nil {
117 done, n, err = ic.intercept(ic, "write", b)
118 }
119 if !done {
120 n, err = ic.Conn.Write(b)
121 }
122 ic.totalWritten += n
123 if *logTraffic {
124 fmt.Printf("W[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalWritten)
125 }
126 return
127}
128
129func (ic *interceptingConn) Read(b []byte) (n int, err error) {
130 done := false
131 before := ic.totalRead
132 if ic.intercept != nil {
133 done, n, err = ic.intercept(ic, "read", b)
134 }
135 if !done {
136 n, err = ic.Conn.Read(b)
137 }
138 ic.totalRead += n
139 if *logTraffic {
140 fmt.Printf("R[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalRead)
141 }
142 return
143}
144
145func (s *acceptanceSuite) TestConnectPingPing(c *C) {
146 errCh := make(chan error, 1)
147 events := make(chan string, 10)
148 sess := testClientSession(s.serverAddr, "DEVA", true)
149 err := sess.Dial()
150 c.Assert(err, IsNil)
151 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
152 // would be 3rd ping read, based on logged traffic
153 if op == "read" && ic.totalRead >= 79 {
154 // exit the sess.Run() goroutine, client will close
155 runtime.Goexit()
156 }
157 return false, 0, nil
158 }
159 sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
160 go func() {
161 errCh <- sess.Run(events)
162 }()
163 connectCli := NextEvent(events, errCh)
164 connectSrv := NextEvent(s.serverEvents, nil)
165 registeredSrv := NextEvent(s.serverEvents, nil)
166 tconnect := time.Now()
167 c.Assert(connectSrv, Matches, ".*session.* connected .*")
168 c.Assert(registeredSrv, Matches, ".*session.* registered DEVA")
169 c.Assert(strings.HasSuffix(connectSrv, connectCli), Equals, true)
170 c.Assert(NextEvent(events, errCh), Equals, "Ping")
171 elapsedOfPing := float64(time.Since(tconnect)) / float64(500*time.Millisecond)
172 c.Check(elapsedOfPing >= 1.0, Equals, true)
173 c.Check(elapsedOfPing < 1.05, Equals, true)
174 c.Assert(NextEvent(events, errCh), Equals, "Ping")
175 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF")
176 c.Check(len(errCh), Equals, 0)
177}
178
179func (s *acceptanceSuite) TestConnectPingNeverPong(c *C) {
180 errCh := make(chan error, 1)
181 events := make(chan string, 10)
182 sess := testClientSession(s.serverAddr, "DEVB", true)
183 err := sess.Dial()
184 c.Assert(err, IsNil)
185 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
186 // would be pong to 2nd ping, based on logged traffic
187 if op == "write" && ic.totalRead >= 67 {
188 time.Sleep(200 * time.Millisecond)
189 // exit the sess.Run() goroutine, client will close
190 runtime.Goexit()
191 }
192 return false, 0, nil
193 }
194 sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
195 go func() {
196 errCh <- sess.Run(events)
197 }()
198 c.Assert(NextEvent(events, errCh), Matches, "connected .*")
199 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")
200 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*")
201 c.Assert(NextEvent(events, errCh), Equals, "Ping")
202 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`)
203 c.Check(len(errCh), Equals, 0)
204}
205
206// Tests about broadcast
207
208var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)37var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)
20938
210func (s *acceptanceSuite) postRequest(path string, message interface{}) (string, error) {39func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) {
211 packedMessage, err := json.Marshal(message)40 events, errCh, stop := s.startClient(c, "DEVB", nil)
212 if err != nil {
213 panic(err)
214 }
215 reader := bytes.NewReader(packedMessage)
216
217 url := s.serverURL + path
218 request, _ := http.NewRequest("POST", url, reader)
219 request.ContentLength = int64(reader.Len())
220 request.Header.Set("Content-Type", "application/json")
221
222 resp, err := s.httpClient.Do(request)
223 if err != nil {
224 panic(err)
225 }
226 defer resp.Body.Close()
227 body, err := ioutil.ReadAll(resp.Body)
228 return string(body), err
229}
230
231func (s *acceptanceSuite) startClient(c *C, devId string, intercept connInterceptor, levels map[string]int64) (<-chan string, <-chan error) {
232 errCh := make(chan error, 1)
233 events := make(chan string, 10)
234 sess := testClientSession(s.serverAddr, devId, false)
235 sess.Levels = levels
236 err := sess.Dial()
237 c.Assert(err, IsNil)
238 sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
239 go func() {
240 errCh <- sess.Run(events)
241 }()
242 c.Assert(NextEvent(events, errCh), Matches, "connected .*")
243 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")
244 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId)
245 return events, errCh
246}
247
248func (s *acceptanceSuite) TestBroadcastToConnected(c *C) {
249 clientShutdown := make(chan bool, 1) // abused as an atomic flag
250 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
251 // read after ack
252 if op == "read" && len(clientShutdown) > 0 {
253 // exit the sess.Run() goroutine, client will close
254 runtime.Goexit()
255 }
256 return false, 0, nil
257 }
258 events, errCh := s.startClient(c, "DEVB", intercept, nil)
259 got, err := s.postRequest("/broadcast", &api.Broadcast{41 got, err := s.postRequest("/broadcast", &api.Broadcast{
260 Channel: "system",42 Channel: "system",
261 ExpireOn: future,43 ExpireOn: future,
@@ -264,12 +46,12 @@
264 c.Assert(err, IsNil)46 c.Assert(err, IsNil)
265 c.Assert(got, Matches, ".*ok.*")47 c.Assert(got, Matches, ".*ok.*")
266 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)48 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
267 clientShutdown <- true49 stop()
268 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)50 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
269 c.Check(len(errCh), Equals, 0)51 c.Check(len(errCh), Equals, 0)
270}52}
27153
272func (s *acceptanceSuite) TestBroadcastPending(c *C) {54func (s *BroadcastAcceptanceSuite) TestBroadcastPending(c *C) {
273 // send broadcast that will be pending55 // send broadcast that will be pending
274 got, err := s.postRequest("/broadcast", &api.Broadcast{56 got, err := s.postRequest("/broadcast", &api.Broadcast{
275 Channel: "system",57 Channel: "system",
@@ -279,24 +61,15 @@
279 c.Assert(err, IsNil)61 c.Assert(err, IsNil)
280 c.Assert(got, Matches, ".*ok.*")62 c.Assert(got, Matches, ".*ok.*")
28163
282 clientShutdown := make(chan bool, 1) // abused as an atomic flag64 events, errCh, stop := s.startClient(c, "DEVB", nil)
283 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
284 // read after ack
285 if op == "read" && len(clientShutdown) > 0 {
286 // exit the sess.Run() goroutine, client will close
287 runtime.Goexit()
288 }
289 return false, 0, nil
290 }
291 events, errCh := s.startClient(c, "DEVB", intercept, nil)
292 // gettting pending on connect65 // gettting pending on connect
293 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)66 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
294 clientShutdown <- true67 stop()
295 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)68 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
296 c.Check(len(errCh), Equals, 0)69 c.Check(len(errCh), Equals, 0)
297}70}
29871
299func (s *acceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) {72func (s *BroadcastAcceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) {
300 // send bunch of broadcasts that will be pending73 // send bunch of broadcasts that will be pending
301 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))74 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
302 for i := 0; i < 32; i++ {75 for i := 0; i < 32; i++ {
@@ -309,38 +82,20 @@
309 c.Assert(got, Matches, ".*ok.*")82 c.Assert(got, Matches, ".*ok.*")
310 }83 }
31184
312 clientShutdown := make(chan bool, 1) // abused as an atomic flag85 events, errCh, stop := s.startClient(c, "DEVC", nil)
313 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
314 // read after ack
315 if op == "read" && len(clientShutdown) > 0 {
316 // exit the sess.Run() goroutine, client will close
317 runtime.Goexit()
318 }
319 return false, 0, nil
320 }
321 events, errCh := s.startClient(c, "DEVC", intercept, nil)
322 // gettting pending on connect86 // gettting pending on connect
323 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`)87 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`)
324 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)88 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)
325 clientShutdown <- true89 stop()
326 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)90 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
327 c.Check(len(errCh), Equals, 0)91 c.Check(len(errCh), Equals, 0)
328}92}
32993
330func (s *acceptanceSuite) TestBroadcastDistribution2(c *C) {94func (s *BroadcastAcceptanceSuite) TestBroadcastDistribution2(c *C) {
331 clientShutdown := make(chan bool, 1) // abused as an atomic flag
332 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
333 // read after ack
334 if op == "read" && len(clientShutdown) > 0 {
335 // exit the sess.Run() goroutine, client will close
336 runtime.Goexit()
337 }
338 return false, 0, nil
339 }
340 // start 1st clinet95 // start 1st clinet
341 events1, errCh1 := s.startClient(c, "DEV1", intercept, nil)96 events1, errCh1, stop1 := s.startClient(c, "DEV1", nil)
342 // start 2nd client97 // start 2nd client
343 events2, errCh2 := s.startClient(c, "DEV2", intercept, nil)98 events2, errCh2, stop2 := s.startClient(c, "DEV2", nil)
344 // broadcast99 // broadcast
345 got, err := s.postRequest("/broadcast", &api.Broadcast{100 got, err := s.postRequest("/broadcast", &api.Broadcast{
346 Channel: "system",101 Channel: "system",
@@ -351,24 +106,16 @@
351 c.Assert(got, Matches, ".*ok.*")106 c.Assert(got, Matches, ".*ok.*")
352 c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)107 c.Check(NextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
353 c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)108 c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
354 clientShutdown <- true109 stop1()
110 stop2()
355 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)111 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
356 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)112 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
357 c.Check(len(errCh1), Equals, 0)113 c.Check(len(errCh1), Equals, 0)
358 c.Check(len(errCh2), Equals, 0)114 c.Check(len(errCh2), Equals, 0)
359}115}
360116
361func (s *acceptanceSuite) TestBroadcastFilterByLevel(c *C) {117func (s *BroadcastAcceptanceSuite) TestBroadcastFilterByLevel(c *C) {
362 clientShutdown := make(chan bool, 1) // abused as an atomic flag118 events, errCh, stop := s.startClient(c, "DEVD", nil)
363 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
364 // read after ack
365 if op == "read" && len(clientShutdown) > 0 {
366 // exit the sess.Run() goroutine, client will close
367 runtime.Goexit()
368 }
369 return false, 0, nil
370 }
371 events, errCh := s.startClient(c, "DEVD", intercept, nil)
372 got, err := s.postRequest("/broadcast", &api.Broadcast{119 got, err := s.postRequest("/broadcast", &api.Broadcast{
373 Channel: "system",120 Channel: "system",
374 ExpireOn: future,121 ExpireOn: future,
@@ -377,7 +124,7 @@
377 c.Assert(err, IsNil)124 c.Assert(err, IsNil)
378 c.Assert(got, Matches, ".*ok.*")125 c.Assert(got, Matches, ".*ok.*")
379 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)126 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
380 clientShutdown <- true127 stop()
381 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)128 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
382 c.Check(len(errCh), Equals, 0)129 c.Check(len(errCh), Equals, 0)
383 // another broadcast130 // another broadcast
@@ -389,17 +136,16 @@
389 c.Assert(err, IsNil)136 c.Assert(err, IsNil)
390 c.Assert(got, Matches, ".*ok.*")137 c.Assert(got, Matches, ".*ok.*")
391 // reconnect, provide levels, get only later notification138 // reconnect, provide levels, get only later notification
392 <-clientShutdown // reset139 events, errCh, stop = s.startClient(c, "DEVD", map[string]int64{
393 events, errCh = s.startClient(c, "DEVD", intercept, map[string]int64{
394 protocol.SystemChannelId: 1,140 protocol.SystemChannelId: 1,
395 })141 })
396 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)142 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
397 clientShutdown <- true143 stop()
398 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)144 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
399 c.Check(len(errCh), Equals, 0)145 c.Check(len(errCh), Equals, 0)
400}146}
401147
402func (s *acceptanceSuite) TestBroadcastTooAhead(c *C) {148func (s *BroadcastAcceptanceSuite) TestBroadcastTooAhead(c *C) {
403 // send broadcasts that will be pending149 // send broadcasts that will be pending
404 got, err := s.postRequest("/broadcast", &api.Broadcast{150 got, err := s.postRequest("/broadcast", &api.Broadcast{
405 Channel: "system",151 Channel: "system",
@@ -416,47 +162,29 @@
416 c.Assert(err, IsNil)162 c.Assert(err, IsNil)
417 c.Assert(got, Matches, ".*ok.*")163 c.Assert(got, Matches, ".*ok.*")
418164
419 clientShutdown := make(chan bool, 1) // abused as an atomic flag165 events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{
420 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
421 // read after ack
422 if op == "read" && len(clientShutdown) > 0 {
423 // exit the sess.Run() goroutine, client will close
424 runtime.Goexit()
425 }
426 return false, 0, nil
427 }
428 events, errCh := s.startClient(c, "DEVB", intercept, map[string]int64{
429 protocol.SystemChannelId: 10,166 protocol.SystemChannelId: 10,
430 })167 })
431 // gettting last one pending on connect168 // gettting last one pending on connect
432 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)169 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
433 clientShutdown <- true170 stop()
434 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)171 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
435 c.Check(len(errCh), Equals, 0)172 c.Check(len(errCh), Equals, 0)
436}173}
437174
438func (s *acceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) {175func (s *BroadcastAcceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) {
439 // nothing there176 // nothing there
440 clientShutdown := make(chan bool, 1) // abused as an atomic flag177 events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{
441 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
442 // read after ack
443 if op == "read" && len(clientShutdown) > 0 {
444 // exit the sess.Run() goroutine, client will close
445 runtime.Goexit()
446 }
447 return false, 0, nil
448 }
449 events, errCh := s.startClient(c, "DEVB", intercept, map[string]int64{
450 protocol.SystemChannelId: 10,178 protocol.SystemChannelId: 10,
451 })179 })
452 // gettting empty pending on connect180 // gettting empty pending on connect
453 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`)181 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`)
454 clientShutdown <- true182 stop()
455 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)183 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
456 c.Check(len(errCh), Equals, 0)184 c.Check(len(errCh), Equals, 0)
457}185}
458186
459func (s *acceptanceSuite) TestBroadcastWayBehind(c *C) {187func (s *BroadcastAcceptanceSuite) TestBroadcastWayBehind(c *C) {
460 // send broadcasts that will be pending188 // send broadcasts that will be pending
461 got, err := s.postRequest("/broadcast", &api.Broadcast{189 got, err := s.postRequest("/broadcast", &api.Broadcast{
462 Channel: "system",190 Channel: "system",
@@ -473,21 +201,12 @@
473 c.Assert(err, IsNil)201 c.Assert(err, IsNil)
474 c.Assert(got, Matches, ".*ok.*")202 c.Assert(got, Matches, ".*ok.*")
475203
476 clientShutdown := make(chan bool, 1) // abused as an atomic flag204 events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{
477 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
478 // read after ack
479 if op == "read" && len(clientShutdown) > 0 {
480 // exit the sess.Run() goroutine, client will close
481 runtime.Goexit()
482 }
483 return false, 0, nil
484 }
485 events, errCh := s.startClient(c, "DEVB", intercept, map[string]int64{
486 protocol.SystemChannelId: -10,205 protocol.SystemChannelId: -10,
487 })206 })
488 // gettting pending on connect207 // gettting pending on connect
489 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`)208 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`)
490 clientShutdown <- true209 stop()
491 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)210 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
492 c.Check(len(errCh), Equals, 0)211 c.Check(len(errCh), Equals, 0)
493}212}
494213
=== renamed file 'server/acceptance/acceptance_helpers.go' => 'server/acceptance/suites/helpers.go'
--- server/acceptance/acceptance_helpers.go 2014-02-11 20:40:06 +0000
+++ server/acceptance/suites/helpers.go 2014-02-11 20:40:06 +0000
@@ -14,7 +14,7 @@
14 with this program. If not, see <http://www.gnu.org/licenses/>.14 with this program. If not, see <http://www.gnu.org/licenses/>.
15*/15*/
1616
17package acceptance17package suites
1818
19import (19import (
20 "bufio"20 "bufio"
@@ -48,8 +48,8 @@
48 "session_queue_size": 10,48 "session_queue_size": 10,
49 "broker_queue_size": 100,49 "broker_queue_size": 100,
50 "addr": addr,50 "addr": addr,
51 "key_pem_file": helpers.SourceRelative("config/testing.key"),51 "key_pem_file": helpers.SourceRelative("../config/testing.key"),
52 "cert_pem_file": helpers.SourceRelative("config/testing.cert"),52 "cert_pem_file": helpers.SourceRelative("../config/testing.cert"),
53 })53 })
54}54}
5555
5656
=== added file 'server/acceptance/suites/pingpong.go'
--- server/acceptance/suites/pingpong.go 1970-01-01 00:00:00 +0000
+++ server/acceptance/suites/pingpong.go 2014-02-11 20:40:06 +0000
@@ -0,0 +1,93 @@
1/*
2 Copyright 2013-2014 Canonical Ltd.
3
4 This program is free software: you can redistribute it and/or modify it
5 under the terms of the GNU General Public License version 3, as published
6 by the Free Software Foundation.
7
8 This program is distributed in the hope that it will be useful, but
9 WITHOUT ANY WARRANTY; without even the implied warranties of
10 MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along
14 with this program. If not, see <http://www.gnu.org/licenses/>.
15*/
16
17package suites
18
19import (
20 "runtime"
21 "strings"
22 "time"
23
24 . "launchpad.net/gocheck"
25)
26
27// PingPongAcceptanceSuite has tests about connectivity and ping-pong requests.
28type PingPongAcceptanceSuite struct {
29 AcceptanceSuite
30}
31
32// Tests about connection, ping-pong, disconnection scenarios
33
34func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) {
35 errCh := make(chan error, 1)
36 events := make(chan string, 10)
37 sess := testClientSession(s.serverAddr, "DEVA", true)
38 err := sess.Dial()
39 c.Assert(err, IsNil)
40 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
41 // would be 3rd ping read, based on logged traffic
42 if op == "read" && ic.totalRead >= 79 {
43 // exit the sess.Run() goroutine, client will close
44 runtime.Goexit()
45 }
46 return false, 0, nil
47 }
48 sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
49 go func() {
50 errCh <- sess.Run(events)
51 }()
52 connectCli := NextEvent(events, errCh)
53 connectSrv := NextEvent(s.serverEvents, nil)
54 registeredSrv := NextEvent(s.serverEvents, nil)
55 tconnect := time.Now()
56 c.Assert(connectSrv, Matches, ".*session.* connected .*")
57 c.Assert(registeredSrv, Matches, ".*session.* registered DEVA")
58 c.Assert(strings.HasSuffix(connectSrv, connectCli), Equals, true)
59 c.Assert(NextEvent(events, errCh), Equals, "Ping")
60 elapsedOfPing := float64(time.Since(tconnect)) / float64(500*time.Millisecond)
61 c.Check(elapsedOfPing >= 1.0, Equals, true)
62 c.Check(elapsedOfPing < 1.05, Equals, true)
63 c.Assert(NextEvent(events, errCh), Equals, "Ping")
64 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF")
65 c.Check(len(errCh), Equals, 0)
66}
67
68func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) {
69 errCh := make(chan error, 1)
70 events := make(chan string, 10)
71 sess := testClientSession(s.serverAddr, "DEVB", true)
72 err := sess.Dial()
73 c.Assert(err, IsNil)
74 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
75 // would be pong to 2nd ping, based on logged traffic
76 if op == "write" && ic.totalRead >= 67 {
77 time.Sleep(200 * time.Millisecond)
78 // exit the sess.Run() goroutine, client will close
79 runtime.Goexit()
80 }
81 return false, 0, nil
82 }
83 sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
84 go func() {
85 errCh <- sess.Run(events)
86 }()
87 c.Assert(NextEvent(events, errCh), Matches, "connected .*")
88 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")
89 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*")
90 c.Assert(NextEvent(events, errCh), Equals, "Ping")
91 c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`)
92 c.Check(len(errCh), Equals, 0)
93}
094
=== added file 'server/acceptance/suites/suite.go'
--- server/acceptance/suites/suite.go 1970-01-01 00:00:00 +0000
+++ server/acceptance/suites/suite.go 2014-02-11 20:40:06 +0000
@@ -0,0 +1,170 @@
1/*
2 Copyright 2013-2014 Canonical Ltd.
3
4 This program is free software: you can redistribute it and/or modify it
5 under the terms of the GNU General Public License version 3, as published
6 by the Free Software Foundation.
7
8 This program is distributed in the hope that it will be useful, but
9 WITHOUT ANY WARRANTY; without even the implied warranties of
10 MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along
14 with this program. If not, see <http://www.gnu.org/licenses/>.
15*/
16
17// Package suites contains reusable acceptance test suites.
18package suites
19
20import (
21 "bytes"
22 "encoding/json"
23 "flag"
24 "fmt"
25 "io/ioutil"
26 "net"
27 "net/http"
28 "runtime"
29 "time"
30
31 . "launchpad.net/gocheck"
32
33 "launchpad.net/ubuntu-push/server/acceptance"
34 helpers "launchpad.net/ubuntu-push/testing"
35)
36
37// AcceptanceSuite has the basic functionality of the acceptance suites.
38type AcceptanceSuite struct {
39 // hook to start the server(s)
40 StartServer func(c *C) (logs <-chan string, kill func(), serverAddr, apiURL string)
41 // running bits
42 serverKill func()
43 serverAddr string
44 serverAPIURL string
45 serverEvents <-chan string
46 httpClient *http.Client
47}
48
49// Start a new server for each test.
50func (s *AcceptanceSuite) SetUpTest(c *C) {
51 logs, kill, addr, url := s.StartServer(c)
52 s.serverEvents = logs
53 s.serverKill = kill
54 s.serverAddr = addr
55 s.serverAPIURL = url
56 s.httpClient = &http.Client{}
57}
58
59func (s *AcceptanceSuite) TearDownTest(c *C) {
60 if s.serverKill != nil {
61 s.serverKill()
62 }
63}
64
65// Post a request.
66func (s *AcceptanceSuite) postRequest(path string, message interface{}) (string, error) {
67 packedMessage, err := json.Marshal(message)
68 if err != nil {
69 panic(err)
70 }
71 reader := bytes.NewReader(packedMessage)
72
73 url := s.serverAPIURL + path
74 request, _ := http.NewRequest("POST", url, reader)
75 request.ContentLength = int64(reader.Len())
76 request.Header.Set("Content-Type", "application/json")
77
78 resp, err := s.httpClient.Do(request)
79 if err != nil {
80 panic(err)
81 }
82 defer resp.Body.Close()
83 body, err := ioutil.ReadAll(resp.Body)
84 return string(body), err
85}
86
87func testClientSession(addr string, deviceId string, reportPings bool) *acceptance.ClientSession {
88 certPEMBlock, err := ioutil.ReadFile(helpers.SourceRelative("../config/testing.cert"))
89 if err != nil {
90 panic(fmt.Sprintf("could not read config/testing.cert: %v", err))
91 }
92 return &acceptance.ClientSession{
93 ExchangeTimeout: 100 * time.Millisecond,
94 ServerAddr: addr,
95 CertPEMBlock: certPEMBlock,
96 DeviceId: deviceId,
97 ReportPings: reportPings,
98 }
99}
100
101// typically combined with -gocheck.vv or test selection
102var logTraffic = flag.Bool("logTraffic", false, "log traffic")
103
104type connInterceptor func(ic *interceptingConn, op string, b []byte) (bool, int, error)
105
106type interceptingConn struct {
107 net.Conn
108 totalRead int
109 totalWritten int
110 intercept connInterceptor
111}
112
113func (ic *interceptingConn) Write(b []byte) (n int, err error) {
114 done := false
115 before := ic.totalWritten
116 if ic.intercept != nil {
117 done, n, err = ic.intercept(ic, "write", b)
118 }
119 if !done {
120 n, err = ic.Conn.Write(b)
121 }
122 ic.totalWritten += n
123 if *logTraffic {
124 fmt.Printf("W[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalWritten)
125 }
126 return
127}
128
129func (ic *interceptingConn) Read(b []byte) (n int, err error) {
130 done := false
131 before := ic.totalRead
132 if ic.intercept != nil {
133 done, n, err = ic.intercept(ic, "read", b)
134 }
135 if !done {
136 n, err = ic.Conn.Read(b)
137 }
138 ic.totalRead += n
139 if *logTraffic {
140 fmt.Printf("R[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalRead)
141 }
142 return
143}
144
145// Start a client.
146func (s *AcceptanceSuite) startClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
147 errCh := make(chan error, 1)
148 cliEvents := make(chan string, 10)
149 sess := testClientSession(s.serverAddr, devId, false)
150 sess.Levels = levels
151 err := sess.Dial()
152 c.Assert(err, IsNil)
153 clientShutdown := make(chan bool, 1) // abused as an atomic flag
154 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
155 // read after ack
156 if op == "read" && len(clientShutdown) > 0 {
157 // exit the sess.Run() goroutine, client will close
158 runtime.Goexit()
159 }
160 return false, 0, nil
161 }
162 sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
163 go func() {
164 errCh <- sess.Run(cliEvents)
165 }()
166 c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*")
167 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")
168 c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId)
169 return cliEvents, errCh, func() { clientShutdown <- true }
170}

Subscribers

People subscribed via source and target branches