Merge lp:~pedronis/ubuntu-push/so-it-begins into lp:ubuntu-push
- so-it-begins
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 41 |
Merged at revision: | 41 |
Proposed branch: | lp:~pedronis/ubuntu-push/so-it-begins |
Merge into: | lp:ubuntu-push |
Diff against target: |
5845 lines (+5616/-0) 43 files modified
.bzrignore (+3/-0) COPYING (+674/-0) LICENSE (+16/-0) Makefile (+48/-0) README (+37/-0) config/config.go (+149/-0) config/config_test.go (+135/-0) dependencies.tsv (+1/-0) logger/logger.go (+100/-0) logger/logger_test.go (+117/-0) protocol/messages.go (+103/-0) protocol/messages_test.go (+101/-0) protocol/protocol.go (+103/-0) protocol/protocol_test.go (+227/-0) scripts/check_fmt (+14/-0) server/acceptance/acceptance.sh (+6/-0) server/acceptance/acceptance_test.go (+487/-0) server/acceptance/acceptanceclient.go (+123/-0) server/acceptance/cmd/acceptanceclient.go (+79/-0) server/acceptance/config/README (+7/-0) server/acceptance/config/config.json (+12/-0) server/acceptance/config/testing.cert (+10/-0) server/acceptance/config/testing.key (+9/-0) server/api/handlers.go (+244/-0) server/api/handlers_test.go (+345/-0) server/broker/broker.go (+71/-0) server/broker/simple.go (+260/-0) server/broker/simple_test.go (+327/-0) server/dev/config.go (+110/-0) server/dev/http.go (+40/-0) server/dev/http_test.go (+70/-0) server/dev/server.go (+77/-0) server/dev/server_test.go (+105/-0) server/listener/listener.go (+86/-0) server/listener/listener_test.go (+239/-0) server/session/session.go (+157/-0) server/session/session_test.go (+619/-0) server/store/inmemory.go (+66/-0) server/store/inmemory_test.go (+62/-0) server/store/store.go (+73/-0) server/store/store_test.go (+52/-0) tarmac_tests.sh (+5/-0) testing/helpers.go (+47/-0) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/so-it-begins |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email:
|
Commit message
and as a start we get some protocol support and a development server and infrastructure
Description of the change
and as a start we get some protocol support and a development server and infrastructure
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added file '.bzrignore' |
2 | --- .bzrignore 1970-01-01 00:00:00 +0000 |
3 | +++ .bzrignore 2014-01-14 15:09:46 +0000 |
4 | @@ -0,0 +1,3 @@ |
5 | +acceptanceclient |
6 | +testserver |
7 | +coverhtml |
8 | |
9 | === added file 'COPYING' |
10 | --- COPYING 1970-01-01 00:00:00 +0000 |
11 | +++ COPYING 2014-01-14 15:09:46 +0000 |
12 | @@ -0,0 +1,674 @@ |
13 | + GNU GENERAL PUBLIC LICENSE |
14 | + Version 3, 29 June 2007 |
15 | + |
16 | + Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/> |
17 | + Everyone is permitted to copy and distribute verbatim copies |
18 | + of this license document, but changing it is not allowed. |
19 | + |
20 | + Preamble |
21 | + |
22 | + The GNU General Public License is a free, copyleft license for |
23 | +software and other kinds of works. |
24 | + |
25 | + The licenses for most software and other practical works are designed |
26 | +to take away your freedom to share and change the works. By contrast, |
27 | +the GNU General Public License is intended to guarantee your freedom to |
28 | +share and change all versions of a program--to make sure it remains free |
29 | +software for all its users. We, the Free Software Foundation, use the |
30 | +GNU General Public License for most of our software; it applies also to |
31 | +any other work released this way by its authors. You can apply it to |
32 | +your programs, too. |
33 | + |
34 | + When we speak of free software, we are referring to freedom, not |
35 | +price. Our General Public Licenses are designed to make sure that you |
36 | +have the freedom to distribute copies of free software (and charge for |
37 | +them if you wish), that you receive source code or can get it if you |
38 | +want it, that you can change the software or use pieces of it in new |
39 | +free programs, and that you know you can do these things. |
40 | + |
41 | + To protect your rights, we need to prevent others from denying you |
42 | +these rights or asking you to surrender the rights. Therefore, you have |
43 | +certain responsibilities if you distribute copies of the software, or if |
44 | +you modify it: responsibilities to respect the freedom of others. |
45 | + |
46 | + For example, if you distribute copies of such a program, whether |
47 | +gratis or for a fee, you must pass on to the recipients the same |
48 | +freedoms that you received. You must make sure that they, too, receive |
49 | +or can get the source code. And you must show them these terms so they |
50 | +know their rights. |
51 | + |
52 | + Developers that use the GNU GPL protect your rights with two steps: |
53 | +(1) assert copyright on the software, and (2) offer you this License |
54 | +giving you legal permission to copy, distribute and/or modify it. |
55 | + |
56 | + For the developers' and authors' protection, the GPL clearly explains |
57 | +that there is no warranty for this free software. For both users' and |
58 | +authors' sake, the GPL requires that modified versions be marked as |
59 | +changed, so that their problems will not be attributed erroneously to |
60 | +authors of previous versions. |
61 | + |
62 | + Some devices are designed to deny users access to install or run |
63 | +modified versions of the software inside them, although the manufacturer |
64 | +can do so. This is fundamentally incompatible with the aim of |
65 | +protecting users' freedom to change the software. The systematic |
66 | +pattern of such abuse occurs in the area of products for individuals to |
67 | +use, which is precisely where it is most unacceptable. Therefore, we |
68 | +have designed this version of the GPL to prohibit the practice for those |
69 | +products. If such problems arise substantially in other domains, we |
70 | +stand ready to extend this provision to those domains in future versions |
71 | +of the GPL, as needed to protect the freedom of users. |
72 | + |
73 | + Finally, every program is threatened constantly by software patents. |
74 | +States should not allow patents to restrict development and use of |
75 | +software on general-purpose computers, but in those that do, we wish to |
76 | +avoid the special danger that patents applied to a free program could |
77 | +make it effectively proprietary. To prevent this, the GPL assures that |
78 | +patents cannot be used to render the program non-free. |
79 | + |
80 | + The precise terms and conditions for copying, distribution and |
81 | +modification follow. |
82 | + |
83 | + TERMS AND CONDITIONS |
84 | + |
85 | + 0. Definitions. |
86 | + |
87 | + "This License" refers to version 3 of the GNU General Public License. |
88 | + |
89 | + "Copyright" also means copyright-like laws that apply to other kinds of |
90 | +works, such as semiconductor masks. |
91 | + |
92 | + "The Program" refers to any copyrightable work licensed under this |
93 | +License. Each licensee is addressed as "you". "Licensees" and |
94 | +"recipients" may be individuals or organizations. |
95 | + |
96 | + To "modify" a work means to copy from or adapt all or part of the work |
97 | +in a fashion requiring copyright permission, other than the making of an |
98 | +exact copy. The resulting work is called a "modified version" of the |
99 | +earlier work or a work "based on" the earlier work. |
100 | + |
101 | + A "covered work" means either the unmodified Program or a work based |
102 | +on the Program. |
103 | + |
104 | + To "propagate" a work means to do anything with it that, without |
105 | +permission, would make you directly or secondarily liable for |
106 | +infringement under applicable copyright law, except executing it on a |
107 | +computer or modifying a private copy. Propagation includes copying, |
108 | +distribution (with or without modification), making available to the |
109 | +public, and in some countries other activities as well. |
110 | + |
111 | + To "convey" a work means any kind of propagation that enables other |
112 | +parties to make or receive copies. Mere interaction with a user through |
113 | +a computer network, with no transfer of a copy, is not conveying. |
114 | + |
115 | + An interactive user interface displays "Appropriate Legal Notices" |
116 | +to the extent that it includes a convenient and prominently visible |
117 | +feature that (1) displays an appropriate copyright notice, and (2) |
118 | +tells the user that there is no warranty for the work (except to the |
119 | +extent that warranties are provided), that licensees may convey the |
120 | +work under this License, and how to view a copy of this License. If |
121 | +the interface presents a list of user commands or options, such as a |
122 | +menu, a prominent item in the list meets this criterion. |
123 | + |
124 | + 1. Source Code. |
125 | + |
126 | + The "source code" for a work means the preferred form of the work |
127 | +for making modifications to it. "Object code" means any non-source |
128 | +form of a work. |
129 | + |
130 | + A "Standard Interface" means an interface that either is an official |
131 | +standard defined by a recognized standards body, or, in the case of |
132 | +interfaces specified for a particular programming language, one that |
133 | +is widely used among developers working in that language. |
134 | + |
135 | + The "System Libraries" of an executable work include anything, other |
136 | +than the work as a whole, that (a) is included in the normal form of |
137 | +packaging a Major Component, but which is not part of that Major |
138 | +Component, and (b) serves only to enable use of the work with that |
139 | +Major Component, or to implement a Standard Interface for which an |
140 | +implementation is available to the public in source code form. A |
141 | +"Major Component", in this context, means a major essential component |
142 | +(kernel, window system, and so on) of the specific operating system |
143 | +(if any) on which the executable work runs, or a compiler used to |
144 | +produce the work, or an object code interpreter used to run it. |
145 | + |
146 | + The "Corresponding Source" for a work in object code form means all |
147 | +the source code needed to generate, install, and (for an executable |
148 | +work) run the object code and to modify the work, including scripts to |
149 | +control those activities. However, it does not include the work's |
150 | +System Libraries, or general-purpose tools or generally available free |
151 | +programs which are used unmodified in performing those activities but |
152 | +which are not part of the work. For example, Corresponding Source |
153 | +includes interface definition files associated with source files for |
154 | +the work, and the source code for shared libraries and dynamically |
155 | +linked subprograms that the work is specifically designed to require, |
156 | +such as by intimate data communication or control flow between those |
157 | +subprograms and other parts of the work. |
158 | + |
159 | + The Corresponding Source need not include anything that users |
160 | +can regenerate automatically from other parts of the Corresponding |
161 | +Source. |
162 | + |
163 | + The Corresponding Source for a work in source code form is that |
164 | +same work. |
165 | + |
166 | + 2. Basic Permissions. |
167 | + |
168 | + All rights granted under this License are granted for the term of |
169 | +copyright on the Program, and are irrevocable provided the stated |
170 | +conditions are met. This License explicitly affirms your unlimited |
171 | +permission to run the unmodified Program. The output from running a |
172 | +covered work is covered by this License only if the output, given its |
173 | +content, constitutes a covered work. This License acknowledges your |
174 | +rights of fair use or other equivalent, as provided by copyright law. |
175 | + |
176 | + You may make, run and propagate covered works that you do not |
177 | +convey, without conditions so long as your license otherwise remains |
178 | +in force. You may convey covered works to others for the sole purpose |
179 | +of having them make modifications exclusively for you, or provide you |
180 | +with facilities for running those works, provided that you comply with |
181 | +the terms of this License in conveying all material for which you do |
182 | +not control copyright. Those thus making or running the covered works |
183 | +for you must do so exclusively on your behalf, under your direction |
184 | +and control, on terms that prohibit them from making any copies of |
185 | +your copyrighted material outside their relationship with you. |
186 | + |
187 | + Conveying under any other circumstances is permitted solely under |
188 | +the conditions stated below. Sublicensing is not allowed; section 10 |
189 | +makes it unnecessary. |
190 | + |
191 | + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. |
192 | + |
193 | + No covered work shall be deemed part of an effective technological |
194 | +measure under any applicable law fulfilling obligations under article |
195 | +11 of the WIPO copyright treaty adopted on 20 December 1996, or |
196 | +similar laws prohibiting or restricting circumvention of such |
197 | +measures. |
198 | + |
199 | + When you convey a covered work, you waive any legal power to forbid |
200 | +circumvention of technological measures to the extent such circumvention |
201 | +is effected by exercising rights under this License with respect to |
202 | +the covered work, and you disclaim any intention to limit operation or |
203 | +modification of the work as a means of enforcing, against the work's |
204 | +users, your or third parties' legal rights to forbid circumvention of |
205 | +technological measures. |
206 | + |
207 | + 4. Conveying Verbatim Copies. |
208 | + |
209 | + You may convey verbatim copies of the Program's source code as you |
210 | +receive it, in any medium, provided that you conspicuously and |
211 | +appropriately publish on each copy an appropriate copyright notice; |
212 | +keep intact all notices stating that this License and any |
213 | +non-permissive terms added in accord with section 7 apply to the code; |
214 | +keep intact all notices of the absence of any warranty; and give all |
215 | +recipients a copy of this License along with the Program. |
216 | + |
217 | + You may charge any price or no price for each copy that you convey, |
218 | +and you may offer support or warranty protection for a fee. |
219 | + |
220 | + 5. Conveying Modified Source Versions. |
221 | + |
222 | + You may convey a work based on the Program, or the modifications to |
223 | +produce it from the Program, in the form of source code under the |
224 | +terms of section 4, provided that you also meet all of these conditions: |
225 | + |
226 | + a) The work must carry prominent notices stating that you modified |
227 | + it, and giving a relevant date. |
228 | + |
229 | + b) The work must carry prominent notices stating that it is |
230 | + released under this License and any conditions added under section |
231 | + 7. This requirement modifies the requirement in section 4 to |
232 | + "keep intact all notices". |
233 | + |
234 | + c) You must license the entire work, as a whole, under this |
235 | + License to anyone who comes into possession of a copy. This |
236 | + License will therefore apply, along with any applicable section 7 |
237 | + additional terms, to the whole of the work, and all its parts, |
238 | + regardless of how they are packaged. This License gives no |
239 | + permission to license the work in any other way, but it does not |
240 | + invalidate such permission if you have separately received it. |
241 | + |
242 | + d) If the work has interactive user interfaces, each must display |
243 | + Appropriate Legal Notices; however, if the Program has interactive |
244 | + interfaces that do not display Appropriate Legal Notices, your |
245 | + work need not make them do so. |
246 | + |
247 | + A compilation of a covered work with other separate and independent |
248 | +works, which are not by their nature extensions of the covered work, |
249 | +and which are not combined with it such as to form a larger program, |
250 | +in or on a volume of a storage or distribution medium, is called an |
251 | +"aggregate" if the compilation and its resulting copyright are not |
252 | +used to limit the access or legal rights of the compilation's users |
253 | +beyond what the individual works permit. Inclusion of a covered work |
254 | +in an aggregate does not cause this License to apply to the other |
255 | +parts of the aggregate. |
256 | + |
257 | + 6. Conveying Non-Source Forms. |
258 | + |
259 | + You may convey a covered work in object code form under the terms |
260 | +of sections 4 and 5, provided that you also convey the |
261 | +machine-readable Corresponding Source under the terms of this License, |
262 | +in one of these ways: |
263 | + |
264 | + a) Convey the object code in, or embodied in, a physical product |
265 | + (including a physical distribution medium), accompanied by the |
266 | + Corresponding Source fixed on a durable physical medium |
267 | + customarily used for software interchange. |
268 | + |
269 | + b) Convey the object code in, or embodied in, a physical product |
270 | + (including a physical distribution medium), accompanied by a |
271 | + written offer, valid for at least three years and valid for as |
272 | + long as you offer spare parts or customer support for that product |
273 | + model, to give anyone who possesses the object code either (1) a |
274 | + copy of the Corresponding Source for all the software in the |
275 | + product that is covered by this License, on a durable physical |
276 | + medium customarily used for software interchange, for a price no |
277 | + more than your reasonable cost of physically performing this |
278 | + conveying of source, or (2) access to copy the |
279 | + Corresponding Source from a network server at no charge. |
280 | + |
281 | + c) Convey individual copies of the object code with a copy of the |
282 | + written offer to provide the Corresponding Source. This |
283 | + alternative is allowed only occasionally and noncommercially, and |
284 | + only if you received the object code with such an offer, in accord |
285 | + with subsection 6b. |
286 | + |
287 | + d) Convey the object code by offering access from a designated |
288 | + place (gratis or for a charge), and offer equivalent access to the |
289 | + Corresponding Source in the same way through the same place at no |
290 | + further charge. You need not require recipients to copy the |
291 | + Corresponding Source along with the object code. If the place to |
292 | + copy the object code is a network server, the Corresponding Source |
293 | + may be on a different server (operated by you or a third party) |
294 | + that supports equivalent copying facilities, provided you maintain |
295 | + clear directions next to the object code saying where to find the |
296 | + Corresponding Source. Regardless of what server hosts the |
297 | + Corresponding Source, you remain obligated to ensure that it is |
298 | + available for as long as needed to satisfy these requirements. |
299 | + |
300 | + e) Convey the object code using peer-to-peer transmission, provided |
301 | + you inform other peers where the object code and Corresponding |
302 | + Source of the work are being offered to the general public at no |
303 | + charge under subsection 6d. |
304 | + |
305 | + A separable portion of the object code, whose source code is excluded |
306 | +from the Corresponding Source as a System Library, need not be |
307 | +included in conveying the object code work. |
308 | + |
309 | + A "User Product" is either (1) a "consumer product", which means any |
310 | +tangible personal property which is normally used for personal, family, |
311 | +or household purposes, or (2) anything designed or sold for incorporation |
312 | +into a dwelling. In determining whether a product is a consumer product, |
313 | +doubtful cases shall be resolved in favor of coverage. For a particular |
314 | +product received by a particular user, "normally used" refers to a |
315 | +typical or common use of that class of product, regardless of the status |
316 | +of the particular user or of the way in which the particular user |
317 | +actually uses, or expects or is expected to use, the product. A product |
318 | +is a consumer product regardless of whether the product has substantial |
319 | +commercial, industrial or non-consumer uses, unless such uses represent |
320 | +the only significant mode of use of the product. |
321 | + |
322 | + "Installation Information" for a User Product means any methods, |
323 | +procedures, authorization keys, or other information required to install |
324 | +and execute modified versions of a covered work in that User Product from |
325 | +a modified version of its Corresponding Source. The information must |
326 | +suffice to ensure that the continued functioning of the modified object |
327 | +code is in no case prevented or interfered with solely because |
328 | +modification has been made. |
329 | + |
330 | + If you convey an object code work under this section in, or with, or |
331 | +specifically for use in, a User Product, and the conveying occurs as |
332 | +part of a transaction in which the right of possession and use of the |
333 | +User Product is transferred to the recipient in perpetuity or for a |
334 | +fixed term (regardless of how the transaction is characterized), the |
335 | +Corresponding Source conveyed under this section must be accompanied |
336 | +by the Installation Information. But this requirement does not apply |
337 | +if neither you nor any third party retains the ability to install |
338 | +modified object code on the User Product (for example, the work has |
339 | +been installed in ROM). |
340 | + |
341 | + The requirement to provide Installation Information does not include a |
342 | +requirement to continue to provide support service, warranty, or updates |
343 | +for a work that has been modified or installed by the recipient, or for |
344 | +the User Product in which it has been modified or installed. Access to a |
345 | +network may be denied when the modification itself materially and |
346 | +adversely affects the operation of the network or violates the rules and |
347 | +protocols for communication across the network. |
348 | + |
349 | + Corresponding Source conveyed, and Installation Information provided, |
350 | +in accord with this section must be in a format that is publicly |
351 | +documented (and with an implementation available to the public in |
352 | +source code form), and must require no special password or key for |
353 | +unpacking, reading or copying. |
354 | + |
355 | + 7. Additional Terms. |
356 | + |
357 | + "Additional permissions" are terms that supplement the terms of this |
358 | +License by making exceptions from one or more of its conditions. |
359 | +Additional permissions that are applicable to the entire Program shall |
360 | +be treated as though they were included in this License, to the extent |
361 | +that they are valid under applicable law. If additional permissions |
362 | +apply only to part of the Program, that part may be used separately |
363 | +under those permissions, but the entire Program remains governed by |
364 | +this License without regard to the additional permissions. |
365 | + |
366 | + When you convey a copy of a covered work, you may at your option |
367 | +remove any additional permissions from that copy, or from any part of |
368 | +it. (Additional permissions may be written to require their own |
369 | +removal in certain cases when you modify the work.) You may place |
370 | +additional permissions on material, added by you to a covered work, |
371 | +for which you have or can give appropriate copyright permission. |
372 | + |
373 | + Notwithstanding any other provision of this License, for material you |
374 | +add to a covered work, you may (if authorized by the copyright holders of |
375 | +that material) supplement the terms of this License with terms: |
376 | + |
377 | + a) Disclaiming warranty or limiting liability differently from the |
378 | + terms of sections 15 and 16 of this License; or |
379 | + |
380 | + b) Requiring preservation of specified reasonable legal notices or |
381 | + author attributions in that material or in the Appropriate Legal |
382 | + Notices displayed by works containing it; or |
383 | + |
384 | + c) Prohibiting misrepresentation of the origin of that material, or |
385 | + requiring that modified versions of such material be marked in |
386 | + reasonable ways as different from the original version; or |
387 | + |
388 | + d) Limiting the use for publicity purposes of names of licensors or |
389 | + authors of the material; or |
390 | + |
391 | + e) Declining to grant rights under trademark law for use of some |
392 | + trade names, trademarks, or service marks; or |
393 | + |
394 | + f) Requiring indemnification of licensors and authors of that |
395 | + material by anyone who conveys the material (or modified versions of |
396 | + it) with contractual assumptions of liability to the recipient, for |
397 | + any liability that these contractual assumptions directly impose on |
398 | + those licensors and authors. |
399 | + |
400 | + All other non-permissive additional terms are considered "further |
401 | +restrictions" within the meaning of section 10. If the Program as you |
402 | +received it, or any part of it, contains a notice stating that it is |
403 | +governed by this License along with a term that is a further |
404 | +restriction, you may remove that term. If a license document contains |
405 | +a further restriction but permits relicensing or conveying under this |
406 | +License, you may add to a covered work material governed by the terms |
407 | +of that license document, provided that the further restriction does |
408 | +not survive such relicensing or conveying. |
409 | + |
410 | + If you add terms to a covered work in accord with this section, you |
411 | +must place, in the relevant source files, a statement of the |
412 | +additional terms that apply to those files, or a notice indicating |
413 | +where to find the applicable terms. |
414 | + |
415 | + Additional terms, permissive or non-permissive, may be stated in the |
416 | +form of a separately written license, or stated as exceptions; |
417 | +the above requirements apply either way. |
418 | + |
419 | + 8. Termination. |
420 | + |
421 | + You may not propagate or modify a covered work except as expressly |
422 | +provided under this License. Any attempt otherwise to propagate or |
423 | +modify it is void, and will automatically terminate your rights under |
424 | +this License (including any patent licenses granted under the third |
425 | +paragraph of section 11). |
426 | + |
427 | + However, if you cease all violation of this License, then your |
428 | +license from a particular copyright holder is reinstated (a) |
429 | +provisionally, unless and until the copyright holder explicitly and |
430 | +finally terminates your license, and (b) permanently, if the copyright |
431 | +holder fails to notify you of the violation by some reasonable means |
432 | +prior to 60 days after the cessation. |
433 | + |
434 | + Moreover, your license from a particular copyright holder is |
435 | +reinstated permanently if the copyright holder notifies you of the |
436 | +violation by some reasonable means, this is the first time you have |
437 | +received notice of violation of this License (for any work) from that |
438 | +copyright holder, and you cure the violation prior to 30 days after |
439 | +your receipt of the notice. |
440 | + |
441 | + Termination of your rights under this section does not terminate the |
442 | +licenses of parties who have received copies or rights from you under |
443 | +this License. If your rights have been terminated and not permanently |
444 | +reinstated, you do not qualify to receive new licenses for the same |
445 | +material under section 10. |
446 | + |
447 | + 9. Acceptance Not Required for Having Copies. |
448 | + |
449 | + You are not required to accept this License in order to receive or |
450 | +run a copy of the Program. Ancillary propagation of a covered work |
451 | +occurring solely as a consequence of using peer-to-peer transmission |
452 | +to receive a copy likewise does not require acceptance. However, |
453 | +nothing other than this License grants you permission to propagate or |
454 | +modify any covered work. These actions infringe copyright if you do |
455 | +not accept this License. Therefore, by modifying or propagating a |
456 | +covered work, you indicate your acceptance of this License to do so. |
457 | + |
458 | + 10. Automatic Licensing of Downstream Recipients. |
459 | + |
460 | + Each time you convey a covered work, the recipient automatically |
461 | +receives a license from the original licensors, to run, modify and |
462 | +propagate that work, subject to this License. You are not responsible |
463 | +for enforcing compliance by third parties with this License. |
464 | + |
465 | + An "entity transaction" is a transaction transferring control of an |
466 | +organization, or substantially all assets of one, or subdividing an |
467 | +organization, or merging organizations. If propagation of a covered |
468 | +work results from an entity transaction, each party to that |
469 | +transaction who receives a copy of the work also receives whatever |
470 | +licenses to the work the party's predecessor in interest had or could |
471 | +give under the previous paragraph, plus a right to possession of the |
472 | +Corresponding Source of the work from the predecessor in interest, if |
473 | +the predecessor has it or can get it with reasonable efforts. |
474 | + |
475 | + You may not impose any further restrictions on the exercise of the |
476 | +rights granted or affirmed under this License. For example, you may |
477 | +not impose a license fee, royalty, or other charge for exercise of |
478 | +rights granted under this License, and you may not initiate litigation |
479 | +(including a cross-claim or counterclaim in a lawsuit) alleging that |
480 | +any patent claim is infringed by making, using, selling, offering for |
481 | +sale, or importing the Program or any portion of it. |
482 | + |
483 | + 11. Patents. |
484 | + |
485 | + A "contributor" is a copyright holder who authorizes use under this |
486 | +License of the Program or a work on which the Program is based. The |
487 | +work thus licensed is called the contributor's "contributor version". |
488 | + |
489 | + A contributor's "essential patent claims" are all patent claims |
490 | +owned or controlled by the contributor, whether already acquired or |
491 | +hereafter acquired, that would be infringed by some manner, permitted |
492 | +by this License, of making, using, or selling its contributor version, |
493 | +but do not include claims that would be infringed only as a |
494 | +consequence of further modification of the contributor version. For |
495 | +purposes of this definition, "control" includes the right to grant |
496 | +patent sublicenses in a manner consistent with the requirements of |
497 | +this License. |
498 | + |
499 | + Each contributor grants you a non-exclusive, worldwide, royalty-free |
500 | +patent license under the contributor's essential patent claims, to |
501 | +make, use, sell, offer for sale, import and otherwise run, modify and |
502 | +propagate the contents of its contributor version. |
503 | + |
504 | + In the following three paragraphs, a "patent license" is any express |
505 | +agreement or commitment, however denominated, not to enforce a patent |
506 | +(such as an express permission to practice a patent or covenant not to |
507 | +sue for patent infringement). To "grant" such a patent license to a |
508 | +party means to make such an agreement or commitment not to enforce a |
509 | +patent against the party. |
510 | + |
511 | + If you convey a covered work, knowingly relying on a patent license, |
512 | +and the Corresponding Source of the work is not available for anyone |
513 | +to copy, free of charge and under the terms of this License, through a |
514 | +publicly available network server or other readily accessible means, |
515 | +then you must either (1) cause the Corresponding Source to be so |
516 | +available, or (2) arrange to deprive yourself of the benefit of the |
517 | +patent license for this particular work, or (3) arrange, in a manner |
518 | +consistent with the requirements of this License, to extend the patent |
519 | +license to downstream recipients. "Knowingly relying" means you have |
520 | +actual knowledge that, but for the patent license, your conveying the |
521 | +covered work in a country, or your recipient's use of the covered work |
522 | +in a country, would infringe one or more identifiable patents in that |
523 | +country that you have reason to believe are valid. |
524 | + |
525 | + If, pursuant to or in connection with a single transaction or |
526 | +arrangement, you convey, or propagate by procuring conveyance of, a |
527 | +covered work, and grant a patent license to some of the parties |
528 | +receiving the covered work authorizing them to use, propagate, modify |
529 | +or convey a specific copy of the covered work, then the patent license |
530 | +you grant is automatically extended to all recipients of the covered |
531 | +work and works based on it. |
532 | + |
533 | + A patent license is "discriminatory" if it does not include within |
534 | +the scope of its coverage, prohibits the exercise of, or is |
535 | +conditioned on the non-exercise of one or more of the rights that are |
536 | +specifically granted under this License. You may not convey a covered |
537 | +work if you are a party to an arrangement with a third party that is |
538 | +in the business of distributing software, under which you make payment |
539 | +to the third party based on the extent of your activity of conveying |
540 | +the work, and under which the third party grants, to any of the |
541 | +parties who would receive the covered work from you, a discriminatory |
542 | +patent license (a) in connection with copies of the covered work |
543 | +conveyed by you (or copies made from those copies), or (b) primarily |
544 | +for and in connection with specific products or compilations that |
545 | +contain the covered work, unless you entered into that arrangement, |
546 | +or that patent license was granted, prior to 28 March 2007. |
547 | + |
548 | + Nothing in this License shall be construed as excluding or limiting |
549 | +any implied license or other defenses to infringement that may |
550 | +otherwise be available to you under applicable patent law. |
551 | + |
552 | + 12. No Surrender of Others' Freedom. |
553 | + |
554 | + If conditions are imposed on you (whether by court order, agreement or |
555 | +otherwise) that contradict the conditions of this License, they do not |
556 | +excuse you from the conditions of this License. If you cannot convey a |
557 | +covered work so as to satisfy simultaneously your obligations under this |
558 | +License and any other pertinent obligations, then as a consequence you may |
559 | +not convey it at all. For example, if you agree to terms that obligate you |
560 | +to collect a royalty for further conveying from those to whom you convey |
561 | +the Program, the only way you could satisfy both those terms and this |
562 | +License would be to refrain entirely from conveying the Program. |
563 | + |
564 | + 13. Use with the GNU Affero General Public License. |
565 | + |
566 | + Notwithstanding any other provision of this License, you have |
567 | +permission to link or combine any covered work with a work licensed |
568 | +under version 3 of the GNU Affero General Public License into a single |
569 | +combined work, and to convey the resulting work. The terms of this |
570 | +License will continue to apply to the part which is the covered work, |
571 | +but the special requirements of the GNU Affero General Public License, |
572 | +section 13, concerning interaction through a network will apply to the |
573 | +combination as such. |
574 | + |
575 | + 14. Revised Versions of this License. |
576 | + |
577 | + The Free Software Foundation may publish revised and/or new versions of |
578 | +the GNU General Public License from time to time. Such new versions will |
579 | +be similar in spirit to the present version, but may differ in detail to |
580 | +address new problems or concerns. |
581 | + |
582 | + Each version is given a distinguishing version number. If the |
583 | +Program specifies that a certain numbered version of the GNU General |
584 | +Public License "or any later version" applies to it, you have the |
585 | +option of following the terms and conditions either of that numbered |
586 | +version or of any later version published by the Free Software |
587 | +Foundation. If the Program does not specify a version number of the |
588 | +GNU General Public License, you may choose any version ever published |
589 | +by the Free Software Foundation. |
590 | + |
591 | + If the Program specifies that a proxy can decide which future |
592 | +versions of the GNU General Public License can be used, that proxy's |
593 | +public statement of acceptance of a version permanently authorizes you |
594 | +to choose that version for the Program. |
595 | + |
596 | + Later license versions may give you additional or different |
597 | +permissions. However, no additional obligations are imposed on any |
598 | +author or copyright holder as a result of your choosing to follow a |
599 | +later version. |
600 | + |
601 | + 15. Disclaimer of Warranty. |
602 | + |
603 | + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY |
604 | +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT |
605 | +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY |
606 | +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, |
607 | +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
608 | +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM |
609 | +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF |
610 | +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. |
611 | + |
612 | + 16. Limitation of Liability. |
613 | + |
614 | + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING |
615 | +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS |
616 | +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY |
617 | +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE |
618 | +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF |
619 | +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD |
620 | +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), |
621 | +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF |
622 | +SUCH DAMAGES. |
623 | + |
624 | + 17. Interpretation of Sections 15 and 16. |
625 | + |
626 | + If the disclaimer of warranty and limitation of liability provided |
627 | +above cannot be given local legal effect according to their terms, |
628 | +reviewing courts shall apply local law that most closely approximates |
629 | +an absolute waiver of all civil liability in connection with the |
630 | +Program, unless a warranty or assumption of liability accompanies a |
631 | +copy of the Program in return for a fee. |
632 | + |
633 | + END OF TERMS AND CONDITIONS |
634 | + |
635 | + How to Apply These Terms to Your New Programs |
636 | + |
637 | + If you develop a new program, and you want it to be of the greatest |
638 | +possible use to the public, the best way to achieve this is to make it |
639 | +free software which everyone can redistribute and change under these terms. |
640 | + |
641 | + To do so, attach the following notices to the program. It is safest |
642 | +to attach them to the start of each source file to most effectively |
643 | +state the exclusion of warranty; and each file should have at least |
644 | +the "copyright" line and a pointer to where the full notice is found. |
645 | + |
646 | + <one line to give the program's name and a brief idea of what it does.> |
647 | + Copyright (C) <year> <name of author> |
648 | + |
649 | + This program is free software: you can redistribute it and/or modify |
650 | + it under the terms of the GNU General Public License as published by |
651 | + the Free Software Foundation, either version 3 of the License, or |
652 | + (at your option) any later version. |
653 | + |
654 | + This program is distributed in the hope that it will be useful, |
655 | + but WITHOUT ANY WARRANTY; without even the implied warranty of |
656 | + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
657 | + GNU General Public License for more details. |
658 | + |
659 | + You should have received a copy of the GNU General Public License |
660 | + along with this program. If not, see <http://www.gnu.org/licenses/>. |
661 | + |
662 | +Also add information on how to contact you by electronic and paper mail. |
663 | + |
664 | + If the program does terminal interaction, make it output a short |
665 | +notice like this when it starts in an interactive mode: |
666 | + |
667 | + <program> Copyright (C) <year> <name of author> |
668 | + This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. |
669 | + This is free software, and you are welcome to redistribute it |
670 | + under certain conditions; type `show c' for details. |
671 | + |
672 | +The hypothetical commands `show w' and `show c' should show the appropriate |
673 | +parts of the General Public License. Of course, your program's commands |
674 | +might be different; for a GUI interface, you would use an "about box". |
675 | + |
676 | + You should also get your employer (if you work as a programmer) or school, |
677 | +if any, to sign a "copyright disclaimer" for the program, if necessary. |
678 | +For more information on this, and how to apply and follow the GNU GPL, see |
679 | +<http://www.gnu.org/licenses/>. |
680 | + |
681 | + The GNU General Public License does not permit incorporating your program |
682 | +into proprietary programs. If your program is a subroutine library, you |
683 | +may consider it more useful to permit linking proprietary applications with |
684 | +the library. If this is what you want to do, use the GNU Lesser General |
685 | +Public License instead of this License. But first, please read |
686 | +<http://www.gnu.org/philosophy/why-not-lgpl.html>. |
687 | |
688 | === added file 'LICENSE' |
689 | --- LICENSE 1970-01-01 00:00:00 +0000 |
690 | +++ LICENSE 2014-01-14 15:09:46 +0000 |
691 | @@ -0,0 +1,16 @@ |
692 | +/* |
693 | + Copyright 2013-2014 Canonical Ltd. |
694 | + |
695 | + This program is free software: you can redistribute it and/or modify it |
696 | + under the terms of the GNU General Public License version 3, as published |
697 | + by the Free Software Foundation. |
698 | + |
699 | + This program is distributed in the hope that it will be useful, but |
700 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
701 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
702 | + PURPOSE. See the GNU General Public License for more details. |
703 | + |
704 | + You should have received a copy of the GNU General Public License along |
705 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
706 | +*/ |
707 | + |
708 | |
709 | === added file 'Makefile' |
710 | --- Makefile 1970-01-01 00:00:00 +0000 |
711 | +++ Makefile 2014-01-14 15:09:46 +0000 |
712 | @@ -0,0 +1,48 @@ |
713 | +GOPATH := $(shell cd ../../..; pwd) |
714 | +export GOPATH |
715 | + |
716 | +PROJECT = launchpad.net/ubuntu-push |
717 | + |
718 | +ifneq ($(CURDIR),$(GOPATH)/src/launchpad.net/ubuntu-push) |
719 | +$(error unexpected curdir and/or layout) |
720 | +endif |
721 | + |
722 | +GODEPS = launchpad.net/gocheck |
723 | + |
724 | +bootstrap: |
725 | + mkdir -p $(GOPATH)/bin |
726 | + mkdir -p $(GOPATH)/pkg |
727 | + go get -u launchpad.net/godeps |
728 | + go get -d -u $(GODEPS) |
729 | + $(GOPATH)/bin/godeps -u dependencies.tsv |
730 | + go install $(GODEPS) |
731 | + |
732 | +check: |
733 | + go test $(PROJECT)/... |
734 | + |
735 | +check-race: |
736 | + go test -race $(PROJECT)/... |
737 | + |
738 | +coverage-summary: |
739 | + go test -a -cover $(PROJECT)/... |
740 | + |
741 | +coverage-html: |
742 | + mkdir -p coverhtml |
743 | + for pkg in $$(go list $(PROJECT)/...|grep -v acceptance ); do \ |
744 | + relname="$${pkg#$(PROJECT)/}" ; \ |
745 | + mkdir -p coverhtml/$$(dirname $${relname}) ; \ |
746 | + go test -a -coverprofile=coverhtml/$${relname}.out $$pkg ; \ |
747 | + if [ -f coverhtml/$${relname}.out ] ; then \ |
748 | + go tool cover -html=coverhtml/$${relname}.out -o coverhtml/$${relname}.html ; \ |
749 | + go tool cover -func=coverhtml/$${relname}.out -o coverhtml/$${relname}.txt ; \ |
750 | + fi \ |
751 | + done |
752 | + |
753 | +format: |
754 | + go fmt $(PROJECT)/... |
755 | + |
756 | +check-format: |
757 | + scripts/check_fmt $(PROJECT) |
758 | + |
759 | +.PHONY: bootstrap check check-race format check-format coverage-summary \ |
760 | + coverage-html |
761 | |
762 | === added file 'README' |
763 | --- README 1970-01-01 00:00:00 +0000 |
764 | +++ README 2014-01-14 15:09:46 +0000 |
765 | @@ -0,0 +1,37 @@ |
766 | +Ubuntu Push Notifications |
767 | +-------------------------- |
768 | + |
769 | +Protocol, client, and development code for Ubuntu Push Notifications. |
770 | + |
771 | +The code expects to be checked out as launchpad.net/ubuntu-push in a Go |
772 | +workspace, see go help gopath. |
773 | + |
774 | +To setup go dependencies one can use: |
775 | + |
776 | + make bootstrap |
777 | + |
778 | +To run tests: |
779 | + |
780 | + make check |
781 | + |
782 | +To produce coverage reports go 1.2 (default on trusty) is needed and |
783 | +the cover tool (the latter can be obtained atm with something like: |
784 | +sudo GOPATH=<go-workspace> go get code.google.com/p/go.tools/cmd/cover |
785 | +) |
786 | + |
787 | +then run: |
788 | + |
789 | + make coverage-summary |
790 | + |
791 | +for a summary report, or: |
792 | + |
793 | +for per package html with annotated code in coverhtml/<package-name>.html |
794 | + |
795 | + make coverage-html |
796 | + |
797 | +(it makes also textual coverhtml/<package-name>.txt reports). |
798 | + |
799 | +To run the acceptance tests, go into the acceptance subdir and run: |
800 | + |
801 | + ./acceptance.sh |
802 | + |
803 | |
804 | === added directory 'config' |
805 | === added file 'config/config.go' |
806 | --- config/config.go 1970-01-01 00:00:00 +0000 |
807 | +++ config/config.go 2014-01-14 15:09:46 +0000 |
808 | @@ -0,0 +1,149 @@ |
809 | +/* |
810 | + Copyright 2013-2014 Canonical Ltd. |
811 | + |
812 | + This program is free software: you can redistribute it and/or modify it |
813 | + under the terms of the GNU General Public License version 3, as published |
814 | + by the Free Software Foundation. |
815 | + |
816 | + This program is distributed in the hope that it will be useful, but |
817 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
818 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
819 | + PURPOSE. See the GNU General Public License for more details. |
820 | + |
821 | + You should have received a copy of the GNU General Public License along |
822 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
823 | +*/ |
824 | + |
825 | +// Package config has helpers to parse and use JSON based configuration. |
826 | +package config |
827 | + |
828 | +import ( |
829 | + "encoding/json" |
830 | + "errors" |
831 | + "fmt" |
832 | + "io" |
833 | + "io/ioutil" |
834 | + "net" |
835 | + "path/filepath" |
836 | + "reflect" |
837 | + "strings" |
838 | + "time" |
839 | +) |
840 | + |
841 | +// ReadConfig reads a JSON configuration into destConfig which should |
842 | +// be a pointer to a structure, it does some more configuration |
843 | +// specific error checking than plain JSON decoding and mentions |
844 | +// fields in errors . Configuration fields are expected to start with |
845 | +// lower case in the JSON object. |
846 | +func ReadConfig(r io.Reader, destConfig interface{}) error { |
847 | + destValue := reflect.ValueOf(destConfig) |
848 | + if destValue.Kind() != reflect.Ptr || destValue.Elem().Kind() != reflect.Struct { |
849 | + return errors.New("destConfig not *struct") |
850 | + } |
851 | + // do the parsing in two phases for better error handling |
852 | + var p1 map[string]json.RawMessage |
853 | + err := json.NewDecoder(r).Decode(&p1) |
854 | + if err != nil { |
855 | + return err |
856 | + } |
857 | + destStruct := destValue.Elem() |
858 | + structType := destStruct.Type() |
859 | + n := structType.NumField() |
860 | + for i := 0; i < n; i++ { |
861 | + fld := structType.Field(i) |
862 | + configName := strings.Split(fld.Tag.Get("json"), ",")[0] |
863 | + if configName == "" { |
864 | + configName = strings.ToLower(fld.Name[:1]) + fld.Name[1:] |
865 | + } |
866 | + raw, found := p1[configName] |
867 | + if !found { // assume all fields are mandatory for now |
868 | + return fmt.Errorf("missing %s", configName) |
869 | + } |
870 | + dest := destStruct.Field(i).Addr().Interface() |
871 | + err = json.Unmarshal([]byte(raw), dest) |
872 | + if err != nil { |
873 | + return fmt.Errorf("%s: %v", configName, err) |
874 | + } |
875 | + } |
876 | + return nil |
877 | +} |
878 | + |
879 | +// ConfigTimeDuration can hold a time.Duration in a configuration struct, |
880 | +// that is parsed from a string as supported by time.ParseDuration. |
881 | +type ConfigTimeDuration struct { |
882 | + time.Duration |
883 | +} |
884 | + |
885 | +func (ctd *ConfigTimeDuration) UnmarshalJSON(b []byte) error { |
886 | + var enc string |
887 | + var v time.Duration |
888 | + err := json.Unmarshal(b, &enc) |
889 | + if err != nil { |
890 | + return err |
891 | + } |
892 | + v, err = time.ParseDuration(enc) |
893 | + if err != nil { |
894 | + return err |
895 | + } |
896 | + *ctd = ConfigTimeDuration{v} |
897 | + return nil |
898 | +} |
899 | + |
900 | +// TimeDuration returns the time.Duration held in ctd. |
901 | +func (ctd ConfigTimeDuration) TimeDuration() time.Duration { |
902 | + return ctd.Duration |
903 | +} |
904 | + |
905 | +// ConfigHostPort can hold a host:port string in a configuration struct. |
906 | +type ConfigHostPort string |
907 | + |
908 | +func (chp *ConfigHostPort) UnmarshalJSON(b []byte) error { |
909 | + var enc string |
910 | + err := json.Unmarshal(b, &enc) |
911 | + if err != nil { |
912 | + return err |
913 | + } |
914 | + _, _, err = net.SplitHostPort(enc) |
915 | + if err != nil { |
916 | + return err |
917 | + } |
918 | + *chp = ConfigHostPort(enc) |
919 | + return nil |
920 | +} |
921 | + |
922 | +// HostPort returns the host:port string held in chp. |
923 | +func (chp ConfigHostPort) HostPort() string { |
924 | + return string(chp) |
925 | +} |
926 | + |
927 | +// ConfigQueueSize can hold a queue size in a configuration struct. |
928 | +type ConfigQueueSize uint |
929 | + |
930 | +func (cqs *ConfigQueueSize) UnmarshalJSON(b []byte) error { |
931 | + var enc uint |
932 | + err := json.Unmarshal(b, &enc) |
933 | + if err != nil { |
934 | + return err |
935 | + } |
936 | + if enc == 0 { |
937 | + return errors.New("queue size should be > 0") |
938 | + } |
939 | + *cqs = ConfigQueueSize(enc) |
940 | + return nil |
941 | +} |
942 | + |
943 | +// QueueSize returns the queue size held in cqs. |
944 | +func (cqs ConfigQueueSize) QueueSize() uint { |
945 | + return uint(cqs) |
946 | +} |
947 | + |
948 | +// LoadFile reads a file possibly relative to a base dir. |
949 | +func LoadFile(p, baseDir string) ([]byte, error) { |
950 | + if p == "" { |
951 | + return nil, nil |
952 | + } |
953 | + if !filepath.IsAbs(p) { |
954 | + p = filepath.Join(baseDir, p) |
955 | + } |
956 | + return ioutil.ReadFile(p) |
957 | +} |
958 | |
959 | === added file 'config/config_test.go' |
960 | --- config/config_test.go 1970-01-01 00:00:00 +0000 |
961 | +++ config/config_test.go 2014-01-14 15:09:46 +0000 |
962 | @@ -0,0 +1,135 @@ |
963 | +/* |
964 | + Copyright 2013-2014 Canonical Ltd. |
965 | + |
966 | + This program is free software: you can redistribute it and/or modify it |
967 | + under the terms of the GNU General Public License version 3, as published |
968 | + by the Free Software Foundation. |
969 | + |
970 | + This program is distributed in the hope that it will be useful, but |
971 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
972 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
973 | + PURPOSE. See the GNU General Public License for more details. |
974 | + |
975 | + You should have received a copy of the GNU General Public License along |
976 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
977 | +*/ |
978 | + |
979 | +package config |
980 | + |
981 | +import ( |
982 | + "bytes" |
983 | + "io/ioutil" |
984 | + . "launchpad.net/gocheck" |
985 | + "os" |
986 | + "path/filepath" |
987 | + "testing" |
988 | + "time" |
989 | +) |
990 | + |
991 | +func TestConfig(t *testing.T) { TestingT(t) } |
992 | + |
993 | +type configSuite struct{} |
994 | + |
995 | +var _ = Suite(&configSuite{}) |
996 | + |
997 | +type testConfig1 struct { |
998 | + A int |
999 | + B string |
1000 | + C []string `json:"c_list"` |
1001 | +} |
1002 | + |
1003 | +func (s *configSuite) TestReadConfig(c *C) { |
1004 | + buf := bytes.NewBufferString(`{"a": 1, "b": "foo", "c_list": ["c", "d", "e"]}`) |
1005 | + var cfg testConfig1 |
1006 | + err := ReadConfig(buf, &cfg) |
1007 | + c.Check(err, IsNil) |
1008 | + c.Check(cfg, DeepEquals, testConfig1{A: 1, B: "foo", C: []string{"c", "d", "e"}}) |
1009 | +} |
1010 | + |
1011 | +func checkError(c *C, config string, dest interface{}, expectedError string) { |
1012 | + buf := bytes.NewBufferString(config) |
1013 | + err := ReadConfig(buf, dest) |
1014 | + c.Check(err, ErrorMatches, expectedError) |
1015 | +} |
1016 | + |
1017 | +func (s *configSuite) TestReadConfigErrors(c *C) { |
1018 | + var cfg testConfig1 |
1019 | + checkError(c, "", cfg, `destConfig not \*struct`) |
1020 | + var i int |
1021 | + checkError(c, "", &i, `destConfig not \*struct`) |
1022 | + checkError(c, "", &cfg, `EOF`) |
1023 | + checkError(c, `{"a": "1"}`, &cfg, `a: .*type int`) |
1024 | + checkError(c, `{"b": "1"}`, &cfg, `missing a`) |
1025 | + checkError(c, `{"A": "1"}`, &cfg, `missing a`) |
1026 | + checkError(c, `{"a": 1, "b": "foo"}`, &cfg, `missing c_list`) |
1027 | +} |
1028 | + |
1029 | +type testTimeDurationConfig struct { |
1030 | + D ConfigTimeDuration |
1031 | +} |
1032 | + |
1033 | +func (s *configSuite) TestReadConfigTimeDuration(c *C) { |
1034 | + buf := bytes.NewBufferString(`{"d": "2s"}`) |
1035 | + var cfg testTimeDurationConfig |
1036 | + err := ReadConfig(buf, &cfg) |
1037 | + c.Assert(err, IsNil) |
1038 | + c.Check(cfg.D.TimeDuration(), Equals, 2*time.Second) |
1039 | +} |
1040 | + |
1041 | +func (s *configSuite) TestReadConfigTimeDurationErrors(c *C) { |
1042 | + var cfg testTimeDurationConfig |
1043 | + checkError(c, `{"d": 1}`, &cfg, "d:.*type string") |
1044 | + checkError(c, `{"d": "2"}`, &cfg, "d:.*missing unit.*") |
1045 | +} |
1046 | + |
1047 | +type testHostPortConfig struct { |
1048 | + H ConfigHostPort |
1049 | +} |
1050 | + |
1051 | +func (s *configSuite) TestReadConfigHostPort(c *C) { |
1052 | + buf := bytes.NewBufferString(`{"h": "127.0.0.1:9999"}`) |
1053 | + var cfg testHostPortConfig |
1054 | + err := ReadConfig(buf, &cfg) |
1055 | + c.Assert(err, IsNil) |
1056 | + c.Check(cfg.H.HostPort(), Equals, "127.0.0.1:9999") |
1057 | +} |
1058 | + |
1059 | +func (s *configSuite) TestReadConfigHostPortErrors(c *C) { |
1060 | + var cfg testHostPortConfig |
1061 | + checkError(c, `{"h": 1}`, &cfg, "h:.*type string") |
1062 | + checkError(c, `{"h": ""}`, &cfg, "h: missing port in address") |
1063 | +} |
1064 | + |
1065 | +type testQueueSizeConfig struct { |
1066 | + QS ConfigQueueSize |
1067 | +} |
1068 | + |
1069 | +func (s *configSuite) TestReadConfigQueueSize(c *C) { |
1070 | + buf := bytes.NewBufferString(`{"qS": 1}`) |
1071 | + var cfg testQueueSizeConfig |
1072 | + err := ReadConfig(buf, &cfg) |
1073 | + c.Assert(err, IsNil) |
1074 | + c.Check(cfg.QS.QueueSize(), Equals, uint(1)) |
1075 | +} |
1076 | + |
1077 | +func (s *configSuite) TestReadConfigQueueSizeErrors(c *C) { |
1078 | + var cfg testQueueSizeConfig |
1079 | + checkError(c, `{"qS": "x"}`, &cfg, "qS: .*type uint") |
1080 | + checkError(c, `{"qS": 0}`, &cfg, "qS: queue size should be > 0") |
1081 | +} |
1082 | + |
1083 | +func (s *configSuite) TestLoadFile(c *C) { |
1084 | + tmpDir := c.MkDir() |
1085 | + d, err := LoadFile("", tmpDir) |
1086 | + c.Check(err, IsNil) |
1087 | + c.Check(d, IsNil) |
1088 | + fullPath := filepath.Join(tmpDir, "example.file") |
1089 | + err = ioutil.WriteFile(fullPath, []byte("Example"), os.ModePerm) |
1090 | + c.Assert(err, IsNil) |
1091 | + d, err = LoadFile("example.file", tmpDir) |
1092 | + c.Check(err, IsNil) |
1093 | + c.Check(string(d), Equals, "Example") |
1094 | + d, err = LoadFile(fullPath, tmpDir) |
1095 | + c.Check(err, IsNil) |
1096 | + c.Check(string(d), Equals, "Example") |
1097 | +} |
1098 | |
1099 | === added file 'dependencies.tsv' |
1100 | --- dependencies.tsv 1970-01-01 00:00:00 +0000 |
1101 | +++ dependencies.tsv 2014-01-14 15:09:46 +0000 |
1102 | @@ -0,0 +1,1 @@ |
1103 | +launchpad.net/gocheck bzr gustavo@niemeyer.net-20130302024745-6ikofwq2c03h7giu 85 |
1104 | |
1105 | === added directory 'logger' |
1106 | === added file 'logger/logger.go' |
1107 | --- logger/logger.go 1970-01-01 00:00:00 +0000 |
1108 | +++ logger/logger.go 2014-01-14 15:09:46 +0000 |
1109 | @@ -0,0 +1,100 @@ |
1110 | +/* |
1111 | + Copyright 2013-2014 Canonical Ltd. |
1112 | + |
1113 | + This program is free software: you can redistribute it and/or modify it |
1114 | + under the terms of the GNU General Public License version 3, as published |
1115 | + by the Free Software Foundation. |
1116 | + |
1117 | + This program is distributed in the hope that it will be useful, but |
1118 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
1119 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1120 | + PURPOSE. See the GNU General Public License for more details. |
1121 | + |
1122 | + You should have received a copy of the GNU General Public License along |
1123 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
1124 | +*/ |
1125 | + |
1126 | +// Package logger defines a simple logger API with level of logging control. |
1127 | +package logger |
1128 | + |
1129 | +import ( |
1130 | + "fmt" |
1131 | + "io" |
1132 | + "log" |
1133 | + "os" |
1134 | + "runtime" |
1135 | +) |
1136 | + |
1137 | +// Logger is a simple logger interface with logging at levels. |
1138 | +type Logger interface { |
1139 | + // Errorf logs an error. |
1140 | + Errorf(format string, v ...interface{}) |
1141 | + // Fatalf logs an error and exists the program with os.Exit(1). |
1142 | + Fatalf(format string, v ...interface{}) |
1143 | + // Recoverf recover from a possible panic and logs it. |
1144 | + Recoverf(format string, v ...interface{}) |
1145 | + // Infof logs a info message. |
1146 | + Infof(format string, v ...interface{}) |
1147 | + // Debugf logs a debug message. |
1148 | + Debugf(format string, v ...interface{}) |
1149 | +} |
1150 | + |
1151 | +type simpleLogger struct { |
1152 | + *log.Logger |
1153 | + nlevel int |
1154 | +} |
1155 | + |
1156 | +const ( |
1157 | + lError = iota |
1158 | + lInfo |
1159 | + lDebug |
1160 | +) |
1161 | + |
1162 | +var levelToNLevel = map[string]int{ |
1163 | + "error": lError, |
1164 | + "info": lInfo, |
1165 | + "debug": lDebug, |
1166 | +} |
1167 | + |
1168 | +// NewSimpleLogger creates a logger logging only up to the given level. |
1169 | +// level can be in order: "error", "info", "debug". |
1170 | +func NewSimpleLogger(w io.Writer, level string) Logger { |
1171 | + nlevel := levelToNLevel[level] |
1172 | + return &simpleLogger{ |
1173 | + log.New(w, "", log.Ldate|log.Ltime|log.Lmicroseconds), |
1174 | + nlevel, |
1175 | + } |
1176 | +} |
1177 | + |
1178 | +func (lg *simpleLogger) Errorf(format string, v ...interface{}) { |
1179 | + lg.Printf("ERROR "+format, v...) |
1180 | +} |
1181 | + |
1182 | +var osExit = os.Exit // for testing |
1183 | + |
1184 | +func (lg *simpleLogger) Fatalf(format string, v ...interface{}) { |
1185 | + lg.Printf("ERROR "+format, v...) |
1186 | + osExit(1) |
1187 | +} |
1188 | + |
1189 | +func (lg *simpleLogger) Recoverf(format string, v ...interface{}) { |
1190 | + if err := recover(); err != nil { |
1191 | + msg := fmt.Sprintf(format, v...) |
1192 | + stack := make([]byte, 8*1024) // Stack writes less but doesn't fail |
1193 | + stackWritten := runtime.Stack(stack, false) |
1194 | + stack = stack[:stackWritten] |
1195 | + lg.Printf("ERROR panic %v!! %s:\n%s", err, msg, stack) |
1196 | + } |
1197 | +} |
1198 | + |
1199 | +func (lg *simpleLogger) Infof(format string, v ...interface{}) { |
1200 | + if lg.nlevel >= lInfo { |
1201 | + lg.Printf("INFO "+format, v...) |
1202 | + } |
1203 | +} |
1204 | + |
1205 | +func (lg *simpleLogger) Debugf(format string, v ...interface{}) { |
1206 | + if lg.nlevel >= lDebug { |
1207 | + lg.Printf("DEBUG "+format, v...) |
1208 | + } |
1209 | +} |
1210 | |
1211 | === added file 'logger/logger_test.go' |
1212 | --- logger/logger_test.go 1970-01-01 00:00:00 +0000 |
1213 | +++ logger/logger_test.go 2014-01-14 15:09:46 +0000 |
1214 | @@ -0,0 +1,117 @@ |
1215 | +/* |
1216 | + Copyright 2013-2014 Canonical Ltd. |
1217 | + |
1218 | + This program is free software: you can redistribute it and/or modify it |
1219 | + under the terms of the GNU General Public License version 3, as published |
1220 | + by the Free Software Foundation. |
1221 | + |
1222 | + This program is distributed in the hope that it will be useful, but |
1223 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
1224 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1225 | + PURPOSE. See the GNU General Public License for more details. |
1226 | + |
1227 | + You should have received a copy of the GNU General Public License along |
1228 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
1229 | +*/ |
1230 | + |
1231 | +package logger |
1232 | + |
1233 | +import ( |
1234 | + "bytes" |
1235 | + . "launchpad.net/gocheck" |
1236 | + "os" |
1237 | + "testing" |
1238 | +) |
1239 | + |
1240 | +func TestLogger(t *testing.T) { TestingT(t) } |
1241 | + |
1242 | +type loggerSuite struct{} |
1243 | + |
1244 | +var _ = Suite(&loggerSuite{}) |
1245 | + |
1246 | +func (s *loggerSuite) TestErrorf(c *C) { |
1247 | + buf := &bytes.Buffer{} |
1248 | + logger := NewSimpleLogger(buf, "error") |
1249 | + logger.Errorf("%v %d", "error", 1) |
1250 | + c.Check(buf.String(), Matches, ".* ERROR error 1\n") |
1251 | +} |
1252 | + |
1253 | +func (s *loggerSuite) TestFatalf(c *C) { |
1254 | + defer func() { |
1255 | + osExit = os.Exit |
1256 | + }() |
1257 | + var exitCode int |
1258 | + osExit = func(code int) { |
1259 | + exitCode = code |
1260 | + } |
1261 | + buf := &bytes.Buffer{} |
1262 | + logger := NewSimpleLogger(buf, "error") |
1263 | + logger.Fatalf("%v %v", "error", "fatal") |
1264 | + c.Check(buf.String(), Matches, ".* ERROR error fatal\n") |
1265 | + c.Check(exitCode, Equals, 1) |
1266 | +} |
1267 | + |
1268 | +func (s *loggerSuite) TestInfof(c *C) { |
1269 | + buf := &bytes.Buffer{} |
1270 | + logger := NewSimpleLogger(buf, "info") |
1271 | + logger.Infof("%v %d", "info", 1) |
1272 | + c.Check(buf.String(), Matches, ".* INFO info 1\n") |
1273 | +} |
1274 | + |
1275 | +func (s *loggerSuite) TestDebugf(c *C) { |
1276 | + buf := &bytes.Buffer{} |
1277 | + logger := NewSimpleLogger(buf, "debug") |
1278 | + logger.Debugf("%v %d", "debug", 1) |
1279 | + c.Check(buf.String(), Matches, `.* DEBUG debug 1\n`) |
1280 | +} |
1281 | + |
1282 | +func (s *loggerSuite) TestFormat(c *C) { |
1283 | + buf := &bytes.Buffer{} |
1284 | + logger := NewSimpleLogger(buf, "error") |
1285 | + logger.Errorf("%v %d", "error", 2) |
1286 | + c.Check(buf.String(), Matches, `.* .*\.\d+ ERROR error 2\n`) |
1287 | +} |
1288 | + |
1289 | +func (s *loggerSuite) TestLevel(c *C) { |
1290 | + buf := &bytes.Buffer{} |
1291 | + logger := NewSimpleLogger(buf, "error") |
1292 | + logger.Errorf("%s%d", "e", 3) |
1293 | + logger.Infof("%s%d", "i", 3) |
1294 | + logger.Debugf("%s%d", "d", 3) |
1295 | + c.Check(buf.String(), Matches, `.* ERROR e3\n`) |
1296 | + |
1297 | + buf.Reset() |
1298 | + logger = NewSimpleLogger(buf, "info") |
1299 | + logger.Errorf("%s%d", "e", 4) |
1300 | + logger.Debugf("%s%d", "d", 4) |
1301 | + logger.Infof("%s%d", "i", 4) |
1302 | + c.Check(buf.String(), Matches, `.* ERROR e4\n.* INFO i4\n`) |
1303 | + |
1304 | + buf.Reset() |
1305 | + logger = NewSimpleLogger(buf, "debug") |
1306 | + logger.Errorf("%s%d", "e", 5) |
1307 | + logger.Debugf("%s%d", "d", 5) |
1308 | + logger.Infof("%s%d", "i", 5) |
1309 | + c.Check(buf.String(), Matches, `.* ERROR e5\n.* DEBUG d5\n.* INFO i5\n`) |
1310 | +} |
1311 | + |
1312 | +func panicAndRecover(logger Logger, n int, doPanic bool) { |
1313 | + defer logger.Recoverf("%v %d", "panic", n) |
1314 | + if doPanic { |
1315 | + panic("Troubles") |
1316 | + } |
1317 | +} |
1318 | + |
1319 | +func (s *loggerSuite) TestRecoverf(c *C) { |
1320 | + buf := &bytes.Buffer{} |
1321 | + logger := NewSimpleLogger(buf, "error") |
1322 | + panicAndRecover(logger, 6, true) |
1323 | + c.Check(buf.String(), Matches, "(?s).* ERROR panic Troubles!! panic 6:.*panicAndRecover.*") |
1324 | +} |
1325 | + |
1326 | +func (s *loggerSuite) TestRecoverfNop(c *C) { |
1327 | + buf := &bytes.Buffer{} |
1328 | + logger := NewSimpleLogger(buf, "error") |
1329 | + panicAndRecover(logger, 6, false) |
1330 | + c.Check(buf.String(), Equals, "") |
1331 | +} |
1332 | |
1333 | === added directory 'protocol' |
1334 | === added file 'protocol/messages.go' |
1335 | --- protocol/messages.go 1970-01-01 00:00:00 +0000 |
1336 | +++ protocol/messages.go 2014-01-14 15:09:46 +0000 |
1337 | @@ -0,0 +1,103 @@ |
1338 | +/* |
1339 | + Copyright 2013-2014 Canonical Ltd. |
1340 | + |
1341 | + This program is free software: you can redistribute it and/or modify it |
1342 | + under the terms of the GNU General Public License version 3, as published |
1343 | + by the Free Software Foundation. |
1344 | + |
1345 | + This program is distributed in the hope that it will be useful, but |
1346 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
1347 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1348 | + PURPOSE. See the GNU General Public License for more details. |
1349 | + |
1350 | + You should have received a copy of the GNU General Public License along |
1351 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
1352 | +*/ |
1353 | + |
1354 | +package protocol |
1355 | + |
1356 | +// representative struct for messages |
1357 | + |
1358 | +import ( |
1359 | + "encoding/json" |
1360 | + // "log" |
1361 | +) |
1362 | + |
1363 | +// System channel id using a shortened hex encoded form for the NIL UUID |
1364 | +const SystemChannelId = "0" |
1365 | + |
1366 | +// CONNECT message |
1367 | +type ConnectMsg struct { |
1368 | + Type string `json:"T"` |
1369 | + ClientVer string |
1370 | + DeviceId string |
1371 | + Info map[string]interface{} `json:",omitempty"` // platform etc... |
1372 | + // maps channel ids (hex encoded UUIDs) to known client channel levels |
1373 | + Levels map[string]int64 |
1374 | +} |
1375 | + |
1376 | +// PING/PONG messages |
1377 | +type PingPongMsg struct { |
1378 | + Type string `json:"T"` |
1379 | +} |
1380 | + |
1381 | +const maxPayloadSize = 62 * 1024 |
1382 | + |
1383 | +// SplittableMsg are messages that may require and are capable of splitting. |
1384 | +type SplittableMsg interface { |
1385 | + Split() (done bool) |
1386 | +} |
1387 | + |
1388 | +// BROADCAST messages |
1389 | +type BroadcastMsg struct { |
1390 | + Type string `json:"T"` |
1391 | + AppId string `json:",omitempty"` |
1392 | + ChanId string |
1393 | + TopLevel int64 |
1394 | + Payloads []json.RawMessage |
1395 | + splitting int |
1396 | +} |
1397 | + |
1398 | +func (m *BroadcastMsg) Split() bool { |
1399 | + var prevTop int64 |
1400 | + if m.splitting == 0 { |
1401 | + prevTop = m.TopLevel - int64(len(m.Payloads)) |
1402 | + } else { |
1403 | + prevTop = m.TopLevel |
1404 | + m.Payloads = m.Payloads[len(m.Payloads):m.splitting] |
1405 | + m.TopLevel = prevTop + int64(len(m.Payloads)) |
1406 | + } |
1407 | + payloads := m.Payloads |
1408 | + var size int |
1409 | + for i := range payloads { |
1410 | + size += len(payloads[i]) |
1411 | + if size > maxPayloadSize { |
1412 | + m.TopLevel = prevTop + int64(i) |
1413 | + m.splitting = len(payloads) |
1414 | + m.Payloads = payloads[:i] |
1415 | + return false |
1416 | + } |
1417 | + } |
1418 | + return true |
1419 | +} |
1420 | + |
1421 | +// NOTIFICATIONS message |
1422 | +type NotificationsMsg struct { |
1423 | + Type string `json:"T"` |
1424 | + Notifications []Notification |
1425 | +} |
1426 | + |
1427 | +// A single unicast notification |
1428 | +type Notification struct { |
1429 | + AppId string `json:"A"` |
1430 | + MsgId string `json:"M"` |
1431 | + // payload |
1432 | + Payload json.RawMessage `json:"P"` |
1433 | +} |
1434 | + |
1435 | +// ACKnowelgement message |
1436 | +type AckMsg struct { |
1437 | + Type string `json:"T"` |
1438 | +} |
1439 | + |
1440 | +// xxx ... query levels messages |
1441 | |
1442 | === added file 'protocol/messages_test.go' |
1443 | --- protocol/messages_test.go 1970-01-01 00:00:00 +0000 |
1444 | +++ protocol/messages_test.go 2014-01-14 15:09:46 +0000 |
1445 | @@ -0,0 +1,101 @@ |
1446 | +/* |
1447 | + Copyright 2013-2014 Canonical Ltd. |
1448 | + |
1449 | + This program is free software: you can redistribute it and/or modify it |
1450 | + under the terms of the GNU General Public License version 3, as published |
1451 | + by the Free Software Foundation. |
1452 | + |
1453 | + This program is distributed in the hope that it will be useful, but |
1454 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
1455 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1456 | + PURPOSE. See the GNU General Public License for more details. |
1457 | + |
1458 | + You should have received a copy of the GNU General Public License along |
1459 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
1460 | +*/ |
1461 | + |
1462 | +package protocol |
1463 | + |
1464 | +import ( |
1465 | + "encoding/json" |
1466 | + "fmt" |
1467 | + . "launchpad.net/gocheck" |
1468 | + "strings" |
1469 | +) |
1470 | + |
1471 | +type messagesSuite struct{} |
1472 | + |
1473 | +var _ = Suite(&messagesSuite{}) |
1474 | + |
1475 | +func (s *messagesSuite) TestSplitBroadcastMsgNop(c *C) { |
1476 | + b := &BroadcastMsg{ |
1477 | + Type: "broadcast", |
1478 | + AppId: "APP", |
1479 | + ChanId: "0", |
1480 | + TopLevel: 2, |
1481 | + Payloads: []json.RawMessage{json.RawMessage(`{b:1}`), json.RawMessage(`{b:2}`)}, |
1482 | + } |
1483 | + done := b.Split() |
1484 | + c.Check(done, Equals, true) |
1485 | + c.Check(b.TopLevel, Equals, int64(2)) |
1486 | + c.Check(cap(b.Payloads), Equals, 2) |
1487 | + c.Check(len(b.Payloads), Equals, 2) |
1488 | +} |
1489 | + |
1490 | +var payloadFmt = fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
1491 | + |
1492 | +func manyParts(c int) []json.RawMessage { |
1493 | + payloads := make([]json.RawMessage, 0, 1) |
1494 | + for i := 0; i < c; i++ { |
1495 | + payloads = append(payloads, json.RawMessage(fmt.Sprintf(payloadFmt, i))) |
1496 | + } |
1497 | + return payloads |
1498 | +} |
1499 | + |
1500 | +func (s *messagesSuite) TestSplitBroadcastMsgManyParts(c *C) { |
1501 | + payloads := manyParts(33) |
1502 | + n := len(payloads) |
1503 | + // more interesting this way |
1504 | + c.Assert(cap(payloads), Not(Equals), n) |
1505 | + b := &BroadcastMsg{ |
1506 | + Type: "broadcast", |
1507 | + AppId: "APP", |
1508 | + ChanId: "0", |
1509 | + TopLevel: 500, |
1510 | + Payloads: payloads, |
1511 | + } |
1512 | + done := b.Split() |
1513 | + c.Assert(done, Equals, false) |
1514 | + n1 := len(b.Payloads) |
1515 | + c.Check(b.TopLevel, Equals, int64(500-n+n1)) |
1516 | + buf, err := json.Marshal(b) |
1517 | + c.Assert(err, IsNil) |
1518 | + c.Assert(len(buf) <= 65535, Equals, true) |
1519 | + c.Check(len(buf)+len(payloads[n1]) > maxPayloadSize, Equals, true) |
1520 | + done = b.Split() |
1521 | + c.Assert(done, Equals, true) |
1522 | + n2 := len(b.Payloads) |
1523 | + c.Check(b.TopLevel, Equals, int64(500)) |
1524 | + c.Check(n1+n2, Equals, n) |
1525 | + |
1526 | + payloads = manyParts(61) |
1527 | + n = len(payloads) |
1528 | + b = &BroadcastMsg{ |
1529 | + Type: "broadcast", |
1530 | + AppId: "APP", |
1531 | + ChanId: "0", |
1532 | + TopLevel: int64(n), |
1533 | + Payloads: payloads, |
1534 | + } |
1535 | + done = b.Split() |
1536 | + c.Assert(done, Equals, false) |
1537 | + n1 = len(b.Payloads) |
1538 | + done = b.Split() |
1539 | + c.Assert(done, Equals, false) |
1540 | + n2 = len(b.Payloads) |
1541 | + done = b.Split() |
1542 | + c.Assert(done, Equals, true) |
1543 | + n3 := len(b.Payloads) |
1544 | + c.Check(b.TopLevel, Equals, int64(n)) |
1545 | + c.Check(n1+n2+n3, Equals, n) |
1546 | +} |
1547 | |
1548 | === added file 'protocol/protocol.go' |
1549 | --- protocol/protocol.go 1970-01-01 00:00:00 +0000 |
1550 | +++ protocol/protocol.go 2014-01-14 15:09:46 +0000 |
1551 | @@ -0,0 +1,103 @@ |
1552 | +/* |
1553 | + Copyright 2013-2014 Canonical Ltd. |
1554 | + |
1555 | + This program is free software: you can redistribute it and/or modify it |
1556 | + under the terms of the GNU General Public License version 3, as published |
1557 | + by the Free Software Foundation. |
1558 | + |
1559 | + This program is distributed in the hope that it will be useful, but |
1560 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
1561 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1562 | + PURPOSE. See the GNU General Public License for more details. |
1563 | + |
1564 | + You should have received a copy of the GNU General Public License along |
1565 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
1566 | +*/ |
1567 | + |
1568 | +// Package protocol has code to talk the client-daemon<->push-server protocol. |
1569 | +package protocol |
1570 | + |
1571 | +import ( |
1572 | + "bytes" |
1573 | + "encoding/binary" |
1574 | + "encoding/json" |
1575 | + "fmt" |
1576 | + "io" |
1577 | + "net" |
1578 | + "time" |
1579 | +) |
1580 | + |
1581 | +// Protocol is a connection capable of writing and reading the wire format of protocol messages. |
1582 | +type Protocol interface { |
1583 | + SetDeadline(t time.Time) |
1584 | + ReadMessage(msg interface{}) error |
1585 | + WriteMessage(msg interface{}) error |
1586 | +} |
1587 | + |
1588 | +func ReadWireFormatVersion(conn net.Conn, exchangeTimeout time.Duration) (ver int, err error) { |
1589 | + var buf1 [1]byte |
1590 | + err = conn.SetReadDeadline(time.Now().Add(exchangeTimeout)) |
1591 | + if err != nil { |
1592 | + panic(fmt.Errorf("can't set deadline: %v", err)) |
1593 | + } |
1594 | + _, err = conn.Read(buf1[:]) |
1595 | + ver = int(buf1[0]) |
1596 | + return |
1597 | +} |
1598 | + |
1599 | +const ProtocolWireVersion = 0 |
1600 | + |
1601 | +// protocol0 handles version 0 of the wire format |
1602 | +type protocol0 struct { |
1603 | + buffer *bytes.Buffer |
1604 | + enc *json.Encoder |
1605 | + conn net.Conn |
1606 | +} |
1607 | + |
1608 | +// NewProtocol0 creates and initialises a protocol with wire format version 0. |
1609 | +func NewProtocol0(conn net.Conn) *protocol0 { |
1610 | + buf := bytes.NewBuffer(make([]byte, 5000)) |
1611 | + return &protocol0{ |
1612 | + buffer: buf, |
1613 | + enc: json.NewEncoder(buf), |
1614 | + conn: conn} |
1615 | +} |
1616 | + |
1617 | +// SetDeadline sets deadline for the subsquent WriteMessage/ReadMessage exchange |
1618 | +func (c *protocol0) SetDeadline(t time.Time) { |
1619 | + err := c.conn.SetDeadline(t) |
1620 | + if err != nil { |
1621 | + panic(fmt.Errorf("can't set deadline: %v", err)) |
1622 | + } |
1623 | +} |
1624 | + |
1625 | +// ReadMessage reads one message made of big endian uint16 length, JSON body of length from the connection. |
1626 | +func (c *protocol0) ReadMessage(msg interface{}) error { |
1627 | + c.buffer.Reset() |
1628 | + _, err := io.CopyN(c.buffer, c.conn, 2) |
1629 | + if err != nil { |
1630 | + return err |
1631 | + } |
1632 | + length := binary.BigEndian.Uint16(c.buffer.Bytes()) |
1633 | + c.buffer.Reset() |
1634 | + _, err = io.CopyN(c.buffer, c.conn, int64(length)) |
1635 | + if err != nil { |
1636 | + return err |
1637 | + } |
1638 | + return json.Unmarshal(c.buffer.Bytes(), msg) |
1639 | +} |
1640 | + |
1641 | +// WriteMessage writes one message made of big endian uint16 length, JSON body of length to the connection. |
1642 | +func (c *protocol0) WriteMessage(msg interface{}) error { |
1643 | + c.buffer.Reset() |
1644 | + c.buffer.WriteString("\x00\x00") // placeholder for length |
1645 | + err := c.enc.Encode(msg) |
1646 | + if err != nil { |
1647 | + panic(fmt.Errorf("WriteMessage got: %v", err)) |
1648 | + } |
1649 | + msgLen := c.buffer.Len() - 3 // length space, extra newline |
1650 | + toWrite := c.buffer.Bytes() |
1651 | + binary.BigEndian.PutUint16(toWrite[:2], uint16(msgLen)) |
1652 | + _, err = c.conn.Write(toWrite[:msgLen+2]) |
1653 | + return err |
1654 | +} |
1655 | |
1656 | === added file 'protocol/protocol_test.go' |
1657 | --- protocol/protocol_test.go 1970-01-01 00:00:00 +0000 |
1658 | +++ protocol/protocol_test.go 2014-01-14 15:09:46 +0000 |
1659 | @@ -0,0 +1,227 @@ |
1660 | +/* |
1661 | + Copyright 2013-2014 Canonical Ltd. |
1662 | + |
1663 | + This program is free software: you can redistribute it and/or modify it |
1664 | + under the terms of the GNU General Public License version 3, as published |
1665 | + by the Free Software Foundation. |
1666 | + |
1667 | + This program is distributed in the hope that it will be useful, but |
1668 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
1669 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1670 | + PURPOSE. See the GNU General Public License for more details. |
1671 | + |
1672 | + You should have received a copy of the GNU General Public License along |
1673 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
1674 | +*/ |
1675 | + |
1676 | +package protocol |
1677 | + |
1678 | +import ( |
1679 | + "encoding/binary" |
1680 | + "encoding/json" |
1681 | + // "fmt" |
1682 | + "io" |
1683 | + . "launchpad.net/gocheck" |
1684 | + "net" |
1685 | + "testing" |
1686 | + "time" |
1687 | +) |
1688 | + |
1689 | +func TestProtocol(t *testing.T) { TestingT(t) } |
1690 | + |
1691 | +type protocolSuite struct{} |
1692 | + |
1693 | +var _ = Suite(&protocolSuite{}) |
1694 | + |
1695 | +type deadline struct { |
1696 | + kind string |
1697 | + deadAfter time.Duration |
1698 | +} |
1699 | + |
1700 | +func (d *deadline) setDeadAfter(t time.Time) { |
1701 | + deadAfter := t.Sub(time.Now()) |
1702 | + d.deadAfter = (deadAfter + time.Millisecond/2) / time.Millisecond * time.Millisecond |
1703 | +} |
1704 | + |
1705 | +type rw struct { |
1706 | + buf []byte |
1707 | + n int |
1708 | + err error |
1709 | +} |
1710 | + |
1711 | +type testConn struct { |
1712 | + deadlines []*deadline |
1713 | + reads []rw |
1714 | + writes []*rw |
1715 | +} |
1716 | + |
1717 | +func (tc *testConn) LocalAddr() net.Addr { |
1718 | + return nil |
1719 | +} |
1720 | + |
1721 | +func (tc *testConn) RemoteAddr() net.Addr { |
1722 | + return nil |
1723 | +} |
1724 | + |
1725 | +func (tc *testConn) Close() error { |
1726 | + return nil |
1727 | +} |
1728 | + |
1729 | +func (tc *testConn) SetDeadline(t time.Time) error { |
1730 | + deadline := tc.deadlines[0] |
1731 | + deadline.kind = "both" |
1732 | + deadline.setDeadAfter(t) |
1733 | + tc.deadlines = tc.deadlines[1:] |
1734 | + return nil |
1735 | +} |
1736 | + |
1737 | +func (tc *testConn) SetReadDeadline(t time.Time) error { |
1738 | + deadline := tc.deadlines[0] |
1739 | + deadline.kind = "read" |
1740 | + deadline.setDeadAfter(t) |
1741 | + tc.deadlines = tc.deadlines[1:] |
1742 | + return nil |
1743 | +} |
1744 | + |
1745 | +func (tc *testConn) SetWriteDeadline(t time.Time) error { |
1746 | + deadline := tc.deadlines[0] |
1747 | + deadline.kind = "write" |
1748 | + deadline.setDeadAfter(t) |
1749 | + tc.deadlines = tc.deadlines[1:] |
1750 | + return nil |
1751 | +} |
1752 | + |
1753 | +func (tc *testConn) Read(buf []byte) (n int, err error) { |
1754 | + read := tc.reads[0] |
1755 | + copy(buf, read.buf) |
1756 | + tc.reads = tc.reads[1:] |
1757 | + return read.n, read.err |
1758 | +} |
1759 | + |
1760 | +func (tc *testConn) Write(buf []byte) (n int, err error) { |
1761 | + write := tc.writes[0] |
1762 | + n = copy(write.buf, buf) |
1763 | + write.buf = write.buf[:n] |
1764 | + write.n = n |
1765 | + err = write.err |
1766 | + tc.writes = tc.writes[1:] |
1767 | + return |
1768 | +} |
1769 | + |
1770 | +func (s *protocolSuite) TestReadWireFormatVersion(c *C) { |
1771 | + deadl := deadline{} |
1772 | + read1 := rw{buf: []byte{42}, n: 1} |
1773 | + tc := &testConn{reads: []rw{read1}, deadlines: []*deadline{&deadl}} |
1774 | + ver, err := ReadWireFormatVersion(tc, time.Minute) |
1775 | + c.Check(err, IsNil) |
1776 | + c.Check(ver, Equals, 42) |
1777 | + c.Check(deadl.kind, Equals, "read") |
1778 | + c.Check(deadl.deadAfter, Equals, time.Minute) |
1779 | +} |
1780 | + |
1781 | +func (s *protocolSuite) TestReadWireFormatVersionError(c *C) { |
1782 | + deadl := deadline{} |
1783 | + read1 := rw{err: io.EOF} |
1784 | + tc := &testConn{reads: []rw{read1}, deadlines: []*deadline{&deadl}} |
1785 | + _, err := ReadWireFormatVersion(tc, time.Minute) |
1786 | + c.Check(err, Equals, io.EOF) |
1787 | +} |
1788 | + |
1789 | +func (s *protocolSuite) TestSetDeadline(c *C) { |
1790 | + deadl := deadline{} |
1791 | + tc := &testConn{deadlines: []*deadline{&deadl}} |
1792 | + pc := NewProtocol0(tc) |
1793 | + pc.SetDeadline(time.Now().Add(time.Minute)) |
1794 | + c.Check(deadl.kind, Equals, "both") |
1795 | + c.Check(deadl.deadAfter, Equals, time.Minute) |
1796 | +} |
1797 | + |
1798 | +type testMsg struct { |
1799 | + Type string `json:"T"` |
1800 | + A uint64 |
1801 | +} |
1802 | + |
1803 | +func lengthAsBytes(length uint16) []byte { |
1804 | + var buf [2]byte |
1805 | + var res = buf[:] |
1806 | + binary.BigEndian.PutUint16(res, length) |
1807 | + return res |
1808 | +} |
1809 | + |
1810 | +func (s *protocolSuite) TestReadMessage(c *C) { |
1811 | + msgBuf, _ := json.Marshal(testMsg{Type: "msg", A: 2000}) |
1812 | + readMsgLen := rw{buf: lengthAsBytes(uint16(len(msgBuf))), n: 2} |
1813 | + readMsgBody := rw{buf: msgBuf, n: len(msgBuf)} |
1814 | + tc := &testConn{reads: []rw{readMsgLen, readMsgBody}} |
1815 | + pc := NewProtocol0(tc) |
1816 | + var recvMsg testMsg |
1817 | + err := pc.ReadMessage(&recvMsg) |
1818 | + c.Check(err, IsNil) |
1819 | + c.Check(recvMsg, DeepEquals, testMsg{Type: "msg", A: 2000}) |
1820 | +} |
1821 | + |
1822 | +func (s *protocolSuite) TestReadMessageBits(c *C) { |
1823 | + msgBuf, _ := json.Marshal(testMsg{Type: "msg", A: 2000}) |
1824 | + readMsgLen := rw{buf: lengthAsBytes(uint16(len(msgBuf))), n: 2} |
1825 | + readMsgBody1 := rw{buf: msgBuf[:5], n: 5} |
1826 | + readMsgBody2 := rw{buf: msgBuf[5:], n: len(msgBuf) - 5} |
1827 | + tc := &testConn{reads: []rw{readMsgLen, readMsgBody1, readMsgBody2}} |
1828 | + pc := NewProtocol0(tc) |
1829 | + var recvMsg testMsg |
1830 | + err := pc.ReadMessage(&recvMsg) |
1831 | + c.Check(err, IsNil) |
1832 | + c.Check(recvMsg, DeepEquals, testMsg{Type: "msg", A: 2000}) |
1833 | +} |
1834 | + |
1835 | +func (s *protocolSuite) TestReadMessageIOErrors(c *C) { |
1836 | + msgBuf, _ := json.Marshal(testMsg{Type: "msg", A: 2000}) |
1837 | + readMsgLenErr := rw{n: 1, err: io.ErrClosedPipe} |
1838 | + tc1 := &testConn{reads: []rw{readMsgLenErr}} |
1839 | + pc1 := NewProtocol0(tc1) |
1840 | + var recvMsg testMsg |
1841 | + err := pc1.ReadMessage(&recvMsg) |
1842 | + c.Check(err, Equals, io.ErrClosedPipe) |
1843 | + |
1844 | + readMsgLen := rw{buf: lengthAsBytes(uint16(len(msgBuf))), n: 2} |
1845 | + readMsgBody1 := rw{buf: msgBuf[:5], n: 5} |
1846 | + readMsgBody2Err := rw{n: 2, err: io.EOF} |
1847 | + tc2 := &testConn{reads: []rw{readMsgLen, readMsgBody1, readMsgBody2Err}} |
1848 | + pc2 := NewProtocol0(tc2) |
1849 | + err = pc2.ReadMessage(&recvMsg) |
1850 | + c.Check(err, Equals, io.EOF) |
1851 | +} |
1852 | + |
1853 | +func (s *protocolSuite) TestReadMessageBrokenJSON(c *C) { |
1854 | + msgBuf := []byte("{\"T\"}") |
1855 | + readMsgLen := rw{buf: lengthAsBytes(uint16(len(msgBuf))), n: 2} |
1856 | + readMsgBody := rw{buf: msgBuf, n: len(msgBuf)} |
1857 | + tc := &testConn{reads: []rw{readMsgLen, readMsgBody}} |
1858 | + pc := NewProtocol0(tc) |
1859 | + var recvMsg testMsg |
1860 | + err := pc.ReadMessage(&recvMsg) |
1861 | + c.Check(err, FitsTypeOf, &json.SyntaxError{}) |
1862 | +} |
1863 | + |
1864 | +func (s *protocolSuite) TestWriteMessage(c *C) { |
1865 | + writeMsg := rw{buf: make([]byte, 64)} |
1866 | + tc := &testConn{writes: []*rw{&writeMsg}} |
1867 | + pc := NewProtocol0(tc) |
1868 | + msg := testMsg{Type: "m", A: 9999} |
1869 | + err := pc.WriteMessage(&msg) |
1870 | + c.Check(err, IsNil) |
1871 | + var msgLen int = int(binary.BigEndian.Uint16(writeMsg.buf[:2])) |
1872 | + c.Check(msgLen, Equals, len(writeMsg.buf)-2) |
1873 | + var wroteMsg testMsg |
1874 | + formatErr := json.Unmarshal(writeMsg.buf[2:], &wroteMsg) |
1875 | + c.Check(formatErr, IsNil) |
1876 | + c.Check(wroteMsg, DeepEquals, testMsg{Type: "m", A: 9999}) |
1877 | +} |
1878 | + |
1879 | +func (s *protocolSuite) TestWriteMessageIOErrors(c *C) { |
1880 | + writeMsgErr := rw{buf: make([]byte, 0), err: io.ErrClosedPipe} |
1881 | + tc1 := &testConn{writes: []*rw{&writeMsgErr}} |
1882 | + pc1 := NewProtocol0(tc1) |
1883 | + msg := testMsg{Type: "m", A: 9999} |
1884 | + err := pc1.WriteMessage(&msg) |
1885 | + c.Check(err, Equals, io.ErrClosedPipe) |
1886 | +} |
1887 | |
1888 | === added directory 'scripts' |
1889 | === added file 'scripts/check_fmt' |
1890 | --- scripts/check_fmt 1970-01-01 00:00:00 +0000 |
1891 | +++ scripts/check_fmt 2014-01-14 15:09:46 +0000 |
1892 | @@ -0,0 +1,14 @@ |
1893 | +#!/bin/bash |
1894 | +# check that all go files respect gofmt formatting |
1895 | +# assumes GOPATH is properly set |
1896 | +PROJECT=${1:?missing project} |
1897 | +PROBLEMS= |
1898 | +for pkg in $(go list ${PROJECT}/...) ; do |
1899 | + NONCOMPLIANT=$(gofmt -l ${GOPATH}/src/${pkg}/*.go) |
1900 | + if [ -n "${NONCOMPLIANT}" ]; then |
1901 | + echo pkg $pkg has some gofmt non-compliant files: |
1902 | + echo ${NONCOMPLIANT}|xargs -d ' ' -n1 basename |
1903 | + PROBLEMS="y" |
1904 | + fi |
1905 | +done |
1906 | +test -z "${PROBLEMS}" |
1907 | |
1908 | === added directory 'server' |
1909 | === added directory 'server/acceptance' |
1910 | === added file 'server/acceptance/acceptance.sh' |
1911 | --- server/acceptance/acceptance.sh 1970-01-01 00:00:00 +0000 |
1912 | +++ server/acceptance/acceptance.sh 2014-01-14 15:09:46 +0000 |
1913 | @@ -0,0 +1,6 @@ |
1914 | +# run acceptance tests, expects properly setup GOPATH and deps |
1915 | +# can set extra build params like -race with BUILD_FLAGS envvar |
1916 | +set -ex |
1917 | +go test $BUILD_FLAGS -i |
1918 | +go build $BUILD_FLAGS -o testserver launchpad.net/ubuntu-push/server/dev |
1919 | +go test $BUILD_FLAGS -server ./testserver $* |
1920 | |
1921 | === added file 'server/acceptance/acceptance_test.go' |
1922 | --- server/acceptance/acceptance_test.go 1970-01-01 00:00:00 +0000 |
1923 | +++ server/acceptance/acceptance_test.go 2014-01-14 15:09:46 +0000 |
1924 | @@ -0,0 +1,487 @@ |
1925 | +/* |
1926 | + Copyright 2013-2014 Canonical Ltd. |
1927 | + |
1928 | + This program is free software: you can redistribute it and/or modify it |
1929 | + under the terms of the GNU General Public License version 3, as published |
1930 | + by the Free Software Foundation. |
1931 | + |
1932 | + This program is distributed in the hope that it will be useful, but |
1933 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
1934 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1935 | + PURPOSE. See the GNU General Public License for more details. |
1936 | + |
1937 | + You should have received a copy of the GNU General Public License along |
1938 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
1939 | +*/ |
1940 | + |
1941 | +package acceptance |
1942 | + |
1943 | +import ( |
1944 | + "bufio" |
1945 | + "bytes" |
1946 | + "encoding/json" |
1947 | + "flag" |
1948 | + "fmt" |
1949 | + "io/ioutil" |
1950 | + . "launchpad.net/gocheck" |
1951 | + "launchpad.net/ubuntu-push/protocol" |
1952 | + "launchpad.net/ubuntu-push/server/api" |
1953 | + "net" |
1954 | + "net/http" |
1955 | + "os" |
1956 | + "os/exec" |
1957 | + "path/filepath" |
1958 | + "regexp" |
1959 | + "runtime" |
1960 | + "strings" |
1961 | + "testing" |
1962 | + "time" |
1963 | +) |
1964 | + |
1965 | +func TestAcceptance(t *testing.T) { TestingT(t) } |
1966 | + |
1967 | +type acceptanceSuite struct { |
1968 | + server *exec.Cmd |
1969 | + serverAddr string |
1970 | + serverURL string |
1971 | + serverEvents <-chan string |
1972 | + httpClient *http.Client |
1973 | +} |
1974 | + |
1975 | +var _ = Suite(&acceptanceSuite{}) |
1976 | + |
1977 | +var serverCmd = flag.String("server", "", "server to test") |
1978 | + |
1979 | +// SourceRelative produces a path relative to the source code, makes |
1980 | +// sense only for tests when the code is available on disk. |
1981 | +// xxx later move it to a testing helpers package |
1982 | +func SourceRelative(relativePath string) string { |
1983 | + _, file, _, ok := runtime.Caller(1) |
1984 | + if !ok { |
1985 | + panic("failed to get source filename using Caller()") |
1986 | + } |
1987 | + return filepath.Join(filepath.Dir(file), relativePath) |
1988 | +} |
1989 | + |
1990 | +func testServerConfig(addr, httpAddr string) map[string]interface{} { |
1991 | + cfg := map[string]interface{}{ |
1992 | + "exchange_timeout": "0.1s", |
1993 | + "ping_interval": "0.5s", |
1994 | + "session_queue_size": 10, |
1995 | + "broker_queue_size": 100, |
1996 | + "addr": addr, |
1997 | + "key_pem_file": SourceRelative("config/testing.key"), |
1998 | + "cert_pem_file": SourceRelative("config/testing.cert"), |
1999 | + "http_addr": httpAddr, |
2000 | + "http_read_timeout": "1s", |
2001 | + "http_write_timeout": "1s", |
2002 | + } |
2003 | + return cfg |
2004 | +} |
2005 | + |
2006 | +func testClientSession(addr string, deviceId string, reportPings bool) *ClientSession { |
2007 | + certPEMBlock, err := ioutil.ReadFile(SourceRelative("config/testing.cert")) |
2008 | + if err != nil { |
2009 | + panic(fmt.Sprintf("could not read config/testing.cert: %v", err)) |
2010 | + } |
2011 | + return &ClientSession{ |
2012 | + ExchangeTimeout: 100 * time.Millisecond, |
2013 | + PingInterval: 500 * time.Millisecond, |
2014 | + ServerAddr: addr, |
2015 | + CertPEMBlock: certPEMBlock, |
2016 | + DeviceId: deviceId, |
2017 | + ReportPings: reportPings, |
2018 | + } |
2019 | +} |
2020 | + |
2021 | +const ( |
2022 | + devListeningOnPat = "INFO listening for devices on " |
2023 | + httpListeningOnPat = "INFO listening for http on " |
2024 | +) |
2025 | + |
2026 | +var rxLineInfo = regexp.MustCompile("^.*? ([[:alpha:]].*)\n") |
2027 | + |
2028 | +func extractListeningAddr(c *C, pat, line string) string { |
2029 | + if !strings.HasPrefix(line, pat) { |
2030 | + c.Fatalf("server: %v", line) |
2031 | + } |
2032 | + return line[len(pat):] |
2033 | +} |
2034 | + |
2035 | +// start a new server for each test |
2036 | +func (s *acceptanceSuite) SetUpTest(c *C) { |
2037 | + if *serverCmd == "" { |
2038 | + c.Skip("executable server not specified") |
2039 | + } |
2040 | + tmpDir := c.MkDir() |
2041 | + cfgFilename := filepath.Join(tmpDir, "config.json") |
2042 | + cfgJson, err := json.Marshal(testServerConfig("127.0.0.1:0", "127.0.0.1:0")) |
2043 | + if err != nil { |
2044 | + c.Fatal(err) |
2045 | + } |
2046 | + err = ioutil.WriteFile(cfgFilename, cfgJson, os.ModePerm) |
2047 | + if err != nil { |
2048 | + c.Fatal(err) |
2049 | + } |
2050 | + server := exec.Command(*serverCmd, cfgFilename) |
2051 | + stderr, err := server.StderrPipe() |
2052 | + if err != nil { |
2053 | + c.Fatal(err) |
2054 | + } |
2055 | + err = server.Start() |
2056 | + if err != nil { |
2057 | + c.Fatal(err) |
2058 | + } |
2059 | + bufErr := bufio.NewReaderSize(stderr, 5000) |
2060 | + getLineInfo := func() (string, error) { |
2061 | + line, err := bufErr.ReadString('\n') |
2062 | + if err != nil { |
2063 | + return "", err |
2064 | + } |
2065 | + extracted := rxLineInfo.FindStringSubmatch(line) |
2066 | + if extracted == nil { |
2067 | + return "", fmt.Errorf("unexpected server line: %#v", line) |
2068 | + } |
2069 | + return extracted[1], nil |
2070 | + } |
2071 | + infoHTTP, err := getLineInfo() |
2072 | + if err != nil { |
2073 | + c.Fatal(err) |
2074 | + } |
2075 | + serverHTTPAddr := extractListeningAddr(c, httpListeningOnPat, infoHTTP) |
2076 | + s.serverURL = fmt.Sprintf("http://%s", serverHTTPAddr) |
2077 | + info, err := getLineInfo() |
2078 | + if err != nil { |
2079 | + c.Fatal(err) |
2080 | + } |
2081 | + s.serverAddr = extractListeningAddr(c, devListeningOnPat, info) |
2082 | + s.server = server |
2083 | + serverEvents := make(chan string, 5) |
2084 | + s.serverEvents = serverEvents |
2085 | + go func() { |
2086 | + for { |
2087 | + info, err := getLineInfo() |
2088 | + if err != nil { |
2089 | + serverEvents <- fmt.Sprintf("ERROR: %v", err) |
2090 | + close(serverEvents) |
2091 | + return |
2092 | + } |
2093 | + serverEvents <- info |
2094 | + } |
2095 | + }() |
2096 | + s.httpClient = &http.Client{} |
2097 | +} |
2098 | + |
2099 | +func (s *acceptanceSuite) TearDownTest(c *C) { |
2100 | + if s.server != nil { |
2101 | + s.server.Process.Kill() |
2102 | + s.server = nil |
2103 | + } |
2104 | +} |
2105 | + |
2106 | +// nextEvent receives an event from given channel with a 5s timeout |
2107 | +func nextEvent(events <-chan string, errCh <-chan error) string { |
2108 | + select { |
2109 | + case <-time.After(5 * time.Second): |
2110 | + panic("too long stuck waiting for next event") |
2111 | + case err := <-errCh: |
2112 | + return err.Error() // will fail comparison typically |
2113 | + case evStr := <-events: |
2114 | + return evStr |
2115 | + } |
2116 | +} |
2117 | + |
2118 | +// Tests about connection, ping-pong, disconnection scenarios |
2119 | + |
2120 | +// typically combined with -gocheck.vv or test selection |
2121 | +var logTraffic = flag.Bool("logTraffic", false, "log traffic") |
2122 | + |
2123 | +type connInterceptor func(ic *interceptingConn, op string, b []byte) (bool, int, error) |
2124 | + |
2125 | +type interceptingConn struct { |
2126 | + net.Conn |
2127 | + totalRead int |
2128 | + totalWritten int |
2129 | + intercept connInterceptor |
2130 | +} |
2131 | + |
2132 | +func (ic *interceptingConn) Write(b []byte) (n int, err error) { |
2133 | + done := false |
2134 | + before := ic.totalWritten |
2135 | + if ic.intercept != nil { |
2136 | + done, n, err = ic.intercept(ic, "write", b) |
2137 | + } |
2138 | + if !done { |
2139 | + n, err = ic.Conn.Write(b) |
2140 | + } |
2141 | + ic.totalWritten += n |
2142 | + if *logTraffic { |
2143 | + fmt.Printf("W[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalWritten) |
2144 | + } |
2145 | + return |
2146 | +} |
2147 | + |
2148 | +func (ic *interceptingConn) Read(b []byte) (n int, err error) { |
2149 | + done := false |
2150 | + before := ic.totalRead |
2151 | + if ic.intercept != nil { |
2152 | + done, n, err = ic.intercept(ic, "read", b) |
2153 | + } |
2154 | + if !done { |
2155 | + n, err = ic.Conn.Read(b) |
2156 | + } |
2157 | + ic.totalRead += n |
2158 | + if *logTraffic { |
2159 | + fmt.Printf("R[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalRead) |
2160 | + } |
2161 | + return |
2162 | +} |
2163 | + |
2164 | +func (s *acceptanceSuite) TestConnectPingPing(c *C) { |
2165 | + errCh := make(chan error, 1) |
2166 | + events := make(chan string, 10) |
2167 | + sess := testClientSession(s.serverAddr, "DEVA", true) |
2168 | + err := sess.Dial() |
2169 | + c.Assert(err, IsNil) |
2170 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
2171 | + // would be 3rd ping read, based on logged traffic |
2172 | + if op == "read" && ic.totalRead >= 28 { |
2173 | + // exit the sess.Run() goroutine, client will close |
2174 | + runtime.Goexit() |
2175 | + } |
2176 | + return false, 0, nil |
2177 | + } |
2178 | + sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
2179 | + go func() { |
2180 | + errCh <- sess.Run(events) |
2181 | + }() |
2182 | + connectCli := nextEvent(events, errCh) |
2183 | + connectSrv := nextEvent(s.serverEvents, nil) |
2184 | + registeredSrv := nextEvent(s.serverEvents, nil) |
2185 | + tconnect := time.Now() |
2186 | + c.Assert(connectSrv, Matches, ".*session.* connected .*") |
2187 | + c.Assert(registeredSrv, Matches, ".*session.* registered DEVA") |
2188 | + c.Assert(strings.HasSuffix(connectSrv, connectCli), Equals, true) |
2189 | + c.Assert(nextEvent(events, errCh), Equals, "Ping") |
2190 | + elapsedOfPing := float64(time.Since(tconnect)) / float64(500*time.Millisecond) |
2191 | + c.Check(elapsedOfPing >= 1.0, Equals, true) |
2192 | + c.Check(elapsedOfPing < 1.05, Equals, true) |
2193 | + c.Assert(nextEvent(events, errCh), Equals, "Ping") |
2194 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF") |
2195 | + c.Check(len(errCh), Equals, 0) |
2196 | +} |
2197 | + |
2198 | +func (s *acceptanceSuite) TestConnectPingNeverPong(c *C) { |
2199 | + errCh := make(chan error, 1) |
2200 | + events := make(chan string, 10) |
2201 | + sess := testClientSession(s.serverAddr, "DEVB", true) |
2202 | + err := sess.Dial() |
2203 | + c.Assert(err, IsNil) |
2204 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
2205 | + // would be pong to 2nd ping, based on logged traffic |
2206 | + if op == "write" && ic.totalRead >= 28 { |
2207 | + time.Sleep(200 * time.Millisecond) |
2208 | + // exit the sess.Run() goroutine, client will close |
2209 | + runtime.Goexit() |
2210 | + } |
2211 | + return false, 0, nil |
2212 | + } |
2213 | + sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
2214 | + go func() { |
2215 | + errCh <- sess.Run(events) |
2216 | + }() |
2217 | + c.Assert(nextEvent(events, errCh), Matches, "connected .*") |
2218 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") |
2219 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*") |
2220 | + c.Assert(nextEvent(events, errCh), Equals, "Ping") |
2221 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`) |
2222 | + c.Check(len(errCh), Equals, 0) |
2223 | +} |
2224 | + |
2225 | +// Tests about broadcast |
2226 | + |
2227 | +func (s *acceptanceSuite) postRequest(path string, message interface{}) (string, error) { |
2228 | + packedMessage, err := json.Marshal(message) |
2229 | + if err != nil { |
2230 | + panic(err) |
2231 | + } |
2232 | + reader := bytes.NewReader(packedMessage) |
2233 | + |
2234 | + url := s.serverURL + path |
2235 | + request, _ := http.NewRequest("POST", url, reader) |
2236 | + request.ContentLength = int64(reader.Len()) |
2237 | + request.Header.Set("Content-Type", "application/json") |
2238 | + |
2239 | + resp, err := s.httpClient.Do(request) |
2240 | + if err != nil { |
2241 | + panic(err) |
2242 | + } |
2243 | + defer resp.Body.Close() |
2244 | + body, err := ioutil.ReadAll(resp.Body) |
2245 | + return string(body), err |
2246 | +} |
2247 | + |
2248 | +func (s *acceptanceSuite) startClient(c *C, devId string, intercept connInterceptor, levels map[string]int64) (<-chan string, <-chan error) { |
2249 | + errCh := make(chan error, 1) |
2250 | + events := make(chan string, 10) |
2251 | + sess := testClientSession(s.serverAddr, devId, false) |
2252 | + sess.Levels = levels |
2253 | + err := sess.Dial() |
2254 | + c.Assert(err, IsNil) |
2255 | + sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept} |
2256 | + go func() { |
2257 | + errCh <- sess.Run(events) |
2258 | + }() |
2259 | + c.Assert(nextEvent(events, errCh), Matches, "connected .*") |
2260 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*") |
2261 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId) |
2262 | + return events, errCh |
2263 | +} |
2264 | + |
2265 | +func (s *acceptanceSuite) TestBroadcastToConnected(c *C) { |
2266 | + clientShutdown := make(chan bool, 1) // abused as an atomic flag |
2267 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
2268 | + // read after ack |
2269 | + if op == "read" && len(clientShutdown) > 0 { |
2270 | + // exit the sess.Run() goroutine, client will close |
2271 | + runtime.Goexit() |
2272 | + } |
2273 | + return false, 0, nil |
2274 | + } |
2275 | + events, errCh := s.startClient(c, "DEVB", intercept, nil) |
2276 | + got, err := s.postRequest("/broadcast", &api.Broadcast{ |
2277 | + Channel: "system", |
2278 | + Data: json.RawMessage(`{"n": 42}`), |
2279 | + }) |
2280 | + c.Check(err, IsNil) |
2281 | + c.Check(got, Matches, ".*ok.*") |
2282 | + c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
2283 | + clientShutdown <- true |
2284 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
2285 | + c.Check(len(errCh), Equals, 0) |
2286 | +} |
2287 | + |
2288 | +func (s *acceptanceSuite) TestBroadcastPending(c *C) { |
2289 | + // send broadcast that will be pending |
2290 | + got, err := s.postRequest("/broadcast", &api.Broadcast{ |
2291 | + Channel: "system", |
2292 | + Data: json.RawMessage(`{"b": 1}`), |
2293 | + }) |
2294 | + c.Check(err, IsNil) |
2295 | + c.Check(got, Matches, ".*ok.*") |
2296 | + |
2297 | + clientShutdown := make(chan bool, 1) // abused as an atomic flag |
2298 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
2299 | + // read after ack |
2300 | + if op == "read" && len(clientShutdown) > 0 { |
2301 | + // exit the sess.Run() goroutine, client will close |
2302 | + runtime.Goexit() |
2303 | + } |
2304 | + return false, 0, nil |
2305 | + } |
2306 | + events, errCh := s.startClient(c, "DEVB", intercept, nil) |
2307 | + // gettting pending on connect |
2308 | + c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
2309 | + clientShutdown <- true |
2310 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
2311 | + c.Check(len(errCh), Equals, 0) |
2312 | +} |
2313 | + |
2314 | +func (s *acceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) { |
2315 | + // send bunch of broadcasts that will be pending |
2316 | + payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2)) |
2317 | + for i := 0; i < 32; i++ { |
2318 | + got, err := s.postRequest("/broadcast", &api.Broadcast{ |
2319 | + Channel: "system", |
2320 | + Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)), |
2321 | + }) |
2322 | + c.Check(err, IsNil) |
2323 | + c.Check(got, Matches, ".*ok.*") |
2324 | + } |
2325 | + |
2326 | + clientShutdown := make(chan bool, 1) // abused as an atomic flag |
2327 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
2328 | + // read after ack |
2329 | + if op == "read" && len(clientShutdown) > 0 { |
2330 | + // exit the sess.Run() goroutine, client will close |
2331 | + runtime.Goexit() |
2332 | + } |
2333 | + return false, 0, nil |
2334 | + } |
2335 | + events, errCh := s.startClient(c, "DEVC", intercept, nil) |
2336 | + // gettting pending on connect |
2337 | + c.Check(nextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`) |
2338 | + c.Check(nextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`) |
2339 | + clientShutdown <- true |
2340 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
2341 | + c.Check(len(errCh), Equals, 0) |
2342 | +} |
2343 | + |
2344 | +func (s *acceptanceSuite) TestBroadcastDistribution2(c *C) { |
2345 | + clientShutdown := make(chan bool, 1) // abused as an atomic flag |
2346 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
2347 | + // read after ack |
2348 | + if op == "read" && len(clientShutdown) > 0 { |
2349 | + // exit the sess.Run() goroutine, client will close |
2350 | + runtime.Goexit() |
2351 | + } |
2352 | + return false, 0, nil |
2353 | + } |
2354 | + // start 1st clinet |
2355 | + events1, errCh1 := s.startClient(c, "DEV1", intercept, nil) |
2356 | + // start 2nd client |
2357 | + events2, errCh2 := s.startClient(c, "DEV2", intercept, nil) |
2358 | + // broadcast |
2359 | + got, err := s.postRequest("/broadcast", &api.Broadcast{ |
2360 | + Channel: "system", |
2361 | + Data: json.RawMessage(`{"n": 42}`), |
2362 | + }) |
2363 | + c.Check(err, IsNil) |
2364 | + c.Check(got, Matches, ".*ok.*") |
2365 | + c.Check(nextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
2366 | + c.Check(nextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`) |
2367 | + clientShutdown <- true |
2368 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
2369 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
2370 | + c.Check(len(errCh1), Equals, 0) |
2371 | + c.Check(len(errCh2), Equals, 0) |
2372 | +} |
2373 | + |
2374 | +func (s *acceptanceSuite) TestBroadcastFilterByLevel(c *C) { |
2375 | + clientShutdown := make(chan bool, 1) // abused as an atomic flag |
2376 | + intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) { |
2377 | + // read after ack |
2378 | + if op == "read" && len(clientShutdown) > 0 { |
2379 | + // exit the sess.Run() goroutine, client will close |
2380 | + runtime.Goexit() |
2381 | + } |
2382 | + return false, 0, nil |
2383 | + } |
2384 | + events, errCh := s.startClient(c, "DEVD", intercept, nil) |
2385 | + got, err := s.postRequest("/broadcast", &api.Broadcast{ |
2386 | + Channel: "system", |
2387 | + Data: json.RawMessage(`{"b": 1}`), |
2388 | + }) |
2389 | + c.Check(err, IsNil) |
2390 | + c.Check(got, Matches, ".*ok.*") |
2391 | + c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`) |
2392 | + clientShutdown <- true |
2393 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
2394 | + c.Check(len(errCh), Equals, 0) |
2395 | + // another broadcast |
2396 | + got, err = s.postRequest("/broadcast", &api.Broadcast{ |
2397 | + Channel: "system", |
2398 | + Data: json.RawMessage(`{"b": 2}`), |
2399 | + }) |
2400 | + c.Check(err, IsNil) |
2401 | + c.Check(got, Matches, ".*ok.*") |
2402 | + // reconnect, provide levels, get only later notification |
2403 | + <-clientShutdown // reset |
2404 | + events, errCh = s.startClient(c, "DEVD", intercept, map[string]int64{ |
2405 | + protocol.SystemChannelId: 1, |
2406 | + }) |
2407 | + c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`) |
2408 | + clientShutdown <- true |
2409 | + c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`) |
2410 | + c.Check(len(errCh), Equals, 0) |
2411 | +} |
2412 | |
2413 | === added file 'server/acceptance/acceptanceclient.go' |
2414 | --- server/acceptance/acceptanceclient.go 1970-01-01 00:00:00 +0000 |
2415 | +++ server/acceptance/acceptanceclient.go 2014-01-14 15:09:46 +0000 |
2416 | @@ -0,0 +1,123 @@ |
2417 | +/* |
2418 | + Copyright 2013-2014 Canonical Ltd. |
2419 | + |
2420 | + This program is free software: you can redistribute it and/or modify it |
2421 | + under the terms of the GNU General Public License version 3, as published |
2422 | + by the Free Software Foundation. |
2423 | + |
2424 | + This program is distributed in the hope that it will be useful, but |
2425 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
2426 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
2427 | + PURPOSE. See the GNU General Public License for more details. |
2428 | + |
2429 | + You should have received a copy of the GNU General Public License along |
2430 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
2431 | +*/ |
2432 | + |
2433 | +package acceptance |
2434 | + |
2435 | +import ( |
2436 | + "crypto/tls" |
2437 | + "crypto/x509" |
2438 | + "encoding/json" |
2439 | + "errors" |
2440 | + "fmt" |
2441 | + "launchpad.net/ubuntu-push/protocol" |
2442 | + "net" |
2443 | + "time" |
2444 | +) |
2445 | + |
2446 | +var wireVersionBytes = []byte{protocol.ProtocolWireVersion} |
2447 | + |
2448 | +// ClienSession holds a client<->server session and its configuration. |
2449 | +type ClientSession struct { |
2450 | + // configuration |
2451 | + DeviceId string |
2452 | + ServerAddr string |
2453 | + PingInterval time.Duration |
2454 | + ExchangeTimeout time.Duration |
2455 | + CertPEMBlock []byte |
2456 | + ReportPings bool |
2457 | + Levels map[string]int64 |
2458 | + // connection |
2459 | + Connection net.Conn |
2460 | +} |
2461 | + |
2462 | +// Dial connects to a server using the configuration in the ClientSession |
2463 | +// and sets up the connection. |
2464 | +func (sess *ClientSession) Dial() error { |
2465 | + conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout) |
2466 | + if err != nil { |
2467 | + return err |
2468 | + } |
2469 | + tlsConfig := &tls.Config{} |
2470 | + if sess.CertPEMBlock != nil { |
2471 | + cp := x509.NewCertPool() |
2472 | + ok := cp.AppendCertsFromPEM(sess.CertPEMBlock) |
2473 | + if !ok { |
2474 | + return errors.New("dial: could not parse certificate") |
2475 | + } |
2476 | + tlsConfig.RootCAs = cp |
2477 | + } |
2478 | + sess.Connection = tls.Client(conn, tlsConfig) |
2479 | + return nil |
2480 | +} |
2481 | + |
2482 | +type serverMsg struct { |
2483 | + Type string `json:"T"` |
2484 | + protocol.BroadcastMsg |
2485 | + protocol.NotificationsMsg |
2486 | +} |
2487 | + |
2488 | +// Run the session with the server, emits a stream of events. |
2489 | +func (sess *ClientSession) Run(events chan<- string) error { |
2490 | + conn := sess.Connection |
2491 | + defer conn.Close() |
2492 | + conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
2493 | + _, err := conn.Write(wireVersionBytes) |
2494 | + if err != nil { |
2495 | + return err |
2496 | + } |
2497 | + proto := protocol.NewProtocol0(conn) |
2498 | + err = proto.WriteMessage(protocol.ConnectMsg{ |
2499 | + Type: "connect", |
2500 | + DeviceId: sess.DeviceId, |
2501 | + Levels: sess.Levels, |
2502 | + }) |
2503 | + if err != nil { |
2504 | + return err |
2505 | + } |
2506 | + events <- fmt.Sprintf("connected %v", conn.LocalAddr()) |
2507 | + var recv serverMsg |
2508 | + for { |
2509 | + deadAfter := sess.PingInterval + sess.ExchangeTimeout |
2510 | + conn.SetDeadline(time.Now().Add(deadAfter)) |
2511 | + err = proto.ReadMessage(&recv) |
2512 | + if err != nil { |
2513 | + return err |
2514 | + } |
2515 | + switch recv.Type { |
2516 | + case "ping": |
2517 | + conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
2518 | + err := proto.WriteMessage(protocol.PingPongMsg{Type: "pong"}) |
2519 | + if err != nil { |
2520 | + return err |
2521 | + } |
2522 | + if sess.ReportPings { |
2523 | + events <- "Ping" |
2524 | + } |
2525 | + case "broadcast": |
2526 | + conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
2527 | + err := proto.WriteMessage(protocol.PingPongMsg{Type: "ack"}) |
2528 | + if err != nil { |
2529 | + return err |
2530 | + } |
2531 | + pack, err := json.Marshal(recv.Payloads) |
2532 | + if err != nil { |
2533 | + return err |
2534 | + } |
2535 | + events <- fmt.Sprintf("broadcast chan:%v app:%v topLevel:%d payloads:%s", recv.ChanId, recv.AppId, recv.TopLevel, pack) |
2536 | + } |
2537 | + } |
2538 | + return nil |
2539 | +} |
2540 | |
2541 | === added directory 'server/acceptance/cmd' |
2542 | === added file 'server/acceptance/cmd/acceptanceclient.go' |
2543 | --- server/acceptance/cmd/acceptanceclient.go 1970-01-01 00:00:00 +0000 |
2544 | +++ server/acceptance/cmd/acceptanceclient.go 2014-01-14 15:09:46 +0000 |
2545 | @@ -0,0 +1,79 @@ |
2546 | +/* |
2547 | + Copyright 2013-2014 Canonical Ltd. |
2548 | + |
2549 | + This program is free software: you can redistribute it and/or modify it |
2550 | + under the terms of the GNU General Public License version 3, as published |
2551 | + by the Free Software Foundation. |
2552 | + |
2553 | + This program is distributed in the hope that it will be useful, but |
2554 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
2555 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
2556 | + PURPOSE. See the GNU General Public License for more details. |
2557 | + |
2558 | + You should have received a copy of the GNU General Public License along |
2559 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
2560 | +*/ |
2561 | + |
2562 | +// acceptanceclient for playing |
2563 | +package main |
2564 | + |
2565 | +import ( |
2566 | + "launchpad.net/ubuntu-push/config" |
2567 | + "launchpad.net/ubuntu-push/server/acceptance" |
2568 | + "log" |
2569 | + "os" |
2570 | + "path/filepath" |
2571 | +) |
2572 | + |
2573 | +type configuration struct { |
2574 | + // session configuration |
2575 | + PingInterval config.ConfigTimeDuration `json:"ping_interval"` |
2576 | + ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"` |
2577 | + // server connection config |
2578 | + Addr config.ConfigHostPort |
2579 | + CertPEMFile string `json:"cert_pem_file"` |
2580 | +} |
2581 | + |
2582 | +func main() { |
2583 | + // xxx configure logging |
2584 | + switch { |
2585 | + case len(os.Args) < 2: |
2586 | + log.Fatal("missing config file") |
2587 | + case len(os.Args) < 3: |
2588 | + log.Fatal("missing device-id") |
2589 | + } |
2590 | + configFName := os.Args[1] |
2591 | + f, err := os.Open(configFName) |
2592 | + if err != nil { |
2593 | + log.Fatalf("reading config: %v", err) |
2594 | + } |
2595 | + cfg := &configuration{} |
2596 | + err = config.ReadConfig(f, cfg) |
2597 | + if err != nil { |
2598 | + log.Fatalf("reading config: %v", err) |
2599 | + } |
2600 | + session := &acceptance.ClientSession{ |
2601 | + ExchangeTimeout: cfg.ExchangeTimeout.TimeDuration(), |
2602 | + PingInterval: cfg.PingInterval.TimeDuration(), |
2603 | + ServerAddr: cfg.Addr.HostPort(), |
2604 | + DeviceId: os.Args[2], |
2605 | + } |
2606 | + session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName)) |
2607 | + if err != nil { |
2608 | + log.Fatalf("reading CertPEMFile: %v", err) |
2609 | + } |
2610 | + err = session.Dial() |
2611 | + if err != nil { |
2612 | + log.Fatalln(err) |
2613 | + } |
2614 | + events := make(chan string, 5) |
2615 | + go func() { |
2616 | + for { |
2617 | + log.Println(<-events) |
2618 | + } |
2619 | + }() |
2620 | + err = session.Run(events) |
2621 | + if err != nil { |
2622 | + log.Fatalln(err) |
2623 | + } |
2624 | +} |
2625 | |
2626 | === added directory 'server/acceptance/config' |
2627 | === added file 'server/acceptance/config/README' |
2628 | --- server/acceptance/config/README 1970-01-01 00:00:00 +0000 |
2629 | +++ server/acceptance/config/README 2014-01-14 15:09:46 +0000 |
2630 | @@ -0,0 +1,7 @@ |
2631 | +testing.key/testing.cert |
2632 | +------------------------- |
2633 | +generated with: |
2634 | + |
2635 | +go run /usr/lib/go/src/pkg/crypto/tls/generate_cert.go -ca -host localhost -rsa-bits 512 -duration 87600h |
2636 | + |
2637 | +and then renamed |
2638 | |
2639 | === added file 'server/acceptance/config/config.json' |
2640 | --- server/acceptance/config/config.json 1970-01-01 00:00:00 +0000 |
2641 | +++ server/acceptance/config/config.json 2014-01-14 15:09:46 +0000 |
2642 | @@ -0,0 +1,12 @@ |
2643 | +{ |
2644 | + "exchange_timeout": "0.5s", |
2645 | + "ping_interval": "2s", |
2646 | + "broker_queue_size": 100, |
2647 | + "session_queue_size": 10, |
2648 | + "addr": "127.0.0.1:9090", |
2649 | + "key_pem_file": "testing.key", |
2650 | + "cert_pem_file": "testing.cert", |
2651 | + "http_addr": "127.0.0.1:8888", |
2652 | + "http_read_timeout": "5s", |
2653 | + "http_write_timeout": "5s" |
2654 | +} |
2655 | |
2656 | === added file 'server/acceptance/config/testing.cert' |
2657 | --- server/acceptance/config/testing.cert 1970-01-01 00:00:00 +0000 |
2658 | +++ server/acceptance/config/testing.cert 2014-01-14 15:09:46 +0000 |
2659 | @@ -0,0 +1,10 @@ |
2660 | +-----BEGIN CERTIFICATE----- |
2661 | +MIIBYzCCAQ+gAwIBAgIBADALBgkqhkiG9w0BAQUwEjEQMA4GA1UEChMHQWNtZSBD |
2662 | +bzAeFw0xMzEyMTkyMDU1NDNaFw0yMzEyMTcyMDU1NDNaMBIxEDAOBgNVBAoTB0Fj |
2663 | +bWUgQ28wWjALBgkqhkiG9w0BAQEDSwAwSAJBAPw+niki17X2qALE2A2AzE1q5dvK |
2664 | +9CI4OduRtT9IgbFLC6psqAT21NA+QbY17nWSSpyP65zkMkwKXrbDzstwLPkCAwEA |
2665 | +AaNUMFIwDgYDVR0PAQH/BAQDAgCkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1Ud |
2666 | +EwEB/wQFMAMBAf8wGgYDVR0RBBMwEYIJbG9jYWxob3N0hwR/AAABMAsGCSqGSIb3 |
2667 | +DQEBBQNBAFqiVI+Km2XPSO+pxITaPvhmuzg+XG3l1+2di3gL+HlDobocjBqRctRU |
2668 | +YySO32W07acjGJmCHUKpCJuq9X8hpmk= |
2669 | +-----END CERTIFICATE----- |
2670 | |
2671 | === added file 'server/acceptance/config/testing.key' |
2672 | --- server/acceptance/config/testing.key 1970-01-01 00:00:00 +0000 |
2673 | +++ server/acceptance/config/testing.key 2014-01-14 15:09:46 +0000 |
2674 | @@ -0,0 +1,9 @@ |
2675 | +-----BEGIN RSA PRIVATE KEY----- |
2676 | +MIIBPAIBAAJBAPw+niki17X2qALE2A2AzE1q5dvK9CI4OduRtT9IgbFLC6psqAT2 |
2677 | +1NA+QbY17nWSSpyP65zkMkwKXrbDzstwLPkCAwEAAQJAKwXbIBULScP6QA6m8xam |
2678 | +wgWbkvN41GVWqPafPV32kPBvKwSc+M1e+JR7g3/xPZE7TCELcfYi4yXEHZZI3Pbh |
2679 | +oQIhAP/UsgJbsfH1GFv8Y8qGl5l/kmwwkwHhuKvEC87Yur9FAiEA/GlQv3ZfaXnT |
2680 | +lcCFT0aL02O0RDiRYyMUG/JAZQJs6CUCIQCHO5SZYIUwxIGK5mCNxxXOAzyQSiD7 |
2681 | +hqkKywf+4FvfDQIhALa0TLyqJFom0t7c4iIGAIRc8UlIYQSPiajI64+x9775AiEA |
2682 | +0v4fgSK/Rq059zW1753JjuB6aR0Uh+3RqJII4dUR1Wg= |
2683 | +-----END RSA PRIVATE KEY----- |
2684 | |
2685 | === added directory 'server/api' |
2686 | === added file 'server/api/handlers.go' |
2687 | --- server/api/handlers.go 1970-01-01 00:00:00 +0000 |
2688 | +++ server/api/handlers.go 2014-01-14 15:09:46 +0000 |
2689 | @@ -0,0 +1,244 @@ |
2690 | +/* |
2691 | + Copyright 2013-2014 Canonical Ltd. |
2692 | + |
2693 | + This program is free software: you can redistribute it and/or modify it |
2694 | + under the terms of the GNU General Public License version 3, as published |
2695 | + by the Free Software Foundation. |
2696 | + |
2697 | + This program is distributed in the hope that it will be useful, but |
2698 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
2699 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
2700 | + PURPOSE. See the GNU General Public License for more details. |
2701 | + |
2702 | + You should have received a copy of the GNU General Public License along |
2703 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
2704 | +*/ |
2705 | + |
2706 | +// Package api has code that offers a REST API for the applications that |
2707 | +// want to push messages. |
2708 | +package api |
2709 | + |
2710 | +import ( |
2711 | + "encoding/json" |
2712 | + "fmt" |
2713 | + "io" |
2714 | + "launchpad.net/ubuntu-push/logger" |
2715 | + "launchpad.net/ubuntu-push/server/broker" |
2716 | + "launchpad.net/ubuntu-push/server/store" |
2717 | + "log" |
2718 | + "net/http" |
2719 | +) |
2720 | + |
2721 | +const MaxRequestBodyBytes = 4 * 1024 |
2722 | +const JSONMediaType = "application/json" |
2723 | + |
2724 | +// APIError represents a API error (both internally and as JSON in a response). |
2725 | +type APIError struct { |
2726 | + // http status code |
2727 | + StatusCode int `json:"-"` |
2728 | + // machine readable label |
2729 | + ErrorLabel string `json:"error"` |
2730 | + // human message |
2731 | + Message string `json:"message"` |
2732 | +} |
2733 | + |
2734 | +// machine readable error labels |
2735 | +const ( |
2736 | + ioError = "io-error" |
2737 | + invalidRequest = "invalid-request" |
2738 | + unknownChannel = "unknown channel" |
2739 | + unavailable = "unavailable" |
2740 | + internalError = "internal" |
2741 | +) |
2742 | + |
2743 | +func (apiErr *APIError) Error() string { |
2744 | + return fmt.Sprintf("api %s: %s", apiErr.ErrorLabel, apiErr.Message) |
2745 | +} |
2746 | + |
2747 | +// Well-known prebuilt API errors |
2748 | +var ( |
2749 | + ErrNoContentLengthProvided = &APIError{ |
2750 | + http.StatusLengthRequired, |
2751 | + invalidRequest, |
2752 | + "A Content-Length must be provided", |
2753 | + } |
2754 | + ErrRequestBodyEmpty = &APIError{ |
2755 | + http.StatusBadRequest, |
2756 | + invalidRequest, |
2757 | + "Request body empty", |
2758 | + } |
2759 | + ErrRequestBodyTooLarge = &APIError{ |
2760 | + http.StatusRequestEntityTooLarge, |
2761 | + invalidRequest, |
2762 | + "Request body too large", |
2763 | + } |
2764 | + ErrWrongContentType = &APIError{ |
2765 | + http.StatusUnsupportedMediaType, |
2766 | + invalidRequest, |
2767 | + "Wrong content type, should be application/json", |
2768 | + } |
2769 | + ErrWrongRequestMethod = &APIError{ |
2770 | + http.StatusMethodNotAllowed, |
2771 | + invalidRequest, |
2772 | + "Wrong request method, should be POST", |
2773 | + } |
2774 | + ErrMalformedJSONObject = &APIError{ |
2775 | + http.StatusBadRequest, |
2776 | + invalidRequest, |
2777 | + "Malformed JSON Object", |
2778 | + } |
2779 | + ErrCouldNotReadBody = &APIError{ |
2780 | + http.StatusBadRequest, |
2781 | + ioError, |
2782 | + "Could not read request body", |
2783 | + } |
2784 | + ErrUnknownChannel = &APIError{ |
2785 | + http.StatusBadRequest, |
2786 | + unknownChannel, |
2787 | + "Unknown channel", |
2788 | + } |
2789 | + ErrUnknown = &APIError{ |
2790 | + http.StatusInternalServerError, |
2791 | + internalError, |
2792 | + "Unknown error", |
2793 | + } |
2794 | + ErrCouldNotStoreNotification = &APIError{ |
2795 | + http.StatusServiceUnavailable, |
2796 | + unavailable, |
2797 | + "Could not store notification", |
2798 | + } |
2799 | +) |
2800 | + |
2801 | +type Message struct { |
2802 | + Registration string `json:"registration"` |
2803 | + CoalesceTag string `json:"coalesce_tag"` |
2804 | + Data json.RawMessage `json:"data"` |
2805 | +} |
2806 | + |
2807 | +// Broadcast request JSON object. |
2808 | +type Broadcast struct { |
2809 | + Channel string `json:"channel"` |
2810 | + ExpireAfter uint8 `json:"expire_after"` |
2811 | + Data json.RawMessage `json:"data"` |
2812 | +} |
2813 | + |
2814 | +func respondError(writer http.ResponseWriter, apiErr *APIError) { |
2815 | + wireError, err := json.Marshal(apiErr) |
2816 | + if err != nil { |
2817 | + // xxx general 500 framework |
2818 | + log.Println("The provided string could not be marshaled into an error:", err) |
2819 | + http.Error(writer, "Internal Server Error", http.StatusInternalServerError) |
2820 | + return |
2821 | + } |
2822 | + writer.Header().Set("Content-type", JSONMediaType) |
2823 | + writer.WriteHeader(apiErr.StatusCode) |
2824 | + writer.Write(wireError) |
2825 | +} |
2826 | + |
2827 | +func checkContentLength(request *http.Request) *APIError { |
2828 | + if request.ContentLength == -1 { |
2829 | + return ErrNoContentLengthProvided |
2830 | + } |
2831 | + if request.ContentLength == 0 { |
2832 | + return ErrRequestBodyEmpty |
2833 | + } |
2834 | + if request.ContentLength > MaxRequestBodyBytes { |
2835 | + return ErrRequestBodyTooLarge |
2836 | + } |
2837 | + return nil |
2838 | +} |
2839 | + |
2840 | +func checkRequestAsPost(request *http.Request) *APIError { |
2841 | + if err := checkContentLength(request); err != nil { |
2842 | + return err |
2843 | + } |
2844 | + if request.Header.Get("Content-Type") != JSONMediaType { |
2845 | + return ErrWrongContentType |
2846 | + } |
2847 | + if request.Method != "POST" { |
2848 | + return ErrWrongRequestMethod |
2849 | + } |
2850 | + return nil |
2851 | +} |
2852 | + |
2853 | +func readBody(writer http.ResponseWriter, request *http.Request) ([]byte, *APIError) { |
2854 | + if err := checkRequestAsPost(request); err != nil { |
2855 | + return nil, err |
2856 | + } |
2857 | + |
2858 | + body := make([]byte, request.ContentLength) |
2859 | + _, err := io.ReadFull(request.Body, body) |
2860 | + |
2861 | + if err != nil { |
2862 | + return nil, ErrCouldNotReadBody |
2863 | + } |
2864 | + |
2865 | + return body, nil |
2866 | +} |
2867 | + |
2868 | +// state holds the interfaces to delegate to serving requests |
2869 | +type state struct { |
2870 | + store store.PendingStore |
2871 | + broker broker.BrokerSending |
2872 | + logger logger.Logger |
2873 | +} |
2874 | + |
2875 | +type BroadcastHandler state |
2876 | + |
2877 | +func (h *BroadcastHandler) doBroadcast(bcast *Broadcast) *APIError { |
2878 | + chanId, err := h.store.GetInternalChannelId(bcast.Channel) |
2879 | + if err != nil { |
2880 | + switch err { |
2881 | + case store.ErrUnknownChannel: |
2882 | + return ErrUnknownChannel |
2883 | + default: |
2884 | + return ErrUnknown |
2885 | + } |
2886 | + } |
2887 | + // xxx ignoring expiration for now |
2888 | + err = h.store.AppendToChannel(chanId, bcast.Data) |
2889 | + if err != nil { |
2890 | + // assume this for now |
2891 | + return ErrCouldNotStoreNotification |
2892 | + } |
2893 | + |
2894 | + h.broker.Broadcast(chanId) |
2895 | + return nil |
2896 | +} |
2897 | + |
2898 | +func (h *BroadcastHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { |
2899 | + body, apiErr := readBody(writer, request) |
2900 | + |
2901 | + if apiErr != nil { |
2902 | + respondError(writer, apiErr) |
2903 | + return |
2904 | + } |
2905 | + |
2906 | + broadcast := &Broadcast{} |
2907 | + err := json.Unmarshal(body, broadcast) |
2908 | + |
2909 | + if err != nil { |
2910 | + respondError(writer, ErrMalformedJSONObject) |
2911 | + return |
2912 | + } |
2913 | + |
2914 | + apiErr = h.doBroadcast(broadcast) |
2915 | + if apiErr != nil { |
2916 | + respondError(writer, apiErr) |
2917 | + return |
2918 | + } |
2919 | + |
2920 | + writer.Header().Set("Content-Type", "application/json") |
2921 | + fmt.Fprintf(writer, `{"ok":true}`) |
2922 | +} |
2923 | + |
2924 | +// MakeHandlersMux makes a handler that dispatches for the various API endpoints. |
2925 | +func MakeHandlersMux(store store.PendingStore, broker broker.BrokerSending, logger logger.Logger) http.Handler { |
2926 | + mux := http.NewServeMux() |
2927 | + mux.Handle("/broadcast", &BroadcastHandler{ |
2928 | + store: store, |
2929 | + broker: broker, |
2930 | + logger: logger, |
2931 | + }) |
2932 | + return mux |
2933 | +} |
2934 | |
2935 | === added file 'server/api/handlers_test.go' |
2936 | --- server/api/handlers_test.go 1970-01-01 00:00:00 +0000 |
2937 | +++ server/api/handlers_test.go 2014-01-14 15:09:46 +0000 |
2938 | @@ -0,0 +1,345 @@ |
2939 | +/* |
2940 | + Copyright 2013-2014 Canonical Ltd. |
2941 | + |
2942 | + This program is free software: you can redistribute it and/or modify it |
2943 | + under the terms of the GNU General Public License version 3, as published |
2944 | + by the Free Software Foundation. |
2945 | + |
2946 | + This program is distributed in the hope that it will be useful, but |
2947 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
2948 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
2949 | + PURPOSE. See the GNU General Public License for more details. |
2950 | + |
2951 | + You should have received a copy of the GNU General Public License along |
2952 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
2953 | +*/ |
2954 | + |
2955 | +package api |
2956 | + |
2957 | +import ( |
2958 | + "bytes" |
2959 | + "encoding/json" |
2960 | + "errors" |
2961 | + "fmt" |
2962 | + "io/ioutil" |
2963 | + . "launchpad.net/gocheck" |
2964 | + "launchpad.net/ubuntu-push/server/store" |
2965 | + "net/http" |
2966 | + "net/http/httptest" |
2967 | + "strings" |
2968 | + "testing" |
2969 | +) |
2970 | + |
2971 | +func TestHandlers(t *testing.T) { TestingT(t) } |
2972 | + |
2973 | +type handlersSuite struct { |
2974 | + messageEndpoint string |
2975 | + json string |
2976 | + client *http.Client |
2977 | + c *C |
2978 | +} |
2979 | + |
2980 | +var _ = Suite(&handlersSuite{}) |
2981 | + |
2982 | +func (s *handlersSuite) SetUpTest(c *C) { |
2983 | + s.client = &http.Client{} |
2984 | +} |
2985 | + |
2986 | +func (s *handlersSuite) TestAPIError(c *C) { |
2987 | + var apiErr error = &APIError{400, invalidRequest, "Message"} |
2988 | + c.Check(apiErr.Error(), Equals, "api invalid-request: Message") |
2989 | + wire, err := json.Marshal(apiErr) |
2990 | + c.Assert(err, IsNil) |
2991 | + c.Check(string(wire), Equals, `{"error":"invalid-request","message":"Message"}`) |
2992 | +} |
2993 | + |
2994 | +type checkBrokerSending struct { |
2995 | + store store.PendingStore |
2996 | + chanId store.InternalChannelId |
2997 | + err error |
2998 | + top int64 |
2999 | + payloads []json.RawMessage |
3000 | +} |
3001 | + |
3002 | +func (cbsend *checkBrokerSending) Broadcast(chanId store.InternalChannelId) { |
3003 | + top, payloads, err := cbsend.store.GetChannelSnapshot(chanId) |
3004 | + cbsend.err = err |
3005 | + cbsend.chanId = chanId |
3006 | + cbsend.top = top |
3007 | + cbsend.payloads = payloads |
3008 | +} |
3009 | + |
3010 | +func (s *handlersSuite) TestDoBroadcast(c *C) { |
3011 | + sto := store.NewInMemoryPendingStore() |
3012 | + bsend := &checkBrokerSending{store: sto} |
3013 | + bh := &BroadcastHandler{sto, bsend, nil} |
3014 | + payload := json.RawMessage(`{"a": 1}`) |
3015 | + apiErr := bh.doBroadcast(&Broadcast{ |
3016 | + Channel: "system", |
3017 | + Data: payload, |
3018 | + }) |
3019 | + c.Check(apiErr, IsNil) |
3020 | + c.Check(bsend.err, IsNil) |
3021 | + c.Check(bsend.chanId, Equals, store.SystemInternalChannelId) |
3022 | + c.Check(bsend.top, Equals, int64(1)) |
3023 | + c.Check(bsend.payloads, DeepEquals, []json.RawMessage{payload}) |
3024 | +} |
3025 | + |
3026 | +func (s *handlersSuite) TestDoBroadcastUnknownChannel(c *C) { |
3027 | + sto := store.NewInMemoryPendingStore() |
3028 | + bh := &BroadcastHandler{sto, nil, nil} |
3029 | + apiErr := bh.doBroadcast(&Broadcast{ |
3030 | + Channel: "unknown", |
3031 | + Data: json.RawMessage(`{"a": 1}`), |
3032 | + }) |
3033 | + c.Check(apiErr, Equals, ErrUnknownChannel) |
3034 | +} |
3035 | + |
3036 | +type interceptInMemoryPendingStore struct { |
3037 | + *store.InMemoryPendingStore |
3038 | + intercept func(meth string, err error) error |
3039 | +} |
3040 | + |
3041 | +func (isto *interceptInMemoryPendingStore) GetInternalChannelId(channel string) (store.InternalChannelId, error) { |
3042 | + chanId, err := isto.InMemoryPendingStore.GetInternalChannelId(channel) |
3043 | + return chanId, isto.intercept("GetInternalChannelId", err) |
3044 | +} |
3045 | + |
3046 | +func (isto *interceptInMemoryPendingStore) AppendToChannel(chanId store.InternalChannelId, payload json.RawMessage) error { |
3047 | + err := isto.InMemoryPendingStore.AppendToChannel(chanId, payload) |
3048 | + return isto.intercept("AppendToChannel", err) |
3049 | +} |
3050 | + |
3051 | +func (s *handlersSuite) TestDoBroadcastUnknownError(c *C) { |
3052 | + sto := &interceptInMemoryPendingStore{ |
3053 | + store.NewInMemoryPendingStore(), |
3054 | + func(meth string, err error) error { |
3055 | + return errors.New("other") |
3056 | + }, |
3057 | + } |
3058 | + bh := &BroadcastHandler{sto, nil, nil} |
3059 | + apiErr := bh.doBroadcast(&Broadcast{ |
3060 | + Channel: "system", |
3061 | + Data: json.RawMessage(`{"a": 1}`), |
3062 | + }) |
3063 | + c.Check(apiErr, Equals, ErrUnknown) |
3064 | +} |
3065 | + |
3066 | +func (s *handlersSuite) TestDoBroadcastCouldNotStoreNotification(c *C) { |
3067 | + sto := &interceptInMemoryPendingStore{ |
3068 | + store.NewInMemoryPendingStore(), |
3069 | + func(meth string, err error) error { |
3070 | + if meth == "AppendToChannel" { |
3071 | + return errors.New("fail") |
3072 | + } |
3073 | + return err |
3074 | + }, |
3075 | + } |
3076 | + bh := &BroadcastHandler{sto, nil, nil} |
3077 | + apiErr := bh.doBroadcast(&Broadcast{ |
3078 | + Channel: "system", |
3079 | + Data: json.RawMessage(`{"a": 1}`), |
3080 | + }) |
3081 | + c.Check(apiErr, Equals, ErrCouldNotStoreNotification) |
3082 | +} |
3083 | + |
3084 | +func newPostRequest(path string, message interface{}, server *httptest.Server) *http.Request { |
3085 | + packedMessage, err := json.Marshal(message) |
3086 | + if err != nil { |
3087 | + panic(err) |
3088 | + } |
3089 | + reader := bytes.NewReader(packedMessage) |
3090 | + |
3091 | + url := server.URL + path |
3092 | + request, _ := http.NewRequest("POST", url, reader) |
3093 | + request.ContentLength = int64(reader.Len()) |
3094 | + request.Header.Set("Content-Type", "application/json") |
3095 | + |
3096 | + return request |
3097 | +} |
3098 | + |
3099 | +func getResponseBody(response *http.Response) ([]byte, error) { |
3100 | + defer response.Body.Close() |
3101 | + return ioutil.ReadAll(response.Body) |
3102 | +} |
3103 | + |
3104 | +func checkError(c *C, response *http.Response, apiErr *APIError) { |
3105 | + c.Check(response.StatusCode, Equals, apiErr.StatusCode) |
3106 | + c.Check(response.Header.Get("Content-Type"), Equals, "application/json") |
3107 | + error := &APIError{StatusCode: response.StatusCode} |
3108 | + body, err := getResponseBody(response) |
3109 | + c.Assert(err, IsNil) |
3110 | + err = json.Unmarshal(body, error) |
3111 | + c.Assert(err, IsNil) |
3112 | + c.Check(error, DeepEquals, apiErr) |
3113 | +} |
3114 | + |
3115 | +type testBrokerSending struct { |
3116 | + chanId chan store.InternalChannelId |
3117 | +} |
3118 | + |
3119 | +func (bsend testBrokerSending) Broadcast(chanId store.InternalChannelId) { |
3120 | + bsend.chanId <- chanId |
3121 | +} |
3122 | + |
3123 | +func (s *handlersSuite) TestRespondsToBasicSystemBroadcast(c *C) { |
3124 | + sto := store.NewInMemoryPendingStore() |
3125 | + bsend := testBrokerSending{make(chan store.InternalChannelId, 1)} |
3126 | + testServer := httptest.NewServer(MakeHandlersMux(sto, bsend, nil)) |
3127 | + defer testServer.Close() |
3128 | + |
3129 | + payload := json.RawMessage(`{"foo":"bar"}`) |
3130 | + |
3131 | + request := newPostRequest("/broadcast", &Broadcast{ |
3132 | + Channel: "system", |
3133 | + ExpireAfter: 60, |
3134 | + Data: payload, |
3135 | + }, testServer) |
3136 | + |
3137 | + response, err := s.client.Do(request) |
3138 | + c.Assert(err, IsNil) |
3139 | + |
3140 | + c.Check(response.StatusCode, Equals, http.StatusOK) |
3141 | + c.Check(response.Header.Get("Content-Type"), Equals, "application/json") |
3142 | + body, err := getResponseBody(response) |
3143 | + c.Assert(err, IsNil) |
3144 | + dest := make(map[string]bool) |
3145 | + err = json.Unmarshal(body, &dest) |
3146 | + c.Assert(err, IsNil) |
3147 | + c.Check(dest, DeepEquals, map[string]bool{"ok": true}) |
3148 | + |
3149 | + top, _, err := sto.GetChannelSnapshot(store.SystemInternalChannelId) |
3150 | + c.Assert(err, IsNil) |
3151 | + c.Check(top, Equals, int64(1)) |
3152 | + c.Check(<-bsend.chanId, Equals, store.SystemInternalChannelId) |
3153 | +} |
3154 | + |
3155 | +func (s *handlersSuite) TestFromBroadcastError(c *C) { |
3156 | + sto := store.NewInMemoryPendingStore() |
3157 | + testServer := httptest.NewServer(MakeHandlersMux(sto, nil, nil)) |
3158 | + defer testServer.Close() |
3159 | + |
3160 | + payload := json.RawMessage(`{"foo":"bar"}`) |
3161 | + |
3162 | + request := newPostRequest("/broadcast", &Broadcast{ |
3163 | + Channel: "unkown", |
3164 | + ExpireAfter: 60, |
3165 | + Data: payload, |
3166 | + }, testServer) |
3167 | + |
3168 | + response, err := s.client.Do(request) |
3169 | + c.Assert(err, IsNil) |
3170 | + checkError(c, response, ErrUnknownChannel) |
3171 | +} |
3172 | + |
3173 | +func (s *handlersSuite) TestCannotBroadcastMalformedData(c *C) { |
3174 | + testServer := httptest.NewServer(&BroadcastHandler{}) |
3175 | + defer testServer.Close() |
3176 | + |
3177 | + packedMessage := []byte("{some bogus-message: ") |
3178 | + reader := bytes.NewReader(packedMessage) |
3179 | + |
3180 | + request, err := http.NewRequest("POST", testServer.URL, reader) |
3181 | + c.Assert(err, IsNil) |
3182 | + request.ContentLength = int64(len(packedMessage)) |
3183 | + request.Header.Set("Content-Type", "application/json") |
3184 | + |
3185 | + response, err := s.client.Do(request) |
3186 | + c.Assert(err, IsNil) |
3187 | + checkError(c, response, ErrMalformedJSONObject) |
3188 | +} |
3189 | + |
3190 | +func (s *handlersSuite) TestCannotBroadcastTooBigMessages(c *C) { |
3191 | + testServer := httptest.NewServer(&BroadcastHandler{}) |
3192 | + defer testServer.Close() |
3193 | + |
3194 | + bigString := strings.Repeat("a", MaxRequestBodyBytes) |
3195 | + dataString := fmt.Sprintf(`"%v"`, bigString) |
3196 | + |
3197 | + request := newPostRequest("/", &Broadcast{ |
3198 | + Channel: "some-channel", |
3199 | + ExpireAfter: 60, |
3200 | + Data: json.RawMessage([]byte(dataString)), |
3201 | + }, testServer) |
3202 | + |
3203 | + response, err := s.client.Do(request) |
3204 | + c.Assert(err, IsNil) |
3205 | + checkError(c, response, ErrRequestBodyTooLarge) |
3206 | +} |
3207 | + |
3208 | +func (s *handlersSuite) TestCannotBroadcastWithoutContentLength(c *C) { |
3209 | + testServer := httptest.NewServer(&BroadcastHandler{}) |
3210 | + defer testServer.Close() |
3211 | + |
3212 | + dataString := `{"foo":"bar"}` |
3213 | + |
3214 | + request := newPostRequest("/", &Broadcast{ |
3215 | + Channel: "some-channel", |
3216 | + ExpireAfter: 60, |
3217 | + Data: json.RawMessage([]byte(dataString)), |
3218 | + }, testServer) |
3219 | + request.ContentLength = -1 |
3220 | + |
3221 | + response, err := s.client.Do(request) |
3222 | + c.Assert(err, IsNil) |
3223 | + checkError(c, response, ErrNoContentLengthProvided) |
3224 | +} |
3225 | + |
3226 | +func (s *handlersSuite) TestCannotBroadcastEmptyMessages(c *C) { |
3227 | + testServer := httptest.NewServer(&BroadcastHandler{}) |
3228 | + defer testServer.Close() |
3229 | + |
3230 | + packedMessage := make([]byte, 0) |
3231 | + reader := bytes.NewReader(packedMessage) |
3232 | + |
3233 | + request, err := http.NewRequest("POST", testServer.URL, reader) |
3234 | + c.Assert(err, IsNil) |
3235 | + request.ContentLength = int64(len(packedMessage)) |
3236 | + request.Header.Set("Content-Type", "application/json") |
3237 | + |
3238 | + response, err := s.client.Do(request) |
3239 | + c.Assert(err, IsNil) |
3240 | + checkError(c, response, ErrRequestBodyEmpty) |
3241 | +} |
3242 | + |
3243 | +func (s *handlersSuite) TestCannotBroadcastNonJSONMessages(c *C) { |
3244 | + testServer := httptest.NewServer(&BroadcastHandler{}) |
3245 | + defer testServer.Close() |
3246 | + |
3247 | + dataString := `{"foo":"bar"}` |
3248 | + |
3249 | + request := newPostRequest("/", &Broadcast{ |
3250 | + Channel: "some-channel", |
3251 | + ExpireAfter: 60, |
3252 | + Data: json.RawMessage([]byte(dataString)), |
3253 | + }, testServer) |
3254 | + request.Header.Set("Content-Type", "text/plain") |
3255 | + |
3256 | + response, err := s.client.Do(request) |
3257 | + c.Assert(err, IsNil) |
3258 | + checkError(c, response, ErrWrongContentType) |
3259 | +} |
3260 | + |
3261 | +func (s *handlersSuite) TestCannotBroadcastNonPostMessages(c *C) { |
3262 | + testServer := httptest.NewServer(&BroadcastHandler{}) |
3263 | + defer testServer.Close() |
3264 | + |
3265 | + dataString := `{"foo":"bar"}` |
3266 | + packedMessage, err := json.Marshal(&Broadcast{ |
3267 | + Channel: "some-channel", |
3268 | + ExpireAfter: 60, |
3269 | + Data: json.RawMessage([]byte(dataString)), |
3270 | + }) |
3271 | + s.c.Assert(err, IsNil) |
3272 | + reader := bytes.NewReader(packedMessage) |
3273 | + |
3274 | + request, err := http.NewRequest("GET", testServer.URL, reader) |
3275 | + c.Assert(err, IsNil) |
3276 | + request.ContentLength = int64(len(packedMessage)) |
3277 | + request.Header.Set("Content-Type", "application/json") |
3278 | + |
3279 | + response, err := s.client.Do(request) |
3280 | + c.Assert(err, IsNil) |
3281 | + |
3282 | + checkError(c, response, ErrWrongRequestMethod) |
3283 | +} |
3284 | |
3285 | === added directory 'server/broker' |
3286 | === added file 'server/broker/broker.go' |
3287 | --- server/broker/broker.go 1970-01-01 00:00:00 +0000 |
3288 | +++ server/broker/broker.go 2014-01-14 15:09:46 +0000 |
3289 | @@ -0,0 +1,71 @@ |
3290 | +/* |
3291 | + Copyright 2013-2014 Canonical Ltd. |
3292 | + |
3293 | + This program is free software: you can redistribute it and/or modify it |
3294 | + under the terms of the GNU General Public License version 3, as published |
3295 | + by the Free Software Foundation. |
3296 | + |
3297 | + This program is distributed in the hope that it will be useful, but |
3298 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
3299 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
3300 | + PURPOSE. See the GNU General Public License for more details. |
3301 | + |
3302 | + You should have received a copy of the GNU General Public License along |
3303 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
3304 | +*/ |
3305 | + |
3306 | +// Package broker handles session registrations and delivery of messages |
3307 | +// through sessions. |
3308 | +package broker |
3309 | + |
3310 | +import ( |
3311 | + "fmt" |
3312 | + "launchpad.net/ubuntu-push/protocol" |
3313 | + "launchpad.net/ubuntu-push/server/store" |
3314 | +) |
3315 | + |
3316 | +// Broker is responsible for registring sessions and delivering messages through them. |
3317 | +type Broker interface { |
3318 | + // Register the session. |
3319 | + Register(*protocol.ConnectMsg) (BrokerSession, error) |
3320 | + // Unregister the session. |
3321 | + Unregister(BrokerSession) |
3322 | +} |
3323 | + |
3324 | +// BrokerSending is the notification sending facet of the broker. |
3325 | +type BrokerSending interface { |
3326 | + // Broadcast channel. |
3327 | + Broadcast(chanId store.InternalChannelId) |
3328 | +} |
3329 | + |
3330 | +// Exchange guides the session through performing an exchange, typically delivery. |
3331 | +type Exchange interface { |
3332 | + Prepare(BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) |
3333 | + Acked(BrokerSession) error |
3334 | +} |
3335 | + |
3336 | +// BrokerSession holds broker session state. |
3337 | +type BrokerSession interface { |
3338 | + // SessionChannel returns the session control channel |
3339 | + // on which the session gets exchanges to perform. |
3340 | + SessionChannel() <-chan Exchange |
3341 | + // DeviceId returns the device id string. |
3342 | + DeviceId() string |
3343 | +} |
3344 | + |
3345 | +// Session aborted error. |
3346 | +type ErrAbort struct { |
3347 | + Reason string |
3348 | +} |
3349 | + |
3350 | +func (ea *ErrAbort) Error() string { |
3351 | + return fmt.Sprintf("session aborted (%s)", ea.Reason) |
3352 | +} |
3353 | + |
3354 | +// BrokerConfig gives access to the typical broker configuration. |
3355 | +type BrokerConfig interface { |
3356 | + // SessionQueueSize gives the session queue size. |
3357 | + SessionQueueSize() uint |
3358 | + // BrokerQueueSize gives the internal broker queue size. |
3359 | + BrokerQueueSize() uint |
3360 | +} |
3361 | |
3362 | === added file 'server/broker/simple.go' |
3363 | --- server/broker/simple.go 1970-01-01 00:00:00 +0000 |
3364 | +++ server/broker/simple.go 2014-01-14 15:09:46 +0000 |
3365 | @@ -0,0 +1,260 @@ |
3366 | +/* |
3367 | + Copyright 2013-2014 Canonical Ltd. |
3368 | + |
3369 | + This program is free software: you can redistribute it and/or modify it |
3370 | + under the terms of the GNU General Public License version 3, as published |
3371 | + by the Free Software Foundation. |
3372 | + |
3373 | + This program is distributed in the hope that it will be useful, but |
3374 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
3375 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
3376 | + PURPOSE. See the GNU General Public License for more details. |
3377 | + |
3378 | + You should have received a copy of the GNU General Public License along |
3379 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
3380 | +*/ |
3381 | + |
3382 | +package broker |
3383 | + |
3384 | +import ( |
3385 | + "encoding/json" |
3386 | + "launchpad.net/ubuntu-push/logger" |
3387 | + "launchpad.net/ubuntu-push/protocol" |
3388 | + "launchpad.net/ubuntu-push/server/store" |
3389 | + // "log" |
3390 | + "sync" |
3391 | +) |
3392 | + |
3393 | +// simpleBroker implements broker.Broker for everything in just one process. |
3394 | +type SimpleBroker struct { |
3395 | + sto store.PendingStore |
3396 | + logger logger.Logger |
3397 | + // running state |
3398 | + runMutex sync.Mutex |
3399 | + running bool |
3400 | + stop chan bool |
3401 | + stopped chan bool |
3402 | + // sessions |
3403 | + sessionCh chan *simpleBrokerSession |
3404 | + registry map[string]*simpleBrokerSession |
3405 | + sessionQueueSize uint |
3406 | + // delivery |
3407 | + deliveryCh chan *delivery |
3408 | +} |
3409 | + |
3410 | +// simpleBrokerSession represents a session in the broker. |
3411 | +type simpleBrokerSession struct { |
3412 | + registered bool |
3413 | + deviceId string |
3414 | + done chan bool |
3415 | + exchanges chan Exchange |
3416 | + levels map[store.InternalChannelId]int64 |
3417 | + // for exchanges |
3418 | + broadcastMsg protocol.BroadcastMsg |
3419 | + ackMsg protocol.AckMsg |
3420 | +} |
3421 | + |
3422 | +type deliveryKind int |
3423 | + |
3424 | +const ( |
3425 | + broadcastDelivery deliveryKind = iota |
3426 | +) |
3427 | + |
3428 | +// delivery holds all the information to request a delivery |
3429 | +type delivery struct { |
3430 | + kind deliveryKind |
3431 | + chanId store.InternalChannelId |
3432 | +} |
3433 | + |
3434 | +func (sess *simpleBrokerSession) SessionChannel() <-chan Exchange { |
3435 | + return sess.exchanges |
3436 | +} |
3437 | + |
3438 | +func (sess *simpleBrokerSession) DeviceId() string { |
3439 | + return sess.deviceId |
3440 | +} |
3441 | + |
3442 | +// NewSimpleBroker makes a new SimpleBroker. |
3443 | +func NewSimpleBroker(sto store.PendingStore, cfg BrokerConfig, logger logger.Logger) *SimpleBroker { |
3444 | + sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize()) |
3445 | + deliveryCh := make(chan *delivery, cfg.BrokerQueueSize()) |
3446 | + registry := make(map[string]*simpleBrokerSession) |
3447 | + return &SimpleBroker{ |
3448 | + logger: logger, |
3449 | + sto: sto, |
3450 | + stop: make(chan bool), |
3451 | + stopped: make(chan bool), |
3452 | + registry: registry, |
3453 | + sessionCh: sessionCh, |
3454 | + deliveryCh: deliveryCh, |
3455 | + sessionQueueSize: cfg.SessionQueueSize(), |
3456 | + } |
3457 | +} |
3458 | + |
3459 | +// Start starts the broker. |
3460 | +func (b *SimpleBroker) Start() { |
3461 | + b.runMutex.Lock() |
3462 | + defer b.runMutex.Unlock() |
3463 | + if b.running { |
3464 | + return |
3465 | + } |
3466 | + b.running = true |
3467 | + go b.run() |
3468 | +} |
3469 | + |
3470 | +// Stop stops the broker. |
3471 | +func (b *SimpleBroker) Stop() { |
3472 | + b.runMutex.Lock() |
3473 | + defer b.runMutex.Unlock() |
3474 | + if !b.running { |
3475 | + return |
3476 | + } |
3477 | + b.stop <- true |
3478 | + <-b.stopped |
3479 | + b.running = false |
3480 | +} |
3481 | + |
3482 | +func (b *SimpleBroker) feedPending(sess *simpleBrokerSession) error { |
3483 | + // find relevant channels, for now only system |
3484 | + channels := []store.InternalChannelId{store.SystemInternalChannelId} |
3485 | + for _, chanId := range channels { |
3486 | + topLevel, payloads, err := b.sto.GetChannelSnapshot(chanId) |
3487 | + if err != nil { |
3488 | + // next broadcast will try again |
3489 | + b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err) |
3490 | + continue |
3491 | + } |
3492 | + clientLevel := sess.levels[chanId] |
3493 | + if clientLevel != topLevel { |
3494 | + broadcastExchg := &simpleBroadcastExchange{ |
3495 | + chanId: chanId, |
3496 | + topLevel: topLevel, |
3497 | + notificationPayloads: payloads, |
3498 | + } |
3499 | + sess.exchanges <- broadcastExchg |
3500 | + } |
3501 | + } |
3502 | + return nil |
3503 | +} |
3504 | + |
3505 | +// Register registers a session with the broker. It feeds the session |
3506 | +// pending notifications as well. |
3507 | +func (b *SimpleBroker) Register(connect *protocol.ConnectMsg) (BrokerSession, error) { |
3508 | + // xxx sanity check DeviceId |
3509 | + levels := map[store.InternalChannelId]int64{} |
3510 | + for hexId, v := range connect.Levels { |
3511 | + id, err := store.HexToInternalChannelId(hexId) |
3512 | + if err != nil { |
3513 | + return nil, &ErrAbort{err.Error()} |
3514 | + } |
3515 | + levels[id] = v |
3516 | + } |
3517 | + sess := &simpleBrokerSession{ |
3518 | + deviceId: connect.DeviceId, |
3519 | + done: make(chan bool), |
3520 | + exchanges: make(chan Exchange, b.sessionQueueSize), |
3521 | + levels: levels, |
3522 | + } |
3523 | + b.sessionCh <- sess |
3524 | + <-sess.done |
3525 | + err := b.feedPending(sess) |
3526 | + if err != nil { |
3527 | + return nil, err |
3528 | + } |
3529 | + return sess, nil |
3530 | +} |
3531 | + |
3532 | +// Unregister unregisters a session with the broker. Doesn't wait. |
3533 | +func (b *SimpleBroker) Unregister(s BrokerSession) { |
3534 | + sess := s.(*simpleBrokerSession) |
3535 | + b.sessionCh <- sess |
3536 | +} |
3537 | + |
3538 | +// run runs the agent logic of the broker. |
3539 | +func (b *SimpleBroker) run() { |
3540 | +Loop: |
3541 | + for { |
3542 | + select { |
3543 | + case <-b.stop: |
3544 | + b.stopped <- true |
3545 | + break Loop |
3546 | + case sess := <-b.sessionCh: |
3547 | + if sess.registered { // unregister |
3548 | + // unregister only current |
3549 | + if b.registry[sess.deviceId] == sess { |
3550 | + delete(b.registry, sess.deviceId) |
3551 | + } |
3552 | + } else { // register |
3553 | + b.registry[sess.deviceId] = sess |
3554 | + sess.registered = true |
3555 | + sess.done <- true |
3556 | + } |
3557 | + case delivery := <-b.deliveryCh: |
3558 | + switch delivery.kind { |
3559 | + case broadcastDelivery: |
3560 | + topLevel, payloads, err := b.sto.GetChannelSnapshot(delivery.chanId) |
3561 | + if err != nil { |
3562 | + // next broadcast will try again |
3563 | + b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err) |
3564 | + continue Loop |
3565 | + } |
3566 | + broadcastExchg := &simpleBroadcastExchange{ |
3567 | + chanId: delivery.chanId, |
3568 | + topLevel: topLevel, |
3569 | + notificationPayloads: payloads, |
3570 | + } |
3571 | + for _, sess := range b.registry { |
3572 | + sess.exchanges <- broadcastExchg |
3573 | + } |
3574 | + } |
3575 | + } |
3576 | + } |
3577 | +} |
3578 | + |
3579 | +// Broadcast requests the broadcast for a channel. |
3580 | +func (b *SimpleBroker) Broadcast(chanId store.InternalChannelId) { |
3581 | + b.deliveryCh <- &delivery{ |
3582 | + kind: broadcastDelivery, |
3583 | + chanId: chanId, |
3584 | + } |
3585 | +} |
3586 | + |
3587 | +// Exchanges |
3588 | + |
3589 | +type simpleBroadcastExchange struct { |
3590 | + chanId store.InternalChannelId |
3591 | + topLevel int64 |
3592 | + notificationPayloads []json.RawMessage |
3593 | +} |
3594 | + |
3595 | +func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage { |
3596 | + c := int64(len(payloads)) |
3597 | + delta := topLevel - clientLevel |
3598 | + if delta < c { |
3599 | + return payloads[c-delta:] |
3600 | + } else { |
3601 | + return payloads |
3602 | + } |
3603 | +} |
3604 | + |
3605 | +func (sbe *simpleBroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
3606 | + simpleSess := sess.(*simpleBrokerSession) |
3607 | + simpleSess.broadcastMsg.Type = "broadcast" |
3608 | + clientLevel := simpleSess.levels[sbe.chanId] |
3609 | + payloads := filterByLevel(clientLevel, sbe.topLevel, sbe.notificationPayloads) |
3610 | + // xxx need an AppId as well, later |
3611 | + simpleSess.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.chanId) |
3612 | + simpleSess.broadcastMsg.TopLevel = sbe.topLevel |
3613 | + simpleSess.broadcastMsg.Payloads = payloads |
3614 | + return &simpleSess.broadcastMsg, &simpleSess.ackMsg, nil |
3615 | +} |
3616 | + |
3617 | +func (sbe *simpleBroadcastExchange) Acked(sess BrokerSession) error { |
3618 | + simpleSess := sess.(*simpleBrokerSession) |
3619 | + if simpleSess.ackMsg.Type != "ack" { |
3620 | + return &ErrAbort{"expected ACK message"} |
3621 | + } |
3622 | + // update levels |
3623 | + simpleSess.levels[sbe.chanId] = sbe.topLevel |
3624 | + return nil |
3625 | +} |
3626 | |
3627 | === added file 'server/broker/simple_test.go' |
3628 | --- server/broker/simple_test.go 1970-01-01 00:00:00 +0000 |
3629 | +++ server/broker/simple_test.go 2014-01-14 15:09:46 +0000 |
3630 | @@ -0,0 +1,327 @@ |
3631 | +/* |
3632 | + Copyright 2013-2014 Canonical Ltd. |
3633 | + |
3634 | + This program is free software: you can redistribute it and/or modify it |
3635 | + under the terms of the GNU General Public License version 3, as published |
3636 | + by the Free Software Foundation. |
3637 | + |
3638 | + This program is distributed in the hope that it will be useful, but |
3639 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
3640 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
3641 | + PURPOSE. See the GNU General Public License for more details. |
3642 | + |
3643 | + You should have received a copy of the GNU General Public License along |
3644 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
3645 | +*/ |
3646 | + |
3647 | +package broker |
3648 | + |
3649 | +import ( |
3650 | + "encoding/json" |
3651 | + "errors" |
3652 | + . "launchpad.net/gocheck" |
3653 | + "launchpad.net/ubuntu-push/logger" |
3654 | + "launchpad.net/ubuntu-push/protocol" |
3655 | + "launchpad.net/ubuntu-push/server/store" |
3656 | + helpers "launchpad.net/ubuntu-push/testing" |
3657 | + // "log" |
3658 | + "testing" |
3659 | + "time" |
3660 | +) |
3661 | + |
3662 | +func TestSimple(t *testing.T) { TestingT(t) } |
3663 | + |
3664 | +type simpleSuite struct{} |
3665 | + |
3666 | +var _ = Suite(&simpleSuite{}) |
3667 | + |
3668 | +type testBrokerConfig struct{} |
3669 | + |
3670 | +func (tbc *testBrokerConfig) SessionQueueSize() uint { |
3671 | + return 10 |
3672 | +} |
3673 | + |
3674 | +func (tbc *testBrokerConfig) BrokerQueueSize() uint { |
3675 | + return 5 |
3676 | +} |
3677 | + |
3678 | +func (s *simpleSuite) TestNew(c *C) { |
3679 | + sto := store.NewInMemoryPendingStore() |
3680 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, nil) |
3681 | + c.Check(cap(b.sessionCh), Equals, 5) |
3682 | + c.Check(len(b.registry), Equals, 0) |
3683 | + c.Check(b.sto, Equals, sto) |
3684 | +} |
3685 | + |
3686 | +func (s *simpleSuite) TestStartStop(c *C) { |
3687 | + b := NewSimpleBroker(nil, &testBrokerConfig{}, nil) |
3688 | + b.Start() |
3689 | + c.Check(b.running, Equals, true) |
3690 | + b.Start() |
3691 | + b.Stop() |
3692 | + c.Check(b.running, Equals, false) |
3693 | + b.Stop() |
3694 | +} |
3695 | + |
3696 | +func (s *simpleSuite) TestRegistration(c *C) { |
3697 | + sto := store.NewInMemoryPendingStore() |
3698 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, nil) |
3699 | + b.Start() |
3700 | + defer b.Stop() |
3701 | + sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"0": 5}}) |
3702 | + c.Assert(err, IsNil) |
3703 | + c.Assert(b.registry["dev-1"], Equals, sess) |
3704 | + c.Assert(sess.DeviceId(), Equals, "dev-1") |
3705 | + c.Check(sess.(*simpleBrokerSession).levels, DeepEquals, map[store.InternalChannelId]int64{ |
3706 | + store.SystemInternalChannelId: 5, |
3707 | + }) |
3708 | + b.Unregister(sess) |
3709 | + // just to make sure the unregister was processed |
3710 | + _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""}) |
3711 | + c.Assert(err, IsNil) |
3712 | + c.Check(b.registry["dev-1"], IsNil) |
3713 | +} |
3714 | + |
3715 | +func (s *simpleSuite) TestRegistrationBrokenLevels(c *C) { |
3716 | + sto := store.NewInMemoryPendingStore() |
3717 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, nil) |
3718 | + b.Start() |
3719 | + defer b.Stop() |
3720 | + _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"z": 5}}) |
3721 | + c.Check(err, FitsTypeOf, &ErrAbort{}) |
3722 | +} |
3723 | + |
3724 | +func (s *simpleSuite) TestFeedPending(c *C) { |
3725 | + sto := store.NewInMemoryPendingStore() |
3726 | + notification1 := json.RawMessage(`{"m": "M"}`) |
3727 | + sto.AppendToChannel(store.SystemInternalChannelId, notification1) |
3728 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, nil) |
3729 | + sess := &simpleBrokerSession{ |
3730 | + exchanges: make(chan Exchange, 1), |
3731 | + } |
3732 | + b.feedPending(sess) |
3733 | + c.Assert(len(sess.exchanges), Equals, 1) |
3734 | + exchg1 := <-sess.exchanges |
3735 | + c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{ |
3736 | + chanId: store.SystemInternalChannelId, |
3737 | + topLevel: 1, |
3738 | + notificationPayloads: []json.RawMessage{notification1}, |
3739 | + }) |
3740 | +} |
3741 | + |
3742 | +func (s *simpleSuite) TestFeedPendingNop(c *C) { |
3743 | + sto := store.NewInMemoryPendingStore() |
3744 | + notification1 := json.RawMessage(`{"m": "M"}`) |
3745 | + sto.AppendToChannel(store.SystemInternalChannelId, notification1) |
3746 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, nil) |
3747 | + sess := &simpleBrokerSession{ |
3748 | + exchanges: make(chan Exchange, 1), |
3749 | + levels: map[store.InternalChannelId]int64{ |
3750 | + store.SystemInternalChannelId: 1, |
3751 | + }, |
3752 | + } |
3753 | + b.feedPending(sess) |
3754 | + c.Assert(len(sess.exchanges), Equals, 0) |
3755 | +} |
3756 | + |
3757 | +func (s *simpleSuite) TestRegistrationFeedPending(c *C) { |
3758 | + sto := store.NewInMemoryPendingStore() |
3759 | + notification1 := json.RawMessage(`{"m": "M"}`) |
3760 | + sto.AppendToChannel(store.SystemInternalChannelId, notification1) |
3761 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, nil) |
3762 | + b.Start() |
3763 | + defer b.Stop() |
3764 | + sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}) |
3765 | + c.Assert(err, IsNil) |
3766 | + c.Check(len(sess.(*simpleBrokerSession).exchanges), Equals, 1) |
3767 | +} |
3768 | + |
3769 | +func (s *simpleSuite) TestRegistrationFeedPendingError(c *C) { |
3770 | + buf := &helpers.SyncedLogBuffer{} |
3771 | + logger := logger.NewSimpleLogger(buf, "error") |
3772 | + sto := &testFailingStore{} |
3773 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, logger) |
3774 | + b.Start() |
3775 | + defer b.Stop() |
3776 | + _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}) |
3777 | + c.Assert(err, IsNil) |
3778 | + // but |
3779 | + c.Check(buf.String(), Matches, ".*ERROR unsuccessful feed pending, get channel snapshot for 0: get channel snapshot fail\n") |
3780 | +} |
3781 | + |
3782 | +func (s *simpleSuite) TestRegistrationLastWins(c *C) { |
3783 | + sto := store.NewInMemoryPendingStore() |
3784 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, nil) |
3785 | + b.Start() |
3786 | + defer b.Stop() |
3787 | + sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}) |
3788 | + c.Assert(err, IsNil) |
3789 | + sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}) |
3790 | + c.Assert(err, IsNil) |
3791 | + c.Assert(b.registry["dev-1"], Equals, sess2) |
3792 | + b.Unregister(sess1) |
3793 | + // just to make sure the unregister was processed |
3794 | + _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""}) |
3795 | + c.Assert(err, IsNil) |
3796 | + c.Check(b.registry["dev-1"], Equals, sess2) |
3797 | +} |
3798 | + |
3799 | +func (s *simpleSuite) TestBroadcastExchange(c *C) { |
3800 | + sess := &simpleBrokerSession{ |
3801 | + levels: map[store.InternalChannelId]int64{}, |
3802 | + } |
3803 | + exchg := &simpleBroadcastExchange{ |
3804 | + chanId: store.SystemInternalChannelId, |
3805 | + topLevel: 3, |
3806 | + notificationPayloads: []json.RawMessage{ |
3807 | + json.RawMessage(`{"a":"x"}`), |
3808 | + json.RawMessage(`{"a":"y"}`), |
3809 | + }, |
3810 | + } |
3811 | + inMsg, outMsg, err := exchg.Prepare(sess) |
3812 | + c.Assert(err, IsNil) |
3813 | + // check |
3814 | + marshalled, err := json.Marshal(inMsg) |
3815 | + c.Assert(err, IsNil) |
3816 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) |
3817 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
3818 | + c.Assert(err, IsNil) |
3819 | + err = exchg.Acked(sess) |
3820 | + c.Assert(err, IsNil) |
3821 | + c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(3)) |
3822 | +} |
3823 | + |
3824 | +func (s *simpleSuite) TestBroadcastExchangeAckMismatch(c *C) { |
3825 | + sess := &simpleBrokerSession{ |
3826 | + levels: map[store.InternalChannelId]int64{}, |
3827 | + } |
3828 | + exchg := &simpleBroadcastExchange{ |
3829 | + chanId: store.SystemInternalChannelId, |
3830 | + topLevel: 3, |
3831 | + notificationPayloads: []json.RawMessage{ |
3832 | + json.RawMessage(`{"a":"y"}`), |
3833 | + }, |
3834 | + } |
3835 | + inMsg, outMsg, err := exchg.Prepare(sess) |
3836 | + c.Assert(err, IsNil) |
3837 | + // check |
3838 | + marshalled, err := json.Marshal(inMsg) |
3839 | + c.Assert(err, IsNil) |
3840 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
3841 | + err = json.Unmarshal([]byte(`{}`), outMsg) |
3842 | + c.Assert(err, IsNil) |
3843 | + err = exchg.Acked(sess) |
3844 | + c.Assert(err, Not(IsNil)) |
3845 | + c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(0)) |
3846 | +} |
3847 | + |
3848 | +func (s *simpleSuite) TestFilterByLevel(c *C) { |
3849 | + payloads := []json.RawMessage{ |
3850 | + json.RawMessage(`{"a": 3}`), |
3851 | + json.RawMessage(`{"a": 4}`), |
3852 | + json.RawMessage(`{"a": 5}`), |
3853 | + } |
3854 | + res := filterByLevel(5, 5, payloads) |
3855 | + c.Check(len(res), Equals, 0) |
3856 | + res = filterByLevel(4, 5, payloads) |
3857 | + c.Check(len(res), Equals, 1) |
3858 | + c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`)) |
3859 | + res = filterByLevel(3, 5, payloads) |
3860 | + c.Check(len(res), Equals, 2) |
3861 | + c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`)) |
3862 | + res = filterByLevel(2, 5, payloads) |
3863 | + c.Check(len(res), Equals, 3) |
3864 | + res = filterByLevel(1, 5, payloads) |
3865 | + c.Check(len(res), Equals, 3) |
3866 | +} |
3867 | + |
3868 | +func (s *simpleSuite) TestBroadcastExchangeFilterByLevel(c *C) { |
3869 | + sess := &simpleBrokerSession{ |
3870 | + levels: map[store.InternalChannelId]int64{ |
3871 | + store.SystemInternalChannelId: 2, |
3872 | + }, |
3873 | + } |
3874 | + exchg := &simpleBroadcastExchange{ |
3875 | + chanId: store.SystemInternalChannelId, |
3876 | + topLevel: 3, |
3877 | + notificationPayloads: []json.RawMessage{ |
3878 | + json.RawMessage(`{"a":"x"}`), |
3879 | + json.RawMessage(`{"a":"y"}`), |
3880 | + }, |
3881 | + } |
3882 | + inMsg, outMsg, err := exchg.Prepare(sess) |
3883 | + c.Assert(err, IsNil) |
3884 | + // check |
3885 | + marshalled, err := json.Marshal(inMsg) |
3886 | + c.Assert(err, IsNil) |
3887 | + c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
3888 | + err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
3889 | + c.Assert(err, IsNil) |
3890 | + err = exchg.Acked(sess) |
3891 | + c.Assert(err, IsNil) |
3892 | +} |
3893 | + |
3894 | +func (s *simpleSuite) TestBroadcast(c *C) { |
3895 | + sto := store.NewInMemoryPendingStore() |
3896 | + notification1 := json.RawMessage(`{"m": "M"}`) |
3897 | + sto.AppendToChannel(store.SystemInternalChannelId, notification1) |
3898 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, nil) |
3899 | + b.Start() |
3900 | + defer b.Stop() |
3901 | + sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}) |
3902 | + c.Assert(err, IsNil) |
3903 | + sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"}) |
3904 | + c.Assert(err, IsNil) |
3905 | + b.Broadcast(store.SystemInternalChannelId) |
3906 | + select { |
3907 | + case <-time.After(5 * time.Second): |
3908 | + c.Fatal("taking too long to get broadcast exchange") |
3909 | + case exchg1 := <-sess1.SessionChannel(): |
3910 | + c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{ |
3911 | + chanId: store.SystemInternalChannelId, |
3912 | + topLevel: 1, |
3913 | + notificationPayloads: []json.RawMessage{notification1}, |
3914 | + }) |
3915 | + } |
3916 | + select { |
3917 | + case <-time.After(5 * time.Second): |
3918 | + c.Fatal("taking too long to get broadcast exchange") |
3919 | + case exchg2 := <-sess2.SessionChannel(): |
3920 | + c.Check(exchg2, DeepEquals, &simpleBroadcastExchange{ |
3921 | + chanId: store.SystemInternalChannelId, |
3922 | + topLevel: 1, |
3923 | + notificationPayloads: []json.RawMessage{notification1}, |
3924 | + }) |
3925 | + } |
3926 | +} |
3927 | + |
3928 | +type testFailingStore struct { |
3929 | + store.InMemoryPendingStore |
3930 | + countdownToFail int |
3931 | +} |
3932 | + |
3933 | +func (sto *testFailingStore) GetChannelSnapshot(chanId store.InternalChannelId) (int64, []json.RawMessage, error) { |
3934 | + if sto.countdownToFail == 0 { |
3935 | + return 0, nil, errors.New("get channel snapshot fail") |
3936 | + } |
3937 | + sto.countdownToFail-- |
3938 | + return 0, nil, nil |
3939 | +} |
3940 | + |
3941 | +func (s *simpleSuite) TestBroadcastFail(c *C) { |
3942 | + buf := &helpers.SyncedLogBuffer{Written: make(chan bool, 1)} |
3943 | + logger := logger.NewSimpleLogger(buf, "error") |
3944 | + sto := &testFailingStore{countdownToFail: 1} |
3945 | + b := NewSimpleBroker(sto, &testBrokerConfig{}, logger) |
3946 | + b.Start() |
3947 | + defer b.Stop() |
3948 | + _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}) |
3949 | + c.Assert(err, IsNil) |
3950 | + b.Broadcast(store.SystemInternalChannelId) |
3951 | + select { |
3952 | + case <-time.After(5 * time.Second): |
3953 | + c.Fatal("taking too long to log error") |
3954 | + case <-buf.Written: |
3955 | + } |
3956 | + c.Check(buf.String(), Matches, ".*ERROR unsuccessful broadcast, get channel snapshot for 0: get channel snapshot fail\n") |
3957 | +} |
3958 | |
3959 | === added directory 'server/dev' |
3960 | === added file 'server/dev/config.go' |
3961 | --- server/dev/config.go 1970-01-01 00:00:00 +0000 |
3962 | +++ server/dev/config.go 2014-01-14 15:09:46 +0000 |
3963 | @@ -0,0 +1,110 @@ |
3964 | +/* |
3965 | + Copyright 2013-2014 Canonical Ltd. |
3966 | + |
3967 | + This program is free software: you can redistribute it and/or modify it |
3968 | + under the terms of the GNU General Public License version 3, as published |
3969 | + by the Free Software Foundation. |
3970 | + |
3971 | + This program is distributed in the hope that it will be useful, but |
3972 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
3973 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
3974 | + PURPOSE. See the GNU General Public License for more details. |
3975 | + |
3976 | + You should have received a copy of the GNU General Public License along |
3977 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
3978 | +*/ |
3979 | + |
3980 | +package main |
3981 | + |
3982 | +import ( |
3983 | + "fmt" |
3984 | + "io" |
3985 | + "launchpad.net/ubuntu-push/config" |
3986 | + "time" |
3987 | +) |
3988 | + |
3989 | +// expectedConfiguration is used as target for JSON parsing the configuration. |
3990 | +type expectedConfiguration struct { |
3991 | + // session configuration |
3992 | + PingInterval config.ConfigTimeDuration `json:"ping_interval"` |
3993 | + ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"` |
3994 | + // broker configuration |
3995 | + SessionQueueSize config.ConfigQueueSize `json:"session_queue_size"` |
3996 | + BrokerQueueSize config.ConfigQueueSize `json:"broker_queue_size"` |
3997 | + // device listener configuration |
3998 | + Addr config.ConfigHostPort `json:"addr"` |
3999 | + KeyPEMFile string `json:"key_pem_file"` |
4000 | + CertPEMFile string `json:"cert_pem_file"` |
4001 | + // api http server configuration |
4002 | + HTTPAddr config.ConfigHostPort `json:"http_addr"` |
4003 | + HTTPReadTimeout config.ConfigTimeDuration `json:"http_read_timeout"` |
4004 | + HTTPWriteTimeout config.ConfigTimeDuration `json:"http_write_timeout"` |
4005 | +} |
4006 | + |
4007 | +// configuration holds the server configuration and gives it access |
4008 | +// through the various component config interfaces. |
4009 | +type configuration struct { |
4010 | + parsed expectedConfiguration |
4011 | + certPEMBlock []byte |
4012 | + keyPEMBlock []byte |
4013 | +} |
4014 | + |
4015 | +func (cfg *configuration) PingInterval() time.Duration { |
4016 | + return cfg.parsed.PingInterval.TimeDuration() |
4017 | +} |
4018 | + |
4019 | +func (cfg *configuration) ExchangeTimeout() time.Duration { |
4020 | + return cfg.parsed.ExchangeTimeout.TimeDuration() |
4021 | +} |
4022 | + |
4023 | +func (cfg *configuration) SessionQueueSize() uint { |
4024 | + return cfg.parsed.SessionQueueSize.QueueSize() |
4025 | +} |
4026 | + |
4027 | +func (cfg *configuration) BrokerQueueSize() uint { |
4028 | + return cfg.parsed.BrokerQueueSize.QueueSize() |
4029 | +} |
4030 | + |
4031 | +func (cfg *configuration) Addr() string { |
4032 | + return cfg.parsed.Addr.HostPort() |
4033 | +} |
4034 | + |
4035 | +func (cfg *configuration) KeyPEMBlock() []byte { |
4036 | + return cfg.keyPEMBlock |
4037 | +} |
4038 | + |
4039 | +func (cfg *configuration) CertPEMBlock() []byte { |
4040 | + return cfg.certPEMBlock |
4041 | +} |
4042 | + |
4043 | +func (cfg *configuration) HTTPAddr() string { |
4044 | + return cfg.parsed.HTTPAddr.HostPort() |
4045 | +} |
4046 | + |
4047 | +func (cfg *configuration) HTTPReadTimeout() time.Duration { |
4048 | + return cfg.parsed.HTTPReadTimeout.TimeDuration() |
4049 | +} |
4050 | + |
4051 | +func (cfg *configuration) HTTPWriteTimeout() time.Duration { |
4052 | + return cfg.parsed.HTTPWriteTimeout.TimeDuration() |
4053 | +} |
4054 | + |
4055 | +// read reads & parses configuration from the reader. it uses baseDir |
4056 | +// to load mentioned files in the configuration. |
4057 | +func (cfg *configuration) read(r io.Reader, baseDir string) error { |
4058 | + err := config.ReadConfig(r, &cfg.parsed) |
4059 | + if err != nil { |
4060 | + return err |
4061 | + } |
4062 | + keyPEMBlock, err := config.LoadFile(cfg.parsed.KeyPEMFile, baseDir) |
4063 | + if err != nil { |
4064 | + return fmt.Errorf("reading key_pem_file: %v", err) |
4065 | + } |
4066 | + certPEMBlock, err := config.LoadFile(cfg.parsed.CertPEMFile, baseDir) |
4067 | + if err != nil { |
4068 | + return fmt.Errorf("reading cert_pem_file: %v", err) |
4069 | + } |
4070 | + cfg.keyPEMBlock = keyPEMBlock |
4071 | + cfg.certPEMBlock = certPEMBlock |
4072 | + return nil |
4073 | +} |
4074 | |
4075 | === added file 'server/dev/http.go' |
4076 | --- server/dev/http.go 1970-01-01 00:00:00 +0000 |
4077 | +++ server/dev/http.go 2014-01-14 15:09:46 +0000 |
4078 | @@ -0,0 +1,40 @@ |
4079 | +/* |
4080 | + Copyright 2013-2014 Canonical Ltd. |
4081 | + |
4082 | + This program is free software: you can redistribute it and/or modify it |
4083 | + under the terms of the GNU General Public License version 3, as published |
4084 | + by the Free Software Foundation. |
4085 | + |
4086 | + This program is distributed in the hope that it will be useful, but |
4087 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
4088 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
4089 | + PURPOSE. See the GNU General Public License for more details. |
4090 | + |
4091 | + You should have received a copy of the GNU General Public License along |
4092 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
4093 | +*/ |
4094 | + |
4095 | +package main |
4096 | + |
4097 | +import ( |
4098 | + "net" |
4099 | + "net/http" |
4100 | + "time" |
4101 | +) |
4102 | + |
4103 | +// A HTTPServeConfig holds the HTTP server config. |
4104 | +type HTTPServeConfig interface { |
4105 | + HTTPAddr() string |
4106 | + HTTPReadTimeout() time.Duration |
4107 | + HTTPWriteTimeout() time.Duration |
4108 | +} |
4109 | + |
4110 | +// RunHTTPServe serves HTTP requests. |
4111 | +func RunHTTPServe(lst net.Listener, h http.Handler, cfg HTTPServeConfig) error { |
4112 | + srv := &http.Server{ |
4113 | + Handler: h, |
4114 | + ReadTimeout: cfg.HTTPReadTimeout(), |
4115 | + WriteTimeout: cfg.HTTPReadTimeout(), |
4116 | + } |
4117 | + return srv.Serve(lst) |
4118 | +} |
4119 | |
4120 | === added file 'server/dev/http_test.go' |
4121 | --- server/dev/http_test.go 1970-01-01 00:00:00 +0000 |
4122 | +++ server/dev/http_test.go 2014-01-14 15:09:46 +0000 |
4123 | @@ -0,0 +1,70 @@ |
4124 | +/* |
4125 | + Copyright 2013-2014 Canonical Ltd. |
4126 | + |
4127 | + This program is free software: you can redistribute it and/or modify it |
4128 | + under the terms of the GNU General Public License version 3, as published |
4129 | + by the Free Software Foundation. |
4130 | + |
4131 | + This program is distributed in the hope that it will be useful, but |
4132 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
4133 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
4134 | + PURPOSE. See the GNU General Public License for more details. |
4135 | + |
4136 | + You should have received a copy of the GNU General Public License along |
4137 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
4138 | +*/ |
4139 | + |
4140 | +package main |
4141 | + |
4142 | +import ( |
4143 | + "fmt" |
4144 | + "io/ioutil" |
4145 | + . "launchpad.net/gocheck" |
4146 | + // "log" |
4147 | + "net" |
4148 | + "net/http" |
4149 | + "time" |
4150 | +) |
4151 | + |
4152 | +type httpSuite struct{} |
4153 | + |
4154 | +var _ = Suite(&httpSuite{}) |
4155 | + |
4156 | +type testHTTPServeConfig struct{} |
4157 | + |
4158 | +func (cfg testHTTPServeConfig) HTTPAddr() string { |
4159 | + return "127.0.0.1:0" |
4160 | +} |
4161 | + |
4162 | +func (cfg testHTTPServeConfig) HTTPReadTimeout() time.Duration { |
4163 | + return 5 * time.Second |
4164 | +} |
4165 | + |
4166 | +func (cfg testHTTPServeConfig) HTTPWriteTimeout() time.Duration { |
4167 | + return 5 * time.Second |
4168 | +} |
4169 | + |
4170 | +func testHandle(w http.ResponseWriter, r *http.Request) { |
4171 | + fmt.Fprintf(w, "yay!\n") |
4172 | +} |
4173 | + |
4174 | +func (s *httpSuite) TestRunHTTPServe(c *C) { |
4175 | + cfg := testHTTPServeConfig{} |
4176 | + lst, err := net.Listen("tcp", cfg.HTTPAddr()) |
4177 | + c.Assert(err, IsNil) |
4178 | + defer lst.Close() |
4179 | + errCh := make(chan error, 1) |
4180 | + h := http.HandlerFunc(testHandle) |
4181 | + go func() { |
4182 | + errCh <- RunHTTPServe(lst, h, cfg) |
4183 | + }() |
4184 | + resp, err := http.Get(fmt.Sprintf("http://%s/", lst.Addr())) |
4185 | + c.Assert(err, IsNil) |
4186 | + defer resp.Body.Close() |
4187 | + c.Assert(resp.StatusCode, Equals, 200) |
4188 | + body, err := ioutil.ReadAll(resp.Body) |
4189 | + c.Assert(err, IsNil) |
4190 | + c.Check(string(body), Equals, "yay!\n") |
4191 | + lst.Close() |
4192 | + c.Check(<-errCh, ErrorMatches, ".*closed.*") |
4193 | +} |
4194 | |
4195 | === added file 'server/dev/server.go' |
4196 | --- server/dev/server.go 1970-01-01 00:00:00 +0000 |
4197 | +++ server/dev/server.go 2014-01-14 15:09:46 +0000 |
4198 | @@ -0,0 +1,77 @@ |
4199 | +/* |
4200 | + Copyright 2013-2014 Canonical Ltd. |
4201 | + |
4202 | + This program is free software: you can redistribute it and/or modify it |
4203 | + under the terms of the GNU General Public License version 3, as published |
4204 | + by the Free Software Foundation. |
4205 | + |
4206 | + This program is distributed in the hope that it will be useful, but |
4207 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
4208 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
4209 | + PURPOSE. See the GNU General Public License for more details. |
4210 | + |
4211 | + You should have received a copy of the GNU General Public License along |
4212 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
4213 | +*/ |
4214 | + |
4215 | +// simple development server. |
4216 | +package main |
4217 | + |
4218 | +import ( |
4219 | + "launchpad.net/ubuntu-push/logger" |
4220 | + "launchpad.net/ubuntu-push/server/api" |
4221 | + "launchpad.net/ubuntu-push/server/broker" |
4222 | + "launchpad.net/ubuntu-push/server/listener" |
4223 | + "launchpad.net/ubuntu-push/server/session" |
4224 | + "launchpad.net/ubuntu-push/server/store" |
4225 | + "net" |
4226 | + "os" |
4227 | + "path/filepath" |
4228 | +) |
4229 | + |
4230 | +func main() { |
4231 | + logger := logger.NewSimpleLogger(os.Stderr, "debug") |
4232 | + if len(os.Args) < 2 { // xxx use flag |
4233 | + logger.Fatalf("missing config file") |
4234 | + } |
4235 | + configFName := os.Args[1] |
4236 | + f, err := os.Open(configFName) |
4237 | + if err != nil { |
4238 | + logger.Fatalf("reading config: %v", err) |
4239 | + } |
4240 | + cfg := &configuration{} |
4241 | + err = cfg.read(f, filepath.Dir(configFName)) |
4242 | + if err != nil { |
4243 | + logger.Fatalf("reading config: %v", err) |
4244 | + } |
4245 | + // setup a pending store and start the broker |
4246 | + sto := store.NewInMemoryPendingStore() |
4247 | + broker := broker.NewSimpleBroker(sto, cfg, logger) |
4248 | + broker.Start() |
4249 | + defer broker.Stop() |
4250 | + // serve the http api |
4251 | + httpLst, err := net.Listen("tcp", cfg.HTTPAddr()) |
4252 | + if err != nil { |
4253 | + logger.Fatalf("start http listening: %v", err) |
4254 | + } |
4255 | + handler := api.MakeHandlersMux(sto, broker, logger) |
4256 | + logger.Infof("listening for http on %v", httpLst.Addr()) |
4257 | + go func() { |
4258 | + err := RunHTTPServe(httpLst, handler, cfg) |
4259 | + if err != nil { |
4260 | + logger.Fatalf("accepting http connections: %v", err) |
4261 | + } |
4262 | + }() |
4263 | + // listen for device connections |
4264 | + lst, err := listener.DeviceListen(cfg) |
4265 | + if err != nil { |
4266 | + logger.Fatalf("start device listening: %v", err) |
4267 | + } |
4268 | + logger.Infof("listening for devices on %v", lst.Addr()) |
4269 | + err = lst.AcceptLoop(func(conn net.Conn) error { |
4270 | + return session.Session(conn, broker, cfg, logger) |
4271 | + }, logger) |
4272 | + if err != nil { |
4273 | + logger.Fatalf("accepting device connections: %v", err) |
4274 | + } |
4275 | +} |
4276 | |
4277 | === added file 'server/dev/server_test.go' |
4278 | --- server/dev/server_test.go 1970-01-01 00:00:00 +0000 |
4279 | +++ server/dev/server_test.go 2014-01-14 15:09:46 +0000 |
4280 | @@ -0,0 +1,105 @@ |
4281 | +/* |
4282 | + Copyright 2013-2014 Canonical Ltd. |
4283 | + |
4284 | + This program is free software: you can redistribute it and/or modify it |
4285 | + under the terms of the GNU General Public License version 3, as published |
4286 | + by the Free Software Foundation. |
4287 | + |
4288 | + This program is distributed in the hope that it will be useful, but |
4289 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
4290 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
4291 | + PURPOSE. See the GNU General Public License for more details. |
4292 | + |
4293 | + You should have received a copy of the GNU General Public License along |
4294 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
4295 | +*/ |
4296 | + |
4297 | +package main |
4298 | + |
4299 | +import ( |
4300 | + // "fmt" |
4301 | + "bytes" |
4302 | + "io/ioutil" |
4303 | + . "launchpad.net/gocheck" |
4304 | + "os" |
4305 | + "path/filepath" |
4306 | + "testing" |
4307 | + "time" |
4308 | +) |
4309 | + |
4310 | +func TestDevserver(t *testing.T) { TestingT(t) } |
4311 | + |
4312 | +type devserverSuite struct{} |
4313 | + |
4314 | +var _ = Suite(&devserverSuite{}) |
4315 | + |
4316 | +func (s *devserverSuite) TestConfigRead(c *C) { |
4317 | + tmpDir := c.MkDir() |
4318 | + err := ioutil.WriteFile(filepath.Join(tmpDir, "key.key"), []byte("KeY"), os.ModePerm) |
4319 | + c.Assert(err, IsNil) |
4320 | + err = ioutil.WriteFile(filepath.Join(tmpDir, "cert.cert"), []byte("CeRt"), os.ModePerm) |
4321 | + c.Assert(err, IsNil) |
4322 | + buf := bytes.NewBufferString(`{ |
4323 | +"ping_interval": "5m", |
4324 | +"exchange_timeout": "10s", |
4325 | +"session_queue_size": 10, |
4326 | +"broker_queue_size": 100, |
4327 | +"addr": "127.0.0.1:9999", |
4328 | +"key_pem_file": "key.key", |
4329 | +"cert_pem_file": "cert.cert", |
4330 | +"http_addr": "127.0.0.1:8080", |
4331 | +"http_read_timeout": "5s", |
4332 | +"http_write_timeout": "10s" |
4333 | +}`) |
4334 | + cfg := &configuration{} |
4335 | + err = cfg.read(buf, tmpDir) |
4336 | + c.Assert(err, IsNil) |
4337 | + c.Check(cfg.PingInterval(), Equals, 5*time.Minute) |
4338 | + c.Check(cfg.ExchangeTimeout(), Equals, 10*time.Second) |
4339 | + c.Check(cfg.BrokerQueueSize(), Equals, uint(100)) |
4340 | + c.Check(cfg.SessionQueueSize(), Equals, uint(10)) |
4341 | + c.Check(cfg.Addr(), Equals, "127.0.0.1:9999") |
4342 | + c.Check(string(cfg.KeyPEMBlock()), Equals, "KeY") |
4343 | + c.Check(string(cfg.CertPEMBlock()), Equals, "CeRt") |
4344 | + c.Check(cfg.HTTPAddr(), Equals, "127.0.0.1:8080") |
4345 | + c.Check(cfg.HTTPReadTimeout(), Equals, 5*time.Second) |
4346 | + c.Check(cfg.HTTPWriteTimeout(), Equals, 10*time.Second) |
4347 | +} |
4348 | + |
4349 | +func (s *devserverSuite) TestConfigReadErrors(c *C) { |
4350 | + tmpDir := c.MkDir() |
4351 | + checkError := func(config, expectedErr string) { |
4352 | + cfg := &configuration{} |
4353 | + err := cfg.read(bytes.NewBufferString(config), tmpDir) |
4354 | + c.Check(err, ErrorMatches, expectedErr) |
4355 | + } |
4356 | + checkError("", "EOF") |
4357 | + checkError(`{"ping_interval": "1m"}`, "missing exchange_timeout") |
4358 | + checkError(`{"ping_interval": "1m", "exchange_timeout": "5s", "session_queue_size": "foo"}`, "session_queue_size:.*type uint") |
4359 | + checkError(`{ |
4360 | +"exchange_timeout": "10s", |
4361 | +"ping_interval": "5m", |
4362 | +"broker_queue_size": 100, |
4363 | +"session_queue_size": 10, |
4364 | +"addr": ":9000", |
4365 | +"key_pem_file": "doesntexist", |
4366 | +"cert_pem_file": "doesntexist", |
4367 | +"http_addr": ":8080", |
4368 | +"http_read_timeout": "5s", |
4369 | +"http_write_timeout": "10s" |
4370 | +}`, "reading key_pem_file:.*no such file.*") |
4371 | + err := ioutil.WriteFile(filepath.Join(tmpDir, "key.key"), []byte("KeY"), os.ModePerm) |
4372 | + c.Assert(err, IsNil) |
4373 | + checkError(`{ |
4374 | +"exchange_timeout": "10s", |
4375 | +"ping_interval": "5m", |
4376 | +"broker_queue_size": 100, |
4377 | +"session_queue_size": 10, |
4378 | +"addr": ":9000", |
4379 | +"key_pem_file": "key.key", |
4380 | +"cert_pem_file": "doesntexist", |
4381 | +"http_addr": ":8080", |
4382 | +"http_read_timeout": "5s", |
4383 | +"http_write_timeout": "10s" |
4384 | +}`, "reading cert_pem_file:.*no such file.*") |
4385 | +} |
4386 | |
4387 | === added directory 'server/listener' |
4388 | === added file 'server/listener/listener.go' |
4389 | --- server/listener/listener.go 1970-01-01 00:00:00 +0000 |
4390 | +++ server/listener/listener.go 2014-01-14 15:09:46 +0000 |
4391 | @@ -0,0 +1,86 @@ |
4392 | +/* |
4393 | + Copyright 2013-2014 Canonical Ltd. |
4394 | + |
4395 | + This program is free software: you can redistribute it and/or modify it |
4396 | + under the terms of the GNU General Public License version 3, as published |
4397 | + by the Free Software Foundation. |
4398 | + |
4399 | + This program is distributed in the hope that it will be useful, but |
4400 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
4401 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
4402 | + PURPOSE. See the GNU General Public License for more details. |
4403 | + |
4404 | + You should have received a copy of the GNU General Public License along |
4405 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
4406 | +*/ |
4407 | + |
4408 | +// Package listener has code to listen for device connections and setup sessions for them. |
4409 | +package listener |
4410 | + |
4411 | +import ( |
4412 | + "crypto/tls" |
4413 | + "launchpad.net/ubuntu-push/logger" |
4414 | + "net" |
4415 | + "time" |
4416 | +) |
4417 | + |
4418 | +// A DeviceListenerConfig offers the DeviceListener configuration. |
4419 | +type DeviceListenerConfig interface { |
4420 | + // Addr to listen on. |
4421 | + Addr() string |
4422 | + // TLS key |
4423 | + KeyPEMBlock() []byte |
4424 | + // TLS cert |
4425 | + CertPEMBlock() []byte |
4426 | +} |
4427 | + |
4428 | +// DeviceListener listens and setup sessions from device connections. |
4429 | +type DeviceListener struct { |
4430 | + net.Listener |
4431 | +} |
4432 | + |
4433 | +// DeviceListen creates a DeviceListener for device connections based on config. |
4434 | +func DeviceListen(cfg DeviceListenerConfig) (*DeviceListener, error) { |
4435 | + cert, err := tls.X509KeyPair(cfg.CertPEMBlock(), cfg.KeyPEMBlock()) |
4436 | + if err != nil { |
4437 | + return nil, err |
4438 | + } |
4439 | + tlsCfg := &tls.Config{ |
4440 | + Certificates: []tls.Certificate{cert}, |
4441 | + SessionTicketsDisabled: true, |
4442 | + } |
4443 | + lst, err := tls.Listen("tcp", cfg.Addr(), tlsCfg) |
4444 | + return &DeviceListener{lst}, err |
4445 | +} |
4446 | + |
4447 | +// handleTemporary checks and handles if the error is just a temporary network |
4448 | +// error. |
4449 | +func handleTemporary(err error) bool { |
4450 | + if netError, isNetError := err.(net.Error); isNetError { |
4451 | + if netError.Temporary() { |
4452 | + // wait, xxx exponential backoff? |
4453 | + time.Sleep(100 * time.Millisecond) |
4454 | + return true |
4455 | + } |
4456 | + } |
4457 | + return false |
4458 | +} |
4459 | + |
4460 | +// AcceptLoop accepts connections and starts sessions for them. |
4461 | +func (dl *DeviceListener) AcceptLoop(session func(net.Conn) error, logger logger.Logger) error { |
4462 | + for { |
4463 | + // xxx enforce a connection limit |
4464 | + conn, err := dl.Listener.Accept() |
4465 | + if err != nil { |
4466 | + if handleTemporary(err) { |
4467 | + logger.Errorf("device listener: %s -- retrying", err) |
4468 | + continue |
4469 | + } |
4470 | + return err |
4471 | + } |
4472 | + go func() { |
4473 | + defer logger.Recoverf("terminating device connection") |
4474 | + session(conn) |
4475 | + }() |
4476 | + } |
4477 | +} |
4478 | |
4479 | === added file 'server/listener/listener_test.go' |
4480 | --- server/listener/listener_test.go 1970-01-01 00:00:00 +0000 |
4481 | +++ server/listener/listener_test.go 2014-01-14 15:09:46 +0000 |
4482 | @@ -0,0 +1,239 @@ |
4483 | +/* |
4484 | + Copyright 2013-2014 Canonical Ltd. |
4485 | + |
4486 | + This program is free software: you can redistribute it and/or modify it |
4487 | + under the terms of the GNU General Public License version 3, as published |
4488 | + by the Free Software Foundation. |
4489 | + |
4490 | + This program is distributed in the hope that it will be useful, but |
4491 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
4492 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
4493 | + PURPOSE. See the GNU General Public License for more details. |
4494 | + |
4495 | + You should have received a copy of the GNU General Public License along |
4496 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
4497 | +*/ |
4498 | + |
4499 | +package listener |
4500 | + |
4501 | +import ( |
4502 | + "crypto/tls" |
4503 | + "crypto/x509" |
4504 | + . "launchpad.net/gocheck" |
4505 | + "launchpad.net/ubuntu-push/logger" |
4506 | + helpers "launchpad.net/ubuntu-push/testing" |
4507 | + "net" |
4508 | + "syscall" |
4509 | + "testing" |
4510 | + "time" |
4511 | +) |
4512 | + |
4513 | +func TestListener(t *testing.T) { TestingT(t) } |
4514 | + |
4515 | +type listenerSuite struct{} |
4516 | + |
4517 | +var _ = Suite(&listenerSuite{}) |
4518 | + |
4519 | +const NofileMax = 500 |
4520 | + |
4521 | +func (s *listenerSuite) SetUpSuite(*C) { |
4522 | + // make it easier to get a too many open files error |
4523 | + var nofileLimit syscall.Rlimit |
4524 | + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &nofileLimit) |
4525 | + if err != nil { |
4526 | + panic(err) |
4527 | + } |
4528 | + nofileLimit.Cur = NofileMax |
4529 | + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &nofileLimit) |
4530 | + if err != nil { |
4531 | + panic(err) |
4532 | + } |
4533 | +} |
4534 | + |
4535 | +type testDevListenerCfg struct { |
4536 | + addr string |
4537 | +} |
4538 | + |
4539 | +func (cfg *testDevListenerCfg) Addr() string { |
4540 | + return cfg.addr |
4541 | +} |
4542 | + |
4543 | +// key&cert generated with go run /usr/lib/go/src/pkg/crypto/tls/generate_cert.go -ca -host localhost -rsa-bits 512 -duration 87600h |
4544 | + |
4545 | +func (cfg *testDevListenerCfg) KeyPEMBlock() []byte { |
4546 | + return []byte(`-----BEGIN RSA PRIVATE KEY----- |
4547 | +MIIBPAIBAAJBAPw+niki17X2qALE2A2AzE1q5dvK9CI4OduRtT9IgbFLC6psqAT2 |
4548 | +1NA+QbY17nWSSpyP65zkMkwKXrbDzstwLPkCAwEAAQJAKwXbIBULScP6QA6m8xam |
4549 | +wgWbkvN41GVWqPafPV32kPBvKwSc+M1e+JR7g3/xPZE7TCELcfYi4yXEHZZI3Pbh |
4550 | +oQIhAP/UsgJbsfH1GFv8Y8qGl5l/kmwwkwHhuKvEC87Yur9FAiEA/GlQv3ZfaXnT |
4551 | +lcCFT0aL02O0RDiRYyMUG/JAZQJs6CUCIQCHO5SZYIUwxIGK5mCNxxXOAzyQSiD7 |
4552 | +hqkKywf+4FvfDQIhALa0TLyqJFom0t7c4iIGAIRc8UlIYQSPiajI64+x9775AiEA |
4553 | +0v4fgSK/Rq059zW1753JjuB6aR0Uh+3RqJII4dUR1Wg= |
4554 | +-----END RSA PRIVATE KEY-----`) |
4555 | +} |
4556 | + |
4557 | +func (cfg *testDevListenerCfg) CertPEMBlock() []byte { |
4558 | + return []byte(`-----BEGIN CERTIFICATE----- |
4559 | +MIIBYzCCAQ+gAwIBAgIBADALBgkqhkiG9w0BAQUwEjEQMA4GA1UEChMHQWNtZSBD |
4560 | +bzAeFw0xMzEyMTkyMDU1NDNaFw0yMzEyMTcyMDU1NDNaMBIxEDAOBgNVBAoTB0Fj |
4561 | +bWUgQ28wWjALBgkqhkiG9w0BAQEDSwAwSAJBAPw+niki17X2qALE2A2AzE1q5dvK |
4562 | +9CI4OduRtT9IgbFLC6psqAT21NA+QbY17nWSSpyP65zkMkwKXrbDzstwLPkCAwEA |
4563 | +AaNUMFIwDgYDVR0PAQH/BAQDAgCkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1Ud |
4564 | +EwEB/wQFMAMBAf8wGgYDVR0RBBMwEYIJbG9jYWxob3N0hwR/AAABMAsGCSqGSIb3 |
4565 | +DQEBBQNBAFqiVI+Km2XPSO+pxITaPvhmuzg+XG3l1+2di3gL+HlDobocjBqRctRU |
4566 | +YySO32W07acjGJmCHUKpCJuq9X8hpmk= |
4567 | +-----END CERTIFICATE-----`) |
4568 | +} |
4569 | + |
4570 | +func (s *listenerSuite) TestDeviceListen(c *C) { |
4571 | + lst, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:0"}) |
4572 | + c.Check(err, IsNil) |
4573 | + defer lst.Close() |
4574 | + c.Check(lst.Addr().String(), Matches, `127.0.0.1:\d{5}`) |
4575 | +} |
4576 | + |
4577 | +func (s *listenerSuite) TestDeviceListenError(c *C) { |
4578 | + // assume tests are not running as root |
4579 | + _, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:99"}) |
4580 | + c.Check(err, ErrorMatches, ".*permission denied.*") |
4581 | +} |
4582 | + |
4583 | +type testNetError struct { |
4584 | + temp bool |
4585 | +} |
4586 | + |
4587 | +func (tne *testNetError) Error() string { |
4588 | + return "test net error" |
4589 | +} |
4590 | + |
4591 | +func (tne *testNetError) Temporary() bool { |
4592 | + return tne.temp |
4593 | +} |
4594 | + |
4595 | +func (tne *testNetError) Timeout() bool { |
4596 | + return false |
4597 | +} |
4598 | + |
4599 | +var _ net.Error = &testNetError{} // sanity check |
4600 | + |
4601 | +func (s *listenerSuite) TestHandleTemporary(c *C) { |
4602 | + c.Check(handleTemporary(&testNetError{true}), Equals, true) |
4603 | + c.Check(handleTemporary(&testNetError{false}), Equals, false) |
4604 | +} |
4605 | + |
4606 | +func testSession(conn net.Conn) error { |
4607 | + defer conn.Close() |
4608 | + conn.SetDeadline(time.Now().Add(2 * time.Second)) |
4609 | + var buf [1]byte |
4610 | + _, err := conn.Read(buf[:]) |
4611 | + if err != nil { |
4612 | + return err |
4613 | + } |
4614 | + _, err = conn.Write(buf[:]) |
4615 | + return err |
4616 | +} |
4617 | + |
4618 | +func testTlsDial(c *C, addr string) (net.Conn, error) { |
4619 | + cp := x509.NewCertPool() |
4620 | + ok := cp.AppendCertsFromPEM((&testDevListenerCfg{}).CertPEMBlock()) |
4621 | + c.Assert(ok, Equals, true) |
4622 | + return tls.Dial("tcp", addr, &tls.Config{RootCAs: cp}) |
4623 | +} |
4624 | + |
4625 | +func testWriteByte(c *C, conn net.Conn, toWrite uint32) { |
4626 | + conn.SetDeadline(time.Now().Add(2 * time.Second)) |
4627 | + _, err := conn.Write([]byte{byte(toWrite)}) |
4628 | + c.Assert(err, IsNil) |
4629 | +} |
4630 | + |
4631 | +func testReadByte(c *C, conn net.Conn, expected uint32) { |
4632 | + var buf [1]byte |
4633 | + _, err := conn.Read(buf[:]) |
4634 | + c.Check(err, IsNil) |
4635 | + c.Check(buf[0], Equals, byte(expected)) |
4636 | +} |
4637 | + |
4638 | +func (s *listenerSuite) TestDeviceAcceptLoop(c *C) { |
4639 | + buf := &helpers.SyncedLogBuffer{} |
4640 | + logger := logger.NewSimpleLogger(buf, "error") |
4641 | + lst, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:0"}) |
4642 | + c.Check(err, IsNil) |
4643 | + defer lst.Close() |
4644 | + errCh := make(chan error) |
4645 | + go func() { |
4646 | + errCh <- lst.AcceptLoop(testSession, logger) |
4647 | + }() |
4648 | + listenerAddr := lst.Addr().String() |
4649 | + conn1, err := testTlsDial(c, listenerAddr) |
4650 | + c.Assert(err, IsNil) |
4651 | + defer conn1.Close() |
4652 | + testWriteByte(c, conn1, '1') |
4653 | + conn2, err := testTlsDial(c, listenerAddr) |
4654 | + c.Assert(err, IsNil) |
4655 | + defer conn2.Close() |
4656 | + testWriteByte(c, conn2, '2') |
4657 | + testReadByte(c, conn1, '1') |
4658 | + testReadByte(c, conn2, '2') |
4659 | + lst.Close() |
4660 | + c.Check(<-errCh, ErrorMatches, ".*use of closed.*") |
4661 | + c.Check(buf.String(), Equals, "") |
4662 | +} |
4663 | + |
4664 | +func (s *listenerSuite) TestDeviceAcceptLoopTemporaryError(c *C) { |
4665 | + buf := &helpers.SyncedLogBuffer{} |
4666 | + logger := logger.NewSimpleLogger(buf, "error") |
4667 | + // ENFILE is not the temp network error we want to handle this way |
4668 | + // but is relatively easy to generate in a controlled way |
4669 | + var err error |
4670 | + lst, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:0"}) |
4671 | + c.Check(err, IsNil) |
4672 | + defer lst.Close() |
4673 | + errCh := make(chan error) |
4674 | + go func() { |
4675 | + errCh <- lst.AcceptLoop(testSession, logger) |
4676 | + }() |
4677 | + listenerAddr := lst.Addr().String() |
4678 | + conns := make([]net.Conn, 0, NofileMax) |
4679 | + for i := 0; i < NofileMax; i++ { |
4680 | + var conn1 net.Conn |
4681 | + conn1, err = net.Dial("tcp", listenerAddr) |
4682 | + if err != nil { |
4683 | + break |
4684 | + } |
4685 | + defer conn1.Close() |
4686 | + conns = append(conns, conn1) |
4687 | + } |
4688 | + c.Assert(err, ErrorMatches, "*.too many open.*") |
4689 | + for _, conn := range conns { |
4690 | + conn.Close() |
4691 | + } |
4692 | + conn2, err := testTlsDial(c, listenerAddr) |
4693 | + c.Assert(err, IsNil) |
4694 | + defer conn2.Close() |
4695 | + testWriteByte(c, conn2, '2') |
4696 | + testReadByte(c, conn2, '2') |
4697 | + lst.Close() |
4698 | + c.Check(<-errCh, ErrorMatches, ".*use of closed.*") |
4699 | + c.Check(buf.String(), Matches, ".*device listener:.*accept.*too many open.*-- retrying\n") |
4700 | +} |
4701 | + |
4702 | +func (s *listenerSuite) TestDeviceAcceptLoopPanic(c *C) { |
4703 | + buf := &helpers.SyncedLogBuffer{} |
4704 | + logger := logger.NewSimpleLogger(buf, "error") |
4705 | + lst, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:0"}) |
4706 | + c.Check(err, IsNil) |
4707 | + defer lst.Close() |
4708 | + errCh := make(chan error) |
4709 | + go func() { |
4710 | + errCh <- lst.AcceptLoop(func(conn net.Conn) error { |
4711 | + defer conn.Close() |
4712 | + panic("session panic") |
4713 | + }, logger) |
4714 | + }() |
4715 | + listenerAddr := lst.Addr().String() |
4716 | + _, err = testTlsDial(c, listenerAddr) |
4717 | + c.Assert(err, Not(IsNil)) |
4718 | + lst.Close() |
4719 | + c.Check(<-errCh, ErrorMatches, ".*use of closed.*") |
4720 | + c.Check(buf.String(), Matches, "(?s).*session panic!! terminating device connection:\n.*AcceptLoop.*") |
4721 | +} |
4722 | |
4723 | === added directory 'server/session' |
4724 | === added file 'server/session/session.go' |
4725 | --- server/session/session.go 1970-01-01 00:00:00 +0000 |
4726 | +++ server/session/session.go 2014-01-14 15:09:46 +0000 |
4727 | @@ -0,0 +1,157 @@ |
4728 | +/* |
4729 | + Copyright 2013-2014 Canonical Ltd. |
4730 | + |
4731 | + This program is free software: you can redistribute it and/or modify it |
4732 | + under the terms of the GNU General Public License version 3, as published |
4733 | + by the Free Software Foundation. |
4734 | + |
4735 | + This program is distributed in the hope that it will be useful, but |
4736 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
4737 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
4738 | + PURPOSE. See the GNU General Public License for more details. |
4739 | + |
4740 | + You should have received a copy of the GNU General Public License along |
4741 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
4742 | +*/ |
4743 | + |
4744 | +// Package session has code handling long-lived connections from devices. |
4745 | +package session |
4746 | + |
4747 | +import ( |
4748 | + "launchpad.net/ubuntu-push/logger" |
4749 | + "launchpad.net/ubuntu-push/protocol" |
4750 | + "launchpad.net/ubuntu-push/server/broker" |
4751 | + "net" |
4752 | + "time" |
4753 | +) |
4754 | + |
4755 | +// SessionConfig is for carrying the session configuration. |
4756 | +type SessionConfig interface { |
4757 | + // pings are emitted each ping interval |
4758 | + PingInterval() time.Duration |
4759 | + // send and waiting for response shouldn't take more than exchange |
4760 | + // timeout |
4761 | + ExchangeTimeout() time.Duration |
4762 | +} |
4763 | + |
4764 | +// sessionStart manages the start of the protocol session. |
4765 | +func sessionStart(proto protocol.Protocol, brkr broker.Broker, cfg SessionConfig) (broker.BrokerSession, error) { |
4766 | + var connMsg protocol.ConnectMsg |
4767 | + proto.SetDeadline(time.Now().Add(cfg.ExchangeTimeout())) |
4768 | + err := proto.ReadMessage(&connMsg) |
4769 | + if err != nil { |
4770 | + return nil, err |
4771 | + } |
4772 | + if connMsg.Type != "connect" { |
4773 | + return nil, &broker.ErrAbort{"expected CONNECT message"} |
4774 | + } |
4775 | + return brkr.Register(&connMsg) |
4776 | +} |
4777 | + |
4778 | +// exchange writes outMsg message, reads answer in inMsg |
4779 | +func exchange(proto protocol.Protocol, outMsg, inMsg interface{}, exchangeTimeout time.Duration) error { |
4780 | + proto.SetDeadline(time.Now().Add(exchangeTimeout)) |
4781 | + err := proto.WriteMessage(outMsg) |
4782 | + if err != nil { |
4783 | + return err |
4784 | + } |
4785 | + err = proto.ReadMessage(inMsg) |
4786 | + if err != nil { |
4787 | + return err |
4788 | + } |
4789 | + return nil |
4790 | +} |
4791 | + |
4792 | +// sessionLoop manages the exchanges of the protocol session. |
4793 | +func sessionLoop(proto protocol.Protocol, sess broker.BrokerSession, cfg SessionConfig) error { |
4794 | + pingInterval := cfg.PingInterval() |
4795 | + exchangeTimeout := cfg.ExchangeTimeout() |
4796 | + pingTimer := time.NewTimer(pingInterval) |
4797 | + ch := sess.SessionChannel() |
4798 | + for { |
4799 | + select { |
4800 | + case <-pingTimer.C: |
4801 | + pingMsg := &protocol.PingPongMsg{"ping"} |
4802 | + var pongMsg protocol.PingPongMsg |
4803 | + err := exchange(proto, pingMsg, &pongMsg, exchangeTimeout) |
4804 | + if err != nil { |
4805 | + return err |
4806 | + } |
4807 | + if pongMsg.Type != "pong" { |
4808 | + return &broker.ErrAbort{"expected PONG message"} |
4809 | + } |
4810 | + pingTimer.Reset(pingInterval) |
4811 | + case exchg := <-ch: |
4812 | + // xxx later can use ch closing for shutdown/reset |
4813 | + pingTimer.Stop() |
4814 | + outMsg, inMsg, err := exchg.Prepare(sess) |
4815 | + if err != nil { |
4816 | + return err |
4817 | + } |
4818 | + for { |
4819 | + done := outMsg.Split() |
4820 | + err = exchange(proto, outMsg, inMsg, exchangeTimeout) |
4821 | + if err != nil { |
4822 | + return err |
4823 | + } |
4824 | + if done { |
4825 | + pingTimer.Reset(pingInterval) |
4826 | + } |
4827 | + err = exchg.Acked(sess) |
4828 | + if err != nil { |
4829 | + return err |
4830 | + } |
4831 | + if done { |
4832 | + break |
4833 | + } |
4834 | + } |
4835 | + } |
4836 | + } |
4837 | + return nil |
4838 | +} |
4839 | + |
4840 | +var sessionsEpoch = time.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano() |
4841 | + |
4842 | +// sessionTracker logs session events. |
4843 | +type sessionTracker struct { |
4844 | + logger.Logger |
4845 | + sessionId int64 // xxx use timeuuid later |
4846 | +} |
4847 | + |
4848 | +func (trk *sessionTracker) start(conn interface { |
4849 | + RemoteAddr() net.Addr |
4850 | +}) { |
4851 | + trk.sessionId = time.Now().UnixNano() - sessionsEpoch |
4852 | + trk.Debugf("session(%x) connected %v", trk.sessionId, conn.RemoteAddr()) |
4853 | +} |
4854 | + |
4855 | +func (trk *sessionTracker) registered(sess broker.BrokerSession) { |
4856 | + trk.Infof("session(%x) registered %v", trk.sessionId, sess.DeviceId()) |
4857 | +} |
4858 | + |
4859 | +func (trk *sessionTracker) end(err error) error { |
4860 | + trk.Debugf("session(%x) ended with: %v", trk.sessionId, err) |
4861 | + return err |
4862 | +} |
4863 | + |
4864 | +// Session manages the session with a client. |
4865 | +func Session(conn net.Conn, brkr broker.Broker, cfg SessionConfig, logger logger.Logger) error { |
4866 | + defer conn.Close() |
4867 | + track := sessionTracker{Logger: logger} |
4868 | + track.start(conn) |
4869 | + v, err := protocol.ReadWireFormatVersion(conn, cfg.ExchangeTimeout()) |
4870 | + if err != nil { |
4871 | + return track.end(err) |
4872 | + } |
4873 | + if v != protocol.ProtocolWireVersion { |
4874 | + return track.end(&broker.ErrAbort{"unexpected wire format version"}) |
4875 | + } |
4876 | + proto := protocol.NewProtocol0(conn) |
4877 | + sess, err := sessionStart(proto, brkr, cfg) |
4878 | + if err != nil { |
4879 | + return track.end(err) |
4880 | + } |
4881 | + track.registered(sess) |
4882 | + defer brkr.Unregister(sess) |
4883 | + return track.end(sessionLoop(proto, sess, cfg)) |
4884 | +} |
4885 | |
4886 | === added file 'server/session/session_test.go' |
4887 | --- server/session/session_test.go 1970-01-01 00:00:00 +0000 |
4888 | +++ server/session/session_test.go 2014-01-14 15:09:46 +0000 |
4889 | @@ -0,0 +1,619 @@ |
4890 | +/* |
4891 | + Copyright 2013-2014 Canonical Ltd. |
4892 | + |
4893 | + This program is free software: you can redistribute it and/or modify it |
4894 | + under the terms of the GNU General Public License version 3, as published |
4895 | + by the Free Software Foundation. |
4896 | + |
4897 | + This program is distributed in the hope that it will be useful, but |
4898 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
4899 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
4900 | + PURPOSE. See the GNU General Public License for more details. |
4901 | + |
4902 | + You should have received a copy of the GNU General Public License along |
4903 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
4904 | +*/ |
4905 | + |
4906 | +package session |
4907 | + |
4908 | +import ( |
4909 | + "bufio" |
4910 | + "bytes" |
4911 | + "encoding/json" |
4912 | + "errors" |
4913 | + "fmt" |
4914 | + "io" |
4915 | + "io/ioutil" |
4916 | + . "launchpad.net/gocheck" |
4917 | + "launchpad.net/ubuntu-push/logger" |
4918 | + "launchpad.net/ubuntu-push/protocol" |
4919 | + "launchpad.net/ubuntu-push/server/broker" |
4920 | + "net" |
4921 | + "reflect" |
4922 | + "testing" |
4923 | + "time" |
4924 | +) |
4925 | + |
4926 | +func TestSession(t *testing.T) { TestingT(t) } |
4927 | + |
4928 | +type sessionSuite struct{} |
4929 | + |
4930 | +var _ = Suite(&sessionSuite{}) |
4931 | + |
4932 | +type testProtocol struct { |
4933 | + up chan interface{} |
4934 | + down chan interface{} |
4935 | +} |
4936 | + |
4937 | +// takeNext takes a value from given channel with a 5s timeout |
4938 | +func takeNext(ch <-chan interface{}) interface{} { |
4939 | + select { |
4940 | + case <-time.After(5 * time.Second): |
4941 | + panic("test protocol exchange stuck: too long waiting") |
4942 | + case v := <-ch: |
4943 | + return v |
4944 | + } |
4945 | + return nil |
4946 | +} |
4947 | + |
4948 | +func (c *testProtocol) SetDeadline(t time.Time) { |
4949 | + deadAfter := t.Sub(time.Now()) |
4950 | + deadAfter = (deadAfter + time.Millisecond/2) / time.Millisecond * time.Millisecond |
4951 | + c.down <- fmt.Sprintf("deadline %v", deadAfter) |
4952 | +} |
4953 | + |
4954 | +func (c *testProtocol) ReadMessage(dest interface{}) error { |
4955 | + switch v := takeNext(c.up).(type) { |
4956 | + case error: |
4957 | + return v |
4958 | + default: |
4959 | + // make sure JSON.Unmarshal works with dest |
4960 | + var marshalledMsg []byte |
4961 | + marshalledMsg, err := json.Marshal(v) |
4962 | + if err != nil { |
4963 | + return fmt.Errorf("can't jsonify test value: %v", v) |
4964 | + } |
4965 | + return json.Unmarshal(marshalledMsg, dest) |
4966 | + } |
4967 | + return nil |
4968 | +} |
4969 | + |
4970 | +func (c *testProtocol) WriteMessage(src interface{}) error { |
4971 | + // make sure JSON.Marshal works with src |
4972 | + _, err := json.Marshal(src) |
4973 | + if err != nil { |
4974 | + return err |
4975 | + } |
4976 | + val := reflect.ValueOf(src) |
4977 | + if val.Kind() == reflect.Ptr { |
4978 | + src = val.Elem().Interface() |
4979 | + } |
4980 | + c.down <- src |
4981 | + switch v := takeNext(c.up).(type) { |
4982 | + case error: |
4983 | + return v |
4984 | + } |
4985 | + return nil |
4986 | +} |
4987 | + |
4988 | +type testSessionConfig struct { |
4989 | + exchangeTimeout time.Duration |
4990 | + pingInterval time.Duration |
4991 | +} |
4992 | + |
4993 | +func (tsc *testSessionConfig) PingInterval() time.Duration { |
4994 | + return tsc.pingInterval |
4995 | +} |
4996 | + |
4997 | +func (tsc *testSessionConfig) ExchangeTimeout() time.Duration { |
4998 | + return tsc.exchangeTimeout |
4999 | +} |
5000 | + |
The diff has been truncated for viewing.
Bring it ON!