Merge lp:~pedronis/ubuntu-push/broadcast-expiration into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 66
Merged at revision: 61
Proposed branch: lp:~pedronis/ubuntu-push/broadcast-expiration
Merge into: lp:ubuntu-push
Diff against target: 620 lines (+205/-76)
8 files modified
server/acceptance/acceptance_test.go (+32/-20)
server/api/handlers.go (+29/-10)
server/api/handlers_test.go (+70/-28)
server/broker/simple/simple_test.go (+5/-2)
server/broker/testsuite/suite.go (+4/-2)
server/store/inmemory.go (+37/-9)
server/store/inmemory_test.go (+25/-3)
server/store/store.go (+3/-2)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/broadcast-expiration
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+205644@code.launchpad.net

Commit message

add support for expiration of broadcasts with trivial implementation in the in-memory pending store

Description of the change

add support for expiration of broadcasts with trivial implementation in the in-memory pending store,

acceptance test coming in a later branch

To post a comment you must log in.
61. By Samuele Pedroni

fix comment

62. By Samuele Pedroni

formatting

63. By Samuele Pedroni

fix acceptance test

64. By Samuele Pedroni

merge reorg some imports

65. By Samuele Pedroni

Merged reorg-imports into expiration-flex-acceptance.

66. By Samuele Pedroni

Merged fix-tweak-tests into expiration-flex-acceptance.

Revision history for this message
John Lenton (chipaca) wrote :

What's the plan for exposing this via REST?

review: Approve
Revision history for this message
John Lenton (chipaca) wrote :

