Merge lp:~pedronis/ubuntu-push/expose-more-acceptance-details into lp:ubuntu-push
- expose-more-acceptance-details
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Superseded |
---|---|
Proposed branch: | lp:~pedronis/ubuntu-push/expose-more-acceptance-details |
Merge into: | lp:ubuntu-push |
Diff against target: |
643 lines (+175/-103) 9 files modified
protocol/messages.go (+6/-0) protocol/messages_test.go (+3/-0) server/acceptance/acceptance_test.go (+6/-5) server/acceptance/suites/broadcast.go (+35/-35) server/acceptance/suites/helpers.go (+11/-2) server/acceptance/suites/pingpong.go (+8/-8) server/acceptance/suites/suite.go (+53/-44) server/broker/exchanges.go (+1/-0) server/broker/exchanges_test.go (+52/-9) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/expose-more-acceptance-details |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Push Hackers | Pending | ||
Review via email: mp+208126@code.launchpad.net |
Commit message
Description of the change
expose more bits of acceptance tests
To post a comment you must log in.
- 78. By Samuele Pedroni
-
Merged fix-split-reuse into expose-
more-acceptance -details. - 79. By Samuele Pedroni
-
log panics from servers in full in tests
- 80. By Samuele Pedroni
-
change the signature of StartServer
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'protocol/messages.go' | |||
2 | --- protocol/messages.go 2014-02-24 15:54:37 +0000 | |||
3 | +++ protocol/messages.go 2014-02-26 19:50:48 +0000 | |||
4 | @@ -93,6 +93,12 @@ | |||
5 | 93 | return true | 93 | return true |
6 | 94 | } | 94 | } |
7 | 95 | 95 | ||
8 | 96 | // Reset resets the splitting state if the message storage is to be | ||
9 | 97 | // reused. | ||
10 | 98 | func (b *BroadcastMsg) Reset() { | ||
11 | 99 | b.splitting = 0 | ||
12 | 100 | } | ||
13 | 101 | |||
14 | 96 | // NOTIFICATIONS message | 102 | // NOTIFICATIONS message |
15 | 97 | type NotificationsMsg struct { | 103 | type NotificationsMsg struct { |
16 | 98 | Type string `json:"T"` | 104 | Type string `json:"T"` |
17 | 99 | 105 | ||
18 | === modified file 'protocol/messages_test.go' | |||
19 | --- protocol/messages_test.go 2014-02-10 22:53:00 +0000 | |||
20 | +++ protocol/messages_test.go 2014-02-26 19:50:48 +0000 | |||
21 | @@ -99,4 +99,7 @@ | |||
22 | 99 | n3 := len(b.Payloads) | 99 | n3 := len(b.Payloads) |
23 | 100 | c.Check(b.TopLevel, Equals, int64(n)) | 100 | c.Check(b.TopLevel, Equals, int64(n)) |
24 | 101 | c.Check(n1+n2+n3, Equals, n) | 101 | c.Check(n1+n2+n3, Equals, n) |
25 | 102 | // reset | ||
26 | 103 | b.Reset() | ||
27 | 104 | c.Check(b.splitting, Equals, 0) | ||
28 | 102 | } | 105 | } |
29 | 103 | 106 | ||
30 | === modified file 'server/acceptance/acceptance_test.go' | |||
31 | --- server/acceptance/acceptance_test.go 2014-02-13 19:33:14 +0000 | |||
32 | +++ server/acceptance/acceptance_test.go 2014-02-26 19:50:48 +0000 | |||
33 | @@ -38,7 +38,7 @@ | |||
34 | 38 | } | 38 | } |
35 | 39 | 39 | ||
36 | 40 | // Start a server. | 40 | // Start a server. |
38 | 41 | func StartServer(c *C) (<-chan string, func(), string, string) { | 41 | func StartServer(c *C, s *suites.AcceptanceSuite, handle *suites.ServerHandle) { |
39 | 42 | if *serverCmd == "" { | 42 | if *serverCmd == "" { |
40 | 43 | c.Skip("executable server not specified") | 43 | c.Skip("executable server not specified") |
41 | 44 | } | 44 | } |
42 | @@ -46,10 +46,11 @@ | |||
43 | 46 | cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0") | 46 | cfg := testServerConfig("127.0.0.1:0", "127.0.0.1:0") |
44 | 47 | cfgFilename := suites.WriteConfig(c, tmpDir, "config.json", cfg) | 47 | cfgFilename := suites.WriteConfig(c, tmpDir, "config.json", cfg) |
45 | 48 | logs, killServer := suites.RunAndObserve(c, *serverCmd, cfgFilename) | 48 | logs, killServer := suites.RunAndObserve(c, *serverCmd, cfgFilename) |
50 | 49 | serverHTTPAddr := suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat) | 49 | s.KillGroup["server"] = killServer |
51 | 50 | serverURL := fmt.Sprintf("http://%s", serverHTTPAddr) | 50 | handle.ServerHTTPAddr = suites.ExtractListeningAddr(c, logs, suites.HTTPListeningOnPat) |
52 | 51 | serverAddr := suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat) | 51 | s.ServerAPIURL = fmt.Sprintf("http://%s", handle.ServerHTTPAddr) |
53 | 52 | return logs, killServer, serverAddr, serverURL | 52 | handle.ServerAddr = suites.ExtractListeningAddr(c, logs, suites.DevListeningOnPat) |
54 | 53 | handle.ServerEvents = logs | ||
55 | 53 | } | 54 | } |
56 | 54 | 55 | ||
57 | 55 | // ping pong/connectivity | 56 | // ping pong/connectivity |
58 | 56 | 57 | ||
59 | === modified file 'server/acceptance/suites/broadcast.go' | |||
60 | --- server/acceptance/suites/broadcast.go 2014-02-12 15:43:24 +0000 | |||
61 | +++ server/acceptance/suites/broadcast.go 2014-02-26 19:50:48 +0000 | |||
62 | @@ -37,8 +37,8 @@ | |||
63 | 37 | var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339) | 37 | var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339) |
64 | 38 | 38 | ||
65 | 39 | func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) { | 39 | func (s *BroadcastAcceptanceSuite) TestBroadcastToConnected(c *C) { |
68 | 40 | events, errCh, stop := s.startClient(c, "DEVB", nil) | 40 | events, errCh, stop := s.StartClient(c, "DEVB", nil) |
69 | 41 | got, err := s.postRequest("/broadcast", &api.Broadcast{ | 41 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
70 | 42 | Channel: "system", | 42 | Channel: "system", |
71 | 43 | ExpireOn: future, | 43 | ExpireOn: future, |
72 | 44 | Data: json.RawMessage(`{"n": 42}`), | 44 | Data: json.RawMessage(`{"n": 42}`), |
73 | @@ -47,13 +47,13 @@ | |||
74 | 47 | c.Assert(got, Matches, ".*ok.*") | 47 | c.Assert(got, Matches, ".*ok.*") |
75 | 48 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) | 48 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
76 | 49 | stop() | 49 | stop() |
78 | 50 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 50 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
79 | 51 | c.Check(len(errCh), Equals, 0) | 51 | c.Check(len(errCh), Equals, 0) |
80 | 52 | } | 52 | } |
81 | 53 | 53 | ||
82 | 54 | func (s *BroadcastAcceptanceSuite) TestBroadcastPending(c *C) { | 54 | func (s *BroadcastAcceptanceSuite) TestBroadcastPending(c *C) { |
83 | 55 | // send broadcast that will be pending | 55 | // send broadcast that will be pending |
85 | 56 | got, err := s.postRequest("/broadcast", &api.Broadcast{ | 56 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
86 | 57 | Channel: "system", | 57 | Channel: "system", |
87 | 58 | ExpireOn: future, | 58 | ExpireOn: future, |
88 | 59 | Data: json.RawMessage(`{"b": 1}`), | 59 | Data: json.RawMessage(`{"b": 1}`), |
89 | @@ -61,11 +61,11 @@ | |||
90 | 61 | c.Assert(err, IsNil) | 61 | c.Assert(err, IsNil) |
91 | 62 | c.Assert(got, Matches, ".*ok.*") | 62 | c.Assert(got, Matches, ".*ok.*") |
92 | 63 | 63 | ||
94 | 64 | events, errCh, stop := s.startClient(c, "DEVB", nil) | 64 | events, errCh, stop := s.StartClient(c, "DEVB", nil) |
95 | 65 | // gettting pending on connect | 65 | // gettting pending on connect |
96 | 66 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) | 66 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
97 | 67 | stop() | 67 | stop() |
99 | 68 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 68 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
100 | 69 | c.Check(len(errCh), Equals, 0) | 69 | c.Check(len(errCh), Equals, 0) |
101 | 70 | } | 70 | } |
102 | 71 | 71 | ||
103 | @@ -73,7 +73,7 @@ | |||
104 | 73 | // send bunch of broadcasts that will be pending | 73 | // send bunch of broadcasts that will be pending |
105 | 74 | payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) | 74 | payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
106 | 75 | for i := 0; i < 32; i++ { | 75 | for i := 0; i < 32; i++ { |
108 | 76 | got, err := s.postRequest("/broadcast", &api.Broadcast{ | 76 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
109 | 77 | Channel: "system", | 77 | Channel: "system", |
110 | 78 | ExpireOn: future, | 78 | ExpireOn: future, |
111 | 79 | Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)), | 79 | Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)), |
112 | @@ -82,22 +82,22 @@ | |||
113 | 82 | c.Assert(got, Matches, ".*ok.*") | 82 | c.Assert(got, Matches, ".*ok.*") |
114 | 83 | } | 83 | } |
115 | 84 | 84 | ||
117 | 85 | events, errCh, stop := s.startClient(c, "DEVC", nil) | 85 | events, errCh, stop := s.StartClient(c, "DEVC", nil) |
118 | 86 | // gettting pending on connect | 86 | // gettting pending on connect |
119 | 87 | c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`) | 87 | c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`) |
120 | 88 | c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`) | 88 | c.Check(NextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`) |
121 | 89 | stop() | 89 | stop() |
123 | 90 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 90 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
124 | 91 | c.Check(len(errCh), Equals, 0) | 91 | c.Check(len(errCh), Equals, 0) |
125 | 92 | } | 92 | } |
126 | 93 | 93 | ||
127 | 94 | func (s *BroadcastAcceptanceSuite) TestBroadcastDistribution2(c *C) { | 94 | func (s *BroadcastAcceptanceSuite) TestBroadcastDistribution2(c *C) { |
130 | 95 | // start 1st clinet | 95 | // start 1st client |
131 | 96 | events1, errCh1, stop1 := s.startClient(c, "DEV1", nil) | 96 | events1, errCh1, stop1 := s.StartClient(c, "DEV1", nil) |
132 | 97 | // start 2nd client | 97 | // start 2nd client |
134 | 98 | events2, errCh2, stop2 := s.startClient(c, "DEV2", nil) | 98 | events2, errCh2, stop2 := s.StartClient(c, "DEV2", nil) |
135 | 99 | // broadcast | 99 | // broadcast |
137 | 100 | got, err := s.postRequest("/broadcast", &api.Broadcast{ | 100 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
138 | 101 | Channel: "system", | 101 | Channel: "system", |
139 | 102 | ExpireOn: future, | 102 | ExpireOn: future, |
140 | 103 | Data: json.RawMessage(`{"n": 42}`), | 103 | Data: json.RawMessage(`{"n": 42}`), |
141 | @@ -108,15 +108,15 @@ | |||
142 | 108 | c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) | 108 | c.Check(NextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
143 | 109 | stop1() | 109 | stop1() |
144 | 110 | stop2() | 110 | stop2() |
147 | 111 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 111 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
148 | 112 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 112 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
149 | 113 | c.Check(len(errCh1), Equals, 0) | 113 | c.Check(len(errCh1), Equals, 0) |
150 | 114 | c.Check(len(errCh2), Equals, 0) | 114 | c.Check(len(errCh2), Equals, 0) |
151 | 115 | } | 115 | } |
152 | 116 | 116 | ||
153 | 117 | func (s *BroadcastAcceptanceSuite) TestBroadcastFilterByLevel(c *C) { | 117 | func (s *BroadcastAcceptanceSuite) TestBroadcastFilterByLevel(c *C) { |
156 | 118 | events, errCh, stop := s.startClient(c, "DEVD", nil) | 118 | events, errCh, stop := s.StartClient(c, "DEVD", nil) |
157 | 119 | got, err := s.postRequest("/broadcast", &api.Broadcast{ | 119 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
158 | 120 | Channel: "system", | 120 | Channel: "system", |
159 | 121 | ExpireOn: future, | 121 | ExpireOn: future, |
160 | 122 | Data: json.RawMessage(`{"b": 1}`), | 122 | Data: json.RawMessage(`{"b": 1}`), |
161 | @@ -125,10 +125,10 @@ | |||
162 | 125 | c.Assert(got, Matches, ".*ok.*") | 125 | c.Assert(got, Matches, ".*ok.*") |
163 | 126 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) | 126 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
164 | 127 | stop() | 127 | stop() |
166 | 128 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 128 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
167 | 129 | c.Check(len(errCh), Equals, 0) | 129 | c.Check(len(errCh), Equals, 0) |
168 | 130 | // another broadcast | 130 | // another broadcast |
170 | 131 | got, err = s.postRequest("/broadcast", &api.Broadcast{ | 131 | got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
171 | 132 | Channel: "system", | 132 | Channel: "system", |
172 | 133 | ExpireOn: future, | 133 | ExpireOn: future, |
173 | 134 | Data: json.RawMessage(`{"b": 2}`), | 134 | Data: json.RawMessage(`{"b": 2}`), |
174 | @@ -136,25 +136,25 @@ | |||
175 | 136 | c.Assert(err, IsNil) | 136 | c.Assert(err, IsNil) |
176 | 137 | c.Assert(got, Matches, ".*ok.*") | 137 | c.Assert(got, Matches, ".*ok.*") |
177 | 138 | // reconnect, provide levels, get only later notification | 138 | // reconnect, provide levels, get only later notification |
179 | 139 | events, errCh, stop = s.startClient(c, "DEVD", map[string]int64{ | 139 | events, errCh, stop = s.StartClient(c, "DEVD", map[string]int64{ |
180 | 140 | protocol.SystemChannelId: 1, | 140 | protocol.SystemChannelId: 1, |
181 | 141 | }) | 141 | }) |
182 | 142 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) | 142 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) |
183 | 143 | stop() | 143 | stop() |
185 | 144 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 144 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
186 | 145 | c.Check(len(errCh), Equals, 0) | 145 | c.Check(len(errCh), Equals, 0) |
187 | 146 | } | 146 | } |
188 | 147 | 147 | ||
189 | 148 | func (s *BroadcastAcceptanceSuite) TestBroadcastTooAhead(c *C) { | 148 | func (s *BroadcastAcceptanceSuite) TestBroadcastTooAhead(c *C) { |
190 | 149 | // send broadcasts that will be pending | 149 | // send broadcasts that will be pending |
192 | 150 | got, err := s.postRequest("/broadcast", &api.Broadcast{ | 150 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
193 | 151 | Channel: "system", | 151 | Channel: "system", |
194 | 152 | ExpireOn: future, | 152 | ExpireOn: future, |
195 | 153 | Data: json.RawMessage(`{"b": 1}`), | 153 | Data: json.RawMessage(`{"b": 1}`), |
196 | 154 | }) | 154 | }) |
197 | 155 | c.Assert(err, IsNil) | 155 | c.Assert(err, IsNil) |
198 | 156 | c.Assert(got, Matches, ".*ok.*") | 156 | c.Assert(got, Matches, ".*ok.*") |
200 | 157 | got, err = s.postRequest("/broadcast", &api.Broadcast{ | 157 | got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
201 | 158 | Channel: "system", | 158 | Channel: "system", |
202 | 159 | ExpireOn: future, | 159 | ExpireOn: future, |
203 | 160 | Data: json.RawMessage(`{"b": 2}`), | 160 | Data: json.RawMessage(`{"b": 2}`), |
204 | @@ -162,38 +162,38 @@ | |||
205 | 162 | c.Assert(err, IsNil) | 162 | c.Assert(err, IsNil) |
206 | 163 | c.Assert(got, Matches, ".*ok.*") | 163 | c.Assert(got, Matches, ".*ok.*") |
207 | 164 | 164 | ||
209 | 165 | events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{ | 165 | events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{ |
210 | 166 | protocol.SystemChannelId: 10, | 166 | protocol.SystemChannelId: 10, |
211 | 167 | }) | 167 | }) |
212 | 168 | // gettting last one pending on connect | 168 | // gettting last one pending on connect |
213 | 169 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) | 169 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) |
214 | 170 | stop() | 170 | stop() |
216 | 171 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 171 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
217 | 172 | c.Check(len(errCh), Equals, 0) | 172 | c.Check(len(errCh), Equals, 0) |
218 | 173 | } | 173 | } |
219 | 174 | 174 | ||
220 | 175 | func (s *BroadcastAcceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) { | 175 | func (s *BroadcastAcceptanceSuite) TestBroadcastTooAheadOnEmpty(c *C) { |
221 | 176 | // nothing there | 176 | // nothing there |
223 | 177 | events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{ | 177 | events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{ |
224 | 178 | protocol.SystemChannelId: 10, | 178 | protocol.SystemChannelId: 10, |
225 | 179 | }) | 179 | }) |
226 | 180 | // gettting empty pending on connect | 180 | // gettting empty pending on connect |
227 | 181 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`) | 181 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:0 payloads:null`) |
228 | 182 | stop() | 182 | stop() |
230 | 183 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 183 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
231 | 184 | c.Check(len(errCh), Equals, 0) | 184 | c.Check(len(errCh), Equals, 0) |
232 | 185 | } | 185 | } |
233 | 186 | 186 | ||
234 | 187 | func (s *BroadcastAcceptanceSuite) TestBroadcastWayBehind(c *C) { | 187 | func (s *BroadcastAcceptanceSuite) TestBroadcastWayBehind(c *C) { |
235 | 188 | // send broadcasts that will be pending | 188 | // send broadcasts that will be pending |
237 | 189 | got, err := s.postRequest("/broadcast", &api.Broadcast{ | 189 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
238 | 190 | Channel: "system", | 190 | Channel: "system", |
239 | 191 | ExpireOn: future, | 191 | ExpireOn: future, |
240 | 192 | Data: json.RawMessage(`{"b": 1}`), | 192 | Data: json.RawMessage(`{"b": 1}`), |
241 | 193 | }) | 193 | }) |
242 | 194 | c.Assert(err, IsNil) | 194 | c.Assert(err, IsNil) |
243 | 195 | c.Assert(got, Matches, ".*ok.*") | 195 | c.Assert(got, Matches, ".*ok.*") |
245 | 196 | got, err = s.postRequest("/broadcast", &api.Broadcast{ | 196 | got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
246 | 197 | Channel: "system", | 197 | Channel: "system", |
247 | 198 | ExpireOn: future, | 198 | ExpireOn: future, |
248 | 199 | Data: json.RawMessage(`{"b": 2}`), | 199 | Data: json.RawMessage(`{"b": 2}`), |
249 | @@ -201,26 +201,26 @@ | |||
250 | 201 | c.Assert(err, IsNil) | 201 | c.Assert(err, IsNil) |
251 | 202 | c.Assert(got, Matches, ".*ok.*") | 202 | c.Assert(got, Matches, ".*ok.*") |
252 | 203 | 203 | ||
254 | 204 | events, errCh, stop := s.startClient(c, "DEVB", map[string]int64{ | 204 | events, errCh, stop := s.StartClient(c, "DEVB", map[string]int64{ |
255 | 205 | protocol.SystemChannelId: -10, | 205 | protocol.SystemChannelId: -10, |
256 | 206 | }) | 206 | }) |
257 | 207 | // gettting pending on connect | 207 | // gettting pending on connect |
258 | 208 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`) | 208 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1},{"b":2}]`) |
259 | 209 | stop() | 209 | stop() |
261 | 210 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 210 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
262 | 211 | c.Check(len(errCh), Equals, 0) | 211 | c.Check(len(errCh), Equals, 0) |
263 | 212 | } | 212 | } |
264 | 213 | 213 | ||
265 | 214 | func (s *BroadcastAcceptanceSuite) TestBroadcastExpiration(c *C) { | 214 | func (s *BroadcastAcceptanceSuite) TestBroadcastExpiration(c *C) { |
266 | 215 | // send broadcast that will be pending, and one that will expire | 215 | // send broadcast that will be pending, and one that will expire |
268 | 216 | got, err := s.postRequest("/broadcast", &api.Broadcast{ | 216 | got, err := s.PostRequest("/broadcast", &api.Broadcast{ |
269 | 217 | Channel: "system", | 217 | Channel: "system", |
270 | 218 | ExpireOn: future, | 218 | ExpireOn: future, |
271 | 219 | Data: json.RawMessage(`{"b": 1}`), | 219 | Data: json.RawMessage(`{"b": 1}`), |
272 | 220 | }) | 220 | }) |
273 | 221 | c.Assert(err, IsNil) | 221 | c.Assert(err, IsNil) |
274 | 222 | c.Assert(got, Matches, ".*ok.*") | 222 | c.Assert(got, Matches, ".*ok.*") |
276 | 223 | got, err = s.postRequest("/broadcast", &api.Broadcast{ | 223 | got, err = s.PostRequest("/broadcast", &api.Broadcast{ |
277 | 224 | Channel: "system", | 224 | Channel: "system", |
278 | 225 | ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339), | 225 | ExpireOn: time.Now().Add(1 * time.Second).Format(time.RFC3339), |
279 | 226 | Data: json.RawMessage(`{"b": 2}`), | 226 | Data: json.RawMessage(`{"b": 2}`), |
280 | @@ -231,10 +231,10 @@ | |||
281 | 231 | time.Sleep(2 * time.Second) | 231 | time.Sleep(2 * time.Second) |
282 | 232 | // second broadcast is expired | 232 | // second broadcast is expired |
283 | 233 | 233 | ||
285 | 234 | events, errCh, stop := s.startClient(c, "DEVB", nil) | 234 | events, errCh, stop := s.StartClient(c, "DEVB", nil) |
286 | 235 | // gettting pending on connect | 235 | // gettting pending on connect |
287 | 236 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`) | 236 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":1}]`) |
288 | 237 | stop() | 237 | stop() |
290 | 238 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) | 238 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
291 | 239 | c.Check(len(errCh), Equals, 0) | 239 | c.Check(len(errCh), Equals, 0) |
292 | 240 | } | 240 | } |
293 | 241 | 241 | ||
294 | === modified file 'server/acceptance/suites/helpers.go' | |||
295 | --- server/acceptance/suites/helpers.go 2014-02-11 20:36:32 +0000 | |||
296 | +++ server/acceptance/suites/helpers.go 2014-02-26 19:50:48 +0000 | |||
297 | @@ -92,12 +92,15 @@ | |||
298 | 92 | c.Fatal(err) | 92 | c.Fatal(err) |
299 | 93 | } | 93 | } |
300 | 94 | bufErr := bufio.NewReaderSize(stderr, 5000) | 94 | bufErr := bufio.NewReaderSize(stderr, 5000) |
302 | 95 | getLineInfo := func() (string, error) { | 95 | getLineInfo := func(full bool) (string, error) { |
303 | 96 | for { | 96 | for { |
304 | 97 | line, err := bufErr.ReadString('\n') | 97 | line, err := bufErr.ReadString('\n') |
305 | 98 | if err != nil { | 98 | if err != nil { |
306 | 99 | return "", err | 99 | return "", err |
307 | 100 | } | 100 | } |
308 | 101 | if full { | ||
309 | 102 | return strings.TrimRight(line, "\n"), nil | ||
310 | 103 | } | ||
311 | 101 | extracted := rxLineInfo.FindStringSubmatch(line) | 104 | extracted := rxLineInfo.FindStringSubmatch(line) |
312 | 102 | if extracted == nil { | 105 | if extracted == nil { |
313 | 103 | return "", fmt.Errorf("unexpected line: %#v", line) | 106 | return "", fmt.Errorf("unexpected line: %#v", line) |
314 | @@ -108,13 +111,19 @@ | |||
315 | 108 | } | 111 | } |
316 | 109 | logs := make(chan string, 10) | 112 | logs := make(chan string, 10) |
317 | 110 | go func() { | 113 | go func() { |
318 | 114 | paniced := false | ||
319 | 111 | for { | 115 | for { |
321 | 112 | info, err := getLineInfo() | 116 | info, err := getLineInfo(paniced) |
322 | 113 | if err != nil { | 117 | if err != nil { |
323 | 114 | logs <- fmt.Sprintf("%s capture: %v", cmdName, err) | 118 | logs <- fmt.Sprintf("%s capture: %v", cmdName, err) |
324 | 115 | close(logs) | 119 | close(logs) |
325 | 116 | return | 120 | return |
326 | 117 | } | 121 | } |
327 | 122 | if paniced || strings.HasPrefix(info, "ERROR(PANIC") { | ||
328 | 123 | paniced = true | ||
329 | 124 | c.Log(info) | ||
330 | 125 | continue | ||
331 | 126 | } | ||
332 | 118 | logs <- info | 127 | logs <- info |
333 | 119 | } | 128 | } |
334 | 120 | }() | 129 | }() |
335 | 121 | 130 | ||
336 | === modified file 'server/acceptance/suites/pingpong.go' | |||
337 | --- server/acceptance/suites/pingpong.go 2014-02-11 20:36:32 +0000 | |||
338 | +++ server/acceptance/suites/pingpong.go 2014-02-26 19:50:48 +0000 | |||
339 | @@ -34,7 +34,7 @@ | |||
340 | 34 | func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) { | 34 | func (s *PingPongAcceptanceSuite) TestConnectPingPing(c *C) { |
341 | 35 | errCh := make(chan error, 1) | 35 | errCh := make(chan error, 1) |
342 | 36 | events := make(chan string, 10) | 36 | events := make(chan string, 10) |
344 | 37 | sess := testClientSession(s.serverAddr, "DEVA", true) | 37 | sess := testClientSession(s.ServerAddr, "DEVA", true) |
345 | 38 | err := sess.Dial() | 38 | err := sess.Dial() |
346 | 39 | c.Assert(err, IsNil) | 39 | c.Assert(err, IsNil) |
347 | 40 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { | 40 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
348 | @@ -50,8 +50,8 @@ | |||
349 | 50 | errCh <- sess.Run(events) | 50 | errCh <- sess.Run(events) |
350 | 51 | }() | 51 | }() |
351 | 52 | connectCli := NextEvent(events, errCh) | 52 | connectCli := NextEvent(events, errCh) |
354 | 53 | connectSrv := NextEvent(s.serverEvents, nil) | 53 | connectSrv := NextEvent(s.ServerEvents, nil) |
355 | 54 | registeredSrv := NextEvent(s.serverEvents, nil) | 54 | registeredSrv := NextEvent(s.ServerEvents, nil) |
356 | 55 | tconnect := time.Now() | 55 | tconnect := time.Now() |
357 | 56 | c.Assert(connectSrv, Matches, ".*session.* connected .*") | 56 | c.Assert(connectSrv, Matches, ".*session.* connected .*") |
358 | 57 | c.Assert(registeredSrv, Matches, ".*session.* registered DEVA") | 57 | c.Assert(registeredSrv, Matches, ".*session.* registered DEVA") |
359 | @@ -61,14 +61,14 @@ | |||
360 | 61 | c.Check(elapsedOfPing >= 1.0, Equals, true) | 61 | c.Check(elapsedOfPing >= 1.0, Equals, true) |
361 | 62 | c.Check(elapsedOfPing < 1.05, Equals, true) | 62 | c.Check(elapsedOfPing < 1.05, Equals, true) |
362 | 63 | c.Assert(NextEvent(events, errCh), Equals, "Ping") | 63 | c.Assert(NextEvent(events, errCh), Equals, "Ping") |
364 | 64 | c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF") | 64 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* ended with: EOF") |
365 | 65 | c.Check(len(errCh), Equals, 0) | 65 | c.Check(len(errCh), Equals, 0) |
366 | 66 | } | 66 | } |
367 | 67 | 67 | ||
368 | 68 | func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) { | 68 | func (s *PingPongAcceptanceSuite) TestConnectPingNeverPong(c *C) { |
369 | 69 | errCh := make(chan error, 1) | 69 | errCh := make(chan error, 1) |
370 | 70 | events := make(chan string, 10) | 70 | events := make(chan string, 10) |
372 | 71 | sess := testClientSession(s.serverAddr, "DEVB", true) | 71 | sess := testClientSession(s.ServerAddr, "DEVB", true) |
373 | 72 | err := sess.Dial() | 72 | err := sess.Dial() |
374 | 73 | c.Assert(err, IsNil) | 73 | c.Assert(err, IsNil) |
375 | 74 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { | 74 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
376 | @@ -85,9 +85,9 @@ | |||
377 | 85 | errCh <- sess.Run(events) | 85 | errCh <- sess.Run(events) |
378 | 86 | }() | 86 | }() |
379 | 87 | c.Assert(NextEvent(events, errCh), Matches, "connected .*") | 87 | c.Assert(NextEvent(events, errCh), Matches, "connected .*") |
382 | 88 | c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") | 88 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* connected .*") |
383 | 89 | c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*") | 89 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, ".*session.* registered .*") |
384 | 90 | c.Assert(NextEvent(events, errCh), Equals, "Ping") | 90 | c.Assert(NextEvent(events, errCh), Equals, "Ping") |
386 | 91 | c.Assert(NextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`) | 91 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*timeout`) |
387 | 92 | c.Check(len(errCh), Equals, 0) | 92 | c.Check(len(errCh), Equals, 0) |
388 | 93 | } | 93 | } |
389 | 94 | 94 | ||
390 | === modified file 'server/acceptance/suites/suite.go' | |||
391 | --- server/acceptance/suites/suite.go 2014-02-11 20:36:32 +0000 | |||
392 | +++ server/acceptance/suites/suite.go 2014-02-26 19:50:48 +0000 | |||
393 | @@ -34,43 +34,79 @@ | |||
394 | 34 | helpers "launchpad.net/ubuntu-push/testing" | 34 | helpers "launchpad.net/ubuntu-push/testing" |
395 | 35 | ) | 35 | ) |
396 | 36 | 36 | ||
397 | 37 | // ServerHandle holds the information to attach a client to the test server. | ||
398 | 38 | type ServerHandle struct { | ||
399 | 39 | ServerAddr string | ||
400 | 40 | ServerHTTPAddr string | ||
401 | 41 | ServerEvents <-chan string | ||
402 | 42 | } | ||
403 | 43 | |||
404 | 44 | // Start a client. | ||
405 | 45 | func (h *ServerHandle) StartClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) { | ||
406 | 46 | errCh := make(chan error, 1) | ||
407 | 47 | cliEvents := make(chan string, 10) | ||
408 | 48 | sess := testClientSession(h.ServerAddr, devId, false) | ||
409 | 49 | sess.Levels = levels | ||
410 | 50 | err := sess.Dial() | ||
411 | 51 | c.Assert(err, IsNil) | ||
412 | 52 | clientShutdown := make(chan bool, 1) // abused as an atomic flag | ||
413 | 53 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { | ||
414 | 54 | // read after ack | ||
415 | 55 | if op == "read" && len(clientShutdown) > 0 { | ||
416 | 56 | // exit the sess.Run() goroutine, client will close | ||
417 | 57 | runtime.Goexit() | ||
418 | 58 | } | ||
419 | 59 | return false, 0, nil | ||
420 | 60 | } | ||
421 | 61 | sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} | ||
422 | 62 | go func() { | ||
423 | 63 | errCh <- sess.Run(cliEvents) | ||
424 | 64 | }() | ||
425 | 65 | c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*") | ||
426 | 66 | c.Assert(NextEvent(h.ServerEvents, nil), Matches, ".*session.* connected .*") | ||
427 | 67 | c.Assert(NextEvent(h.ServerEvents, nil), Matches, ".*session.* registered "+devId) | ||
428 | 68 | return cliEvents, errCh, func() { clientShutdown <- true } | ||
429 | 69 | } | ||
430 | 70 | |||
431 | 37 | // AcceptanceSuite has the basic functionality of the acceptance suites. | 71 | // AcceptanceSuite has the basic functionality of the acceptance suites. |
432 | 38 | type AcceptanceSuite struct { | 72 | type AcceptanceSuite struct { |
433 | 39 | // hook to start the server(s) | 73 | // hook to start the server(s) |
441 | 40 | StartServer func(c *C) (logs <-chan string, kill func(), serverAddr, apiURL string) | 74 | StartServer func(c *C, s *AcceptanceSuite, handle *ServerHandle) |
442 | 41 | // running bits | 75 | // populated by StartServer |
443 | 42 | serverKill func() | 76 | ServerHandle |
444 | 43 | serverAddr string | 77 | ServerAPIURL string |
445 | 44 | serverAPIURL string | 78 | // KillGroup should be populated by StartServer with functions |
446 | 45 | serverEvents <-chan string | 79 | // to kill the server process |
447 | 46 | httpClient *http.Client | 80 | KillGroup map[string]func() |
448 | 81 | // other state | ||
449 | 82 | httpClient *http.Client | ||
450 | 47 | } | 83 | } |
451 | 48 | 84 | ||
452 | 49 | // Start a new server for each test. | 85 | // Start a new server for each test. |
453 | 50 | func (s *AcceptanceSuite) SetUpTest(c *C) { | 86 | func (s *AcceptanceSuite) SetUpTest(c *C) { |
459 | 51 | logs, kill, addr, url := s.StartServer(c) | 87 | s.KillGroup = make(map[string]func()) |
460 | 52 | s.serverEvents = logs | 88 | s.StartServer(c, s, &s.ServerHandle) |
461 | 53 | s.serverKill = kill | 89 | c.Assert(s.ServerHandle.ServerEvents, NotNil) |
462 | 54 | s.serverAddr = addr | 90 | c.Assert(s.ServerHandle.ServerAddr, Not(Equals), "") |
463 | 55 | s.serverAPIURL = url | 91 | c.Assert(s.ServerAPIURL, Not(Equals), "") |
464 | 56 | s.httpClient = &http.Client{} | 92 | s.httpClient = &http.Client{} |
465 | 57 | } | 93 | } |
466 | 58 | 94 | ||
467 | 59 | func (s *AcceptanceSuite) TearDownTest(c *C) { | 95 | func (s *AcceptanceSuite) TearDownTest(c *C) { |
470 | 60 | if s.serverKill != nil { | 96 | for _, f := range s.KillGroup { |
471 | 61 | s.serverKill() | 97 | f() |
472 | 62 | } | 98 | } |
473 | 63 | } | 99 | } |
474 | 64 | 100 | ||
477 | 65 | // Post a request. | 101 | // Post a API request. |
478 | 66 | func (s *AcceptanceSuite) postRequest(path string, message interface{}) (string, error) { | 102 | func (s *AcceptanceSuite) PostRequest(path string, message interface{}) (string, error) { |
479 | 67 | packedMessage, err := json.Marshal(message) | 103 | packedMessage, err := json.Marshal(message) |
480 | 68 | if err != nil { | 104 | if err != nil { |
481 | 69 | panic(err) | 105 | panic(err) |
482 | 70 | } | 106 | } |
483 | 71 | reader := bytes.NewReader(packedMessage) | 107 | reader := bytes.NewReader(packedMessage) |
484 | 72 | 108 | ||
486 | 73 | url := s.serverAPIURL + path | 109 | url := s.ServerAPIURL + path |
487 | 74 | request, _ := http.NewRequest("POST", url, reader) | 110 | request, _ := http.NewRequest("POST", url, reader) |
488 | 75 | request.ContentLength = int64(reader.Len()) | 111 | request.ContentLength = int64(reader.Len()) |
489 | 76 | request.Header.Set("Content-Type", "application/json") | 112 | request.Header.Set("Content-Type", "application/json") |
490 | @@ -141,30 +177,3 @@ | |||
491 | 141 | } | 177 | } |
492 | 142 | return | 178 | return |
493 | 143 | } | 179 | } |
494 | 144 | |||
495 | 145 | // Start a client. | ||
496 | 146 | func (s *AcceptanceSuite) startClient(c *C, devId string, levels map[string]int64) (events <-chan string, errorCh <-chan error, stop func()) { | ||
497 | 147 | errCh := make(chan error, 1) | ||
498 | 148 | cliEvents := make(chan string, 10) | ||
499 | 149 | sess := testClientSession(s.serverAddr, devId, false) | ||
500 | 150 | sess.Levels = levels | ||
501 | 151 | err := sess.Dial() | ||
502 | 152 | c.Assert(err, IsNil) | ||
503 | 153 | clientShutdown := make(chan bool, 1) // abused as an atomic flag | ||
504 | 154 | intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { | ||
505 | 155 | // read after ack | ||
506 | 156 | if op == "read" && len(clientShutdown) > 0 { | ||
507 | 157 | // exit the sess.Run() goroutine, client will close | ||
508 | 158 | runtime.Goexit() | ||
509 | 159 | } | ||
510 | 160 | return false, 0, nil | ||
511 | 161 | } | ||
512 | 162 | sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} | ||
513 | 163 | go func() { | ||
514 | 164 | errCh <- sess.Run(cliEvents) | ||
515 | 165 | }() | ||
516 | 166 | c.Assert(NextEvent(cliEvents, errCh), Matches, "connected .*") | ||
517 | 167 | c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") | ||
518 | 168 | c.Assert(NextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId) | ||
519 | 169 | return cliEvents, errCh, func() { clientShutdown <- true } | ||
520 | 170 | } | ||
521 | 171 | 180 | ||
522 | === modified file 'server/broker/exchanges.go' | |||
523 | --- server/broker/exchanges.go 2014-02-10 23:19:08 +0000 | |||
524 | +++ server/broker/exchanges.go 2014-02-26 19:50:48 +0000 | |||
525 | @@ -61,6 +61,7 @@ | |||
526 | 61 | // Prepare session for a BROADCAST. | 61 | // Prepare session for a BROADCAST. |
527 | 62 | func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { | 62 | func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
528 | 63 | scratchArea := sess.ExchangeScratchArea() | 63 | scratchArea := sess.ExchangeScratchArea() |
529 | 64 | scratchArea.broadcastMsg.Reset() | ||
530 | 64 | scratchArea.broadcastMsg.Type = "broadcast" | 65 | scratchArea.broadcastMsg.Type = "broadcast" |
531 | 65 | clientLevel := sess.Levels()[sbe.ChanId] | 66 | clientLevel := sess.Levels()[sbe.ChanId] |
532 | 66 | payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads) | 67 | payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads) |
533 | 67 | 68 | ||
534 | === modified file 'server/broker/exchanges_test.go' | |||
535 | --- server/broker/exchanges_test.go 2014-02-10 23:19:08 +0000 | |||
536 | +++ server/broker/exchanges_test.go 2014-02-26 19:50:48 +0000 | |||
537 | @@ -18,6 +18,8 @@ | |||
538 | 18 | 18 | ||
539 | 19 | import ( | 19 | import ( |
540 | 20 | "encoding/json" | 20 | "encoding/json" |
541 | 21 | "fmt" | ||
542 | 22 | "strings" | ||
543 | 21 | stdtesting "testing" | 23 | stdtesting "testing" |
544 | 22 | 24 | ||
545 | 23 | . "launchpad.net/gocheck" | 25 | . "launchpad.net/gocheck" |
546 | @@ -45,19 +47,60 @@ | |||
547 | 45 | json.RawMessage(`{"a":"y"}`), | 47 | json.RawMessage(`{"a":"y"}`), |
548 | 46 | }, | 48 | }, |
549 | 47 | } | 49 | } |
551 | 48 | inMsg, outMsg, err := exchg.Prepare(sess) | 50 | outMsg, inMsg, err := exchg.Prepare(sess) |
552 | 49 | c.Assert(err, IsNil) | 51 | c.Assert(err, IsNil) |
553 | 50 | // check | 52 | // check |
555 | 51 | marshalled, err := json.Marshal(inMsg) | 53 | marshalled, err := json.Marshal(outMsg) |
556 | 52 | c.Assert(err, IsNil) | 54 | c.Assert(err, IsNil) |
557 | 53 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) | 55 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) |
559 | 54 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) | 56 | err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
560 | 55 | c.Assert(err, IsNil) | 57 | c.Assert(err, IsNil) |
561 | 56 | err = exchg.Acked(sess, true) | 58 | err = exchg.Acked(sess, true) |
562 | 57 | c.Assert(err, IsNil) | 59 | c.Assert(err, IsNil) |
563 | 58 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3)) | 60 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3)) |
564 | 59 | } | 61 | } |
565 | 60 | 62 | ||
566 | 63 | func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) { | ||
567 | 64 | sess := &testing.TestBrokerSession{ | ||
568 | 65 | LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), | ||
569 | 66 | } | ||
570 | 67 | payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) | ||
571 | 68 | needsSplitting := make([]json.RawMessage, 32) | ||
572 | 69 | for i := 0; i < 32; i++ { | ||
573 | 70 | needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i)) | ||
574 | 71 | } | ||
575 | 72 | |||
576 | 73 | topLevel := int64(len(needsSplitting)) | ||
577 | 74 | exchg := &broker.BroadcastExchange{ | ||
578 | 75 | ChanId: store.SystemInternalChannelId, | ||
579 | 76 | TopLevel: topLevel, | ||
580 | 77 | NotificationPayloads: needsSplitting, | ||
581 | 78 | } | ||
582 | 79 | outMsg, _, err := exchg.Prepare(sess) | ||
583 | 80 | c.Assert(err, IsNil) | ||
584 | 81 | parts := 0 | ||
585 | 82 | for { | ||
586 | 83 | done := outMsg.Split() | ||
587 | 84 | parts++ | ||
588 | 85 | if done { | ||
589 | 86 | break | ||
590 | 87 | } | ||
591 | 88 | } | ||
592 | 89 | c.Assert(parts, Equals, 2) | ||
593 | 90 | exchg = &broker.BroadcastExchange{ | ||
594 | 91 | ChanId: store.SystemInternalChannelId, | ||
595 | 92 | TopLevel: topLevel + 2, | ||
596 | 93 | NotificationPayloads: []json.RawMessage{ | ||
597 | 94 | json.RawMessage(`{"a":"x"}`), | ||
598 | 95 | json.RawMessage(`{"a":"y"}`), | ||
599 | 96 | }, | ||
600 | 97 | } | ||
601 | 98 | outMsg, _, err = exchg.Prepare(sess) | ||
602 | 99 | c.Assert(err, IsNil) | ||
603 | 100 | done := outMsg.Split() // shouldn't panic | ||
604 | 101 | c.Check(done, Equals, true) | ||
605 | 102 | } | ||
606 | 103 | |||
607 | 61 | func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) { | 104 | func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) { |
608 | 62 | sess := &testing.TestBrokerSession{ | 105 | sess := &testing.TestBrokerSession{ |
609 | 63 | LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), | 106 | LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
610 | @@ -69,13 +112,13 @@ | |||
611 | 69 | json.RawMessage(`{"a":"y"}`), | 112 | json.RawMessage(`{"a":"y"}`), |
612 | 70 | }, | 113 | }, |
613 | 71 | } | 114 | } |
615 | 72 | inMsg, outMsg, err := exchg.Prepare(sess) | 115 | outMsg, inMsg, err := exchg.Prepare(sess) |
616 | 73 | c.Assert(err, IsNil) | 116 | c.Assert(err, IsNil) |
617 | 74 | // check | 117 | // check |
619 | 75 | marshalled, err := json.Marshal(inMsg) | 118 | marshalled, err := json.Marshal(outMsg) |
620 | 76 | c.Assert(err, IsNil) | 119 | c.Assert(err, IsNil) |
621 | 77 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) | 120 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
623 | 78 | err = json.Unmarshal([]byte(`{}`), outMsg) | 121 | err = json.Unmarshal([]byte(`{}`), inMsg) |
624 | 79 | c.Assert(err, IsNil) | 122 | c.Assert(err, IsNil) |
625 | 80 | err = exchg.Acked(sess, true) | 123 | err = exchg.Acked(sess, true) |
626 | 81 | c.Assert(err, Not(IsNil)) | 124 | c.Assert(err, Not(IsNil)) |
627 | @@ -96,13 +139,13 @@ | |||
628 | 96 | json.RawMessage(`{"a":"y"}`), | 139 | json.RawMessage(`{"a":"y"}`), |
629 | 97 | }, | 140 | }, |
630 | 98 | } | 141 | } |
632 | 99 | inMsg, outMsg, err := exchg.Prepare(sess) | 142 | outMsg, inMsg, err := exchg.Prepare(sess) |
633 | 100 | c.Assert(err, IsNil) | 143 | c.Assert(err, IsNil) |
634 | 101 | // check | 144 | // check |
636 | 102 | marshalled, err := json.Marshal(inMsg) | 145 | marshalled, err := json.Marshal(outMsg) |
637 | 103 | c.Assert(err, IsNil) | 146 | c.Assert(err, IsNil) |
638 | 104 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) | 147 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
640 | 105 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) | 148 | err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg) |
641 | 106 | c.Assert(err, IsNil) | 149 | c.Assert(err, IsNil) |
642 | 107 | err = exchg.Acked(sess, true) | 150 | err = exchg.Acked(sess, true) |
643 | 108 | c.Assert(err, IsNil) | 151 | c.Assert(err, IsNil) |