Merge lp:~sergiusens/account-polld/batch_request into lp:~ubuntu-push-hackers/account-polld/trunk

Proposed by Sergio Schvezov
Status: Approved
Approved by: Manuel de la Peña
Approved revision: 58
Proposed branch: lp:~sergiusens/account-polld/batch_request
Merge into: lp:~ubuntu-push-hackers/account-polld/trunk
Diff against target: 216 lines (+155/-25)
1 file modified
plugins/gmail/gmail.go (+155/-25)
To merge this branch: bzr merge lp:~sergiusens/account-polld/batch_request
Reviewer Review Type Date Requested Status
Manuel de la Peña (community) Approve
PS Jenkins bot continuous-integration Approve
Roberto Alsina (community) Approve
Review via email: mp+230788@code.launchpad.net

Commit message

Batch request for gmail emails

Description of the change

To post a comment you must log in.
Revision history for this message
Roberto Alsina (ralsina) :
review: Approve
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Manuel de la Peña (mandel) :
review: Approve

Unmerged revisions

58. By Sergio Schvezov

Batch request messages

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'plugins/gmail/gmail.go'
--- plugins/gmail/gmail.go 2014-08-08 20:11:14 +0000
+++ plugins/gmail/gmail.go 2014-08-14 16:59:39 +0000
@@ -18,13 +18,20 @@
18package gmail18package gmail
1919
20import (20import (
21 "bufio"
22 "bytes"
21 "encoding/json"23 "encoding/json"
24 "errors"
22 "fmt"25 "fmt"
26 "io"
27 "mime/multipart"
23 "net/http"28 "net/http"
24 "net/mail"29 "net/mail"
30 "net/textproto"
25 "net/url"31 "net/url"
26 "os"32 "os"
27 "sort"33 "sort"
34 "strings"
28 "time"35 "time"
2936
30 "log"37 "log"
@@ -114,16 +121,14 @@
114 return nil, err121 return nil, err
115 }122 }
116123
117 // TODO use the batching API defined in https://developers.google.com/gmail/api/guides/batch124 if len(messages) == 0 {
118 for i := range messages {125 log.Print("gmail plugin ", p.accountId, ": no messages to fetch")
119 resp, err := p.requestMessage(messages[i].Id, authData.AccessToken)126 return nil, nil
120 if err != nil {127 }
121 return nil, err128
122 }129 messages, err = p.getMessageBatch(messages, authData.AccessToken)
123 messages[i], err = p.parseMessageResponse(resp)130 if err != nil {
124 if err != nil {131 return nil, err
125 return nil, err
126 }
127 }132 }
128 return p.createNotifications(messages)133 return p.createNotifications(messages)
129}134}
@@ -251,26 +256,151 @@
251 return msg, nil256 return msg, nil
252}257}
253258
254func (p *GmailPlugin) requestMessage(id, accessToken string) (*http.Response, error) {259func (p *GmailPlugin) getMessageBatch(messages []message, accessToken string) ([]message, error) {
255 u, err := baseUrl.Parse("messages/" + id)260 req, err := p.createBatchRequest(messages, accessToken)
256 if err != nil {261 if err != nil {
257 return nil, err262 return nil, err
258 }263 }
259264
260 query := u.Query()265 /*
261 // only request specific fields266 dump, err := httputil.DumpRequest(req, true)
262 query.Add("fields", "snippet,threadId,id,payload/headers")267 if err != nil {
263 // get the full message to get From and Subject from headers268 return nil, err
264 query.Add("format", "full")269 }
265 u.RawQuery = query.Encode()270 fmt.Println(string(dump))
266271 */
267 req, err := http.NewRequest("GET", u.String(), nil)272
273 resp, err := http.DefaultClient.Do(req)
274 if err != nil {
275 return nil, err
276 } else if resp.StatusCode == 401 {
277 return nil, plugins.ErrTokenExpired
278 } else if resp.StatusCode != http.StatusOK {
279 return nil, fmt.Errorf("recieved %d when requesting message batch", resp.StatusCode)
280 }
281
282 /*
283 respDump, err := httputil.DumpResponse(resp, true)
284 if err != nil {
285 return nil, err
286 }
287 fmt.Println(string(respDump))
288 */
289
290 return p.getBatchResponse(resp)
291}
292
293func getBoundary(contentType string) (boundary string, err error) {
294 if i := strings.Index(contentType, ";"); i == -1 {
295 return boundary, errors.New("no boundary in batch response")
296 } else if mediaType := strings.TrimSpace(contentType[:i]); mediaType != "multipart/mixed" {
297 return boundary, errors.New("unexpected media type in batch response")
298 }
299
300 var start, end int
301 if i := strings.Index(contentType, "boundary="); i == -1 {
302 return boundary, errors.New("no boundary in batch response")
303 } else {
304 start = i + len("boundary=")
305 }
306 if i := strings.Index(contentType[start:], ";"); i == -1 {
307 end = len(contentType)
308 } else {
309 end = start + i
310 }
311
312 boundary = strings.TrimSpace(contentType[start:end])
313 return boundary, nil
314}
315
316func (p *GmailPlugin) getBatchResponse(resp *http.Response) (messages []message, err error) {
317 /*
318 This is the ideal solution but the boundary has '=' in it and the parsing fails to parse
319 it correctly, e.g.; "multipart/mixed; boundary=batch_Qau-LYGqak0=_AApYZR4VvsU="
320
321 contentType := resp.Header.Get("Content-Type")
322 fmt.Println(contentType)
323 mediaType, params, err := mime.ParseMediaType(contentType)
324 if err != nil {
325 fmt.Println(mediaType)
326 return nil, err
327 }
328 if !strings.HasPrefix(mediaType, "multipart/") {
329 return nil, fmt.Errorf("wrong mediatype, expected multipart and received %s", mediaType)
330 }
331 */
332 contentType := resp.Header.Get("Content-Type")
333 boundary, err := getBoundary(contentType)
334 if err != nil {
335 return nil, err
336 }
337 multipartR := multipart.NewReader(resp.Body, boundary)
338 for {
339 part, err := multipartR.NextPart()
340 if err == io.EOF {
341 break
342 }
343 if err != nil {
344 return nil, err
345 }
346 defer part.Close()
347 r := bufio.NewReader(part)
348 for {
349 line, err := r.ReadString('\n')
350 if err == io.EOF {
351 return nil, errors.New("EOF reached before content")
352 }
353 if len(strings.TrimSpace(line)) == 0 {
354 break
355 }
356 }
357 var msg message
358 decoder := json.NewDecoder(r)
359 if err := decoder.Decode(&msg); err != nil {
360 return nil, err
361 }
362 messages = append(messages, msg)
363 }
364 return messages, nil
365}
366
367func (p *GmailPlugin) createBatchRequest(messages []message, accessToken string) (req *http.Request, err error) {
368 buffer := new(bytes.Buffer)
369 multipartW := multipart.NewWriter(buffer)
370 for i := range messages {
371 u, err := baseUrl.Parse("messages/" + messages[i].Id)
372 if err != nil {
373 return nil, err
374 }
375
376 query := u.Query()
377 // only request specific fields
378 query.Add("fields", "snippet,threadId,id,payload/headers")
379 // get the full message to get From and Subject from headers
380 query.Add("format", "full")
381 u.RawQuery = query.Encode()
382
383 mh := make(textproto.MIMEHeader)
384 mh.Set("Content-Type", "application/http")
385 mh.Set("Content-Id", messages[i].Id)
386
387 partW, err := multipartW.CreatePart(mh)
388 if err != nil {
389 return nil, err
390 }
391 content := []byte("GET " + u.String() + " HTTP/1.1\r\n")
392 partW.Write(content)
393 }
394
395 multipartW.Close()
396 req, err = http.NewRequest("POST", "https://www.googleapis.com/batch", buffer)
268 if err != nil {397 if err != nil {
269 return nil, err398 return nil, err
270 }399 }
271 req.Header.Set("Authorization", "Bearer "+accessToken)400 req.Header.Set("Authorization", "Bearer "+accessToken)
401 req.Header.Set("Content-Type", "multipart/mixed; boundary="+multipartW.Boundary())
272402
273 return http.DefaultClient.Do(req)403 return req, nil
274}404}
275405
276func (p *GmailPlugin) requestMessageList(accessToken string) (*http.Response, error) {406func (p *GmailPlugin) requestMessageList(accessToken string) (*http.Response, error) {

Subscribers

People subscribed via source and target branches