Stupid question, it's right there. Sorry!

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'server/acceptance/acceptance_test.go'
2--- server/acceptance/acceptance_test.go 2014-02-10 23:38:19 +0000
3+++ server/acceptance/acceptance_test.go 2014-02-10 23:46:49 +0000
4@@ -301,6 +301,8 @@
5
6 // Tests about broadcast
7
8+var future = time.Now().Add(9 * time.Hour).Format(time.RFC3339)
9+
10 func (s *acceptanceSuite) postRequest(path string, message interface{}) (string, error) {
11 packedMessage, err := json.Marshal(message)
12 if err != nil {
13@@ -351,8 +353,9 @@
14 }
15 events, errCh := s.startClient(c, "DEVB", intercept, nil)
16 got, err := s.postRequest("/broadcast", &api.Broadcast{
17- Channel: "system",
18- Data: json.RawMessage(`{"n": 42}`),
19+ Channel: "system",
20+ ExpireOn: future,
21+ Data: json.RawMessage(`{"n": 42}`),
22 })
23 c.Assert(err, IsNil)
24 c.Assert(got, Matches, ".*ok.*")
25@@ -365,8 +368,9 @@
26 func (s *acceptanceSuite) TestBroadcastPending(c *C) {
27 // send broadcast that will be pending
28 got, err := s.postRequest("/broadcast", &api.Broadcast{
29- Channel: "system",
30- Data: json.RawMessage(`{"b": 1}`),
31+ Channel: "system",
32+ ExpireOn: future,
33+ Data: json.RawMessage(`{"b": 1}`),
34 })
35 c.Assert(err, IsNil)
36 c.Assert(got, Matches, ".*ok.*")
37@@ -393,8 +397,9 @@
38 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
39 for i := 0; i < 32; i++ {
40 got, err := s.postRequest("/broadcast", &api.Broadcast{
41- Channel: "system",
42- Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)),
43+ Channel: "system",
44+ ExpireOn: future,
45+ Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)),
46 })
47 c.Assert(err, IsNil)
48 c.Assert(got, Matches, ".*ok.*")
49@@ -434,8 +439,9 @@
50 events2, errCh2 := s.startClient(c, "DEV2", intercept, nil)
51 // broadcast
52 got, err := s.postRequest("/broadcast", &api.Broadcast{
53- Channel: "system",
54- Data: json.RawMessage(`{"n": 42}`),
55+ Channel: "system",
56+ ExpireOn: future,
57+ Data: json.RawMessage(`{"n": 42}`),
58 })
59 c.Assert(err, IsNil)
60 c.Assert(got, Matches, ".*ok.*")
61@@ -460,8 +466,9 @@
62 }
63 events, errCh := s.startClient(c, "DEVD", intercept, nil)
64 got, err := s.postRequest("/broadcast", &api.Broadcast{
65- Channel: "system",
66- Data: json.RawMessage(`{"b": 1}`),
67+ Channel: "system",
68+ ExpireOn: future,
69+ Data: json.RawMessage(`{"b": 1}`),
70 })
71 c.Assert(err, IsNil)
72 c.Assert(got, Matches, ".*ok.*")
73@@ -471,8 +478,9 @@
74 c.Check(len(errCh), Equals, 0)
75 // another broadcast
76 got, err = s.postRequest("/broadcast", &api.Broadcast{
77- Channel: "system",
78- Data: json.RawMessage(`{"b": 2}`),
79+ Channel: "system",
80+ ExpireOn: future,
81+ Data: json.RawMessage(`{"b": 2}`),
82 })
83 c.Assert(err, IsNil)
84 c.Assert(got, Matches, ".*ok.*")
85@@ -490,14 +498,16 @@
86 func (s *acceptanceSuite) TestBroadcastTooAhead(c *C) {
87 // send broadcasts that will be pending
88 got, err := s.postRequest("/broadcast", &api.Broadcast{
89- Channel: "system",
90- Data: json.RawMessage(`{"b": 1}`),
91+ Channel: "system",
92+ ExpireOn: future,
93+ Data: json.RawMessage(`{"b": 1}`),
94 })
95 c.Assert(err, IsNil)
96 c.Assert(got, Matches, ".*ok.*")
97 got, err = s.postRequest("/broadcast", &api.Broadcast{
98- Channel: "system",
99- Data: json.RawMessage(`{"b": 2}`),
100+ Channel: "system",
101+ ExpireOn: future,
102+ Data: json.RawMessage(`{"b": 2}`),
103 })
104 c.Assert(err, IsNil)
105 c.Assert(got, Matches, ".*ok.*")
106@@ -545,14 +555,16 @@
107 func (s *acceptanceSuite) TestBroadcastWayBehind(c *C) {
108 // send broadcasts that will be pending
109 got, err := s.postRequest("/broadcast", &api.Broadcast{
110- Channel: "system",
111- Data: json.RawMessage(`{"b": 1}`),
112+ Channel: "system",
113+ ExpireOn: future,
114+ Data: json.RawMessage(`{"b": 1}`),
115 })
116 c.Assert(err, IsNil)
117 c.Assert(got, Matches, ".*ok.*")
118 got, err = s.postRequest("/broadcast", &api.Broadcast{
119- Channel: "system",
120- Data: json.RawMessage(`{"b": 2}`),
121+ Channel: "system",
122+ ExpireOn: future,
123+ Data: json.RawMessage(`{"b": 2}`),
124 })
125 c.Assert(err, IsNil)
126 c.Assert(got, Matches, ".*ok.*")
127
128=== modified file 'server/api/handlers.go'
129--- server/api/handlers.go 2014-02-10 23:19:08 +0000
130+++ server/api/handlers.go 2014-02-10 23:46:49 +0000
131@@ -23,6 +23,7 @@
132 "fmt"
133 "io"
134 "net/http"
135+ "time"
136
137 "launchpad.net/ubuntu-push/logger"
138 "launchpad.net/ubuntu-push/server/broker"
139@@ -97,6 +98,16 @@
140 invalidRequest,
141 "Missing data field",
142 }
143+ ErrInvalidExpiration = &APIError{
144+ http.StatusBadRequest,
145+ invalidRequest,
146+ "Invalid expiration date",
147+ }
148+ ErrPastExpiration = &APIError{
149+ http.StatusBadRequest,
150+ invalidRequest,
151+ "Past expiration date",
152+ }
153 ErrUnknownChannel = &APIError{
154 http.StatusBadRequest,
155 unknownChannel,
156@@ -122,9 +133,9 @@
157
158 // Broadcast request JSON object.
159 type Broadcast struct {
160- Channel string `json:"channel"`
161- ExpireAfter uint8 `json:"expire_after"`
162- Data json.RawMessage `json:"data"`
163+ Channel string `json:"channel"`
164+ ExpireOn string `json:"expire_on"`
165+ Data json.RawMessage `json:"data"`
166 }
167
168 func respondError(writer http.ResponseWriter, apiErr *APIError) {
169@@ -178,11 +189,20 @@
170 return body, nil
171 }
172
173-func checkBroadcast(bcast *Broadcast) *APIError {
174+var zeroTime = time.Time{}
175+
176+func checkBroadcast(bcast *Broadcast) (time.Time, *APIError) {
177 if len(bcast.Data) == 0 {
178- return ErrMissingData
179- }
180- return nil
181+ return zeroTime, ErrMissingData
182+ }
183+ expire, err := time.Parse(time.RFC3339, bcast.ExpireOn)
184+ if err != nil {
185+ return zeroTime, ErrInvalidExpiration
186+ }
187+ if expire.Before(time.Now()) {
188+ return zeroTime, ErrPastExpiration
189+ }
190+ return expire, nil
191 }
192
193 // state holds the interfaces to delegate to serving requests
194@@ -195,7 +215,7 @@
195 type BroadcastHandler state
196
197 func (h *BroadcastHandler) doBroadcast(bcast *Broadcast) *APIError {
198- apiErr := checkBroadcast(bcast)
199+ expire, apiErr := checkBroadcast(bcast)
200 if apiErr != nil {
201 return apiErr
202 }
203@@ -208,8 +228,7 @@
204 return ErrUnknown
205 }
206 }
207- // xxx ignoring expiration for now
208- err = h.store.AppendToChannel(chanId, bcast.Data)
209+ err = h.store.AppendToChannel(chanId, bcast.Data, expire)
210 if err != nil {
211 // assume this for now
212 return ErrCouldNotStoreNotification
213
214=== modified file 'server/api/handlers_test.go'
215--- server/api/handlers_test.go 2014-02-10 23:19:08 +0000
216+++ server/api/handlers_test.go 2014-02-10 23:46:49 +0000
217@@ -26,6 +26,7 @@
218 "net/http/httptest"
219 "strings"
220 "testing"
221+ "time"
222
223 . "launchpad.net/gocheck"
224
225@@ -65,6 +66,43 @@
226 c.Check(err, Equals, ErrCouldNotReadBody)
227 }
228
229+var future = time.Now().Add(4 * time.Hour).Format(time.RFC3339)
230+
231+func (s *handlersSuite) TestCheckBroadcast(c *C) {
232+ payload := json.RawMessage(`{"foo":"bar"}`)
233+ broadcast := &Broadcast{
234+ Channel: "system",
235+ ExpireOn: future,
236+ Data: payload,
237+ }
238+ expire, err := checkBroadcast(broadcast)
239+ c.Check(err, IsNil)
240+ c.Check(expire.Format(time.RFC3339), Equals, future)
241+
242+ broadcast = &Broadcast{
243+ Channel: "system",
244+ ExpireOn: future,
245+ }
246+ _, err = checkBroadcast(broadcast)
247+ c.Check(err, Equals, ErrMissingData)
248+
249+ broadcast = &Broadcast{
250+ Channel: "system",
251+ ExpireOn: "12:00",
252+ Data: payload,
253+ }
254+ _, err = checkBroadcast(broadcast)
255+ c.Check(err, Equals, ErrInvalidExpiration)
256+
257+ broadcast = &Broadcast{
258+ Channel: "system",
259+ ExpireOn: time.Now().Add(-10 * time.Hour).Format(time.RFC3339),
260+ Data: payload,
261+ }
262+ _, err = checkBroadcast(broadcast)
263+ c.Check(err, Equals, ErrPastExpiration)
264+}
265+
266 type checkBrokerSending struct {
267 store store.PendingStore
268 chanId store.InternalChannelId
269@@ -87,8 +125,9 @@
270 bh := &BroadcastHandler{sto, bsend, nil}
271 payload := json.RawMessage(`{"a": 1}`)
272 apiErr := bh.doBroadcast(&Broadcast{
273- Channel: "system",
274- Data: payload,
275+ Channel: "system",
276+ ExpireOn: future,
277+ Data: payload,
278 })
279 c.Check(apiErr, IsNil)
280 c.Check(bsend.err, IsNil)
281@@ -101,8 +140,9 @@
282 sto := store.NewInMemoryPendingStore()
283 bh := &BroadcastHandler{sto, nil, nil}
284 apiErr := bh.doBroadcast(&Broadcast{
285- Channel: "unknown",
286- Data: json.RawMessage(`{"a": 1}`),
287+ Channel: "unknown",
288+ ExpireOn: future,
289+ Data: json.RawMessage(`{"a": 1}`),
290 })
291 c.Check(apiErr, Equals, ErrUnknownChannel)
292 }
293@@ -117,8 +157,8 @@
294 return chanId, isto.intercept("GetInternalChannelId", err)
295 }
296
297-func (isto *interceptInMemoryPendingStore) AppendToChannel(chanId store.InternalChannelId, payload json.RawMessage) error {
298- err := isto.InMemoryPendingStore.AppendToChannel(chanId, payload)
299+func (isto *interceptInMemoryPendingStore) AppendToChannel(chanId store.InternalChannelId, payload json.RawMessage, expiration time.Time) error {
300+ err := isto.InMemoryPendingStore.AppendToChannel(chanId, payload, expiration)
301 return isto.intercept("AppendToChannel", err)
302 }
303
304@@ -131,8 +171,9 @@
305 }
306 bh := &BroadcastHandler{sto, nil, nil}
307 apiErr := bh.doBroadcast(&Broadcast{
308- Channel: "system",
309- Data: json.RawMessage(`{"a": 1}`),
310+ Channel: "system",
311+ ExpireOn: future,
312+ Data: json.RawMessage(`{"a": 1}`),
313 })
314 c.Check(apiErr, Equals, ErrUnknown)
315 }
316@@ -149,8 +190,9 @@
317 }
318 bh := &BroadcastHandler{sto, nil, nil}
319 apiErr := bh.doBroadcast(&Broadcast{
320- Channel: "system",
321- Data: json.RawMessage(`{"a": 1}`),
322+ Channel: "system",
323+ ExpireOn: future,
324+ Data: json.RawMessage(`{"a": 1}`),
325 })
326 c.Check(apiErr, Equals, ErrCouldNotStoreNotification)
327 }
328@@ -203,9 +245,9 @@
329 payload := json.RawMessage(`{"foo":"bar"}`)
330
331 request := newPostRequest("/broadcast", &Broadcast{
332- Channel: "system",
333- ExpireAfter: 60,
334- Data: payload,
335+ Channel: "system",
336+ ExpireOn: future,
337+ Data: payload,
338 }, testServer)
339
340 response, err := s.client.Do(request)
341@@ -234,9 +276,9 @@
342 payload := json.RawMessage(`{"foo":"bar"}`)
343
344 request := newPostRequest("/broadcast", &Broadcast{
345- Channel: "unknown",
346- ExpireAfter: 60,
347- Data: payload,
348+ Channel: "unknown",
349+ ExpireOn: future,
350+ Data: payload,
351 }, testServer)
352
353 response, err := s.client.Do(request)
354@@ -286,9 +328,9 @@
355 dataString := fmt.Sprintf(`"%v"`, bigString)
356
357 request := newPostRequest("/", &Broadcast{
358- Channel: "some-channel",
359- ExpireAfter: 60,
360- Data: json.RawMessage([]byte(dataString)),
361+ Channel: "some-channel",
362+ ExpireOn: future,
363+ Data: json.RawMessage([]byte(dataString)),
364 }, testServer)
365
366 response, err := s.client.Do(request)
367@@ -303,9 +345,9 @@
368 dataString := `{"foo":"bar"}`
369
370 request := newPostRequest("/", &Broadcast{
371- Channel: "some-channel",
372- ExpireAfter: 60,
373- Data: json.RawMessage([]byte(dataString)),
374+ Channel: "some-channel",
375+ ExpireOn: future,
376+ Data: json.RawMessage([]byte(dataString)),
377 }, testServer)
378 request.ContentLength = -1
379
380@@ -338,9 +380,9 @@
381 dataString := `{"foo":"bar"}`
382
383 request := newPostRequest("/", &Broadcast{
384- Channel: "some-channel",
385- ExpireAfter: 60,
386- Data: json.RawMessage([]byte(dataString)),
387+ Channel: "some-channel",
388+ ExpireOn: future,
389+ Data: json.RawMessage([]byte(dataString)),
390 }, testServer)
391 request.Header.Set("Content-Type", "text/plain")
392
393@@ -355,9 +397,9 @@
394
395 dataString := `{"foo":"bar"}`
396 packedMessage, err := json.Marshal(&Broadcast{
397- Channel: "some-channel",
398- ExpireAfter: 60,
399- Data: json.RawMessage([]byte(dataString)),
400+ Channel: "some-channel",
401+ ExpireOn: future,
402+ Data: json.RawMessage([]byte(dataString)),
403 })
404 s.c.Assert(err, IsNil)
405 reader := bytes.NewReader(packedMessage)
406
407=== modified file 'server/broker/simple/simple_test.go'
408--- server/broker/simple/simple_test.go 2014-02-10 23:19:08 +0000
409+++ server/broker/simple/simple_test.go 2014-02-10 23:46:49 +0000
410@@ -19,6 +19,7 @@
411 import (
412 "encoding/json"
413 stdtesting "testing"
414+ "time"
415
416 . "launchpad.net/gocheck"
417
418@@ -45,8 +46,9 @@
419
420 func (s *simpleSuite) TestFeedPending(c *C) {
421 sto := store.NewInMemoryPendingStore()
422+ muchLater := time.Now().Add(10 * time.Minute)
423 notification1 := json.RawMessage(`{"m": "M"}`)
424- sto.AppendToChannel(store.SystemInternalChannelId, notification1)
425+ sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
426 b := NewSimpleBroker(sto, testBrokerConfig, nil)
427 sess := &simpleBrokerSession{
428 exchanges: make(chan broker.Exchange, 1),
429@@ -63,8 +65,9 @@
430
431 func (s *simpleSuite) TestFeedPendingNop(c *C) {
432 sto := store.NewInMemoryPendingStore()
433+ muchLater := time.Now().Add(10 * time.Minute)
434 notification1 := json.RawMessage(`{"m": "M"}`)
435- sto.AppendToChannel(store.SystemInternalChannelId, notification1)
436+ sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
437 b := NewSimpleBroker(sto, testBrokerConfig, nil)
438 sess := &simpleBrokerSession{
439 exchanges: make(chan broker.Exchange, 1),
440
441=== modified file 'server/broker/testsuite/suite.go'
442--- server/broker/testsuite/suite.go 2014-02-10 23:19:08 +0000
443+++ server/broker/testsuite/suite.go 2014-02-10 23:46:49 +0000
444@@ -108,7 +108,8 @@
445 func (s *CommonBrokerSuite) TestRegistrationFeedPending(c *C) {
446 sto := store.NewInMemoryPendingStore()
447 notification1 := json.RawMessage(`{"m": "M"}`)
448- sto.AppendToChannel(store.SystemInternalChannelId, notification1)
449+ muchLater := time.Now().Add(10 * time.Minute)
450+ sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
451 b := s.MakeBroker(sto, testBrokerConfig, nil)
452 b.Start()
453 defer b.Stop()
454@@ -156,7 +157,8 @@
455 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"})
456 c.Assert(err, IsNil)
457 // add notification to channel *after* the registrations
458- sto.AppendToChannel(store.SystemInternalChannelId, notification1)
459+ muchLater := time.Now().Add(10 * time.Minute)
460+ sto.AppendToChannel(store.SystemInternalChannelId, notification1, muchLater)
461 b.Broadcast(store.SystemInternalChannelId)
462 select {
463 case <-time.After(5 * time.Second):
464
465=== modified file 'server/store/inmemory.go'
466--- server/store/inmemory.go 2014-01-14 15:35:20 +0000
467+++ server/store/inmemory.go 2014-02-10 23:46:49 +0000
468@@ -19,18 +19,31 @@
469 import (
470 "encoding/json"
471 "sync"
472+ "time"
473 )
474
475+// one stored notification
476+type notification struct {
477+ payload json.RawMessage
478+ expiration time.Time
479+}
480+
481+// one stored channel
482+type channel struct {
483+ topLevel int64
484+ notifications []notification
485+}
486+
487 // InMemoryPendingStore is a basic in-memory pending notification store.
488 type InMemoryPendingStore struct {
489 lock sync.Mutex
490- store map[InternalChannelId][]json.RawMessage
491+ store map[InternalChannelId]*channel
492 }
493
494 // NewInMemoryPendingStore returns a new InMemoryStore.
495 func NewInMemoryPendingStore() *InMemoryPendingStore {
496 return &InMemoryPendingStore{
497- store: make(map[InternalChannelId][]json.RawMessage),
498+ store: make(map[InternalChannelId]*channel),
499 }
500 }
501
502@@ -41,25 +54,40 @@
503 return InternalChannelId(""), ErrUnknownChannel
504 }
505
506-func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notification json.RawMessage) error {
507+func (sto *InMemoryPendingStore) AppendToChannel(chanId InternalChannelId, notificationPayload json.RawMessage, expiration time.Time) error {
508 sto.lock.Lock()
509 defer sto.lock.Unlock()
510 prev := sto.store[chanId]
511- sto.store[chanId] = append(prev, notification)
512+ if prev == nil {
513+ prev = &channel{}
514+ }
515+ prev.topLevel++
516+ prev.notifications = append(prev.notifications, notification{
517+ payload: notificationPayload,
518+ expiration: expiration,
519+ })
520+ sto.store[chanId] = prev
521 return nil
522 }
523
524 func (sto *InMemoryPendingStore) GetChannelSnapshot(chanId InternalChannelId) (int64, []json.RawMessage, error) {
525 sto.lock.Lock()
526 defer sto.lock.Unlock()
527- notifications, ok := sto.store[chanId]
528+ channel, ok := sto.store[chanId]
529 if !ok {
530 return 0, nil, nil
531 }
532- n := len(notifications)
533- res := make([]json.RawMessage, n)
534- copy(res, notifications)
535- return int64(n), res, nil
536+ topLevel := channel.topLevel
537+ n := len(channel.notifications)
538+ res := make([]json.RawMessage, 0, n)
539+ now := time.Now()
540+ for _, notification := range channel.notifications {
541+ if notification.expiration.Before(now) {
542+ continue
543+ }
544+ res = append(res, notification.payload)
545+ }
546+ return topLevel, res, nil
547 }
548
549 // sanity check we implement the interface
550
551=== modified file 'server/store/inmemory_test.go'
552--- server/store/inmemory_test.go 2014-02-10 23:19:08 +0000
553+++ server/store/inmemory_test.go 2014-02-10 23:46:49 +0000
554@@ -18,7 +18,7 @@
555
556 import (
557 "encoding/json"
558- // "fmt"
559+ "time"
560
561 . "launchpad.net/gocheck"
562 )
563@@ -54,10 +54,32 @@
564 notification1 := json.RawMessage(`{"a":1}`)
565 notification2 := json.RawMessage(`{"a":2}`)
566
567- sto.AppendToChannel(SystemInternalChannelId, notification1)
568- sto.AppendToChannel(SystemInternalChannelId, notification2)
569+ muchLater := time.Now().Add(time.Minute)
570+
571+ sto.AppendToChannel(SystemInternalChannelId, notification1, muchLater)
572+ sto.AppendToChannel(SystemInternalChannelId, notification2, muchLater)
573 top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)
574 c.Assert(err, IsNil)
575 c.Check(top, Equals, int64(2))
576 c.Check(res, DeepEquals, []json.RawMessage{notification1, notification2})
577 }
578+
579+func (s *inMemorySuite) TestAppendToChannelAndGetChannelSnapshortWithExpiration(c *C) {
580+ sto := NewInMemoryPendingStore()
581+
582+ notification1 := json.RawMessage(`{"a":1}`)
583+ notification2 := json.RawMessage(`{"a":2}`)
584+
585+ verySoon := time.Now().Add(100 * time.Millisecond)
586+ muchLater := time.Now().Add(time.Minute)
587+
588+ sto.AppendToChannel(SystemInternalChannelId, notification1, muchLater)
589+ sto.AppendToChannel(SystemInternalChannelId, notification2, verySoon)
590+
591+ time.Sleep(200 * time.Millisecond)
592+
593+ top, res, err := sto.GetChannelSnapshot(SystemInternalChannelId)
594+ c.Assert(err, IsNil)
595+ c.Check(top, Equals, int64(2))
596+ c.Check(res, DeepEquals, []json.RawMessage{notification1})
597+}
598
599=== modified file 'server/store/store.go'
600--- server/store/store.go 2014-01-14 15:35:20 +0000
601+++ server/store/store.go 2014-02-10 23:46:49 +0000
602@@ -21,6 +21,7 @@
603 "encoding/hex"
604 "encoding/json"
605 "errors"
606+ "time"
607 )
608
609 type InternalChannelId string
610@@ -65,8 +66,8 @@
611 // GetInternalChannelId returns the internal store id for a channel
612 // given the name.
613 GetInternalChannelId(name string) (InternalChannelId, error)
614- // AppendToChannel appends a notification to the channel. xxx expiration
615- AppendToChannel(chanId InternalChannelId, notification json.RawMessage) error
616+ // AppendToChannel appends a notification to the channel.
617+ AppendToChannel(chanId InternalChannelId, notification json.RawMessage, expiration time.Time) error
618 // GetChannelSnapshot gets all the current notifications and
619 // current top level in the channel.
620 GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, payloads []json.RawMessage, err error)

Subscribers

People subscribed via source and target branches