Merge lp:~pedronis/ubuntu-push/broadcast-expiration into lp:ubuntu-push
- broadcast-expiration
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 66 |
Merged at revision: | 61 |
Proposed branch: | lp:~pedronis/ubuntu-push/broadcast-expiration |
Merge into: | lp:ubuntu-push |
Diff against target: |
620 lines (+205/-76) 8 files modified
server/acceptance/acceptance_test.go (+32/-20) server/api/handlers.go (+29/-10) server/api/handlers_test.go (+70/-28) server/broker/simple/simple_test.go (+5/-2) server/broker/testsuite/suite.go (+4/-2) server/store/inmemory.go (+37/-9) server/store/inmemory_test.go (+25/-3) server/store/store.go (+3/-2) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/broadcast-expiration |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email:
|
Commit message
add support for expiration of broadcasts with trivial implementation in the in-memory pending store
Description of the change
add support for expiration of broadcasts with trivial implementation in the in-memory pending store,
acceptance test coming in a later branch
To post a comment you must log in.
- 61. By Samuele Pedroni
-
fix comment
- 62. By Samuele Pedroni
-
formatting
- 63. By Samuele Pedroni
-
fix acceptance test
- 64. By Samuele Pedroni
-
merge reorg some imports
- 65. By Samuele Pedroni
-
Merged reorg-imports into expiration-
flex-acceptance . - 66. By Samuele Pedroni
-
Merged fix-tweak-tests into expiration-
flex-acceptance .
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
John Lenton (chipaca) wrote : | # |
Stupid question, it's right there. Sorry!
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'server/acceptance/acceptance_test.go' |
2 | --- server/acceptance/acceptance_test.go 2014-02-10 23:38:19 +0000 |
3 | +++ server/acceptance/acceptance_test.go 2014-02-10 23:46:49 +0000 |
4 | @@ -301,6 +301,8 @@ |
5 | |
6 | // Tests about broadcast |
7 | |
8 | +var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339) |
9 | + |
10 | func (s *acceptanceSuite) postRequest(path string, message interface{}) (string, error) { |
11 | packedMessage, err := json.Marshal(message) |
12 | if err != nil { |
13 | @@ -351,8 +353,9 @@ |
14 | } |
15 | events, errCh := s.startClient(c, "DEVB", intercept, nil) |
16 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
17 | - Channel: "system", |
18 | - Data: json.RawMessage(`{"n": 42}`), |
19 | + Channel: "system", |
20 | + ExpireOn: future, |
21 | + Data: json.RawMessage(`{"n": 42}`), |
22 | }) |
23 | c.Assert(err, IsNil) |
24 | c.Assert(got, Matches, ".*ok.*") |
25 | @@ -365,8 +368,9 @@ |
26 | func (s *acceptanceSuite) TestBroadcastPending(c *C) { |
27 | // send broadcast that will be pending |
28 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
29 | - Channel: "system", |
30 | - Data: json.RawMessage(`{"b": 1}`), |
31 | + Channel: "system", |
32 | + ExpireOn: future, |
33 | + Data: json.RawMessage(`{"b": 1}`), |
34 | }) |
35 | c.Assert(err, IsNil) |
36 | c.Assert(got, Matches, ".*ok.*") |
37 | @@ -393,8 +397,9 @@ |
38 | payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
39 | for i := 0; i < 32; i++ { |
40 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
41 | - Channel: "system", |
42 | - Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)), |
43 | + Channel: "system", |
44 | + ExpireOn: future, |
45 | + Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)), |
46 | }) |
47 | c.Assert(err, IsNil) |
48 | c.Assert(got, Matches, ".*ok.*") |
49 | @@ -434,8 +439,9 @@ |
50 | events2, errCh2 := s.startClient(c, "DEV2", intercept, nil) |
51 | // broadcast |
52 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
53 | - Channel: "system", |
54 | - Data: json.RawMessage(`{"n": 42}`), |
55 | + Channel: "system", |
56 | + ExpireOn: future, |
57 | + Data: json.RawMessage(`{"n": 42}`), |
58 | }) |
59 | c.Assert(err, IsNil) |
60 | c.Assert(got, Matches, ".*ok.*") |
61 | @@ -460,8 +466,9 @@ |
62 | } |
63 | events, errCh := s.startClient(c, "DEVD", intercept, nil) |
64 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
65 | - Channel: "system", |
66 | - Data: json.RawMessage(`{"b": 1}`), |
67 | + Channel: "system", |
68 | + ExpireOn: future, |
69 | + Data: json.RawMessage(`{"b": 1}`), |
70 | }) |
71 | c.Assert(err, IsNil) |
72 | c.Assert(got, Matches, ".*ok.*") |
73 | @@ -471,8 +478,9 @@ |
74 | c.Check(len(errCh), Equals, 0) |
75 | // another broadcast |
76 | got, err = s.postRequest("/broadcast", &api.Broadcast{ |
77 | - Channel: "system", |
78 | - Data: json.RawMessage(`{"b": 2}`), |
79 | + Channel: "system", |
80 | + ExpireOn: future, |
81 | + Data: json.RawMessage(`{"b": 2}`), |
82 | }) |
83 | c.Assert(err, IsNil) |
84 | c.Assert(got, Matches, ".*ok.*") |
85 | @@ -490,14 +498,16 @@ |
86 | func (s *acceptanceSuite) TestBroadcastTooAhead(c *C) { |
87 | // send broadcasts that will be pending |
88 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
89 | - Channel: "system", |
90 | - Data: json.RawMessage(`{"b": 1}`), |
91 | + Channel: "system", |
92 | + ExpireOn: future, |
93 | + Data: json.RawMessage(`{"b": 1}`), |
94 | }) |
95 | c.Assert(err, IsNil) |
96 | c.Assert(got, Matches, ".*ok.*") |
97 | got, err = s.postRequest("/broadcast", &api.Broadcast{ |
98 | - Channel: "system", |
99 | - Data: json.RawMessage(`{"b": 2}`), |
100 | + Channel: "system", |
101 | + ExpireOn: future, |
102 | + Data: json.RawMessage(`{"b": 2}`), |
103 | }) |
104 | c.Assert(err, IsNil) |
105 | c.Assert(got, Matches, ".*ok.*") |
106 | @@ -545,14 +555,16 @@ |
107 | func (s *acceptanceSuite) TestBroadcastWayBehind(c *C) { |
108 | // send broadcasts that will be pending |
109 | got, err := s.postRequest("/broadcast", &api.Broadcast{ |
110 | - Channel: "system", |
111 | - Data: json.RawMessage(`{"b": 1}`), |
112 | + Channel: "system", |
113 | + ExpireOn: future, |
114 | + Data: json.RawMessage(`{"b": 1}`), |
115 | }) |
116 | c.Assert(err, IsNil) |
117 | c.Assert(got, Matches, ".*ok.*") |
118 | got, err = s.postRequest("/broadcast", &api.Broadcast{ |
119 | - Channel: "system", |
120 | - Data: json.RawMessage(`{"b": 2}`), |
121 | + Channel: "system", |
122 | + ExpireOn: future, |
123 | + Data: json.RawMessage(`{"b": 2}`), |
124 | }) |
125 | c.Assert(err, IsNil) |
126 | c.Assert(got, Matches, ".*ok.*") |
127 | |
128 | === modified file 'server/api/handlers.go' |
129 | --- server/api/handlers.go 2014-02-10 23:19:08 +0000 |
130 | +++ server/api/handlers.go 2014-02-10 23:46:49 +0000 |
131 | @@ -23,6 +23,7 @@ |
132 | "fmt" |
133 | "io" |
134 | "net/http" |
135 | + "time" |
136 | |
137 | "launchpad.net/ubuntu-push/logger" |
138 | "launchpad.net/ubuntu-push/server/broker" |
139 | @@ -97,6 +98,16 @@ |
140 | invalidRequest, |
141 | "Missing data field", |
142 | } |
143 | + ErrInvalidExpiration = &APIError{ |
144 | + http.StatusBadRequest, |
145 | + invalidRequest, |
146 | + "Invalid expiration date", |
147 | + } |
148 | + ErrPastExpiration = &APIError{ |
149 | + http.StatusBadRequest, |
150 | + invalidRequest, |
151 | + "Past expiration date", |
152 | + } |
153 | ErrUnknownChannel = &APIError{ |
154 | http.StatusBadRequest, |
155 | unknownChannel, |
156 | @@ -122,9 +133,9 @@ |
157 | |
158 | // Broadcast request JSON object. |
159 | type Broadcast struct { |
160 | - Channel string `json:"channel"` |
161 | - ExpireAfter uint8 `json:"expire_after"` |
162 | - Data json.RawMessage `json:"data"` |
163 | + Channel string `json:"channel"` |
164 | + ExpireOn string `json:"expire_on"` |
165 | + Data json.RawMessage `json:"data"` |
166 | } |
167 | |
168 | func respondError(writer http.ResponseWriter, apiErr *APIError) { |
169 | @@ -178,11 +189,20 @@ |
170 | return body, nil |
171 | } |
172 | |
173 | -func checkBroadcast(bcast *Broadcast) *APIError { |
174 | +var zeroTime = time.Time{} |
175 | + |
176 | +func checkBroadcast(bcast *Broadcast) (time.Time, *APIError) { |
177 | if len(bcast.Data) == 0 { |
178 | - return ErrMissingData |
179 | - } |
180 | - return nil |
181 | + return zeroTime, ErrMissingData |
182 | + } |
183 | + expire, err := time.Parse(time.RFC3339, bcast.ExpireOn) |
184 | + if err != nil { |
185 | + return zeroTime, ErrInvalidExpiration |
186 | + } |
187 | + if expire.Before(time.Now()) { |
188 | + return zeroTime, ErrPastExpiration |
189 | + } |
190 | + return expire, nil |
191 | } |
192 | |
193 | // state holds the interfaces to delegate to serving requests |
194 | @@ -195,7 +215,7 @@ |
195 | type BroadcastHandler state |
196 | |
197 | func (h *BroadcastHandler) doBroadcast(bcast *Broadcast) *APIError { |
198 | - apiErr := checkBroadcast(bcast) |
199 | + expire, apiErr := checkBroadcast(bcast) |
200 | if apiErr != nil { |
201 | return apiErr |
202 | } |
203 | @@ -208,8 +228,7 @@ |
204 | return ErrUnknown |
205 | } |
206 | } |
207 | - // xxx ignoring expiration for now |
208 | - err = h.store.AppendToChannel(chanId, bcast.Data) |
209 | + err = h.store.AppendToChannel(chanId, bcast.Data, expire) |
210 | if err != nil { |
211 | // assume this for now |
212 | return ErrCouldNotStoreNotification |
213 | |
214 | === modified file 'server/api/handlers_test.go' |
215 | --- server/api/handlers_test.go 2014-02-10 23:19:08 +0000 |
216 | +++ server/api/handlers_test.go 2014-02-10 23:46:49 +0000 |
217 | @@ -26,6 +26,7 @@ |
218 | "net/http/httptest" |
219 | "strings" |
220 | "testing" |
221 | + "time" |
222 | |
223 | . "launchpad.net/gocheck" |
224 | |
225 | @@ -65,6 +66,43 @@ |
226 | c.Check(err, Equals, ErrCouldNotReadBody) |
227 | } |
228 | |
229 | +var future = time.Now().Add(4 * time.Hour).Format(time.RFC3339) |
230 | + |
231 | +func (s *handlersSuite) TestCheckBroadcast(c *C) { |
232 | + payload := json.RawMessage(`{"foo":"bar"}`) |
233 | + broadcast := &Broadcast{ |
234 | + Channel: "system", |
235 | + ExpireOn: future, |
236 | + Data: payload, |
237 | + } |
238 | + expire, err := checkBroadcast(broadcast) |
239 | + c.Check(err, IsNil) |
240 | + c.Check(expire.Format(time.RFC3339), Equals, future) |
241 | + |
242 | + broadcast = &Broadcast{ |
243 | + Channel: "system", |
244 | + ExpireOn: future, |
245 | + } |
246 | + _, err = checkBroadcast(broadcast) |
247 | + c.Check(err, Equals, ErrMissingData) |
248 | + |
249 | + broadcast = &Broadcast{ |
250 | + Channel: "system", |
251 | + ExpireOn: "12:00", |
252 | + Data: payload, |
253 | + } |
254 | + _, err = checkBroadcast(broadcast) |
255 | + c.Check(err, Equals, ErrInvalidExpiration) |
256 | + |
257 | + broadcast = &Broadcast{ |
258 | + Channel: "system", |
259 | + ExpireOn: time.Now().Add(-10 * time.Hour).Format(time.RFC3339), |
260 | + Data: payload, |
261 | + } |
262 | + _, err = checkBroadcast(broadcast) |
263 | + c.Check(err, Equals, ErrPastExpiration) |
264 | +} |
265 | + |
266 | type checkBrokerSending struct { |
267 | store store.PendingStore |
268 | chanId store.InternalChannelId |
269 | @@ -87,8 +125,9 @@ |
270 | bh := &BroadcastHandler{sto, bsend, nil} |
271 | payload := json.RawMessage(`{"a": 1}`) |
272 | apiErr := bh.doBroadcast(&Broadcast{ |
273 | - Channel: "system", |
274 | - Data: payload, |
275 | + Channel: "system", |
276 | + ExpireOn: future, |
277 | + Data: payload, |
278 | }) |
279 | c.Check(apiErr, IsNil) |
280 | c.Check(bsend.err, IsNil) |
281 | @@ -101,8 +140,9 @@ |
282 | sto := store.NewInMemoryPendingStore() |
283 | bh := &BroadcastHandler{sto, nil, nil} |
284 | apiErr := bh.doBroadcast(&Broadcast{ |
285 | - Channel: "unknown", |
286 | - Data: json.RawMessage(`{"a": 1}`), |
287 | + Channel: "unknown", |
288 | + ExpireOn: future, |
289 | + Data: json.RawMessage(`{"a": 1}`), |
290 | }) |
291 | c.Check(apiErr, Equals, ErrUnknownChannel) |
292 | } |
293 | @@ -117,8 +157,8 @@ |
294 | return chanId, isto.intercept("GetInternalChannelId", err) |
295 | } |
296 | |
297 | -func (isto *interceptInMemoryPendingStore) AppendToChannel(chanId store.InternalChannelId, payload json.RawMessage) error { |
298 | - err := isto.InMemoryPendingStore.AppendToChannel(chanId, payload) |
299 | +func (isto *interceptInMemoryPendingStore) AppendToChannel(chanId store.InternalChannelId, payload json.RawMessage, expiration time.Time) error { |
300 | + err := isto.InMemoryPendingStore.AppendToChannel(chanId, payload, expiration) |
301 | return isto.intercept("AppendToChannel", err) |
302 | } |
303 | |
304 | @@ -131,8 +171,9 @@ |
305 | } |
306 | bh := &BroadcastHandler{sto, nil, nil} |
307 | apiErr := bh.doBroadcast(&Broadcast{ |
308 | - Channel: "system", |
309 | - Data: json.RawMessage(`{"a": 1}`), |
310 | + Channel: "system", |
311 | + ExpireOn: future, |
312 | + Data: json.RawMessage(`{"a": 1}`), |
313 | }) |
314 | c.Check(apiErr, Equals, ErrUnknown) |
315 | } |
316 | @@ -149,8 +190,9 @@ |
317 | } |
318 | bh := &BroadcastHandler{sto, nil, nil} |
319 | apiErr := bh.doBroadcast(&Broadcast{ |
320 | - Channel: "system", |
321 | - Data: json.RawMessage(`{"a": 1}`), |
322 | + Channel: "system", |
323 | + ExpireOn: future, |
324 | + Data: json.RawMessage(`{"a": 1}`), |
325 | }) |
326 | c.Check(apiErr, Equals, ErrCouldNotStoreNotification) |
327 | } |
328 | @@ -203,9 +245,9 @@ |
329 | payload := json.RawMessage(`{"foo":"bar"}`) |
330 | |
331 | request := newPostRequest("/broadcast", &Broadcast{ |
332 | - Channel: "system", |
333 | - ExpireAfter: 60, |
334 | - Data: payload, |
335 | + Channel: "system", |
336 | + ExpireOn: future, |
337 | + Data: payload, |
338 | }, testServer) |
339 | |
340 | response, err := s.client.Do(request) |
341 | @@ -234,9 +276,9 @@ |
342 | payload := json.RawMessage(`{"foo":"bar"}`) |
343 | |
344 | request := newPostRequest("/broadcast", &Broadcast{ |
345 | - Channel: "unknown", |
346 | - ExpireAfter: 60, |
347 | - Data: payload, |
348 | + Channel: "unknown", |
349 | + ExpireOn: future, |
350 | + Data: payload, |
351 | }, testServer) |
352 | |
353 | response, err := s.client.Do(request) |
354 | @@ -286,9 +328,9 @@ |
355 | dataString := fmt.Sprintf(`"%v"`, bigString) |
356 | |
357 | request := newPostRequest("/", &Broadcast{ |
358 | - Channel: "some-channel", |
359 | - ExpireAfter: 60, |
360 | - Data: json.RawMessage([]byte(dataString)), |
361 | + Channel: "some-channel", |
362 | + ExpireOn: future, |
363 | + Data: json.RawMessage([]byte(dataString)), |
364 | }, testServer) |
365 | |
366 | response, err := s.client.Do(request) |
367 | @@ -303,9 +345,9 @@ |
368 | dataString := `{"foo":"bar"}` |
369 | |
370 | request := newPostRequest("/", &Broadcast{ |
371 | - Channel: "some-channel", |
372 | - ExpireAfter: 60, |
373 | - Data: json.RawMessage([]byte(dataString)), |
374 | + Channel: "some-channel", |
375 | + ExpireOn: future, |
376 | + Data: json.RawMessage([]byte(dataString)), |
377 | }, testServer) |
378 | request.ContentLength = -1 |
379 | |
380 | @@ -338,9 +380,9 @@ |
381 | dataString := `{"foo":"bar"}` |
382 | |
383 | request := newPostRequest("/", &Broadcast{ |
384 | - Channel: "some-channel", |
385 | - ExpireAfter: 60, |
386 | - Data: json.RawMessage([]byte(dataString)), |
387 | + Channel: "some-channel", |
388 | + ExpireOn: future, |
389 | + Data: json.RawMessage([]byte(dataString)), |
390 | }, testServer) |
391 | request.Header.Set("Content-Type", "text/plain") |
392 | |
393 | @@ -355,9 +397,9 @@ |
394 | |
395 | dataString := `{"foo":"bar"}` |
396 | packedMessage, err := json.Marshal(&Broadcast{ |
397 | - Channel: "some-channel", |
398 | - ExpireAfter: 60, |
399 | - Data: json.RawMessage([]byte(dataString)), |
400 | + Channel: "some-channel", |
401 | + ExpireOn: future, |
402 | + Data: json.RawMessage([]byte(dataString)), |
403 | }) |
404 | s.c.Assert(err, IsNil) |
405 | reader := bytes.NewReader(packedMessage) |
406 | |
407 | === modified file 'server/broker/simple/simple_test.go' |
408 | --- server/broker/simple/simple_test.go 2014-02-10 23:19:08 +0000 |
409 | +++ server/broker/simple/simple_test.go 2014-02-10 23:46:49 +0000 |
410 | @@ -19,6 +19,7 @@ |
411 | import ( |
412 | "encoding/json" |
413 | stdtesting "testing" |
414 | + "time" |
415 | |
416 | . "launchpad.net/gocheck" |
417 | |
418 | @@ -45,8 +46,9 @@ |
419 | |
420 | func (s *simpleSuite) TestFeedPending(c *C) { |
421 | sto := store.NewInMemoryPendingStore() |
422 | + muchLater := time.Now().Add(10 * time.Minute) |
423 | notification1 := json.RawMessage(`{"m": "M"}`) |
424 | - sto.AppendToChannel(store.SystemInternalChannelId, notification1) |
425 | + sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) |
426 | b := NewSimpleBroker(sto, testBrokerConfig, nil) |
427 | sess := &simpleBrokerSession{ |
428 | exchanges: make(chan broker.Exchange, 1), |
429 | @@ -63,8 +65,9 @@ |
430 | |
431 | func (s *simpleSuite) TestFeedPendingNop(c *C) { |
432 | sto := store.NewInMemoryPendingStore() |
433 | + muchLater := time.Now().Add(10 * time.Minute) |
434 | notification1 := json.RawMessage(`{"m": "M"}`) |
435 | - sto.AppendToChannel(store.SystemInternalChannelId, notification1) |
436 | + sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) |
437 | b := NewSimpleBroker(sto, testBrokerConfig, nil) |
438 | sess := &simpleBrokerSession{ |
439 | exchanges: make(chan broker.Exchange, 1), |
440 | |
441 | === modified file 'server/broker/testsuite/suite.go' |
442 | --- server/broker/testsuite/suite.go 2014-02-10 23:19:08 +0000 |
443 | +++ server/broker/testsuite/suite.go 2014-02-10 23:46:49 +0000 |
444 | @@ -108,7 +108,8 @@ |
445 | func (s *CommonBrokerSuite) TestRegistrationFeedPending(c *C) { |
446 | sto := store.NewInMemoryPendingStore() |
447 | notification1 := json.RawMessage(`{"m": "M"}`) |
448 | - sto.AppendToChannel(store.SystemInternalChannelId, notification1) |
449 | + muchLater := time.Now().Add(10 * time.Minute) |
450 | + sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) |
451 | b := s.MakeBroker(sto, testBrokerConfig, nil) |
452 | b.Start() |
453 | defer b.Stop() |
454 | @@ -156,7 +157,8 @@ |
455 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}) |
456 | c.Assert(err, IsNil) |
457 | // add notification to channel *after* the registrations |
458 | - sto.AppendToChannel(store.SystemInternalChannelId, notification1) |
459 | + muchLater := time.Now().Add(10 * time.Minute) |
460 | + sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater) |
461 | b.Broadcast(store.SystemInternalChannelId) |
462 | select { |
463 | case <-time.After(5 * time.Second): |
464 | |
465 | === modified file 'server/store/inmemory.go' |
466 | --- server/store/inmemory.go 2014-01-14 15:35:20 +0000 |
467 | +++ server/store/inmemory.go 2014-02-10 23:46:49 +0000 |
468 | @@ -19,18 +19,31 @@ |
469 | import ( |
470 | "encoding/json" |
471 | "sync" |
472 | + "time" |
473 | ) |
474 | |
475 | +// one stored notification |
476 | +type notification struct { |
477 | + payload json.RawMessage |
478 | + expiration time.Time |
479 | +} |
480 | + |
481 | +// one stored channel |
482 | +type channel struct { |
483 | + topLevel int64 |
484 | + notifications []notification |
485 | +} |
486 | + |
487 | // InMemoryPendingStore is a basic in-memory pending notification store. |
488 | type InMemoryPendingStore struct { |
489 | lock sync.Mutex |
490 | - store map[InternalChannelId][]json.RawMessage |
491 | + store map[InternalChannelId]*channel |
492 | } |
493 | |
494 | // NewInMemoryPendingStore returns a new InMemoryStore. |
495 | func NewInMemoryPendingStore() *InMemoryPendingStore { |
496 | return &InMemoryPendingStore{ |
497 | - store: make(map[InternalChannelId][]json.RawMessage), |
498 | + store: make(map[InternalChannelId]*channel), |
499 | } |
500 | } |
501 | |
502 | @@ -41,25 +54,40 @@ |
503 | return InternalChannelId(""), ErrUnknownChannel |
504 | } |
505 | |
506 | -func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notification json.RawMessage) error { |
507 | +func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notificationPayload json.RawMessage, expiration time.Time) error { |
508 | sto.lock.Lock() |
509 | defer sto.lock.Unlock() |
510 | prev := sto.store[chanId] |
511 | - sto.store[chanId] = append(prev, notification) |
512 | + if prev == nil { |
513 | + prev = &channel{} |
514 | + } |
515 | + prev.topLevel++ |
516 | + prev.notifications = append(prev.notifications, notification{ |
517 | + payload: notificationPayload, |
518 | + expiration: expiration, |
519 | + }) |
520 | + sto.store[chanId] = prev |
521 | return nil |
522 | } |
523 | |
524 | func (sto *InMemoryPendingStore) GetChannelSnapshot(chanId InternalChannelId) (int64, []json.RawMessage, error) { |
525 | sto.lock.Lock() |
526 | defer sto.lock.Unlock() |
527 | - notifications, ok := sto.store[chanId] |
528 | + channel, ok := sto.store[chanId] |
529 | if !ok { |
530 | return 0, nil, nil |
531 | } |
532 | - n := len(notifications) |
533 | - res := make([]json.RawMessage, n) |
534 | - copy(res, notifications) |
535 | - return int64(n), res, nil |
536 | + topLevel := channel.topLevel |
537 | + n := len(channel.notifications) |
538 | + res := make([]json.RawMessage, 0, n) |
539 | + now := time.Now() |
540 | + for _, notification := range channel.notifications { |
541 | + if notification.expiration.Before(now) { |
542 | + continue |
543 | + } |
544 | + res = append(res, notification.payload) |
545 | + } |
546 | + return topLevel, res, nil |
547 | } |
548 | |
549 | // sanity check we implement the interface |
550 | |
551 | === modified file 'server/store/inmemory_test.go' |
552 | --- server/store/inmemory_test.go 2014-02-10 23:19:08 +0000 |
553 | +++ server/store/inmemory_test.go 2014-02-10 23:46:49 +0000 |
554 | @@ -18,7 +18,7 @@ |
555 | |
556 | import ( |
557 | "encoding/json" |
558 | - // "fmt" |
559 | + "time" |
560 | |
561 | . "launchpad.net/gocheck" |
562 | ) |
563 | @@ -54,10 +54,32 @@ |
564 | notification1 := json.RawMessage(`{"a":1}`) |
565 | notification2 := json.RawMessage(`{"a":2}`) |
566 | |
567 | - sto.AppendToChannel(SystemInternalChannelId, notification1) |
568 | - sto.AppendToChannel(SystemInternalChannelId, notification2) |
569 | + muchLater := time.Now().Add(time.Minute) |
570 | + |
571 | + sto.AppendToChannel(SystemInternalChannelId, notification1, muchLater) |
572 | + sto.AppendToChannel(SystemInternalChannelId, notification2, muchLater) |
573 | top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId) |
574 | c.Assert(err, IsNil) |
575 | c.Check(top, Equals, int64(2)) |
576 | c.Check(res, DeepEquals, []json.RawMessage{notification1, notification2}) |
577 | } |
578 | + |
579 | +func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshortWithExpiration(c *C) { |
580 | + sto := NewInMemoryPendingStore() |
581 | + |
582 | + notification1 := json.RawMessage(`{"a":1}`) |
583 | + notification2 := json.RawMessage(`{"a":2}`) |
584 | + |
585 | + verySoon := time.Now().Add(100 * time.Millisecond) |
586 | + muchLater := time.Now().Add(time.Minute) |
587 | + |
588 | + sto.AppendToChannel(SystemInternalChannelId, notification1, muchLater) |
589 | + sto.AppendToChannel(SystemInternalChannelId, notification2, verySoon) |
590 | + |
591 | + time.Sleep(200 * time.Millisecond) |
592 | + |
593 | + top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId) |
594 | + c.Assert(err, IsNil) |
595 | + c.Check(top, Equals, int64(2)) |
596 | + c.Check(res, DeepEquals, []json.RawMessage{notification1}) |
597 | +} |
598 | |
599 | === modified file 'server/store/store.go' |
600 | --- server/store/store.go 2014-01-14 15:35:20 +0000 |
601 | +++ server/store/store.go 2014-02-10 23:46:49 +0000 |
602 | @@ -21,6 +21,7 @@ |
603 | "encoding/hex" |
604 | "encoding/json" |
605 | "errors" |
606 | + "time" |
607 | ) |
608 | |
609 | type InternalChannelId string |
610 | @@ -65,8 +66,8 @@ |
611 | // GetInternalChannelId returns the internal store id for a channel |
612 | // given the name. |
613 | GetInternalChannelId(name string) (InternalChannelId, error) |
614 | - // AppendToChannel appends a notification to the channel. xxx expiration |
615 | - AppendToChannel(chanId InternalChannelId, notification json.RawMessage) error |
616 | + // AppendToChannel appends a notification to the channel. |
617 | + AppendToChannel(chanId InternalChannelId, notification json.RawMessage, expiration time.Time) error |
618 | // GetChannelSnapshot gets all the current notifications and |
619 | // current top level in the channel. |
620 | GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, payloads []json.RawMessage, err error) |
What's the plan for exposing this via REST?