Merge lp:~pedronis/ubuntu-push/expose-more-acceptance-details into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Superseded
Proposed branch: lp:~pedronis/ubuntu-push/expose-more-acceptance-details
Merge into: lp:ubuntu-push
Diff against target: 643 lines (+175/-103)
9 files modified
protocol/messages.go (+6/-0)
protocol/messages_test.go (+3/-0)
server/acceptance/acceptance_test.go (+6/-5)
server/acceptance/suites/broadcast.go (+35/-35)
server/acceptance/suites/helpers.go (+11/-2)
server/acceptance/suites/pingpong.go (+8/-8)
server/acceptance/suites/suite.go (+53/-44)
server/broker/exchanges.go (+1/-0)
server/broker/exchanges_test.go (+52/-9)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/expose-more-acceptance-details
Reviewer Review Type Date Requested Status
Ubuntu Push Hackers Pending
Review via email: mp+208126@code.launchpad.net

Description of the change

expose more bits of acceptance tests

To post a comment you must log in.
78. By Samuele Pedroni

Merged fix-split-reuse into expose-more-acceptance-details.

79. By Samuele Pedroni

log panics from servers in full in tests

80. By Samuele Pedroni

change the signature of StartServer

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'protocol/messages.go'
2--- protocol/messages.go 2014-02-24 15:54:37 +0000
3+++ protocol/messages.go 2014-02-26 19:50:48 +0000
4@@ -93,6 +93,12 @@
5 return true
6 }
7
8+// Reset resets the splitting state if the message storage is to be
9+// reused.
10+func (b *BroadcastMsg) Reset() {
11+ b.splitting = 0
12+}
13+
14 // NOTIFICATIONS message
15 type NotificationsMsg struct {
16 Type string `json:"T"`
17
18=== modified file 'protocol/messages_test.go'
19--- protocol/messages_test.go 2014-02-10 22:53:00 +0000
20+++ protocol/messages_test.go 2014-02-26 19:50:48 +0000
21@@ -99,4 +99,7 @@
22 n3 := len(b.Payloads)
23 c.Check(b.TopLevel, Equals, int64(n))
24 c.Check(n1+n2+n3, Equals, n)
25+ // reset
26+ b.Reset()
27+ c.Check(b.splitting, Equals, 0)
28 }
29
30=== modified file 'server/acceptance/acceptance_test.go'
31--- server/acceptance/acceptance_test.go 2014-02-13 19:33:14 +0000
32+++ server/acceptance/acceptance_test.go 2014-02-26 19:50:48 +0000
33@@ -38,7 +38,7 @@
34 }
35
36 // Start a server.
37-func StartServer(c *C) (<-chan string, func(), string, string) {
38+func StartServer(c *C, s *suites.AcceptanceSuite, handle *suites.ServerHandle) {
39 if *serverCmd == "" {
40 c.Skip("executable server not specified")
41 }
42@@ -46,10 +46,11 @@
43 cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0")
44 cfgFilename := suites.WriteConfig(c, tmpDir, "config.json", cfg)
45 logs, killServer := suites.RunAndObserve(c, *serverCmd, cfgFilename)
46- serverHTTPAddr := suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat)
47- serverURL := fmt.Sprintf("http://%s", serverHTTPAddr)
48- serverAddr := suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat)
49- return logs, killServer, serverAddr, serverURL
50+ s.KillGroup["server"] = killServer
51+ handle.ServerHTTPAddr = suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat)
52+ s.ServerAPIURL = fmt.Sprintf("http://%s", handle.ServerHTTPAddr)
53+ handle.ServerAddr = suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat)
54+ handle.ServerEvents = logs
55 }
56
57 // ping pong/connectivity
58
59=== modified file 'server/acceptance/suites/broadcast.go'
60--- server/acceptance/suites/broadcast.go 2014-02-12 15:43:24 +0000
61+++ server/acceptance/suites/broadcast.go 2014-02-26 19:50:48 +0000
62@@ -37,8 +37,8 @@
63 var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)
64
65 func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) {
66- events, errCh, stop := s.startClient(c, "DEVB", nil)
67- got, err := s.postRequest("/broadcast", &api.Broadcast{
68+ events, errCh, stop := s.StartClient(c, "DEVB", nil)
69+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
70 Channel: "system",
71 ExpireOn: future,
72 Data: json.RawMessage(`{"n": 42}`),
73@@ -47,13 +47,13 @@
74 c.Assert(got, Matches, ".*ok.*")
75 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
76 stop()
77- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
78+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
79 c.Check(len(errCh), Equals, 0)
80 }
81
82 func (s *BroadcastAcceptanceSuite) TestBroadcastPending(c *C) {
83 // send broadcast that will be pending
84- got, err := s.postRequest("/broadcast", &api.Broadcast{
85+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
86 Channel: "system",
87 ExpireOn: future,
88 Data: json.RawMessage(`{"b": 1}`),
89@@ -61,11 +61,11 @@
90 c.Assert(err, IsNil)
91 c.Assert(got, Matches, ".*ok.*")
92
93- events, errCh, stop := s.startClient(c, "DEVB", nil)
94+ events, errCh, stop := s.StartClient(c, "DEVB", nil)
95 // gettting pending on connect
96 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
97 stop()
98- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
99+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
100 c.Check(len(errCh), Equals, 0)
101 }
102
103@@ -73,7 +73,7 @@
104 // send bunch of broadcasts that will be pending
105 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
106 for i := 0; i < 32; i++ {
107- got, err := s.postRequest("/broadcast", &api.Broadcast{
108+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
109 Channel: "system",
110 ExpireOn: future,
111 Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)),
112@@ -82,22 +82,22 @@
113 c.Assert(got, Matches, ".*ok.*")
114 }
115
116- events, errCh, stop := s.startClient(c, "DEVC", nil)
117+ events, errCh, stop := s.StartClient(c, "DEVC", nil)
118 // gettting pending on connect
119 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`)
120 c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)
121 stop()
122- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
123+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
124 c.Check(len(errCh), Equals, 0)
125 }
126
127 func (s *BroadcastAcceptanceSuite) TestBroadcastDistribution2(c *C) {
128- // start 1st clinet
129- events1, errCh1, stop1 := s.startClient(c, "DEV1", nil)
130+ // start 1st client
131+ events1, errCh1, stop1 := s.StartClient(c, "DEV1", nil)
132 // start 2nd client
133- events2, errCh2, stop2 := s.startClient(c, "DEV2", nil)
134+ events2, errCh2, stop2 := s.StartClient(c, "DEV2", nil)
135 // broadcast
136- got, err := s.postRequest("/broadcast", &api.Broadcast{
137+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
138 Channel: "system",
139 ExpireOn: future,
140 Data: json.RawMessage(`{"n": 42}`),
141@@ -108,15 +108,15 @@
142 c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
143 stop1()
144 stop2()
145- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
146- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
147+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
148+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
149 c.Check(len(errCh1), Equals, 0)
150 c.Check(len(errCh2), Equals, 0)
151 }
152
153 func (s *BroadcastAcceptanceSuite) TestBroadcastFilterByLevel(c *C) {
154- events, errCh, stop := s.startClient(c, "DEVD", nil)
155- got, err := s.postRequest("/broadcast", &api.Broadcast{
156+ events, errCh, stop := s.StartClient(c, "DEVD", nil)
157+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
158 Channel: "system",
159 ExpireOn: future,
160 Data: json.RawMessage(`{"b": 1}`),
161@@ -125,10 +125,10 @@
162 c.Assert(got, Matches, ".*ok.*")
163 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
164 stop()
165- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
166+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
167 c.Check(len(errCh), Equals, 0)
168 // another broadcast
169- got, err = s.postRequest("/broadcast", &api.Broadcast{
170+ got, err = s.PostRequest("/broadcast", &api.Broadcast{
171 Channel: "system",
172 ExpireOn: future,
173 Data: json.RawMessage(`{"b": 2}`),
174@@ -136,25 +136,25 @@
175 c.Assert(err, IsNil)
176 c.Assert(got, Matches, ".*ok.*")
177 // reconnect, provide levels, get only later notification
178- events, errCh, stop = s.startClient(c, "DEVD", map[string]int64{
179+ events, errCh, stop = s.StartClient(c, "DEVD", map[string]int64{
180 protocol.SystemChannelId: 1,
181 })
182 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
183 stop()
184- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
185+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
186 c.Check(len(errCh), Equals, 0)
187 }
188
189 func (s *BroadcastAcceptanceSuite) TestBroadcastTooAhead(c *C) {
190 // send broadcasts that will be pending
191- got, err := s.postRequest("/broadcast", &api.Broadcast{
192+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
193 Channel: "system",
194 ExpireOn: future,
195 Data: json.RawMessage(`{"b": 1}`),
196 })
197 c.Assert(err, IsNil)
198 c.Assert(got, Matches, ".*ok.*")
199- got, err = s.postRequest("/broadcast", &api.Broadcast{
200+ got, err = s.PostRequest("/broadcast", &api.Broadcast{
201 Channel: "system",
202 ExpireOn: future,
203 Data: json.RawMessage(`{"b": 2}`),
204@@ -162,38 +162,38 @@
205 c.Assert(err, IsNil)
206 c.Assert(got, Matches, ".*ok.*")
207
208- events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{
209+ events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{
210 protocol.SystemChannelId: 10,
211 })
212 // gettting last one pending on connect
213 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
214 stop()
215- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
216+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
217 c.Check(len(errCh), Equals, 0)
218 }
219
220 func (s *BroadcastAcceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) {
221 // nothing there
222- events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{
223+ events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{
224 protocol.SystemChannelId: 10,
225 })
226 // gettting empty pending on connect
227 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`)
228 stop()
229- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
230+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
231 c.Check(len(errCh), Equals, 0)
232 }
233
234 func (s *BroadcastAcceptanceSuite) TestBroadcastWayBehind(c *C) {
235 // send broadcasts that will be pending
236- got, err := s.postRequest("/broadcast", &api.Broadcast{
237+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
238 Channel: "system",
239 ExpireOn: future,
240 Data: json.RawMessage(`{"b": 1}`),
241 })
242 c.Assert(err, IsNil)
243 c.Assert(got, Matches, ".*ok.*")
244- got, err = s.postRequest("/broadcast", &api.Broadcast{
245+ got, err = s.PostRequest("/broadcast", &api.Broadcast{
246 Channel: "system",
247 ExpireOn: future,
248 Data: json.RawMessage(`{"b": 2}`),
249@@ -201,26 +201,26 @@
250 c.Assert(err, IsNil)
251 c.Assert(got, Matches, ".*ok.*")
252
253- events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{
254+ events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{
255 protocol.SystemChannelId: -10,
256 })
257 // gettting pending on connect
258 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`)
259 stop()
260- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
261+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
262 c.Check(len(errCh), Equals, 0)
263 }
264
265 func (s *BroadcastAcceptanceSuite) TestBroadcastExpiration(c *C) {
266 // send broadcast that will be pending, and one that will expire
267- got, err := s.postRequest("/broadcast", &api.Broadcast{
268+ got, err := s.PostRequest("/broadcast", &api.Broadcast{
269 Channel: "system",
270 ExpireOn: future,
271 Data: json.RawMessage(`{"b": 1}`),
272 })
273 c.Assert(err, IsNil)
274 c.Assert(got, Matches, ".*ok.*")
275- got, err = s.postRequest("/broadcast", &api.Broadcast{
276+ got, err = s.PostRequest("/broadcast", &api.Broadcast{
277 Channel: "system",
278 ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339),
279 Data: json.RawMessage(`{"b": 2}`),
280@@ -231,10 +231,10 @@
281 time.Sleep(2 * time.Second)
282 // second broadcast is expired
283
284- events, errCh, stop := s.startClient(c, "DEVB", nil)
285+ events, errCh, stop := s.StartClient(c, "DEVB", nil)
286 // gettting pending on connect
287 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`)
288 stop()
289- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
290+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
291 c.Check(len(errCh), Equals, 0)
292 }
293
294=== modified file 'server/acceptance/suites/helpers.go'
295--- server/acceptance/suites/helpers.go 2014-02-11 20:36:32 +0000
296+++ server/acceptance/suites/helpers.go 2014-02-26 19:50:48 +0000
297@@ -92,12 +92,15 @@
298 c.Fatal(err)
299 }
300 bufErr := bufio.NewReaderSize(stderr, 5000)
301- getLineInfo := func() (string, error) {
302+ getLineInfo := func(full bool) (string, error) {
303 for {
304 line, err := bufErr.ReadString('\n')
305 if err != nil {
306 return "", err
307 }
308+ if full {
309+ return strings.TrimRight(line, "\n"), nil
310+ }
311 extracted := rxLineInfo.FindStringSubmatch(line)
312 if extracted == nil {
313 return "", fmt.Errorf("unexpected line: %#v", line)
314@@ -108,13 +111,19 @@
315 }
316 logs := make(chan string, 10)
317 go func() {
318+ paniced := false
319 for {
320- info, err := getLineInfo()
321+ info, err := getLineInfo(paniced)
322 if err != nil {
323 logs <- fmt.Sprintf("%s capture: %v", cmdName, err)
324 close(logs)
325 return
326 }
327+ if paniced || strings.HasPrefix(info, "ERROR(PANIC") {
328+ paniced = true
329+ c.Log(info)
330+ continue
331+ }
332 logs <- info
333 }
334 }()
335
336=== modified file 'server/acceptance/suites/pingpong.go'
337--- server/acceptance/suites/pingpong.go 2014-02-11 20:36:32 +0000
338+++ server/acceptance/suites/pingpong.go 2014-02-26 19:50:48 +0000
339@@ -34,7 +34,7 @@
340 func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) {
341 errCh := make(chan error, 1)
342 events := make(chan string, 10)
343- sess := testClientSession(s.serverAddr, "DEVA", true)
344+ sess := testClientSession(s.ServerAddr, "DEVA", true)
345 err := sess.Dial()
346 c.Assert(err, IsNil)
347 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
348@@ -50,8 +50,8 @@
349 errCh <- sess.Run(events)
350 }()
351 connectCli := NextEvent(events, errCh)
352- connectSrv := NextEvent(s.serverEvents, nil)
353- registeredSrv := NextEvent(s.serverEvents, nil)
354+ connectSrv := NextEvent(s.ServerEvents, nil)
355+ registeredSrv := NextEvent(s.ServerEvents, nil)
356 tconnect := time.Now()
357 c.Assert(connectSrv, Matches, ".*session.* connected .*")
358 c.Assert(registeredSrv, Matches, ".*session.* registered DEVA")
359@@ -61,14 +61,14 @@
360 c.Check(elapsedOfPing >= 1.0, Equals, true)
361 c.Check(elapsedOfPing < 1.05, Equals, true)
362 c.Assert(NextEvent(events, errCh), Equals, "Ping")
363- c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF")
364+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* ended with: EOF")
365 c.Check(len(errCh), Equals, 0)
366 }
367
368 func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) {
369 errCh := make(chan error, 1)
370 events := make(chan string, 10)
371- sess := testClientSession(s.serverAddr, "DEVB", true)
372+ sess := testClientSession(s.ServerAddr, "DEVB", true)
373 err := sess.Dial()
374 c.Assert(err, IsNil)
375 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
376@@ -85,9 +85,9 @@
377 errCh <- sess.Run(events)
378 }()
379 c.Assert(NextEvent(events, errCh), Matches, "connected .*")
380- c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")
381- c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*")
382+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* connected .*")
383+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* registered .*")
384 c.Assert(NextEvent(events, errCh), Equals, "Ping")
385- c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`)
386+ c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*timeout`)
387 c.Check(len(errCh), Equals, 0)
388 }
389
390=== modified file 'server/acceptance/suites/suite.go'
391--- server/acceptance/suites/suite.go 2014-02-11 20:36:32 +0000
392+++ server/acceptance/suites/suite.go 2014-02-26 19:50:48 +0000
393@@ -34,43 +34,79 @@
394 helpers "launchpad.net/ubuntu-push/testing"
395 )
396
397+// ServerHandle holds the information to attach a client to the test server.
398+type ServerHandle struct {
399+ ServerAddr string
400+ ServerHTTPAddr string
401+ ServerEvents <-chan string
402+}
403+
404+// Start a client.
405+func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
406+ errCh := make(chan error, 1)
407+ cliEvents := make(chan string, 10)
408+ sess := testClientSession(h.ServerAddr, devId, false)
409+ sess.Levels = levels
410+ err := sess.Dial()
411+ c.Assert(err, IsNil)
412+ clientShutdown := make(chan bool, 1) // abused as an atomic flag
413+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
414+ // read after ack
415+ if op == "read" && len(clientShutdown) > 0 {
416+ // exit the sess.Run() goroutine, client will close
417+ runtime.Goexit()
418+ }
419+ return false, 0, nil
420+ }
421+ sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
422+ go func() {
423+ errCh <- sess.Run(cliEvents)
424+ }()
425+ c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*")
426+ c.Assert(NextEvent(h.ServerEvents, nil), Matches, ".*session.* connected .*")
427+ c.Assert(NextEvent(h.ServerEvents, nil), Matches, ".*session.* registered "+devId)
428+ return cliEvents, errCh, func() { clientShutdown <- true }
429+}
430+
431 // AcceptanceSuite has the basic functionality of the acceptance suites.
432 type AcceptanceSuite struct {
433 // hook to start the server(s)
434- StartServer func(c *C) (logs <-chan string, kill func(), serverAddr, apiURL string)
435- // running bits
436- serverKill func()
437- serverAddr string
438- serverAPIURL string
439- serverEvents <-chan string
440- httpClient *http.Client
441+ StartServer func(c *C, s *AcceptanceSuite, handle *ServerHandle)
442+ // populated by StartServer
443+ ServerHandle
444+ ServerAPIURL string
445+ // KillGroup should be populated by StartServer with functions
446+ // to kill the server process
447+ KillGroup map[string]func()
448+ // other state
449+ httpClient *http.Client
450 }
451
452 // Start a new server for each test.
453 func (s *AcceptanceSuite) SetUpTest(c *C) {
454- logs, kill, addr, url := s.StartServer(c)
455- s.serverEvents = logs
456- s.serverKill = kill
457- s.serverAddr = addr
458- s.serverAPIURL = url
459+ s.KillGroup = make(map[string]func())
460+ s.StartServer(c, s, &s.ServerHandle)
461+ c.Assert(s.ServerHandle.ServerEvents, NotNil)
462+ c.Assert(s.ServerHandle.ServerAddr, Not(Equals), "")
463+ c.Assert(s.ServerAPIURL, Not(Equals), "")
464 s.httpClient = &http.Client{}
465 }
466
467 func (s *AcceptanceSuite) TearDownTest(c *C) {
468- if s.serverKill != nil {
469- s.serverKill()
470+ for _, f := range s.KillGroup {
471+ f()
472 }
473 }
474
475-// Post a request.
476-func (s *AcceptanceSuite) postRequest(path string, message interface{}) (string, error) {
477+// Post a API request.
478+func (s *AcceptanceSuite) PostRequest(path string, message interface{}) (string, error) {
479 packedMessage, err := json.Marshal(message)
480 if err != nil {
481 panic(err)
482 }
483 reader := bytes.NewReader(packedMessage)
484
485- url := s.serverAPIURL + path
486+ url := s.ServerAPIURL + path
487 request, _ := http.NewRequest("POST", url, reader)
488 request.ContentLength = int64(reader.Len())
489 request.Header.Set("Content-Type", "application/json")
490@@ -141,30 +177,3 @@
491 }
492 return
493 }
494-
495-// Start a client.
496-func (s *AcceptanceSuite) startClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) {
497- errCh := make(chan error, 1)
498- cliEvents := make(chan string, 10)
499- sess := testClientSession(s.serverAddr, devId, false)
500- sess.Levels = levels
501- err := sess.Dial()
502- c.Assert(err, IsNil)
503- clientShutdown := make(chan bool, 1) // abused as an atomic flag
504- intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
505- // read after ack
506- if op == "read" && len(clientShutdown) > 0 {
507- // exit the sess.Run() goroutine, client will close
508- runtime.Goexit()
509- }
510- return false, 0, nil
511- }
512- sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
513- go func() {
514- errCh <- sess.Run(cliEvents)
515- }()
516- c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*")
517- c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")
518- c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId)
519- return cliEvents, errCh, func() { clientShutdown <- true }
520-}
521
522=== modified file 'server/broker/exchanges.go'
523--- server/broker/exchanges.go 2014-02-10 23:19:08 +0000
524+++ server/broker/exchanges.go 2014-02-26 19:50:48 +0000
525@@ -61,6 +61,7 @@
526 // Prepare session for a BROADCAST.
527 func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
528 scratchArea := sess.ExchangeScratchArea()
529+ scratchArea.broadcastMsg.Reset()
530 scratchArea.broadcastMsg.Type = "broadcast"
531 clientLevel := sess.Levels()[sbe.ChanId]
532 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
533
534=== modified file 'server/broker/exchanges_test.go'
535--- server/broker/exchanges_test.go 2014-02-10 23:19:08 +0000
536+++ server/broker/exchanges_test.go 2014-02-26 19:50:48 +0000
537@@ -18,6 +18,8 @@
538
539 import (
540 "encoding/json"
541+ "fmt"
542+ "strings"
543 stdtesting "testing"
544
545 . "launchpad.net/gocheck"
546@@ -45,19 +47,60 @@
547 json.RawMessage(`{"a":"y"}`),
548 },
549 }
550- inMsg, outMsg, err := exchg.Prepare(sess)
551+ outMsg, inMsg, err := exchg.Prepare(sess)
552 c.Assert(err, IsNil)
553 // check
554- marshalled, err := json.Marshal(inMsg)
555+ marshalled, err := json.Marshal(outMsg)
556 c.Assert(err, IsNil)
557 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
558- err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
559+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
560 c.Assert(err, IsNil)
561 err = exchg.Acked(sess, true)
562 c.Assert(err, IsNil)
563 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))
564 }
565
566+func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {
567+ sess := &testing.TestBrokerSession{
568+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
569+ }
570+ payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
571+ needsSplitting := make([]json.RawMessage, 32)
572+ for i := 0; i < 32; i++ {
573+ needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i))
574+ }
575+
576+ topLevel := int64(len(needsSplitting))
577+ exchg := &broker.BroadcastExchange{
578+ ChanId: store.SystemInternalChannelId,
579+ TopLevel: topLevel,
580+ NotificationPayloads: needsSplitting,
581+ }
582+ outMsg, _, err := exchg.Prepare(sess)
583+ c.Assert(err, IsNil)
584+ parts := 0
585+ for {
586+ done := outMsg.Split()
587+ parts++
588+ if done {
589+ break
590+ }
591+ }
592+ c.Assert(parts, Equals, 2)
593+ exchg = &broker.BroadcastExchange{
594+ ChanId: store.SystemInternalChannelId,
595+ TopLevel: topLevel + 2,
596+ NotificationPayloads: []json.RawMessage{
597+ json.RawMessage(`{"a":"x"}`),
598+ json.RawMessage(`{"a":"y"}`),
599+ },
600+ }
601+ outMsg, _, err = exchg.Prepare(sess)
602+ c.Assert(err, IsNil)
603+ done := outMsg.Split() // shouldn't panic
604+ c.Check(done, Equals, true)
605+}
606+
607 func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {
608 sess := &testing.TestBrokerSession{
609 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
610@@ -69,13 +112,13 @@
611 json.RawMessage(`{"a":"y"}`),
612 },
613 }
614- inMsg, outMsg, err := exchg.Prepare(sess)
615+ outMsg, inMsg, err := exchg.Prepare(sess)
616 c.Assert(err, IsNil)
617 // check
618- marshalled, err := json.Marshal(inMsg)
619+ marshalled, err := json.Marshal(outMsg)
620 c.Assert(err, IsNil)
621 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
622- err = json.Unmarshal([]byte(`{}`), outMsg)
623+ err = json.Unmarshal([]byte(`{}`), inMsg)
624 c.Assert(err, IsNil)
625 err = exchg.Acked(sess, true)
626 c.Assert(err, Not(IsNil))
627@@ -96,13 +139,13 @@
628 json.RawMessage(`{"a":"y"}`),
629 },
630 }
631- inMsg, outMsg, err := exchg.Prepare(sess)
632+ outMsg, inMsg, err := exchg.Prepare(sess)
633 c.Assert(err, IsNil)
634 // check
635- marshalled, err := json.Marshal(inMsg)
636+ marshalled, err := json.Marshal(outMsg)
637 c.Assert(err, IsNil)
638 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
639- err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
640+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
641 c.Assert(err, IsNil)
642 err = exchg.Acked(sess, true)
643 c.Assert(err, IsNil)

Subscribers

People subscribed via source and target branches