Merge lp:~sergiusens/account-polld/batch_request into lp:~ubuntu-push-hackers/account-polld/trunk

Proposed by Sergio Schvezov on 2014-08-14
Status: Approved
Approved by: Manuel de la Peña on 2014-08-15
Approved revision: 58
Proposed branch: lp:~sergiusens/account-polld/batch_request
Merge into: lp:~ubuntu-push-hackers/account-polld/trunk
Diff against target: 216 lines (+155/-25)
1 file modified
plugins/gmail/gmail.go (+155/-25)
To merge this branch: bzr merge lp:~sergiusens/account-polld/batch_request
Reviewer Review Type Date Requested Status
Manuel de la Peña (community) Approve on 2014-08-15
PS Jenkins bot continuous-integration Approve on 2014-08-14
Roberto Alsina (community) 2014-08-14 Approve on 2014-08-14
Review via email: mp+230788@code.launchpad.net

Commit message

Batch request for gmail emails

Description of the change

Unmerged revisions

58. By Sergio Schvezov on 2014-08-14

Batch request messages

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'plugins/gmail/gmail.go'
2--- plugins/gmail/gmail.go 2014-08-08 20:11:14 +0000
3+++ plugins/gmail/gmail.go 2014-08-14 16:59:39 +0000
4@@ -18,13 +18,20 @@
5 package gmail
6
7 import (
8+ "bufio"
9+ "bytes"
10 "encoding/json"
11+ "errors"
12 "fmt"
13+ "io"
14+ "mime/multipart"
15 "net/http"
16 "net/mail"
17+ "net/textproto"
18 "net/url"
19 "os"
20 "sort"
21+ "strings"
22 "time"
23
24 "log"
25@@ -114,16 +121,14 @@
26 return nil, err
27 }
28
29- // TODO use the batching API defined in https://developers.google.com/gmail/api/guides/batch
30- for i := range messages {
31- resp, err := p.requestMessage(messages[i].Id, authData.AccessToken)
32- if err != nil {
33- return nil, err
34- }
35- messages[i], err = p.parseMessageResponse(resp)
36- if err != nil {
37- return nil, err
38- }
39+ if len(messages) == 0 {
40+ log.Print("gmail plugin ", p.accountId, ": no messages to fetch")
41+ return nil, nil
42+ }
43+
44+ messages, err = p.getMessageBatch(messages, authData.AccessToken)
45+ if err != nil {
46+ return nil, err
47 }
48 return p.createNotifications(messages)
49 }
50@@ -251,26 +256,151 @@
51 return msg, nil
52 }
53
54-func (p *GmailPlugin) requestMessage(id, accessToken string) (*http.Response, error) {
55- u, err := baseUrl.Parse("messages/" + id)
56- if err != nil {
57- return nil, err
58- }
59-
60- query := u.Query()
61- // only request specific fields
62- query.Add("fields", "snippet,threadId,id,payload/headers")
63- // get the full message to get From and Subject from headers
64- query.Add("format", "full")
65- u.RawQuery = query.Encode()
66-
67- req, err := http.NewRequest("GET", u.String(), nil)
68+func (p *GmailPlugin) getMessageBatch(messages []message, accessToken string) ([]message, error) {
69+ req, err := p.createBatchRequest(messages, accessToken)
70+ if err != nil {
71+ return nil, err
72+ }
73+
74+ /*
75+ dump, err := httputil.DumpRequest(req, true)
76+ if err != nil {
77+ return nil, err
78+ }
79+ fmt.Println(string(dump))
80+ */
81+
82+ resp, err := http.DefaultClient.Do(req)
83+ if err != nil {
84+ return nil, err
85+ } else if resp.StatusCode == 401 {
86+ return nil, plugins.ErrTokenExpired
87+ } else if resp.StatusCode != http.StatusOK {
88+ return nil, fmt.Errorf("recieved %d when requesting message batch", resp.StatusCode)
89+ }
90+
91+ /*
92+ respDump, err := httputil.DumpResponse(resp, true)
93+ if err != nil {
94+ return nil, err
95+ }
96+ fmt.Println(string(respDump))
97+ */
98+
99+ return p.getBatchResponse(resp)
100+}
101+
102+func getBoundary(contentType string) (boundary string, err error) {
103+ if i := strings.Index(contentType, ";"); i == -1 {
104+ return boundary, errors.New("no boundary in batch response")
105+ } else if mediaType := strings.TrimSpace(contentType[:i]); mediaType != "multipart/mixed" {
106+ return boundary, errors.New("unexpected media type in batch response")
107+ }
108+
109+ var start, end int
110+ if i := strings.Index(contentType, "boundary="); i == -1 {
111+ return boundary, errors.New("no boundary in batch response")
112+ } else {
113+ start = i + len("boundary=")
114+ }
115+ if i := strings.Index(contentType[start:], ";"); i == -1 {
116+ end = len(contentType)
117+ } else {
118+ end = start + i
119+ }
120+
121+ boundary = strings.TrimSpace(contentType[start:end])
122+ return boundary, nil
123+}
124+
125+func (p *GmailPlugin) getBatchResponse(resp *http.Response) (messages []message, err error) {
126+ /*
127+ This is the ideal solution but the boundary has '=' in it and the parsing fails to parse
128+ it correctly, e.g.; "multipart/mixed; boundary=batch_Qau-LYGqak0=_AApYZR4VvsU="
129+
130+ contentType := resp.Header.Get("Content-Type")
131+ fmt.Println(contentType)
132+ mediaType, params, err := mime.ParseMediaType(contentType)
133+ if err != nil {
134+ fmt.Println(mediaType)
135+ return nil, err
136+ }
137+ if !strings.HasPrefix(mediaType, "multipart/") {
138+ return nil, fmt.Errorf("wrong mediatype, expected multipart and received %s", mediaType)
139+ }
140+ */
141+ contentType := resp.Header.Get("Content-Type")
142+ boundary, err := getBoundary(contentType)
143+ if err != nil {
144+ return nil, err
145+ }
146+ multipartR := multipart.NewReader(resp.Body, boundary)
147+ for {
148+ part, err := multipartR.NextPart()
149+ if err == io.EOF {
150+ break
151+ }
152+ if err != nil {
153+ return nil, err
154+ }
155+ defer part.Close()
156+ r := bufio.NewReader(part)
157+ for {
158+ line, err := r.ReadString('\n')
159+ if err == io.EOF {
160+ return nil, errors.New("EOF reached before content")
161+ }
162+ if len(strings.TrimSpace(line)) == 0 {
163+ break
164+ }
165+ }
166+ var msg message
167+ decoder := json.NewDecoder(r)
168+ if err := decoder.Decode(&msg); err != nil {
169+ return nil, err
170+ }
171+ messages = append(messages, msg)
172+ }
173+ return messages, nil
174+}
175+
176+func (p *GmailPlugin) createBatchRequest(messages []message, accessToken string) (req *http.Request, err error) {
177+ buffer := new(bytes.Buffer)
178+ multipartW := multipart.NewWriter(buffer)
179+ for i := range messages {
180+ u, err := baseUrl.Parse("messages/" + messages[i].Id)
181+ if err != nil {
182+ return nil, err
183+ }
184+
185+ query := u.Query()
186+ // only request specific fields
187+ query.Add("fields", "snippet,threadId,id,payload/headers")
188+ // get the full message to get From and Subject from headers
189+ query.Add("format", "full")
190+ u.RawQuery = query.Encode()
191+
192+ mh := make(textproto.MIMEHeader)
193+ mh.Set("Content-Type", "application/http")
194+ mh.Set("Content-Id", messages[i].Id)
195+
196+ partW, err := multipartW.CreatePart(mh)
197+ if err != nil {
198+ return nil, err
199+ }
200+ content := []byte("GET " + u.String() + " HTTP/1.1\r\n")
201+ partW.Write(content)
202+ }
203+
204+ multipartW.Close()
205+ req, err = http.NewRequest("POST", "https://www.googleapis.com/batch", buffer)
206 if err != nil {
207 return nil, err
208 }
209 req.Header.Set("Authorization", "Bearer "+accessToken)
210+ req.Header.Set("Content-Type", "multipart/mixed; boundary="+multipartW.Boundary())
211
212- return http.DefaultClient.Do(req)
213+ return req, nil
214 }
215
216 func (p *GmailPlugin) requestMessageList(accessToken string) (*http.Response, error) {

Subscribers

People subscribed via source and target branches