Merge lp:~pedronis/ubuntu-push/store-flex into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Guillermo Gonzalez
Approved revision: 72
Merged at revision: 70
Proposed branch: lp:~pedronis/ubuntu-push/store-flex
Merge into: lp:ubuntu-push
Diff against target: 362 lines (+136/-38)
5 files modified
server/api/handlers.go (+56/-24)
server/api/handlers_test.go (+68/-12)
server/dev/server.go (+5/-1)
server/store/inmemory.go (+5/-1)
server/store/store.go (+2/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/store-flex
Reviewer Review Type Date Requested Status
Guillermo Gonzalez Approve
Review via email: mp+206943@code.launchpad.net

Commit message

add some flexibility in how the requests get the store; log unexpected store related errors

Description of the change

add some flexibility in how the requests get the store

log unexpected store related errors

To post a comment you must log in.
Revision history for this message
Guillermo Gonzalez (verterok) wrote :

looks good.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'server/api/handlers.go'
2--- server/api/handlers.go 2014-02-13 18:49:31 +0000
3+++ server/api/handlers.go 2014-02-18 14:20:50 +0000
4@@ -118,6 +118,11 @@
5 internalError,
6 "Unknown error",
7 }
8+ ErrStoreUnavailable = &APIError{
9+ http.StatusServiceUnavailable,
10+ unavailable,
11+ "Message store unavailable",
12+ }
13 ErrCouldNotStoreNotification = &APIError{
14 http.StatusServiceUnavailable,
15 unavailable,
16@@ -205,21 +210,38 @@
17 return expire, nil
18 }
19
20-// state holds the interfaces to delegate to serving requests
21-type state struct {
22- store store.PendingStore
23- broker broker.BrokerSending
24- logger logger.Logger
25-}
26-
27-type BroadcastHandler state
28-
29-func (h *BroadcastHandler) doBroadcast(bcast *Broadcast) *APIError {
30+type StoreForRequest func(w http.ResponseWriter, request *http.Request) (store.PendingStore, error)
31+
32+// context holds the interfaces to delegate to serving requests
33+type context struct {
34+ storeForRequest StoreForRequest
35+ broker broker.BrokerSending
36+ logger logger.Logger
37+}
38+
39+func (ctx *context) getStore(w http.ResponseWriter, request *http.Request) (store.PendingStore, *APIError) {
40+ sto, err := ctx.storeForRequest(w, request)
41+ if err != nil {
42+ apiErr, ok := err.(*APIError)
43+ if ok {
44+ return nil, apiErr
45+ }
46+ ctx.logger.Errorf("failed to get store: %v", err)
47+ return nil, ErrUnknown
48+ }
49+ return sto, nil
50+}
51+
52+type BroadcastHandler struct {
53+ *context
54+}
55+
56+func (h *BroadcastHandler) doBroadcast(sto store.PendingStore, bcast *Broadcast) *APIError {
57 expire, apiErr := checkBroadcast(bcast)
58 if apiErr != nil {
59 return apiErr
60 }
61- chanId, err := h.store.GetInternalChannelId(bcast.Channel)
62+ chanId, err := sto.GetInternalChannelId(bcast.Channel)
63 if err != nil {
64 switch err {
65 case store.ErrUnknownChannel:
66@@ -228,9 +250,9 @@
67 return ErrUnknown
68 }
69 }
70- err = h.store.AppendToChannel(chanId, bcast.Data, expire)
71+ err = sto.AppendToChannel(chanId, bcast.Data, expire)
72 if err != nil {
73- // assume this for now
74+ h.logger.Errorf("could not store notification: %v", err)
75 return ErrCouldNotStoreNotification
76 }
77
78@@ -239,24 +261,33 @@
79 }
80
81 func (h *BroadcastHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
82+ var apiErr *APIError
83+ defer func() {
84+ if apiErr != nil {
85+ respondError(writer, apiErr)
86+ }
87+ }()
88+
89 body, apiErr := readBody(request)
90+ if apiErr != nil {
91+ return
92+ }
93
94+ sto, apiErr := h.getStore(writer, request)
95 if apiErr != nil {
96- respondError(writer, apiErr)
97 return
98 }
99+ defer sto.Close()
100
101 broadcast := &Broadcast{}
102 err := json.Unmarshal(body, broadcast)
103-
104 if err != nil {
105- respondError(writer, ErrMalformedJSONObject)
106+ apiErr = ErrMalformedJSONObject
107 return
108 }
109
110- apiErr = h.doBroadcast(broadcast)
111+ apiErr = h.doBroadcast(sto, broadcast)
112 if apiErr != nil {
113- respondError(writer, apiErr)
114 return
115 }
116
117@@ -265,12 +296,13 @@
118 }
119
120 // MakeHandlersMux makes a handler that dispatches for the various API endpoints.
121-func MakeHandlersMux(store store.PendingStore, broker broker.BrokerSending, logger logger.Logger) *http.ServeMux {
122+func MakeHandlersMux(storeForRequest StoreForRequest, broker broker.BrokerSending, logger logger.Logger) *http.ServeMux {
123+ ctx := &context{
124+ storeForRequest: storeForRequest,
125+ broker: broker,
126+ logger: logger,
127+ }
128 mux := http.NewServeMux()
129- mux.Handle("/broadcast", &BroadcastHandler{
130- store: store,
131- broker: broker,
132- logger: logger,
133- })
134+ mux.Handle("/broadcast", &BroadcastHandler{context: ctx})
135 return mux
136 }
137
138=== modified file 'server/api/handlers_test.go'
139--- server/api/handlers_test.go 2014-02-10 23:29:53 +0000
140+++ server/api/handlers_test.go 2014-02-18 14:20:50 +0000
141@@ -31,6 +31,7 @@
142 . "launchpad.net/gocheck"
143
144 "launchpad.net/ubuntu-push/server/store"
145+ helpers "launchpad.net/ubuntu-push/testing"
146 )
147
148 func TestHandlers(t *testing.T) { TestingT(t) }
149@@ -40,12 +41,14 @@
150 json string
151 client *http.Client
152 c *C
153+ testlog *helpers.TestLogger
154 }
155
156 var _ = Suite(&handlersSuite{})
157
158 func (s *handlersSuite) SetUpTest(c *C) {
159 s.client = &http.Client{}
160+ s.testlog = helpers.NewTestLogger(c, "error")
161 }
162
163 func (s *handlersSuite) TestAPIError(c *C) {
164@@ -66,6 +69,23 @@
165 c.Check(err, Equals, ErrCouldNotReadBody)
166 }
167
168+func (s *handlersSuite) TestGetStore(c *C) {
169+ ctx := &context{storeForRequest: func(w http.ResponseWriter, r *http.Request) (store.PendingStore, error) {
170+ return nil, ErrStoreUnavailable
171+ }}
172+ sto, apiErr := ctx.getStore(nil, nil)
173+ c.Check(sto, IsNil)
174+ c.Check(apiErr, Equals, ErrStoreUnavailable)
175+
176+ ctx = &context{storeForRequest: func(w http.ResponseWriter, r *http.Request) (store.PendingStore, error) {
177+ return nil, errors.New("something else")
178+ }, logger: s.testlog}
179+ sto, apiErr = ctx.getStore(nil, nil)
180+ c.Check(sto, IsNil)
181+ c.Check(apiErr, Equals, ErrUnknown)
182+ c.Check(s.testlog.Captured(), Equals, "ERROR failed to get store: something else\n")
183+}
184+
185 var future = time.Now().Add(4 * time.Hour).Format(time.RFC3339)
186
187 func (s *handlersSuite) TestCheckBroadcast(c *C) {
188@@ -122,9 +142,9 @@
189 func (s *handlersSuite) TestDoBroadcast(c *C) {
190 sto := store.NewInMemoryPendingStore()
191 bsend := &checkBrokerSending{store: sto}
192- bh := &BroadcastHandler{sto, bsend, nil}
193+ bh := &BroadcastHandler{&context{nil, bsend, nil}}
194 payload := json.RawMessage(`{"a": 1}`)
195- apiErr := bh.doBroadcast(&Broadcast{
196+ apiErr := bh.doBroadcast(sto, &Broadcast{
197 Channel: "system",
198 ExpireOn: future,
199 Data: payload,
200@@ -138,8 +158,8 @@
201
202 func (s *handlersSuite) TestDoBroadcastUnknownChannel(c *C) {
203 sto := store.NewInMemoryPendingStore()
204- bh := &BroadcastHandler{sto, nil, nil}
205- apiErr := bh.doBroadcast(&Broadcast{
206+ bh := &BroadcastHandler{}
207+ apiErr := bh.doBroadcast(sto, &Broadcast{
208 Channel: "unknown",
209 ExpireOn: future,
210 Data: json.RawMessage(`{"a": 1}`),
211@@ -169,8 +189,8 @@
212 return errors.New("other")
213 },
214 }
215- bh := &BroadcastHandler{sto, nil, nil}
216- apiErr := bh.doBroadcast(&Broadcast{
217+ bh := &BroadcastHandler{}
218+ apiErr := bh.doBroadcast(sto, &Broadcast{
219 Channel: "system",
220 ExpireOn: future,
221 Data: json.RawMessage(`{"a": 1}`),
222@@ -188,13 +208,15 @@
223 return err
224 },
225 }
226- bh := &BroadcastHandler{sto, nil, nil}
227- apiErr := bh.doBroadcast(&Broadcast{
228+ ctx := &context{logger: s.testlog}
229+ bh := &BroadcastHandler{ctx}
230+ apiErr := bh.doBroadcast(sto, &Broadcast{
231 Channel: "system",
232 ExpireOn: future,
233 Data: json.RawMessage(`{"a": 1}`),
234 })
235 c.Check(apiErr, Equals, ErrCouldNotStoreNotification)
236+ c.Check(s.testlog.Captured(), Equals, "ERROR could not store notification: fail\n")
237 }
238
239 func newPostRequest(path string, message interface{}, server *httptest.Server) *http.Request {
240@@ -238,8 +260,11 @@
241
242 func (s *handlersSuite) TestRespondsToBasicSystemBroadcast(c *C) {
243 sto := store.NewInMemoryPendingStore()
244+ stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
245+ return sto, nil
246+ }
247 bsend := testBrokerSending{make(chan store.InternalChannelId, 1)}
248- testServer := httptest.NewServer(MakeHandlersMux(sto, bsend, nil))
249+ testServer := httptest.NewServer(MakeHandlersMux(stoForReq, bsend, nil))
250 defer testServer.Close()
251
252 payload := json.RawMessage(`{"foo":"bar"}`)
253@@ -268,9 +293,32 @@
254 c.Check(<-bsend.chanId, Equals, store.SystemInternalChannelId)
255 }
256
257+func (s *handlersSuite) TestStoreUnavailable(c *C) {
258+ stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
259+ return nil, ErrStoreUnavailable
260+ }
261+ testServer := httptest.NewServer(MakeHandlersMux(stoForReq, nil, nil))
262+ defer testServer.Close()
263+
264+ payload := json.RawMessage(`{"foo":"bar"}`)
265+
266+ request := newPostRequest("/broadcast", &Broadcast{
267+ Channel: "system",
268+ ExpireOn: future,
269+ Data: payload,
270+ }, testServer)
271+
272+ response, err := s.client.Do(request)
273+ c.Assert(err, IsNil)
274+ checkError(c, response, ErrStoreUnavailable)
275+}
276+
277 func (s *handlersSuite) TestFromBroadcastError(c *C) {
278 sto := store.NewInMemoryPendingStore()
279- testServer := httptest.NewServer(MakeHandlersMux(sto, nil, nil))
280+ stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
281+ return sto, nil
282+ }
283+ testServer := httptest.NewServer(MakeHandlersMux(stoForReq, nil, nil))
284 defer testServer.Close()
285
286 payload := json.RawMessage(`{"foo":"bar"}`)
287@@ -287,7 +335,11 @@
288 }
289
290 func (s *handlersSuite) TestMissingData(c *C) {
291- testServer := httptest.NewServer(&BroadcastHandler{})
292+ stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
293+ return store.NewInMemoryPendingStore(), nil
294+ }
295+ ctx := &context{stoForReq, nil, nil}
296+ testServer := httptest.NewServer(&BroadcastHandler{ctx})
297 defer testServer.Close()
298
299 packedMessage := []byte(`{"channel": "system"}`)
300@@ -304,7 +356,11 @@
301 }
302
303 func (s *handlersSuite) TestCannotBroadcastMalformedData(c *C) {
304- testServer := httptest.NewServer(&BroadcastHandler{})
305+ stoForReq := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
306+ return store.NewInMemoryPendingStore(), nil
307+ }
308+ ctx := &context{stoForReq, nil, nil}
309+ testServer := httptest.NewServer(&BroadcastHandler{ctx})
310 defer testServer.Close()
311
312 packedMessage := []byte("{some bogus-message: ")
313
314=== modified file 'server/dev/server.go'
315--- server/dev/server.go 2014-02-13 19:33:03 +0000
316+++ server/dev/server.go 2014-02-18 14:20:50 +0000
317@@ -19,6 +19,7 @@
318
319 import (
320 "net"
321+ "net/http"
322 "os"
323 "path/filepath"
324
325@@ -56,7 +57,10 @@
326 broker.Start()
327 defer broker.Stop()
328 // serve the http api
329- mux := api.MakeHandlersMux(sto, broker, logger)
330+ storeForRequest := func(http.ResponseWriter, *http.Request) (store.PendingStore, error) {
331+ return sto, nil
332+ }
333+ mux := api.MakeHandlersMux(storeForRequest, broker, logger)
334 handler := api.PanicTo500Handler(mux, logger)
335 go server.HTTPServeRunner(handler, &cfg.HTTPServeParsedConfig)()
336 // listen for device connections
337
338=== modified file 'server/store/inmemory.go'
339--- server/store/inmemory.go 2014-02-10 18:43:51 +0000
340+++ server/store/inmemory.go 2014-02-18 14:20:50 +0000
341@@ -90,5 +90,9 @@
342 return topLevel, res, nil
343 }
344
345+func (sto *InMemoryPendingStore) Close() {
346+ // ignored
347+}
348+
349 // sanity check we implement the interface
350-var _ PendingStore = &InMemoryPendingStore{}
351+var _ PendingStore = (*InMemoryPendingStore)(nil)
352
353=== modified file 'server/store/store.go'
354--- server/store/store.go 2014-02-10 19:53:00 +0000
355+++ server/store/store.go 2014-02-18 14:20:50 +0000
356@@ -71,4 +71,6 @@
357 // GetChannelSnapshot gets all the current notifications and
358 // current top level in the channel.
359 GetChannelSnapshot(chanId InternalChannelId) (topLevel int64, payloads []json.RawMessage, err error)
360+ // Close is to be called when done with the store.
361+ Close()
362 }

Subscribers

People subscribed via source and target branches