Merge lp:~pedronis/ubuntu-push/so-it-begins into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 41
Merged at revision: 41
Proposed branch: lp:~pedronis/ubuntu-push/so-it-begins
Merge into: lp:ubuntu-push
Diff against target: 5845 lines (+5616/-0)
43 files modified
.bzrignore (+3/-0)
COPYING (+674/-0)
LICENSE (+16/-0)
Makefile (+48/-0)
README (+37/-0)
config/config.go (+149/-0)
config/config_test.go (+135/-0)
dependencies.tsv (+1/-0)
logger/logger.go (+100/-0)
logger/logger_test.go (+117/-0)
protocol/messages.go (+103/-0)
protocol/messages_test.go (+101/-0)
protocol/protocol.go (+103/-0)
protocol/protocol_test.go (+227/-0)
scripts/check_fmt (+14/-0)
server/acceptance/acceptance.sh (+6/-0)
server/acceptance/acceptance_test.go (+487/-0)
server/acceptance/acceptanceclient.go (+123/-0)
server/acceptance/cmd/acceptanceclient.go (+79/-0)
server/acceptance/config/README (+7/-0)
server/acceptance/config/config.json (+12/-0)
server/acceptance/config/testing.cert (+10/-0)
server/acceptance/config/testing.key (+9/-0)
server/api/handlers.go (+244/-0)
server/api/handlers_test.go (+345/-0)
server/broker/broker.go (+71/-0)
server/broker/simple.go (+260/-0)
server/broker/simple_test.go (+327/-0)
server/dev/config.go (+110/-0)
server/dev/http.go (+40/-0)
server/dev/http_test.go (+70/-0)
server/dev/server.go (+77/-0)
server/dev/server_test.go (+105/-0)
server/listener/listener.go (+86/-0)
server/listener/listener_test.go (+239/-0)
server/session/session.go (+157/-0)
server/session/session_test.go (+619/-0)
server/store/inmemory.go (+66/-0)
server/store/inmemory_test.go (+62/-0)
server/store/store.go (+73/-0)
server/store/store_test.go (+52/-0)
tarmac_tests.sh (+5/-0)
testing/helpers.go (+47/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/so-it-begins
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+201605@code.launchpad.net

Commit message

and as a start we get some protocol support and a development server and infrastructure

Description of the change

and as a start we get some protocol support and a development server and infrastructure

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

Bring it ON!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file '.bzrignore'
2--- .bzrignore 1970-01-01 00:00:00 +0000
3+++ .bzrignore 2014-01-14 15:09:46 +0000
4@@ -0,0 +1,3 @@
5+acceptanceclient
6+testserver
7+coverhtml
8
9=== added file 'COPYING'
10--- COPYING 1970-01-01 00:00:00 +0000
11+++ COPYING 2014-01-14 15:09:46 +0000
12@@ -0,0 +1,674 @@
13+ GNU GENERAL PUBLIC LICENSE
14+ Version 3, 29 June 2007
15+
16+ Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
17+ Everyone is permitted to copy and distribute verbatim copies
18+ of this license document, but changing it is not allowed.
19+
20+ Preamble
21+
22+ The GNU General Public License is a free, copyleft license for
23+software and other kinds of works.
24+
25+ The licenses for most software and other practical works are designed
26+to take away your freedom to share and change the works. By contrast,
27+the GNU General Public License is intended to guarantee your freedom to
28+share and change all versions of a program--to make sure it remains free
29+software for all its users. We, the Free Software Foundation, use the
30+GNU General Public License for most of our software; it applies also to
31+any other work released this way by its authors. You can apply it to
32+your programs, too.
33+
34+ When we speak of free software, we are referring to freedom, not
35+price. Our General Public Licenses are designed to make sure that you
36+have the freedom to distribute copies of free software (and charge for
37+them if you wish), that you receive source code or can get it if you
38+want it, that you can change the software or use pieces of it in new
39+free programs, and that you know you can do these things.
40+
41+ To protect your rights, we need to prevent others from denying you
42+these rights or asking you to surrender the rights. Therefore, you have
43+certain responsibilities if you distribute copies of the software, or if
44+you modify it: responsibilities to respect the freedom of others.
45+
46+ For example, if you distribute copies of such a program, whether
47+gratis or for a fee, you must pass on to the recipients the same
48+freedoms that you received. You must make sure that they, too, receive
49+or can get the source code. And you must show them these terms so they
50+know their rights.
51+
52+ Developers that use the GNU GPL protect your rights with two steps:
53+(1) assert copyright on the software, and (2) offer you this License
54+giving you legal permission to copy, distribute and/or modify it.
55+
56+ For the developers' and authors' protection, the GPL clearly explains
57+that there is no warranty for this free software. For both users' and
58+authors' sake, the GPL requires that modified versions be marked as
59+changed, so that their problems will not be attributed erroneously to
60+authors of previous versions.
61+
62+ Some devices are designed to deny users access to install or run
63+modified versions of the software inside them, although the manufacturer
64+can do so. This is fundamentally incompatible with the aim of
65+protecting users' freedom to change the software. The systematic
66+pattern of such abuse occurs in the area of products for individuals to
67+use, which is precisely where it is most unacceptable. Therefore, we
68+have designed this version of the GPL to prohibit the practice for those
69+products. If such problems arise substantially in other domains, we
70+stand ready to extend this provision to those domains in future versions
71+of the GPL, as needed to protect the freedom of users.
72+
73+ Finally, every program is threatened constantly by software patents.
74+States should not allow patents to restrict development and use of
75+software on general-purpose computers, but in those that do, we wish to
76+avoid the special danger that patents applied to a free program could
77+make it effectively proprietary. To prevent this, the GPL assures that
78+patents cannot be used to render the program non-free.
79+
80+ The precise terms and conditions for copying, distribution and
81+modification follow.
82+
83+ TERMS AND CONDITIONS
84+
85+ 0. Definitions.
86+
87+ "This License" refers to version 3 of the GNU General Public License.
88+
89+ "Copyright" also means copyright-like laws that apply to other kinds of
90+works, such as semiconductor masks.
91+
92+ "The Program" refers to any copyrightable work licensed under this
93+License. Each licensee is addressed as "you". "Licensees" and
94+"recipients" may be individuals or organizations.
95+
96+ To "modify" a work means to copy from or adapt all or part of the work
97+in a fashion requiring copyright permission, other than the making of an
98+exact copy. The resulting work is called a "modified version" of the
99+earlier work or a work "based on" the earlier work.
100+
101+ A "covered work" means either the unmodified Program or a work based
102+on the Program.
103+
104+ To "propagate" a work means to do anything with it that, without
105+permission, would make you directly or secondarily liable for
106+infringement under applicable copyright law, except executing it on a
107+computer or modifying a private copy. Propagation includes copying,
108+distribution (with or without modification), making available to the
109+public, and in some countries other activities as well.
110+
111+ To "convey" a work means any kind of propagation that enables other
112+parties to make or receive copies. Mere interaction with a user through
113+a computer network, with no transfer of a copy, is not conveying.
114+
115+ An interactive user interface displays "Appropriate Legal Notices"
116+to the extent that it includes a convenient and prominently visible
117+feature that (1) displays an appropriate copyright notice, and (2)
118+tells the user that there is no warranty for the work (except to the
119+extent that warranties are provided), that licensees may convey the
120+work under this License, and how to view a copy of this License. If
121+the interface presents a list of user commands or options, such as a
122+menu, a prominent item in the list meets this criterion.
123+
124+ 1. Source Code.
125+
126+ The "source code" for a work means the preferred form of the work
127+for making modifications to it. "Object code" means any non-source
128+form of a work.
129+
130+ A "Standard Interface" means an interface that either is an official
131+standard defined by a recognized standards body, or, in the case of
132+interfaces specified for a particular programming language, one that
133+is widely used among developers working in that language.
134+
135+ The "System Libraries" of an executable work include anything, other
136+than the work as a whole, that (a) is included in the normal form of
137+packaging a Major Component, but which is not part of that Major
138+Component, and (b) serves only to enable use of the work with that
139+Major Component, or to implement a Standard Interface for which an
140+implementation is available to the public in source code form. A
141+"Major Component", in this context, means a major essential component
142+(kernel, window system, and so on) of the specific operating system
143+(if any) on which the executable work runs, or a compiler used to
144+produce the work, or an object code interpreter used to run it.
145+
146+ The "Corresponding Source" for a work in object code form means all
147+the source code needed to generate, install, and (for an executable
148+work) run the object code and to modify the work, including scripts to
149+control those activities. However, it does not include the work's
150+System Libraries, or general-purpose tools or generally available free
151+programs which are used unmodified in performing those activities but
152+which are not part of the work. For example, Corresponding Source
153+includes interface definition files associated with source files for
154+the work, and the source code for shared libraries and dynamically
155+linked subprograms that the work is specifically designed to require,
156+such as by intimate data communication or control flow between those
157+subprograms and other parts of the work.
158+
159+ The Corresponding Source need not include anything that users
160+can regenerate automatically from other parts of the Corresponding
161+Source.
162+
163+ The Corresponding Source for a work in source code form is that
164+same work.
165+
166+ 2. Basic Permissions.
167+
168+ All rights granted under this License are granted for the term of
169+copyright on the Program, and are irrevocable provided the stated
170+conditions are met. This License explicitly affirms your unlimited
171+permission to run the unmodified Program. The output from running a
172+covered work is covered by this License only if the output, given its
173+content, constitutes a covered work. This License acknowledges your
174+rights of fair use or other equivalent, as provided by copyright law.
175+
176+ You may make, run and propagate covered works that you do not
177+convey, without conditions so long as your license otherwise remains
178+in force. You may convey covered works to others for the sole purpose
179+of having them make modifications exclusively for you, or provide you
180+with facilities for running those works, provided that you comply with
181+the terms of this License in conveying all material for which you do
182+not control copyright. Those thus making or running the covered works
183+for you must do so exclusively on your behalf, under your direction
184+and control, on terms that prohibit them from making any copies of
185+your copyrighted material outside their relationship with you.
186+
187+ Conveying under any other circumstances is permitted solely under
188+the conditions stated below. Sublicensing is not allowed; section 10
189+makes it unnecessary.
190+
191+ 3. Protecting Users' Legal Rights From Anti-Circumvention Law.
192+
193+ No covered work shall be deemed part of an effective technological
194+measure under any applicable law fulfilling obligations under article
195+11 of the WIPO copyright treaty adopted on 20 December 1996, or
196+similar laws prohibiting or restricting circumvention of such
197+measures.
198+
199+ When you convey a covered work, you waive any legal power to forbid
200+circumvention of technological measures to the extent such circumvention
201+is effected by exercising rights under this License with respect to
202+the covered work, and you disclaim any intention to limit operation or
203+modification of the work as a means of enforcing, against the work's
204+users, your or third parties' legal rights to forbid circumvention of
205+technological measures.
206+
207+ 4. Conveying Verbatim Copies.
208+
209+ You may convey verbatim copies of the Program's source code as you
210+receive it, in any medium, provided that you conspicuously and
211+appropriately publish on each copy an appropriate copyright notice;
212+keep intact all notices stating that this License and any
213+non-permissive terms added in accord with section 7 apply to the code;
214+keep intact all notices of the absence of any warranty; and give all
215+recipients a copy of this License along with the Program.
216+
217+ You may charge any price or no price for each copy that you convey,
218+and you may offer support or warranty protection for a fee.
219+
220+ 5. Conveying Modified Source Versions.
221+
222+ You may convey a work based on the Program, or the modifications to
223+produce it from the Program, in the form of source code under the
224+terms of section 4, provided that you also meet all of these conditions:
225+
226+ a) The work must carry prominent notices stating that you modified
227+ it, and giving a relevant date.
228+
229+ b) The work must carry prominent notices stating that it is
230+ released under this License and any conditions added under section
231+ 7. This requirement modifies the requirement in section 4 to
232+ "keep intact all notices".
233+
234+ c) You must license the entire work, as a whole, under this
235+ License to anyone who comes into possession of a copy. This
236+ License will therefore apply, along with any applicable section 7
237+ additional terms, to the whole of the work, and all its parts,
238+ regardless of how they are packaged. This License gives no
239+ permission to license the work in any other way, but it does not
240+ invalidate such permission if you have separately received it.
241+
242+ d) If the work has interactive user interfaces, each must display
243+ Appropriate Legal Notices; however, if the Program has interactive
244+ interfaces that do not display Appropriate Legal Notices, your
245+ work need not make them do so.
246+
247+ A compilation of a covered work with other separate and independent
248+works, which are not by their nature extensions of the covered work,
249+and which are not combined with it such as to form a larger program,
250+in or on a volume of a storage or distribution medium, is called an
251+"aggregate" if the compilation and its resulting copyright are not
252+used to limit the access or legal rights of the compilation's users
253+beyond what the individual works permit. Inclusion of a covered work
254+in an aggregate does not cause this License to apply to the other
255+parts of the aggregate.
256+
257+ 6. Conveying Non-Source Forms.
258+
259+ You may convey a covered work in object code form under the terms
260+of sections 4 and 5, provided that you also convey the
261+machine-readable Corresponding Source under the terms of this License,
262+in one of these ways:
263+
264+ a) Convey the object code in, or embodied in, a physical product
265+ (including a physical distribution medium), accompanied by the
266+ Corresponding Source fixed on a durable physical medium
267+ customarily used for software interchange.
268+
269+ b) Convey the object code in, or embodied in, a physical product
270+ (including a physical distribution medium), accompanied by a
271+ written offer, valid for at least three years and valid for as
272+ long as you offer spare parts or customer support for that product
273+ model, to give anyone who possesses the object code either (1) a
274+ copy of the Corresponding Source for all the software in the
275+ product that is covered by this License, on a durable physical
276+ medium customarily used for software interchange, for a price no
277+ more than your reasonable cost of physically performing this
278+ conveying of source, or (2) access to copy the
279+ Corresponding Source from a network server at no charge.
280+
281+ c) Convey individual copies of the object code with a copy of the
282+ written offer to provide the Corresponding Source. This
283+ alternative is allowed only occasionally and noncommercially, and
284+ only if you received the object code with such an offer, in accord
285+ with subsection 6b.
286+
287+ d) Convey the object code by offering access from a designated
288+ place (gratis or for a charge), and offer equivalent access to the
289+ Corresponding Source in the same way through the same place at no
290+ further charge. You need not require recipients to copy the
291+ Corresponding Source along with the object code. If the place to
292+ copy the object code is a network server, the Corresponding Source
293+ may be on a different server (operated by you or a third party)
294+ that supports equivalent copying facilities, provided you maintain
295+ clear directions next to the object code saying where to find the
296+ Corresponding Source. Regardless of what server hosts the
297+ Corresponding Source, you remain obligated to ensure that it is
298+ available for as long as needed to satisfy these requirements.
299+
300+ e) Convey the object code using peer-to-peer transmission, provided
301+ you inform other peers where the object code and Corresponding
302+ Source of the work are being offered to the general public at no
303+ charge under subsection 6d.
304+
305+ A separable portion of the object code, whose source code is excluded
306+from the Corresponding Source as a System Library, need not be
307+included in conveying the object code work.
308+
309+ A "User Product" is either (1) a "consumer product", which means any
310+tangible personal property which is normally used for personal, family,
311+or household purposes, or (2) anything designed or sold for incorporation
312+into a dwelling. In determining whether a product is a consumer product,
313+doubtful cases shall be resolved in favor of coverage. For a particular
314+product received by a particular user, "normally used" refers to a
315+typical or common use of that class of product, regardless of the status
316+of the particular user or of the way in which the particular user
317+actually uses, or expects or is expected to use, the product. A product
318+is a consumer product regardless of whether the product has substantial
319+commercial, industrial or non-consumer uses, unless such uses represent
320+the only significant mode of use of the product.
321+
322+ "Installation Information" for a User Product means any methods,
323+procedures, authorization keys, or other information required to install
324+and execute modified versions of a covered work in that User Product from
325+a modified version of its Corresponding Source. The information must
326+suffice to ensure that the continued functioning of the modified object
327+code is in no case prevented or interfered with solely because
328+modification has been made.
329+
330+ If you convey an object code work under this section in, or with, or
331+specifically for use in, a User Product, and the conveying occurs as
332+part of a transaction in which the right of possession and use of the
333+User Product is transferred to the recipient in perpetuity or for a
334+fixed term (regardless of how the transaction is characterized), the
335+Corresponding Source conveyed under this section must be accompanied
336+by the Installation Information. But this requirement does not apply
337+if neither you nor any third party retains the ability to install
338+modified object code on the User Product (for example, the work has
339+been installed in ROM).
340+
341+ The requirement to provide Installation Information does not include a
342+requirement to continue to provide support service, warranty, or updates
343+for a work that has been modified or installed by the recipient, or for
344+the User Product in which it has been modified or installed. Access to a
345+network may be denied when the modification itself materially and
346+adversely affects the operation of the network or violates the rules and
347+protocols for communication across the network.
348+
349+ Corresponding Source conveyed, and Installation Information provided,
350+in accord with this section must be in a format that is publicly
351+documented (and with an implementation available to the public in
352+source code form), and must require no special password or key for
353+unpacking, reading or copying.
354+
355+ 7. Additional Terms.
356+
357+ "Additional permissions" are terms that supplement the terms of this
358+License by making exceptions from one or more of its conditions.
359+Additional permissions that are applicable to the entire Program shall
360+be treated as though they were included in this License, to the extent
361+that they are valid under applicable law. If additional permissions
362+apply only to part of the Program, that part may be used separately
363+under those permissions, but the entire Program remains governed by
364+this License without regard to the additional permissions.
365+
366+ When you convey a copy of a covered work, you may at your option
367+remove any additional permissions from that copy, or from any part of
368+it. (Additional permissions may be written to require their own
369+removal in certain cases when you modify the work.) You may place
370+additional permissions on material, added by you to a covered work,
371+for which you have or can give appropriate copyright permission.
372+
373+ Notwithstanding any other provision of this License, for material you
374+add to a covered work, you may (if authorized by the copyright holders of
375+that material) supplement the terms of this License with terms:
376+
377+ a) Disclaiming warranty or limiting liability differently from the
378+ terms of sections 15 and 16 of this License; or
379+
380+ b) Requiring preservation of specified reasonable legal notices or
381+ author attributions in that material or in the Appropriate Legal
382+ Notices displayed by works containing it; or
383+
384+ c) Prohibiting misrepresentation of the origin of that material, or
385+ requiring that modified versions of such material be marked in
386+ reasonable ways as different from the original version; or
387+
388+ d) Limiting the use for publicity purposes of names of licensors or
389+ authors of the material; or
390+
391+ e) Declining to grant rights under trademark law for use of some
392+ trade names, trademarks, or service marks; or
393+
394+ f) Requiring indemnification of licensors and authors of that
395+ material by anyone who conveys the material (or modified versions of
396+ it) with contractual assumptions of liability to the recipient, for
397+ any liability that these contractual assumptions directly impose on
398+ those licensors and authors.
399+
400+ All other non-permissive additional terms are considered "further
401+restrictions" within the meaning of section 10. If the Program as you
402+received it, or any part of it, contains a notice stating that it is
403+governed by this License along with a term that is a further
404+restriction, you may remove that term. If a license document contains
405+a further restriction but permits relicensing or conveying under this
406+License, you may add to a covered work material governed by the terms
407+of that license document, provided that the further restriction does
408+not survive such relicensing or conveying.
409+
410+ If you add terms to a covered work in accord with this section, you
411+must place, in the relevant source files, a statement of the
412+additional terms that apply to those files, or a notice indicating
413+where to find the applicable terms.
414+
415+ Additional terms, permissive or non-permissive, may be stated in the
416+form of a separately written license, or stated as exceptions;
417+the above requirements apply either way.
418+
419+ 8. Termination.
420+
421+ You may not propagate or modify a covered work except as expressly
422+provided under this License. Any attempt otherwise to propagate or
423+modify it is void, and will automatically terminate your rights under
424+this License (including any patent licenses granted under the third
425+paragraph of section 11).
426+
427+ However, if you cease all violation of this License, then your
428+license from a particular copyright holder is reinstated (a)
429+provisionally, unless and until the copyright holder explicitly and
430+finally terminates your license, and (b) permanently, if the copyright
431+holder fails to notify you of the violation by some reasonable means
432+prior to 60 days after the cessation.
433+
434+ Moreover, your license from a particular copyright holder is
435+reinstated permanently if the copyright holder notifies you of the
436+violation by some reasonable means, this is the first time you have
437+received notice of violation of this License (for any work) from that
438+copyright holder, and you cure the violation prior to 30 days after
439+your receipt of the notice.
440+
441+ Termination of your rights under this section does not terminate the
442+licenses of parties who have received copies or rights from you under
443+this License. If your rights have been terminated and not permanently
444+reinstated, you do not qualify to receive new licenses for the same
445+material under section 10.
446+
447+ 9. Acceptance Not Required for Having Copies.
448+
449+ You are not required to accept this License in order to receive or
450+run a copy of the Program. Ancillary propagation of a covered work
451+occurring solely as a consequence of using peer-to-peer transmission
452+to receive a copy likewise does not require acceptance. However,
453+nothing other than this License grants you permission to propagate or
454+modify any covered work. These actions infringe copyright if you do
455+not accept this License. Therefore, by modifying or propagating a
456+covered work, you indicate your acceptance of this License to do so.
457+
458+ 10. Automatic Licensing of Downstream Recipients.
459+
460+ Each time you convey a covered work, the recipient automatically
461+receives a license from the original licensors, to run, modify and
462+propagate that work, subject to this License. You are not responsible
463+for enforcing compliance by third parties with this License.
464+
465+ An "entity transaction" is a transaction transferring control of an
466+organization, or substantially all assets of one, or subdividing an
467+organization, or merging organizations. If propagation of a covered
468+work results from an entity transaction, each party to that
469+transaction who receives a copy of the work also receives whatever
470+licenses to the work the party's predecessor in interest had or could
471+give under the previous paragraph, plus a right to possession of the
472+Corresponding Source of the work from the predecessor in interest, if
473+the predecessor has it or can get it with reasonable efforts.
474+
475+ You may not impose any further restrictions on the exercise of the
476+rights granted or affirmed under this License. For example, you may
477+not impose a license fee, royalty, or other charge for exercise of
478+rights granted under this License, and you may not initiate litigation
479+(including a cross-claim or counterclaim in a lawsuit) alleging that
480+any patent claim is infringed by making, using, selling, offering for
481+sale, or importing the Program or any portion of it.
482+
483+ 11. Patents.
484+
485+ A "contributor" is a copyright holder who authorizes use under this
486+License of the Program or a work on which the Program is based. The
487+work thus licensed is called the contributor's "contributor version".
488+
489+ A contributor's "essential patent claims" are all patent claims
490+owned or controlled by the contributor, whether already acquired or
491+hereafter acquired, that would be infringed by some manner, permitted
492+by this License, of making, using, or selling its contributor version,
493+but do not include claims that would be infringed only as a
494+consequence of further modification of the contributor version. For
495+purposes of this definition, "control" includes the right to grant
496+patent sublicenses in a manner consistent with the requirements of
497+this License.
498+
499+ Each contributor grants you a non-exclusive, worldwide, royalty-free
500+patent license under the contributor's essential patent claims, to
501+make, use, sell, offer for sale, import and otherwise run, modify and
502+propagate the contents of its contributor version.
503+
504+ In the following three paragraphs, a "patent license" is any express
505+agreement or commitment, however denominated, not to enforce a patent
506+(such as an express permission to practice a patent or covenant not to
507+sue for patent infringement). To "grant" such a patent license to a
508+party means to make such an agreement or commitment not to enforce a
509+patent against the party.
510+
511+ If you convey a covered work, knowingly relying on a patent license,
512+and the Corresponding Source of the work is not available for anyone
513+to copy, free of charge and under the terms of this License, through a
514+publicly available network server or other readily accessible means,
515+then you must either (1) cause the Corresponding Source to be so
516+available, or (2) arrange to deprive yourself of the benefit of the
517+patent license for this particular work, or (3) arrange, in a manner
518+consistent with the requirements of this License, to extend the patent
519+license to downstream recipients. "Knowingly relying" means you have
520+actual knowledge that, but for the patent license, your conveying the
521+covered work in a country, or your recipient's use of the covered work
522+in a country, would infringe one or more identifiable patents in that
523+country that you have reason to believe are valid.
524+
525+ If, pursuant to or in connection with a single transaction or
526+arrangement, you convey, or propagate by procuring conveyance of, a
527+covered work, and grant a patent license to some of the parties
528+receiving the covered work authorizing them to use, propagate, modify
529+or convey a specific copy of the covered work, then the patent license
530+you grant is automatically extended to all recipients of the covered
531+work and works based on it.
532+
533+ A patent license is "discriminatory" if it does not include within
534+the scope of its coverage, prohibits the exercise of, or is
535+conditioned on the non-exercise of one or more of the rights that are
536+specifically granted under this License. You may not convey a covered
537+work if you are a party to an arrangement with a third party that is
538+in the business of distributing software, under which you make payment
539+to the third party based on the extent of your activity of conveying
540+the work, and under which the third party grants, to any of the
541+parties who would receive the covered work from you, a discriminatory
542+patent license (a) in connection with copies of the covered work
543+conveyed by you (or copies made from those copies), or (b) primarily
544+for and in connection with specific products or compilations that
545+contain the covered work, unless you entered into that arrangement,
546+or that patent license was granted, prior to 28 March 2007.
547+
548+ Nothing in this License shall be construed as excluding or limiting
549+any implied license or other defenses to infringement that may
550+otherwise be available to you under applicable patent law.
551+
552+ 12. No Surrender of Others' Freedom.
553+
554+ If conditions are imposed on you (whether by court order, agreement or
555+otherwise) that contradict the conditions of this License, they do not
556+excuse you from the conditions of this License. If you cannot convey a
557+covered work so as to satisfy simultaneously your obligations under this
558+License and any other pertinent obligations, then as a consequence you may
559+not convey it at all. For example, if you agree to terms that obligate you
560+to collect a royalty for further conveying from those to whom you convey
561+the Program, the only way you could satisfy both those terms and this
562+License would be to refrain entirely from conveying the Program.
563+
564+ 13. Use with the GNU Affero General Public License.
565+
566+ Notwithstanding any other provision of this License, you have
567+permission to link or combine any covered work with a work licensed
568+under version 3 of the GNU Affero General Public License into a single
569+combined work, and to convey the resulting work. The terms of this
570+License will continue to apply to the part which is the covered work,
571+but the special requirements of the GNU Affero General Public License,
572+section 13, concerning interaction through a network will apply to the
573+combination as such.
574+
575+ 14. Revised Versions of this License.
576+
577+ The Free Software Foundation may publish revised and/or new versions of
578+the GNU General Public License from time to time. Such new versions will
579+be similar in spirit to the present version, but may differ in detail to
580+address new problems or concerns.
581+
582+ Each version is given a distinguishing version number. If the
583+Program specifies that a certain numbered version of the GNU General
584+Public License "or any later version" applies to it, you have the
585+option of following the terms and conditions either of that numbered
586+version or of any later version published by the Free Software
587+Foundation. If the Program does not specify a version number of the
588+GNU General Public License, you may choose any version ever published
589+by the Free Software Foundation.
590+
591+ If the Program specifies that a proxy can decide which future
592+versions of the GNU General Public License can be used, that proxy's
593+public statement of acceptance of a version permanently authorizes you
594+to choose that version for the Program.
595+
596+ Later license versions may give you additional or different
597+permissions. However, no additional obligations are imposed on any
598+author or copyright holder as a result of your choosing to follow a
599+later version.
600+
601+ 15. Disclaimer of Warranty.
602+
603+ THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY
604+APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT
605+HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY
606+OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO,
607+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
608+PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM
609+IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF
610+ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
611+
612+ 16. Limitation of Liability.
613+
614+ IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
615+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
616+THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
617+GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
618+USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
619+DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
620+PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
621+EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
622+SUCH DAMAGES.
623+
624+ 17. Interpretation of Sections 15 and 16.
625+
626+ If the disclaimer of warranty and limitation of liability provided
627+above cannot be given local legal effect according to their terms,
628+reviewing courts shall apply local law that most closely approximates
629+an absolute waiver of all civil liability in connection with the
630+Program, unless a warranty or assumption of liability accompanies a
631+copy of the Program in return for a fee.
632+
633+ END OF TERMS AND CONDITIONS
634+
635+ How to Apply These Terms to Your New Programs
636+
637+ If you develop a new program, and you want it to be of the greatest
638+possible use to the public, the best way to achieve this is to make it
639+free software which everyone can redistribute and change under these terms.
640+
641+ To do so, attach the following notices to the program. It is safest
642+to attach them to the start of each source file to most effectively
643+state the exclusion of warranty; and each file should have at least
644+the "copyright" line and a pointer to where the full notice is found.
645+
646+ <one line to give the program's name and a brief idea of what it does.>
647+ Copyright (C) <year> <name of author>
648+
649+ This program is free software: you can redistribute it and/or modify
650+ it under the terms of the GNU General Public License as published by
651+ the Free Software Foundation, either version 3 of the License, or
652+ (at your option) any later version.
653+
654+ This program is distributed in the hope that it will be useful,
655+ but WITHOUT ANY WARRANTY; without even the implied warranty of
656+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
657+ GNU General Public License for more details.
658+
659+ You should have received a copy of the GNU General Public License
660+ along with this program. If not, see <http://www.gnu.org/licenses/>.
661+
662+Also add information on how to contact you by electronic and paper mail.
663+
664+ If the program does terminal interaction, make it output a short
665+notice like this when it starts in an interactive mode:
666+
667+ <program> Copyright (C) <year> <name of author>
668+ This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
669+ This is free software, and you are welcome to redistribute it
670+ under certain conditions; type `show c' for details.
671+
672+The hypothetical commands `show w' and `show c' should show the appropriate
673+parts of the General Public License. Of course, your program's commands
674+might be different; for a GUI interface, you would use an "about box".
675+
676+ You should also get your employer (if you work as a programmer) or school,
677+if any, to sign a "copyright disclaimer" for the program, if necessary.
678+For more information on this, and how to apply and follow the GNU GPL, see
679+<http://www.gnu.org/licenses/>.
680+
681+ The GNU General Public License does not permit incorporating your program
682+into proprietary programs. If your program is a subroutine library, you
683+may consider it more useful to permit linking proprietary applications with
684+the library. If this is what you want to do, use the GNU Lesser General
685+Public License instead of this License. But first, please read
686+<http://www.gnu.org/philosophy/why-not-lgpl.html>.
687
688=== added file 'LICENSE'
689--- LICENSE 1970-01-01 00:00:00 +0000
690+++ LICENSE 2014-01-14 15:09:46 +0000
691@@ -0,0 +1,16 @@
692+/*
693+ Copyright 2013-2014 Canonical Ltd.
694+
695+ This program is free software: you can redistribute it and/or modify it
696+ under the terms of the GNU General Public License version 3, as published
697+ by the Free Software Foundation.
698+
699+ This program is distributed in the hope that it will be useful, but
700+ WITHOUT ANY WARRANTY; without even the implied warranties of
701+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
702+ PURPOSE. See the GNU General Public License for more details.
703+
704+ You should have received a copy of the GNU General Public License along
705+ with this program. If not, see <http://www.gnu.org/licenses/>.
706+*/
707+
708
709=== added file 'Makefile'
710--- Makefile 1970-01-01 00:00:00 +0000
711+++ Makefile 2014-01-14 15:09:46 +0000
712@@ -0,0 +1,48 @@
713+GOPATH := $(shell cd ../../..; pwd)
714+export GOPATH
715+
716+PROJECT = launchpad.net/ubuntu-push
717+
718+ifneq ($(CURDIR),$(GOPATH)/src/launchpad.net/ubuntu-push)
719+$(error unexpected curdir and/or layout)
720+endif
721+
722+GODEPS = launchpad.net/gocheck
723+
724+bootstrap:
725+ mkdir -p $(GOPATH)/bin
726+ mkdir -p $(GOPATH)/pkg
727+ go get -u launchpad.net/godeps
728+ go get -d -u $(GODEPS)
729+ $(GOPATH)/bin/godeps -u dependencies.tsv
730+ go install $(GODEPS)
731+
732+check:
733+ go test $(PROJECT)/...
734+
735+check-race:
736+ go test -race $(PROJECT)/...
737+
738+coverage-summary:
739+ go test -a -cover $(PROJECT)/...
740+
741+coverage-html:
742+ mkdir -p coverhtml
743+ for pkg in $$(go list $(PROJECT)/...|grep -v acceptance ); do \
744+ relname="$${pkg#$(PROJECT)/}" ; \
745+ mkdir -p coverhtml/$$(dirname $${relname}) ; \
746+ go test -a -coverprofile=coverhtml/$${relname}.out $$pkg ; \
747+ if [ -f coverhtml/$${relname}.out ] ; then \
748+ go tool cover -html=coverhtml/$${relname}.out -o coverhtml/$${relname}.html ; \
749+ go tool cover -func=coverhtml/$${relname}.out -o coverhtml/$${relname}.txt ; \
750+ fi \
751+ done
752+
753+format:
754+ go fmt $(PROJECT)/...
755+
756+check-format:
757+ scripts/check_fmt $(PROJECT)
758+
759+.PHONY: bootstrap check check-race format check-format coverage-summary \
760+ coverage-html
761
762=== added file 'README'
763--- README 1970-01-01 00:00:00 +0000
764+++ README 2014-01-14 15:09:46 +0000
765@@ -0,0 +1,37 @@
766+Ubuntu Push Notifications
767+--------------------------
768+
769+Protocol, client, and development code for Ubuntu Push Notifications.
770+
771+The code expects to be checked out as launchpad.net/ubuntu-push in a Go
772+workspace, see go help gopath.
773+
774+To setup go dependencies one can use:
775+
776+ make bootstrap
777+
778+To run tests:
779+
780+ make check
781+
782+To produce coverage reports go 1.2 (default on trusty) is needed and
783+the cover tool (the latter can be obtained atm with something like:
784+sudo GOPATH=<go-workspace> go get code.google.com/p/go.tools/cmd/cover
785+)
786+
787+then run:
788+
789+ make coverage-summary
790+
791+for a summary report, or:
792+
793+for per package html with annotated code in coverhtml/<package-name>.html
794+
795+ make coverage-html
796+
797+(it makes also textual coverhtml/<package-name>.txt reports).
798+
799+To run the acceptance tests, go into the acceptance subdir and run:
800+
801+ ./acceptance.sh
802+
803
804=== added directory 'config'
805=== added file 'config/config.go'
806--- config/config.go 1970-01-01 00:00:00 +0000
807+++ config/config.go 2014-01-14 15:09:46 +0000
808@@ -0,0 +1,149 @@
809+/*
810+ Copyright 2013-2014 Canonical Ltd.
811+
812+ This program is free software: you can redistribute it and/or modify it
813+ under the terms of the GNU General Public License version 3, as published
814+ by the Free Software Foundation.
815+
816+ This program is distributed in the hope that it will be useful, but
817+ WITHOUT ANY WARRANTY; without even the implied warranties of
818+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
819+ PURPOSE. See the GNU General Public License for more details.
820+
821+ You should have received a copy of the GNU General Public License along
822+ with this program. If not, see <http://www.gnu.org/licenses/>.
823+*/
824+
825+// Package config has helpers to parse and use JSON based configuration.
826+package config
827+
828+import (
829+ "encoding/json"
830+ "errors"
831+ "fmt"
832+ "io"
833+ "io/ioutil"
834+ "net"
835+ "path/filepath"
836+ "reflect"
837+ "strings"
838+ "time"
839+)
840+
841+// ReadConfig reads a JSON configuration into destConfig which should
842+// be a pointer to a structure, it does some more configuration
843+// specific error checking than plain JSON decoding and mentions
844+// fields in errors . Configuration fields are expected to start with
845+// lower case in the JSON object.
846+func ReadConfig(r io.Reader, destConfig interface{}) error {
847+ destValue := reflect.ValueOf(destConfig)
848+ if destValue.Kind() != reflect.Ptr || destValue.Elem().Kind() != reflect.Struct {
849+ return errors.New("destConfig not *struct")
850+ }
851+ // do the parsing in two phases for better error handling
852+ var p1 map[string]json.RawMessage
853+ err := json.NewDecoder(r).Decode(&p1)
854+ if err != nil {
855+ return err
856+ }
857+ destStruct := destValue.Elem()
858+ structType := destStruct.Type()
859+ n := structType.NumField()
860+ for i := 0; i < n; i++ {
861+ fld := structType.Field(i)
862+ configName := strings.Split(fld.Tag.Get("json"), ",")[0]
863+ if configName == "" {
864+ configName = strings.ToLower(fld.Name[:1]) + fld.Name[1:]
865+ }
866+ raw, found := p1[configName]
867+ if !found { // assume all fields are mandatory for now
868+ return fmt.Errorf("missing %s", configName)
869+ }
870+ dest := destStruct.Field(i).Addr().Interface()
871+ err = json.Unmarshal([]byte(raw), dest)
872+ if err != nil {
873+ return fmt.Errorf("%s: %v", configName, err)
874+ }
875+ }
876+ return nil
877+}
878+
879+// ConfigTimeDuration can hold a time.Duration in a configuration struct,
880+// that is parsed from a string as supported by time.ParseDuration.
881+type ConfigTimeDuration struct {
882+ time.Duration
883+}
884+
885+func (ctd *ConfigTimeDuration) UnmarshalJSON(b []byte) error {
886+ var enc string
887+ var v time.Duration
888+ err := json.Unmarshal(b, &enc)
889+ if err != nil {
890+ return err
891+ }
892+ v, err = time.ParseDuration(enc)
893+ if err != nil {
894+ return err
895+ }
896+ *ctd = ConfigTimeDuration{v}
897+ return nil
898+}
899+
900+// TimeDuration returns the time.Duration held in ctd.
901+func (ctd ConfigTimeDuration) TimeDuration() time.Duration {
902+ return ctd.Duration
903+}
904+
905+// ConfigHostPort can hold a host:port string in a configuration struct.
906+type ConfigHostPort string
907+
908+func (chp *ConfigHostPort) UnmarshalJSON(b []byte) error {
909+ var enc string
910+ err := json.Unmarshal(b, &enc)
911+ if err != nil {
912+ return err
913+ }
914+ _, _, err = net.SplitHostPort(enc)
915+ if err != nil {
916+ return err
917+ }
918+ *chp = ConfigHostPort(enc)
919+ return nil
920+}
921+
922+// HostPort returns the host:port string held in chp.
923+func (chp ConfigHostPort) HostPort() string {
924+ return string(chp)
925+}
926+
927+// ConfigQueueSize can hold a queue size in a configuration struct.
928+type ConfigQueueSize uint
929+
930+func (cqs *ConfigQueueSize) UnmarshalJSON(b []byte) error {
931+ var enc uint
932+ err := json.Unmarshal(b, &enc)
933+ if err != nil {
934+ return err
935+ }
936+ if enc == 0 {
937+ return errors.New("queue size should be > 0")
938+ }
939+ *cqs = ConfigQueueSize(enc)
940+ return nil
941+}
942+
943+// QueueSize returns the queue size held in cqs.
944+func (cqs ConfigQueueSize) QueueSize() uint {
945+ return uint(cqs)
946+}
947+
948+// LoadFile reads a file possibly relative to a base dir.
949+func LoadFile(p, baseDir string) ([]byte, error) {
950+ if p == "" {
951+ return nil, nil
952+ }
953+ if !filepath.IsAbs(p) {
954+ p = filepath.Join(baseDir, p)
955+ }
956+ return ioutil.ReadFile(p)
957+}
958
959=== added file 'config/config_test.go'
960--- config/config_test.go 1970-01-01 00:00:00 +0000
961+++ config/config_test.go 2014-01-14 15:09:46 +0000
962@@ -0,0 +1,135 @@
963+/*
964+ Copyright 2013-2014 Canonical Ltd.
965+
966+ This program is free software: you can redistribute it and/or modify it
967+ under the terms of the GNU General Public License version 3, as published
968+ by the Free Software Foundation.
969+
970+ This program is distributed in the hope that it will be useful, but
971+ WITHOUT ANY WARRANTY; without even the implied warranties of
972+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
973+ PURPOSE. See the GNU General Public License for more details.
974+
975+ You should have received a copy of the GNU General Public License along
976+ with this program. If not, see <http://www.gnu.org/licenses/>.
977+*/
978+
979+package config
980+
981+import (
982+ "bytes"
983+ "io/ioutil"
984+ . "launchpad.net/gocheck"
985+ "os"
986+ "path/filepath"
987+ "testing"
988+ "time"
989+)
990+
991+func TestConfig(t *testing.T) { TestingT(t) }
992+
993+type configSuite struct{}
994+
995+var _ = Suite(&configSuite{})
996+
997+type testConfig1 struct {
998+ A int
999+ B string
1000+ C []string `json:"c_list"`
1001+}
1002+
1003+func (s *configSuite) TestReadConfig(c *C) {
1004+ buf := bytes.NewBufferString(`{"a": 1, "b": "foo", "c_list": ["c", "d", "e"]}`)
1005+ var cfg testConfig1
1006+ err := ReadConfig(buf, &cfg)
1007+ c.Check(err, IsNil)
1008+ c.Check(cfg, DeepEquals, testConfig1{A: 1, B: "foo", C: []string{"c", "d", "e"}})
1009+}
1010+
1011+func checkError(c *C, config string, dest interface{}, expectedError string) {
1012+ buf := bytes.NewBufferString(config)
1013+ err := ReadConfig(buf, dest)
1014+ c.Check(err, ErrorMatches, expectedError)
1015+}
1016+
1017+func (s *configSuite) TestReadConfigErrors(c *C) {
1018+ var cfg testConfig1
1019+ checkError(c, "", cfg, `destConfig not \*struct`)
1020+ var i int
1021+ checkError(c, "", &i, `destConfig not \*struct`)
1022+ checkError(c, "", &cfg, `EOF`)
1023+ checkError(c, `{"a": "1"}`, &cfg, `a: .*type int`)
1024+ checkError(c, `{"b": "1"}`, &cfg, `missing a`)
1025+ checkError(c, `{"A": "1"}`, &cfg, `missing a`)
1026+ checkError(c, `{"a": 1, "b": "foo"}`, &cfg, `missing c_list`)
1027+}
1028+
1029+type testTimeDurationConfig struct {
1030+ D ConfigTimeDuration
1031+}
1032+
1033+func (s *configSuite) TestReadConfigTimeDuration(c *C) {
1034+ buf := bytes.NewBufferString(`{"d": "2s"}`)
1035+ var cfg testTimeDurationConfig
1036+ err := ReadConfig(buf, &cfg)
1037+ c.Assert(err, IsNil)
1038+ c.Check(cfg.D.TimeDuration(), Equals, 2*time.Second)
1039+}
1040+
1041+func (s *configSuite) TestReadConfigTimeDurationErrors(c *C) {
1042+ var cfg testTimeDurationConfig
1043+ checkError(c, `{"d": 1}`, &cfg, "d:.*type string")
1044+ checkError(c, `{"d": "2"}`, &cfg, "d:.*missing unit.*")
1045+}
1046+
1047+type testHostPortConfig struct {
1048+ H ConfigHostPort
1049+}
1050+
1051+func (s *configSuite) TestReadConfigHostPort(c *C) {
1052+ buf := bytes.NewBufferString(`{"h": "127.0.0.1:9999"}`)
1053+ var cfg testHostPortConfig
1054+ err := ReadConfig(buf, &cfg)
1055+ c.Assert(err, IsNil)
1056+ c.Check(cfg.H.HostPort(), Equals, "127.0.0.1:9999")
1057+}
1058+
1059+func (s *configSuite) TestReadConfigHostPortErrors(c *C) {
1060+ var cfg testHostPortConfig
1061+ checkError(c, `{"h": 1}`, &cfg, "h:.*type string")
1062+ checkError(c, `{"h": ""}`, &cfg, "h: missing port in address")
1063+}
1064+
1065+type testQueueSizeConfig struct {
1066+ QS ConfigQueueSize
1067+}
1068+
1069+func (s *configSuite) TestReadConfigQueueSize(c *C) {
1070+ buf := bytes.NewBufferString(`{"qS": 1}`)
1071+ var cfg testQueueSizeConfig
1072+ err := ReadConfig(buf, &cfg)
1073+ c.Assert(err, IsNil)
1074+ c.Check(cfg.QS.QueueSize(), Equals, uint(1))
1075+}
1076+
1077+func (s *configSuite) TestReadConfigQueueSizeErrors(c *C) {
1078+ var cfg testQueueSizeConfig
1079+ checkError(c, `{"qS": "x"}`, &cfg, "qS: .*type uint")
1080+ checkError(c, `{"qS": 0}`, &cfg, "qS: queue size should be > 0")
1081+}
1082+
1083+func (s *configSuite) TestLoadFile(c *C) {
1084+ tmpDir := c.MkDir()
1085+ d, err := LoadFile("", tmpDir)
1086+ c.Check(err, IsNil)
1087+ c.Check(d, IsNil)
1088+ fullPath := filepath.Join(tmpDir, "example.file")
1089+ err = ioutil.WriteFile(fullPath, []byte("Example"), os.ModePerm)
1090+ c.Assert(err, IsNil)
1091+ d, err = LoadFile("example.file", tmpDir)
1092+ c.Check(err, IsNil)
1093+ c.Check(string(d), Equals, "Example")
1094+ d, err = LoadFile(fullPath, tmpDir)
1095+ c.Check(err, IsNil)
1096+ c.Check(string(d), Equals, "Example")
1097+}
1098
1099=== added file 'dependencies.tsv'
1100--- dependencies.tsv 1970-01-01 00:00:00 +0000
1101+++ dependencies.tsv 2014-01-14 15:09:46 +0000
1102@@ -0,0 +1,1 @@
1103+launchpad.net/gocheck bzr gustavo@niemeyer.net-20130302024745-6ikofwq2c03h7giu 85
1104
1105=== added directory 'logger'
1106=== added file 'logger/logger.go'
1107--- logger/logger.go 1970-01-01 00:00:00 +0000
1108+++ logger/logger.go 2014-01-14 15:09:46 +0000
1109@@ -0,0 +1,100 @@
1110+/*
1111+ Copyright 2013-2014 Canonical Ltd.
1112+
1113+ This program is free software: you can redistribute it and/or modify it
1114+ under the terms of the GNU General Public License version 3, as published
1115+ by the Free Software Foundation.
1116+
1117+ This program is distributed in the hope that it will be useful, but
1118+ WITHOUT ANY WARRANTY; without even the implied warranties of
1119+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1120+ PURPOSE. See the GNU General Public License for more details.
1121+
1122+ You should have received a copy of the GNU General Public License along
1123+ with this program. If not, see <http://www.gnu.org/licenses/>.
1124+*/
1125+
1126+// Package logger defines a simple logger API with level of logging control.
1127+package logger
1128+
1129+import (
1130+ "fmt"
1131+ "io"
1132+ "log"
1133+ "os"
1134+ "runtime"
1135+)
1136+
1137+// Logger is a simple logger interface with logging at levels.
1138+type Logger interface {
1139+ // Errorf logs an error.
1140+ Errorf(format string, v ...interface{})
1141+ // Fatalf logs an error and exists the program with os.Exit(1).
1142+ Fatalf(format string, v ...interface{})
1143+ // Recoverf recover from a possible panic and logs it.
1144+ Recoverf(format string, v ...interface{})
1145+ // Infof logs a info message.
1146+ Infof(format string, v ...interface{})
1147+ // Debugf logs a debug message.
1148+ Debugf(format string, v ...interface{})
1149+}
1150+
1151+type simpleLogger struct {
1152+ *log.Logger
1153+ nlevel int
1154+}
1155+
1156+const (
1157+ lError = iota
1158+ lInfo
1159+ lDebug
1160+)
1161+
1162+var levelToNLevel = map[string]int{
1163+ "error": lError,
1164+ "info": lInfo,
1165+ "debug": lDebug,
1166+}
1167+
1168+// NewSimpleLogger creates a logger logging only up to the given level.
1169+// level can be in order: "error", "info", "debug".
1170+func NewSimpleLogger(w io.Writer, level string) Logger {
1171+ nlevel := levelToNLevel[level]
1172+ return &simpleLogger{
1173+ log.New(w, "", log.Ldate|log.Ltime|log.Lmicroseconds),
1174+ nlevel,
1175+ }
1176+}
1177+
1178+func (lg *simpleLogger) Errorf(format string, v ...interface{}) {
1179+ lg.Printf("ERROR "+format, v...)
1180+}
1181+
1182+var osExit = os.Exit // for testing
1183+
1184+func (lg *simpleLogger) Fatalf(format string, v ...interface{}) {
1185+ lg.Printf("ERROR "+format, v...)
1186+ osExit(1)
1187+}
1188+
1189+func (lg *simpleLogger) Recoverf(format string, v ...interface{}) {
1190+ if err := recover(); err != nil {
1191+ msg := fmt.Sprintf(format, v...)
1192+ stack := make([]byte, 8*1024) // Stack writes less but doesn't fail
1193+ stackWritten := runtime.Stack(stack, false)
1194+ stack = stack[:stackWritten]
1195+ lg.Printf("ERROR panic %v!! %s:\n%s", err, msg, stack)
1196+ }
1197+}
1198+
1199+func (lg *simpleLogger) Infof(format string, v ...interface{}) {
1200+ if lg.nlevel >= lInfo {
1201+ lg.Printf("INFO "+format, v...)
1202+ }
1203+}
1204+
1205+func (lg *simpleLogger) Debugf(format string, v ...interface{}) {
1206+ if lg.nlevel >= lDebug {
1207+ lg.Printf("DEBUG "+format, v...)
1208+ }
1209+}
1210
1211=== added file 'logger/logger_test.go'
1212--- logger/logger_test.go 1970-01-01 00:00:00 +0000
1213+++ logger/logger_test.go 2014-01-14 15:09:46 +0000
1214@@ -0,0 +1,117 @@
1215+/*
1216+ Copyright 2013-2014 Canonical Ltd.
1217+
1218+ This program is free software: you can redistribute it and/or modify it
1219+ under the terms of the GNU General Public License version 3, as published
1220+ by the Free Software Foundation.
1221+
1222+ This program is distributed in the hope that it will be useful, but
1223+ WITHOUT ANY WARRANTY; without even the implied warranties of
1224+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1225+ PURPOSE. See the GNU General Public License for more details.
1226+
1227+ You should have received a copy of the GNU General Public License along
1228+ with this program. If not, see <http://www.gnu.org/licenses/>.
1229+*/
1230+
1231+package logger
1232+
1233+import (
1234+ "bytes"
1235+ . "launchpad.net/gocheck"
1236+ "os"
1237+ "testing"
1238+)
1239+
1240+func TestLogger(t *testing.T) { TestingT(t) }
1241+
1242+type loggerSuite struct{}
1243+
1244+var _ = Suite(&loggerSuite{})
1245+
1246+func (s *loggerSuite) TestErrorf(c *C) {
1247+ buf := &bytes.Buffer{}
1248+ logger := NewSimpleLogger(buf, "error")
1249+ logger.Errorf("%v %d", "error", 1)
1250+ c.Check(buf.String(), Matches, ".* ERROR error 1\n")
1251+}
1252+
1253+func (s *loggerSuite) TestFatalf(c *C) {
1254+ defer func() {
1255+ osExit = os.Exit
1256+ }()
1257+ var exitCode int
1258+ osExit = func(code int) {
1259+ exitCode = code
1260+ }
1261+ buf := &bytes.Buffer{}
1262+ logger := NewSimpleLogger(buf, "error")
1263+ logger.Fatalf("%v %v", "error", "fatal")
1264+ c.Check(buf.String(), Matches, ".* ERROR error fatal\n")
1265+ c.Check(exitCode, Equals, 1)
1266+}
1267+
1268+func (s *loggerSuite) TestInfof(c *C) {
1269+ buf := &bytes.Buffer{}
1270+ logger := NewSimpleLogger(buf, "info")
1271+ logger.Infof("%v %d", "info", 1)
1272+ c.Check(buf.String(), Matches, ".* INFO info 1\n")
1273+}
1274+
1275+func (s *loggerSuite) TestDebugf(c *C) {
1276+ buf := &bytes.Buffer{}
1277+ logger := NewSimpleLogger(buf, "debug")
1278+ logger.Debugf("%v %d", "debug", 1)
1279+ c.Check(buf.String(), Matches, `.* DEBUG debug 1\n`)
1280+}
1281+
1282+func (s *loggerSuite) TestFormat(c *C) {
1283+ buf := &bytes.Buffer{}
1284+ logger := NewSimpleLogger(buf, "error")
1285+ logger.Errorf("%v %d", "error", 2)
1286+ c.Check(buf.String(), Matches, `.* .*\.\d+ ERROR error 2\n`)
1287+}
1288+
1289+func (s *loggerSuite) TestLevel(c *C) {
1290+ buf := &bytes.Buffer{}
1291+ logger := NewSimpleLogger(buf, "error")
1292+ logger.Errorf("%s%d", "e", 3)
1293+ logger.Infof("%s%d", "i", 3)
1294+ logger.Debugf("%s%d", "d", 3)
1295+ c.Check(buf.String(), Matches, `.* ERROR e3\n`)
1296+
1297+ buf.Reset()
1298+ logger = NewSimpleLogger(buf, "info")
1299+ logger.Errorf("%s%d", "e", 4)
1300+ logger.Debugf("%s%d", "d", 4)
1301+ logger.Infof("%s%d", "i", 4)
1302+ c.Check(buf.String(), Matches, `.* ERROR e4\n.* INFO i4\n`)
1303+
1304+ buf.Reset()
1305+ logger = NewSimpleLogger(buf, "debug")
1306+ logger.Errorf("%s%d", "e", 5)
1307+ logger.Debugf("%s%d", "d", 5)
1308+ logger.Infof("%s%d", "i", 5)
1309+ c.Check(buf.String(), Matches, `.* ERROR e5\n.* DEBUG d5\n.* INFO i5\n`)
1310+}
1311+
1312+func panicAndRecover(logger Logger, n int, doPanic bool) {
1313+ defer logger.Recoverf("%v %d", "panic", n)
1314+ if doPanic {
1315+ panic("Troubles")
1316+ }
1317+}
1318+
1319+func (s *loggerSuite) TestRecoverf(c *C) {
1320+ buf := &bytes.Buffer{}
1321+ logger := NewSimpleLogger(buf, "error")
1322+ panicAndRecover(logger, 6, true)
1323+ c.Check(buf.String(), Matches, "(?s).* ERROR panic Troubles!! panic 6:.*panicAndRecover.*")
1324+}
1325+
1326+func (s *loggerSuite) TestRecoverfNop(c *C) {
1327+ buf := &bytes.Buffer{}
1328+ logger := NewSimpleLogger(buf, "error")
1329+ panicAndRecover(logger, 6, false)
1330+ c.Check(buf.String(), Equals, "")
1331+}
1332
1333=== added directory 'protocol'
1334=== added file 'protocol/messages.go'
1335--- protocol/messages.go 1970-01-01 00:00:00 +0000
1336+++ protocol/messages.go 2014-01-14 15:09:46 +0000
1337@@ -0,0 +1,103 @@
1338+/*
1339+ Copyright 2013-2014 Canonical Ltd.
1340+
1341+ This program is free software: you can redistribute it and/or modify it
1342+ under the terms of the GNU General Public License version 3, as published
1343+ by the Free Software Foundation.
1344+
1345+ This program is distributed in the hope that it will be useful, but
1346+ WITHOUT ANY WARRANTY; without even the implied warranties of
1347+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1348+ PURPOSE. See the GNU General Public License for more details.
1349+
1350+ You should have received a copy of the GNU General Public License along
1351+ with this program. If not, see <http://www.gnu.org/licenses/>.
1352+*/
1353+
1354+package protocol
1355+
1356+// representative struct for messages
1357+
1358+import (
1359+ "encoding/json"
1360+ // "log"
1361+)
1362+
1363+// System channel id using a shortened hex encoded form for the NIL UUID
1364+const SystemChannelId = "0"
1365+
1366+// CONNECT message
1367+type ConnectMsg struct {
1368+ Type string `json:"T"`
1369+ ClientVer string
1370+ DeviceId string
1371+ Info map[string]interface{} `json:",omitempty"` // platform etc...
1372+ // maps channel ids (hex encoded UUIDs) to known client channel levels
1373+ Levels map[string]int64
1374+}
1375+
1376+// PING/PONG messages
1377+type PingPongMsg struct {
1378+ Type string `json:"T"`
1379+}
1380+
1381+const maxPayloadSize = 62 * 1024
1382+
1383+// SplittableMsg are messages that may require and are capable of splitting.
1384+type SplittableMsg interface {
1385+ Split() (done bool)
1386+}
1387+
1388+// BROADCAST messages
1389+type BroadcastMsg struct {
1390+ Type string `json:"T"`
1391+ AppId string `json:",omitempty"`
1392+ ChanId string
1393+ TopLevel int64
1394+ Payloads []json.RawMessage
1395+ splitting int
1396+}
1397+
1398+func (m *BroadcastMsg) Split() bool {
1399+ var prevTop int64
1400+ if m.splitting == 0 {
1401+ prevTop = m.TopLevel - int64(len(m.Payloads))
1402+ } else {
1403+ prevTop = m.TopLevel
1404+ m.Payloads = m.Payloads[len(m.Payloads):m.splitting]
1405+ m.TopLevel = prevTop + int64(len(m.Payloads))
1406+ }
1407+ payloads := m.Payloads
1408+ var size int
1409+ for i := range payloads {
1410+ size += len(payloads[i])
1411+ if size > maxPayloadSize {
1412+ m.TopLevel = prevTop + int64(i)
1413+ m.splitting = len(payloads)
1414+ m.Payloads = payloads[:i]
1415+ return false
1416+ }
1417+ }
1418+ return true
1419+}
1420+
1421+// NOTIFICATIONS message
1422+type NotificationsMsg struct {
1423+ Type string `json:"T"`
1424+ Notifications []Notification
1425+}
1426+
1427+// A single unicast notification
1428+type Notification struct {
1429+ AppId string `json:"A"`
1430+ MsgId string `json:"M"`
1431+ // payload
1432+ Payload json.RawMessage `json:"P"`
1433+}
1434+
1435+// ACKnowelgement message
1436+type AckMsg struct {
1437+ Type string `json:"T"`
1438+}
1439+
1440+// xxx ... query levels messages
1441
1442=== added file 'protocol/messages_test.go'
1443--- protocol/messages_test.go 1970-01-01 00:00:00 +0000
1444+++ protocol/messages_test.go 2014-01-14 15:09:46 +0000
1445@@ -0,0 +1,101 @@
1446+/*
1447+ Copyright 2013-2014 Canonical Ltd.
1448+
1449+ This program is free software: you can redistribute it and/or modify it
1450+ under the terms of the GNU General Public License version 3, as published
1451+ by the Free Software Foundation.
1452+
1453+ This program is distributed in the hope that it will be useful, but
1454+ WITHOUT ANY WARRANTY; without even the implied warranties of
1455+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1456+ PURPOSE. See the GNU General Public License for more details.
1457+
1458+ You should have received a copy of the GNU General Public License along
1459+ with this program. If not, see <http://www.gnu.org/licenses/>.
1460+*/
1461+
1462+package protocol
1463+
1464+import (
1465+ "encoding/json"
1466+ "fmt"
1467+ . "launchpad.net/gocheck"
1468+ "strings"
1469+)
1470+
1471+type messagesSuite struct{}
1472+
1473+var _ = Suite(&messagesSuite{})
1474+
1475+func (s *messagesSuite) TestSplitBroadcastMsgNop(c *C) {
1476+ b := &BroadcastMsg{
1477+ Type: "broadcast",
1478+ AppId: "APP",
1479+ ChanId: "0",
1480+ TopLevel: 2,
1481+ Payloads: []json.RawMessage{json.RawMessage(`{b:1}`), json.RawMessage(`{b:2}`)},
1482+ }
1483+ done := b.Split()
1484+ c.Check(done, Equals, true)
1485+ c.Check(b.TopLevel, Equals, int64(2))
1486+ c.Check(cap(b.Payloads), Equals, 2)
1487+ c.Check(len(b.Payloads), Equals, 2)
1488+}
1489+
1490+var payloadFmt = fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
1491+
1492+func manyParts(c int) []json.RawMessage {
1493+ payloads := make([]json.RawMessage, 0, 1)
1494+ for i := 0; i < c; i++ {
1495+ payloads = append(payloads, json.RawMessage(fmt.Sprintf(payloadFmt, i)))
1496+ }
1497+ return payloads
1498+}
1499+
1500+func (s *messagesSuite) TestSplitBroadcastMsgManyParts(c *C) {
1501+ payloads := manyParts(33)
1502+ n := len(payloads)
1503+ // more interesting this way
1504+ c.Assert(cap(payloads), Not(Equals), n)
1505+ b := &BroadcastMsg{
1506+ Type: "broadcast",
1507+ AppId: "APP",
1508+ ChanId: "0",
1509+ TopLevel: 500,
1510+ Payloads: payloads,
1511+ }
1512+ done := b.Split()
1513+ c.Assert(done, Equals, false)
1514+ n1 := len(b.Payloads)
1515+ c.Check(b.TopLevel, Equals, int64(500-n+n1))
1516+ buf, err := json.Marshal(b)
1517+ c.Assert(err, IsNil)
1518+ c.Assert(len(buf) <= 65535, Equals, true)
1519+ c.Check(len(buf)+len(payloads[n1]) > maxPayloadSize, Equals, true)
1520+ done = b.Split()
1521+ c.Assert(done, Equals, true)
1522+ n2 := len(b.Payloads)
1523+ c.Check(b.TopLevel, Equals, int64(500))
1524+ c.Check(n1+n2, Equals, n)
1525+
1526+ payloads = manyParts(61)
1527+ n = len(payloads)
1528+ b = &BroadcastMsg{
1529+ Type: "broadcast",
1530+ AppId: "APP",
1531+ ChanId: "0",
1532+ TopLevel: int64(n),
1533+ Payloads: payloads,
1534+ }
1535+ done = b.Split()
1536+ c.Assert(done, Equals, false)
1537+ n1 = len(b.Payloads)
1538+ done = b.Split()
1539+ c.Assert(done, Equals, false)
1540+ n2 = len(b.Payloads)
1541+ done = b.Split()
1542+ c.Assert(done, Equals, true)
1543+ n3 := len(b.Payloads)
1544+ c.Check(b.TopLevel, Equals, int64(n))
1545+ c.Check(n1+n2+n3, Equals, n)
1546+}
1547
1548=== added file 'protocol/protocol.go'
1549--- protocol/protocol.go 1970-01-01 00:00:00 +0000
1550+++ protocol/protocol.go 2014-01-14 15:09:46 +0000
1551@@ -0,0 +1,103 @@
1552+/*
1553+ Copyright 2013-2014 Canonical Ltd.
1554+
1555+ This program is free software: you can redistribute it and/or modify it
1556+ under the terms of the GNU General Public License version 3, as published
1557+ by the Free Software Foundation.
1558+
1559+ This program is distributed in the hope that it will be useful, but
1560+ WITHOUT ANY WARRANTY; without even the implied warranties of
1561+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1562+ PURPOSE. See the GNU General Public License for more details.
1563+
1564+ You should have received a copy of the GNU General Public License along
1565+ with this program. If not, see <http://www.gnu.org/licenses/>.
1566+*/
1567+
1568+// Package protocol has code to talk the client-daemon<->push-server protocol.
1569+package protocol
1570+
1571+import (
1572+ "bytes"
1573+ "encoding/binary"
1574+ "encoding/json"
1575+ "fmt"
1576+ "io"
1577+ "net"
1578+ "time"
1579+)
1580+
1581+// Protocol is a connection capable of writing and reading the wire format of protocol messages.
1582+type Protocol interface {
1583+ SetDeadline(t time.Time)
1584+ ReadMessage(msg interface{}) error
1585+ WriteMessage(msg interface{}) error
1586+}
1587+
1588+func ReadWireFormatVersion(conn net.Conn, exchangeTimeout time.Duration) (ver int, err error) {
1589+ var buf1 [1]byte
1590+ err = conn.SetReadDeadline(time.Now().Add(exchangeTimeout))
1591+ if err != nil {
1592+ panic(fmt.Errorf("can't set deadline: %v", err))
1593+ }
1594+ _, err = conn.Read(buf1[:])
1595+ ver = int(buf1[0])
1596+ return
1597+}
1598+
1599+const ProtocolWireVersion = 0
1600+
1601+// protocol0 handles version 0 of the wire format
1602+type protocol0 struct {
1603+ buffer *bytes.Buffer
1604+ enc *json.Encoder
1605+ conn net.Conn
1606+}
1607+
1608+// NewProtocol0 creates and initialises a protocol with wire format version 0.
1609+func NewProtocol0(conn net.Conn) *protocol0 {
1610+ buf := bytes.NewBuffer(make([]byte, 5000))
1611+ return &protocol0{
1612+ buffer: buf,
1613+ enc: json.NewEncoder(buf),
1614+ conn: conn}
1615+}
1616+
1617+// SetDeadline sets deadline for the subsquent WriteMessage/ReadMessage exchange
1618+func (c *protocol0) SetDeadline(t time.Time) {
1619+ err := c.conn.SetDeadline(t)
1620+ if err != nil {
1621+ panic(fmt.Errorf("can't set deadline: %v", err))
1622+ }
1623+}
1624+
1625+// ReadMessage reads one message made of big endian uint16 length, JSON body of length from the connection.
1626+func (c *protocol0) ReadMessage(msg interface{}) error {
1627+ c.buffer.Reset()
1628+ _, err := io.CopyN(c.buffer, c.conn, 2)
1629+ if err != nil {
1630+ return err
1631+ }
1632+ length := binary.BigEndian.Uint16(c.buffer.Bytes())
1633+ c.buffer.Reset()
1634+ _, err = io.CopyN(c.buffer, c.conn, int64(length))
1635+ if err != nil {
1636+ return err
1637+ }
1638+ return json.Unmarshal(c.buffer.Bytes(), msg)
1639+}
1640+
1641+// WriteMessage writes one message made of big endian uint16 length, JSON body of length to the connection.
1642+func (c *protocol0) WriteMessage(msg interface{}) error {
1643+ c.buffer.Reset()
1644+ c.buffer.WriteString("\x00\x00") // placeholder for length
1645+ err := c.enc.Encode(msg)
1646+ if err != nil {
1647+ panic(fmt.Errorf("WriteMessage got: %v", err))
1648+ }
1649+ msgLen := c.buffer.Len() - 3 // length space, extra newline
1650+ toWrite := c.buffer.Bytes()
1651+ binary.BigEndian.PutUint16(toWrite[:2], uint16(msgLen))
1652+ _, err = c.conn.Write(toWrite[:msgLen+2])
1653+ return err
1654+}
1655
1656=== added file 'protocol/protocol_test.go'
1657--- protocol/protocol_test.go 1970-01-01 00:00:00 +0000
1658+++ protocol/protocol_test.go 2014-01-14 15:09:46 +0000
1659@@ -0,0 +1,227 @@
1660+/*
1661+ Copyright 2013-2014 Canonical Ltd.
1662+
1663+ This program is free software: you can redistribute it and/or modify it
1664+ under the terms of the GNU General Public License version 3, as published
1665+ by the Free Software Foundation.
1666+
1667+ This program is distributed in the hope that it will be useful, but
1668+ WITHOUT ANY WARRANTY; without even the implied warranties of
1669+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1670+ PURPOSE. See the GNU General Public License for more details.
1671+
1672+ You should have received a copy of the GNU General Public License along
1673+ with this program. If not, see <http://www.gnu.org/licenses/>.
1674+*/
1675+
1676+package protocol
1677+
1678+import (
1679+ "encoding/binary"
1680+ "encoding/json"
1681+ // "fmt"
1682+ "io"
1683+ . "launchpad.net/gocheck"
1684+ "net"
1685+ "testing"
1686+ "time"
1687+)
1688+
1689+func TestProtocol(t *testing.T) { TestingT(t) }
1690+
1691+type protocolSuite struct{}
1692+
1693+var _ = Suite(&protocolSuite{})
1694+
1695+type deadline struct {
1696+ kind string
1697+ deadAfter time.Duration
1698+}
1699+
1700+func (d *deadline) setDeadAfter(t time.Time) {
1701+ deadAfter := t.Sub(time.Now())
1702+ d.deadAfter = (deadAfter + time.Millisecond/2) / time.Millisecond * time.Millisecond
1703+}
1704+
1705+type rw struct {
1706+ buf []byte
1707+ n int
1708+ err error
1709+}
1710+
1711+type testConn struct {
1712+ deadlines []*deadline
1713+ reads []rw
1714+ writes []*rw
1715+}
1716+
1717+func (tc *testConn) LocalAddr() net.Addr {
1718+ return nil
1719+}
1720+
1721+func (tc *testConn) RemoteAddr() net.Addr {
1722+ return nil
1723+}
1724+
1725+func (tc *testConn) Close() error {
1726+ return nil
1727+}
1728+
1729+func (tc *testConn) SetDeadline(t time.Time) error {
1730+ deadline := tc.deadlines[0]
1731+ deadline.kind = "both"
1732+ deadline.setDeadAfter(t)
1733+ tc.deadlines = tc.deadlines[1:]
1734+ return nil
1735+}
1736+
1737+func (tc *testConn) SetReadDeadline(t time.Time) error {
1738+ deadline := tc.deadlines[0]
1739+ deadline.kind = "read"
1740+ deadline.setDeadAfter(t)
1741+ tc.deadlines = tc.deadlines[1:]
1742+ return nil
1743+}
1744+
1745+func (tc *testConn) SetWriteDeadline(t time.Time) error {
1746+ deadline := tc.deadlines[0]
1747+ deadline.kind = "write"
1748+ deadline.setDeadAfter(t)
1749+ tc.deadlines = tc.deadlines[1:]
1750+ return nil
1751+}
1752+
1753+func (tc *testConn) Read(buf []byte) (n int, err error) {
1754+ read := tc.reads[0]
1755+ copy(buf, read.buf)
1756+ tc.reads = tc.reads[1:]
1757+ return read.n, read.err
1758+}
1759+
1760+func (tc *testConn) Write(buf []byte) (n int, err error) {
1761+ write := tc.writes[0]
1762+ n = copy(write.buf, buf)
1763+ write.buf = write.buf[:n]
1764+ write.n = n
1765+ err = write.err
1766+ tc.writes = tc.writes[1:]
1767+ return
1768+}
1769+
1770+func (s *protocolSuite) TestReadWireFormatVersion(c *C) {
1771+ deadl := deadline{}
1772+ read1 := rw{buf: []byte{42}, n: 1}
1773+ tc := &testConn{reads: []rw{read1}, deadlines: []*deadline{&deadl}}
1774+ ver, err := ReadWireFormatVersion(tc, time.Minute)
1775+ c.Check(err, IsNil)
1776+ c.Check(ver, Equals, 42)
1777+ c.Check(deadl.kind, Equals, "read")
1778+ c.Check(deadl.deadAfter, Equals, time.Minute)
1779+}
1780+
1781+func (s *protocolSuite) TestReadWireFormatVersionError(c *C) {
1782+ deadl := deadline{}
1783+ read1 := rw{err: io.EOF}
1784+ tc := &testConn{reads: []rw{read1}, deadlines: []*deadline{&deadl}}
1785+ _, err := ReadWireFormatVersion(tc, time.Minute)
1786+ c.Check(err, Equals, io.EOF)
1787+}
1788+
1789+func (s *protocolSuite) TestSetDeadline(c *C) {
1790+ deadl := deadline{}
1791+ tc := &testConn{deadlines: []*deadline{&deadl}}
1792+ pc := NewProtocol0(tc)
1793+ pc.SetDeadline(time.Now().Add(time.Minute))
1794+ c.Check(deadl.kind, Equals, "both")
1795+ c.Check(deadl.deadAfter, Equals, time.Minute)
1796+}
1797+
1798+type testMsg struct {
1799+ Type string `json:"T"`
1800+ A uint64
1801+}
1802+
1803+func lengthAsBytes(length uint16) []byte {
1804+ var buf [2]byte
1805+ var res = buf[:]
1806+ binary.BigEndian.PutUint16(res, length)
1807+ return res
1808+}
1809+
1810+func (s *protocolSuite) TestReadMessage(c *C) {
1811+ msgBuf, _ := json.Marshal(testMsg{Type: "msg", A: 2000})
1812+ readMsgLen := rw{buf: lengthAsBytes(uint16(len(msgBuf))), n: 2}
1813+ readMsgBody := rw{buf: msgBuf, n: len(msgBuf)}
1814+ tc := &testConn{reads: []rw{readMsgLen, readMsgBody}}
1815+ pc := NewProtocol0(tc)
1816+ var recvMsg testMsg
1817+ err := pc.ReadMessage(&recvMsg)
1818+ c.Check(err, IsNil)
1819+ c.Check(recvMsg, DeepEquals, testMsg{Type: "msg", A: 2000})
1820+}
1821+
1822+func (s *protocolSuite) TestReadMessageBits(c *C) {
1823+ msgBuf, _ := json.Marshal(testMsg{Type: "msg", A: 2000})
1824+ readMsgLen := rw{buf: lengthAsBytes(uint16(len(msgBuf))), n: 2}
1825+ readMsgBody1 := rw{buf: msgBuf[:5], n: 5}
1826+ readMsgBody2 := rw{buf: msgBuf[5:], n: len(msgBuf) - 5}
1827+ tc := &testConn{reads: []rw{readMsgLen, readMsgBody1, readMsgBody2}}
1828+ pc := NewProtocol0(tc)
1829+ var recvMsg testMsg
1830+ err := pc.ReadMessage(&recvMsg)
1831+ c.Check(err, IsNil)
1832+ c.Check(recvMsg, DeepEquals, testMsg{Type: "msg", A: 2000})
1833+}
1834+
1835+func (s *protocolSuite) TestReadMessageIOErrors(c *C) {
1836+ msgBuf, _ := json.Marshal(testMsg{Type: "msg", A: 2000})
1837+ readMsgLenErr := rw{n: 1, err: io.ErrClosedPipe}
1838+ tc1 := &testConn{reads: []rw{readMsgLenErr}}
1839+ pc1 := NewProtocol0(tc1)
1840+ var recvMsg testMsg
1841+ err := pc1.ReadMessage(&recvMsg)
1842+ c.Check(err, Equals, io.ErrClosedPipe)
1843+
1844+ readMsgLen := rw{buf: lengthAsBytes(uint16(len(msgBuf))), n: 2}
1845+ readMsgBody1 := rw{buf: msgBuf[:5], n: 5}
1846+ readMsgBody2Err := rw{n: 2, err: io.EOF}
1847+ tc2 := &testConn{reads: []rw{readMsgLen, readMsgBody1, readMsgBody2Err}}
1848+ pc2 := NewProtocol0(tc2)
1849+ err = pc2.ReadMessage(&recvMsg)
1850+ c.Check(err, Equals, io.EOF)
1851+}
1852+
1853+func (s *protocolSuite) TestReadMessageBrokenJSON(c *C) {
1854+ msgBuf := []byte("{\"T\"}")
1855+ readMsgLen := rw{buf: lengthAsBytes(uint16(len(msgBuf))), n: 2}
1856+ readMsgBody := rw{buf: msgBuf, n: len(msgBuf)}
1857+ tc := &testConn{reads: []rw{readMsgLen, readMsgBody}}
1858+ pc := NewProtocol0(tc)
1859+ var recvMsg testMsg
1860+ err := pc.ReadMessage(&recvMsg)
1861+ c.Check(err, FitsTypeOf, &json.SyntaxError{})
1862+}
1863+
1864+func (s *protocolSuite) TestWriteMessage(c *C) {
1865+ writeMsg := rw{buf: make([]byte, 64)}
1866+ tc := &testConn{writes: []*rw{&writeMsg}}
1867+ pc := NewProtocol0(tc)
1868+ msg := testMsg{Type: "m", A: 9999}
1869+ err := pc.WriteMessage(&msg)
1870+ c.Check(err, IsNil)
1871+ var msgLen int = int(binary.BigEndian.Uint16(writeMsg.buf[:2]))
1872+ c.Check(msgLen, Equals, len(writeMsg.buf)-2)
1873+ var wroteMsg testMsg
1874+ formatErr := json.Unmarshal(writeMsg.buf[2:], &wroteMsg)
1875+ c.Check(formatErr, IsNil)
1876+ c.Check(wroteMsg, DeepEquals, testMsg{Type: "m", A: 9999})
1877+}
1878+
1879+func (s *protocolSuite) TestWriteMessageIOErrors(c *C) {
1880+ writeMsgErr := rw{buf: make([]byte, 0), err: io.ErrClosedPipe}
1881+ tc1 := &testConn{writes: []*rw{&writeMsgErr}}
1882+ pc1 := NewProtocol0(tc1)
1883+ msg := testMsg{Type: "m", A: 9999}
1884+ err := pc1.WriteMessage(&msg)
1885+ c.Check(err, Equals, io.ErrClosedPipe)
1886+}
1887
1888=== added directory 'scripts'
1889=== added file 'scripts/check_fmt'
1890--- scripts/check_fmt 1970-01-01 00:00:00 +0000
1891+++ scripts/check_fmt 2014-01-14 15:09:46 +0000
1892@@ -0,0 +1,14 @@
1893+#!/bin/bash
1894+# check that all go files respect gofmt formatting
1895+# assumes GOPATH is properly set
1896+PROJECT=${1:?missing project}
1897+PROBLEMS=
1898+for pkg in $(go list ${PROJECT}/...) ; do
1899+ NONCOMPLIANT=$(gofmt -l ${GOPATH}/src/${pkg}/*.go)
1900+ if [ -n "${NONCOMPLIANT}" ]; then
1901+ echo pkg $pkg has some gofmt non-compliant files:
1902+ echo ${NONCOMPLIANT}|xargs -d ' ' -n1 basename
1903+ PROBLEMS="y"
1904+ fi
1905+done
1906+test -z "${PROBLEMS}"
1907
1908=== added directory 'server'
1909=== added directory 'server/acceptance'
1910=== added file 'server/acceptance/acceptance.sh'
1911--- server/acceptance/acceptance.sh 1970-01-01 00:00:00 +0000
1912+++ server/acceptance/acceptance.sh 2014-01-14 15:09:46 +0000
1913@@ -0,0 +1,6 @@
1914+# run acceptance tests, expects properly setup GOPATH and deps
1915+# can set extra build params like -race with BUILD_FLAGS envvar
1916+set -ex
1917+go test $BUILD_FLAGS -i
1918+go build $BUILD_FLAGS -o testserver launchpad.net/ubuntu-push/server/dev
1919+go test $BUILD_FLAGS -server ./testserver $*
1920
1921=== added file 'server/acceptance/acceptance_test.go'
1922--- server/acceptance/acceptance_test.go 1970-01-01 00:00:00 +0000
1923+++ server/acceptance/acceptance_test.go 2014-01-14 15:09:46 +0000
1924@@ -0,0 +1,487 @@
1925+/*
1926+ Copyright 2013-2014 Canonical Ltd.
1927+
1928+ This program is free software: you can redistribute it and/or modify it
1929+ under the terms of the GNU General Public License version 3, as published
1930+ by the Free Software Foundation.
1931+
1932+ This program is distributed in the hope that it will be useful, but
1933+ WITHOUT ANY WARRANTY; without even the implied warranties of
1934+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1935+ PURPOSE. See the GNU General Public License for more details.
1936+
1937+ You should have received a copy of the GNU General Public License along
1938+ with this program. If not, see <http://www.gnu.org/licenses/>.
1939+*/
1940+
1941+package acceptance
1942+
1943+import (
1944+ "bufio"
1945+ "bytes"
1946+ "encoding/json"
1947+ "flag"
1948+ "fmt"
1949+ "io/ioutil"
1950+ . "launchpad.net/gocheck"
1951+ "launchpad.net/ubuntu-push/protocol"
1952+ "launchpad.net/ubuntu-push/server/api"
1953+ "net"
1954+ "net/http"
1955+ "os"
1956+ "os/exec"
1957+ "path/filepath"
1958+ "regexp"
1959+ "runtime"
1960+ "strings"
1961+ "testing"
1962+ "time"
1963+)
1964+
1965+func TestAcceptance(t *testing.T) { TestingT(t) }
1966+
1967+type acceptanceSuite struct {
1968+ server *exec.Cmd
1969+ serverAddr string
1970+ serverURL string
1971+ serverEvents <-chan string
1972+ httpClient *http.Client
1973+}
1974+
1975+var _ = Suite(&acceptanceSuite{})
1976+
1977+var serverCmd = flag.String("server", "", "server to test")
1978+
1979+// SourceRelative produces a path relative to the source code, makes
1980+// sense only for tests when the code is available on disk.
1981+// xxx later move it to a testing helpers package
1982+func SourceRelative(relativePath string) string {
1983+ _, file, _, ok := runtime.Caller(1)
1984+ if !ok {
1985+ panic("failed to get source filename using Caller()")
1986+ }
1987+ return filepath.Join(filepath.Dir(file), relativePath)
1988+}
1989+
1990+func testServerConfig(addr, httpAddr string) map[string]interface{} {
1991+ cfg := map[string]interface{}{
1992+ "exchange_timeout": "0.1s",
1993+ "ping_interval": "0.5s",
1994+ "session_queue_size": 10,
1995+ "broker_queue_size": 100,
1996+ "addr": addr,
1997+ "key_pem_file": SourceRelative("config/testing.key"),
1998+ "cert_pem_file": SourceRelative("config/testing.cert"),
1999+ "http_addr": httpAddr,
2000+ "http_read_timeout": "1s",
2001+ "http_write_timeout": "1s",
2002+ }
2003+ return cfg
2004+}
2005+
2006+func testClientSession(addr string, deviceId string, reportPings bool) *ClientSession {
2007+ certPEMBlock, err := ioutil.ReadFile(SourceRelative("config/testing.cert"))
2008+ if err != nil {
2009+ panic(fmt.Sprintf("could not read config/testing.cert: %v", err))
2010+ }
2011+ return &ClientSession{
2012+ ExchangeTimeout: 100 * time.Millisecond,
2013+ PingInterval: 500 * time.Millisecond,
2014+ ServerAddr: addr,
2015+ CertPEMBlock: certPEMBlock,
2016+ DeviceId: deviceId,
2017+ ReportPings: reportPings,
2018+ }
2019+}
2020+
2021+const (
2022+ devListeningOnPat = "INFO listening for devices on "
2023+ httpListeningOnPat = "INFO listening for http on "
2024+)
2025+
2026+var rxLineInfo = regexp.MustCompile("^.*? ([[:alpha:]].*)\n")
2027+
2028+func extractListeningAddr(c *C, pat, line string) string {
2029+ if !strings.HasPrefix(line, pat) {
2030+ c.Fatalf("server: %v", line)
2031+ }
2032+ return line[len(pat):]
2033+}
2034+
2035+// start a new server for each test
2036+func (s *acceptanceSuite) SetUpTest(c *C) {
2037+ if *serverCmd == "" {
2038+ c.Skip("executable server not specified")
2039+ }
2040+ tmpDir := c.MkDir()
2041+ cfgFilename := filepath.Join(tmpDir, "config.json")
2042+ cfgJson, err := json.Marshal(testServerConfig("127.0.0.1:0", "127.0.0.1:0"))
2043+ if err != nil {
2044+ c.Fatal(err)
2045+ }
2046+ err = ioutil.WriteFile(cfgFilename, cfgJson, os.ModePerm)
2047+ if err != nil {
2048+ c.Fatal(err)
2049+ }
2050+ server := exec.Command(*serverCmd, cfgFilename)
2051+ stderr, err := server.StderrPipe()
2052+ if err != nil {
2053+ c.Fatal(err)
2054+ }
2055+ err = server.Start()
2056+ if err != nil {
2057+ c.Fatal(err)
2058+ }
2059+ bufErr := bufio.NewReaderSize(stderr, 5000)
2060+ getLineInfo := func() (string, error) {
2061+ line, err := bufErr.ReadString('\n')
2062+ if err != nil {
2063+ return "", err
2064+ }
2065+ extracted := rxLineInfo.FindStringSubmatch(line)
2066+ if extracted == nil {
2067+ return "", fmt.Errorf("unexpected server line: %#v", line)
2068+ }
2069+ return extracted[1], nil
2070+ }
2071+ infoHTTP, err := getLineInfo()
2072+ if err != nil {
2073+ c.Fatal(err)
2074+ }
2075+ serverHTTPAddr := extractListeningAddr(c, httpListeningOnPat, infoHTTP)
2076+ s.serverURL = fmt.Sprintf("http://%s", serverHTTPAddr)
2077+ info, err := getLineInfo()
2078+ if err != nil {
2079+ c.Fatal(err)
2080+ }
2081+ s.serverAddr = extractListeningAddr(c, devListeningOnPat, info)
2082+ s.server = server
2083+ serverEvents := make(chan string, 5)
2084+ s.serverEvents = serverEvents
2085+ go func() {
2086+ for {
2087+ info, err := getLineInfo()
2088+ if err != nil {
2089+ serverEvents <- fmt.Sprintf("ERROR: %v", err)
2090+ close(serverEvents)
2091+ return
2092+ }
2093+ serverEvents <- info
2094+ }
2095+ }()
2096+ s.httpClient = &http.Client{}
2097+}
2098+
2099+func (s *acceptanceSuite) TearDownTest(c *C) {
2100+ if s.server != nil {
2101+ s.server.Process.Kill()
2102+ s.server = nil
2103+ }
2104+}
2105+
2106+// nextEvent receives an event from given channel with a 5s timeout
2107+func nextEvent(events <-chan string, errCh <-chan error) string {
2108+ select {
2109+ case <-time.After(5 * time.Second):
2110+ panic("too long stuck waiting for next event")
2111+ case err := <-errCh:
2112+ return err.Error() // will fail comparison typically
2113+ case evStr := <-events:
2114+ return evStr
2115+ }
2116+}
2117+
2118+// Tests about connection, ping-pong, disconnection scenarios
2119+
2120+// typically combined with -gocheck.vv or test selection
2121+var logTraffic = flag.Bool("logTraffic", false, "log traffic")
2122+
2123+type connInterceptor func(ic *interceptingConn, op string, b []byte) (bool, int, error)
2124+
2125+type interceptingConn struct {
2126+ net.Conn
2127+ totalRead int
2128+ totalWritten int
2129+ intercept connInterceptor
2130+}
2131+
2132+func (ic *interceptingConn) Write(b []byte) (n int, err error) {
2133+ done := false
2134+ before := ic.totalWritten
2135+ if ic.intercept != nil {
2136+ done, n, err = ic.intercept(ic, "write", b)
2137+ }
2138+ if !done {
2139+ n, err = ic.Conn.Write(b)
2140+ }
2141+ ic.totalWritten += n
2142+ if *logTraffic {
2143+ fmt.Printf("W[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalWritten)
2144+ }
2145+ return
2146+}
2147+
2148+func (ic *interceptingConn) Read(b []byte) (n int, err error) {
2149+ done := false
2150+ before := ic.totalRead
2151+ if ic.intercept != nil {
2152+ done, n, err = ic.intercept(ic, "read", b)
2153+ }
2154+ if !done {
2155+ n, err = ic.Conn.Read(b)
2156+ }
2157+ ic.totalRead += n
2158+ if *logTraffic {
2159+ fmt.Printf("R[%v]: %d %#v %v %d\n", ic.Conn.LocalAddr(), before, string(b[:n]), err, ic.totalRead)
2160+ }
2161+ return
2162+}
2163+
2164+func (s *acceptanceSuite) TestConnectPingPing(c *C) {
2165+ errCh := make(chan error, 1)
2166+ events := make(chan string, 10)
2167+ sess := testClientSession(s.serverAddr, "DEVA", true)
2168+ err := sess.Dial()
2169+ c.Assert(err, IsNil)
2170+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
2171+ // would be 3rd ping read, based on logged traffic
2172+ if op == "read" && ic.totalRead >= 28 {
2173+ // exit the sess.Run() goroutine, client will close
2174+ runtime.Goexit()
2175+ }
2176+ return false, 0, nil
2177+ }
2178+ sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
2179+ go func() {
2180+ errCh <- sess.Run(events)
2181+ }()
2182+ connectCli := nextEvent(events, errCh)
2183+ connectSrv := nextEvent(s.serverEvents, nil)
2184+ registeredSrv := nextEvent(s.serverEvents, nil)
2185+ tconnect := time.Now()
2186+ c.Assert(connectSrv, Matches, ".*session.* connected .*")
2187+ c.Assert(registeredSrv, Matches, ".*session.* registered DEVA")
2188+ c.Assert(strings.HasSuffix(connectSrv, connectCli), Equals, true)
2189+ c.Assert(nextEvent(events, errCh), Equals, "Ping")
2190+ elapsedOfPing := float64(time.Since(tconnect)) / float64(500*time.Millisecond)
2191+ c.Check(elapsedOfPing >= 1.0, Equals, true)
2192+ c.Check(elapsedOfPing < 1.05, Equals, true)
2193+ c.Assert(nextEvent(events, errCh), Equals, "Ping")
2194+ c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* ended with: EOF")
2195+ c.Check(len(errCh), Equals, 0)
2196+}
2197+
2198+func (s *acceptanceSuite) TestConnectPingNeverPong(c *C) {
2199+ errCh := make(chan error, 1)
2200+ events := make(chan string, 10)
2201+ sess := testClientSession(s.serverAddr, "DEVB", true)
2202+ err := sess.Dial()
2203+ c.Assert(err, IsNil)
2204+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
2205+ // would be pong to 2nd ping, based on logged traffic
2206+ if op == "write" && ic.totalRead >= 28 {
2207+ time.Sleep(200 * time.Millisecond)
2208+ // exit the sess.Run() goroutine, client will close
2209+ runtime.Goexit()
2210+ }
2211+ return false, 0, nil
2212+ }
2213+ sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
2214+ go func() {
2215+ errCh <- sess.Run(events)
2216+ }()
2217+ c.Assert(nextEvent(events, errCh), Matches, "connected .*")
2218+ c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")
2219+ c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* registered .*")
2220+ c.Assert(nextEvent(events, errCh), Equals, "Ping")
2221+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*timeout`)
2222+ c.Check(len(errCh), Equals, 0)
2223+}
2224+
2225+// Tests about broadcast
2226+
2227+func (s *acceptanceSuite) postRequest(path string, message interface{}) (string, error) {
2228+ packedMessage, err := json.Marshal(message)
2229+ if err != nil {
2230+ panic(err)
2231+ }
2232+ reader := bytes.NewReader(packedMessage)
2233+
2234+ url := s.serverURL + path
2235+ request, _ := http.NewRequest("POST", url, reader)
2236+ request.ContentLength = int64(reader.Len())
2237+ request.Header.Set("Content-Type", "application/json")
2238+
2239+ resp, err := s.httpClient.Do(request)
2240+ if err != nil {
2241+ panic(err)
2242+ }
2243+ defer resp.Body.Close()
2244+ body, err := ioutil.ReadAll(resp.Body)
2245+ return string(body), err
2246+}
2247+
2248+func (s *acceptanceSuite) startClient(c *C, devId string, intercept connInterceptor, levels map[string]int64) (<-chan string, <-chan error) {
2249+ errCh := make(chan error, 1)
2250+ events := make(chan string, 10)
2251+ sess := testClientSession(s.serverAddr, devId, false)
2252+ sess.Levels = levels
2253+ err := sess.Dial()
2254+ c.Assert(err, IsNil)
2255+ sess.Connection = &interceptingConn{sess.Connection, 0, 0, intercept}
2256+ go func() {
2257+ errCh <- sess.Run(events)
2258+ }()
2259+ c.Assert(nextEvent(events, errCh), Matches, "connected .*")
2260+ c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* connected .*")
2261+ c.Assert(nextEvent(s.serverEvents, nil), Matches, ".*session.* registered "+devId)
2262+ return events, errCh
2263+}
2264+
2265+func (s *acceptanceSuite) TestBroadcastToConnected(c *C) {
2266+ clientShutdown := make(chan bool, 1) // abused as an atomic flag
2267+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
2268+ // read after ack
2269+ if op == "read" && len(clientShutdown) > 0 {
2270+ // exit the sess.Run() goroutine, client will close
2271+ runtime.Goexit()
2272+ }
2273+ return false, 0, nil
2274+ }
2275+ events, errCh := s.startClient(c, "DEVB", intercept, nil)
2276+ got, err := s.postRequest("/broadcast", &api.Broadcast{
2277+ Channel: "system",
2278+ Data: json.RawMessage(`{"n": 42}`),
2279+ })
2280+ c.Check(err, IsNil)
2281+ c.Check(got, Matches, ".*ok.*")
2282+ c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
2283+ clientShutdown <- true
2284+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
2285+ c.Check(len(errCh), Equals, 0)
2286+}
2287+
2288+func (s *acceptanceSuite) TestBroadcastPending(c *C) {
2289+ // send broadcast that will be pending
2290+ got, err := s.postRequest("/broadcast", &api.Broadcast{
2291+ Channel: "system",
2292+ Data: json.RawMessage(`{"b": 1}`),
2293+ })
2294+ c.Check(err, IsNil)
2295+ c.Check(got, Matches, ".*ok.*")
2296+
2297+ clientShutdown := make(chan bool, 1) // abused as an atomic flag
2298+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
2299+ // read after ack
2300+ if op == "read" && len(clientShutdown) > 0 {
2301+ // exit the sess.Run() goroutine, client will close
2302+ runtime.Goexit()
2303+ }
2304+ return false, 0, nil
2305+ }
2306+ events, errCh := s.startClient(c, "DEVB", intercept, nil)
2307+ // gettting pending on connect
2308+ c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
2309+ clientShutdown <- true
2310+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
2311+ c.Check(len(errCh), Equals, 0)
2312+}
2313+
2314+func (s *acceptanceSuite) TestBroadcasLargeNeedsSplitting(c *C) {
2315+ // send bunch of broadcasts that will be pending
2316+ payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
2317+ for i := 0; i < 32; i++ {
2318+ got, err := s.postRequest("/broadcast", &api.Broadcast{
2319+ Channel: "system",
2320+ Data: json.RawMessage(fmt.Sprintf(payloadFmt, i)),
2321+ })
2322+ c.Check(err, IsNil)
2323+ c.Check(got, Matches, ".*ok.*")
2324+ }
2325+
2326+ clientShutdown := make(chan bool, 1) // abused as an atomic flag
2327+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
2328+ // read after ack
2329+ if op == "read" && len(clientShutdown) > 0 {
2330+ // exit the sess.Run() goroutine, client will close
2331+ runtime.Goexit()
2332+ }
2333+ return false, 0, nil
2334+ }
2335+ events, errCh := s.startClient(c, "DEVC", intercept, nil)
2336+ // gettting pending on connect
2337+ c.Check(nextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:30 payloads:\[{"b":0,.*`)
2338+ c.Check(nextEvent(events, errCh), Matches, `broadcast chan:0 app: topLevel:32 payloads:\[.*`)
2339+ clientShutdown <- true
2340+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
2341+ c.Check(len(errCh), Equals, 0)
2342+}
2343+
2344+func (s *acceptanceSuite) TestBroadcastDistribution2(c *C) {
2345+ clientShutdown := make(chan bool, 1) // abused as an atomic flag
2346+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
2347+ // read after ack
2348+ if op == "read" && len(clientShutdown) > 0 {
2349+ // exit the sess.Run() goroutine, client will close
2350+ runtime.Goexit()
2351+ }
2352+ return false, 0, nil
2353+ }
2354+ // start 1st clinet
2355+ events1, errCh1 := s.startClient(c, "DEV1", intercept, nil)
2356+ // start 2nd client
2357+ events2, errCh2 := s.startClient(c, "DEV2", intercept, nil)
2358+ // broadcast
2359+ got, err := s.postRequest("/broadcast", &api.Broadcast{
2360+ Channel: "system",
2361+ Data: json.RawMessage(`{"n": 42}`),
2362+ })
2363+ c.Check(err, IsNil)
2364+ c.Check(got, Matches, ".*ok.*")
2365+ c.Check(nextEvent(events1, errCh1), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
2366+ c.Check(nextEvent(events2, errCh2), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"n":42}]`)
2367+ clientShutdown <- true
2368+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
2369+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
2370+ c.Check(len(errCh1), Equals, 0)
2371+ c.Check(len(errCh2), Equals, 0)
2372+}
2373+
2374+func (s *acceptanceSuite) TestBroadcastFilterByLevel(c *C) {
2375+ clientShutdown := make(chan bool, 1) // abused as an atomic flag
2376+ intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
2377+ // read after ack
2378+ if op == "read" && len(clientShutdown) > 0 {
2379+ // exit the sess.Run() goroutine, client will close
2380+ runtime.Goexit()
2381+ }
2382+ return false, 0, nil
2383+ }
2384+ events, errCh := s.startClient(c, "DEVD", intercept, nil)
2385+ got, err := s.postRequest("/broadcast", &api.Broadcast{
2386+ Channel: "system",
2387+ Data: json.RawMessage(`{"b": 1}`),
2388+ })
2389+ c.Check(err, IsNil)
2390+ c.Check(got, Matches, ".*ok.*")
2391+ c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[{"b":1}]`)
2392+ clientShutdown <- true
2393+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
2394+ c.Check(len(errCh), Equals, 0)
2395+ // another broadcast
2396+ got, err = s.postRequest("/broadcast", &api.Broadcast{
2397+ Channel: "system",
2398+ Data: json.RawMessage(`{"b": 2}`),
2399+ })
2400+ c.Check(err, IsNil)
2401+ c.Check(got, Matches, ".*ok.*")
2402+ // reconnect, provide levels, get only later notification
2403+ <-clientShutdown // reset
2404+ events, errCh = s.startClient(c, "DEVD", intercept, map[string]int64{
2405+ protocol.SystemChannelId: 1,
2406+ })
2407+ c.Check(nextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"b":2}]`)
2408+ clientShutdown <- true
2409+ c.Assert(nextEvent(s.serverEvents, nil), Matches, `.* ended with:.*EOF`)
2410+ c.Check(len(errCh), Equals, 0)
2411+}
2412
2413=== added file 'server/acceptance/acceptanceclient.go'
2414--- server/acceptance/acceptanceclient.go 1970-01-01 00:00:00 +0000
2415+++ server/acceptance/acceptanceclient.go 2014-01-14 15:09:46 +0000
2416@@ -0,0 +1,123 @@
2417+/*
2418+ Copyright 2013-2014 Canonical Ltd.
2419+
2420+ This program is free software: you can redistribute it and/or modify it
2421+ under the terms of the GNU General Public License version 3, as published
2422+ by the Free Software Foundation.
2423+
2424+ This program is distributed in the hope that it will be useful, but
2425+ WITHOUT ANY WARRANTY; without even the implied warranties of
2426+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
2427+ PURPOSE. See the GNU General Public License for more details.
2428+
2429+ You should have received a copy of the GNU General Public License along
2430+ with this program. If not, see <http://www.gnu.org/licenses/>.
2431+*/
2432+
2433+package acceptance
2434+
2435+import (
2436+ "crypto/tls"
2437+ "crypto/x509"
2438+ "encoding/json"
2439+ "errors"
2440+ "fmt"
2441+ "launchpad.net/ubuntu-push/protocol"
2442+ "net"
2443+ "time"
2444+)
2445+
2446+var wireVersionBytes = []byte{protocol.ProtocolWireVersion}
2447+
2448+// ClienSession holds a client<->server session and its configuration.
2449+type ClientSession struct {
2450+ // configuration
2451+ DeviceId string
2452+ ServerAddr string
2453+ PingInterval time.Duration
2454+ ExchangeTimeout time.Duration
2455+ CertPEMBlock []byte
2456+ ReportPings bool
2457+ Levels map[string]int64
2458+ // connection
2459+ Connection net.Conn
2460+}
2461+
2462+// Dial connects to a server using the configuration in the ClientSession
2463+// and sets up the connection.
2464+func (sess *ClientSession) Dial() error {
2465+ conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout)
2466+ if err != nil {
2467+ return err
2468+ }
2469+ tlsConfig := &tls.Config{}
2470+ if sess.CertPEMBlock != nil {
2471+ cp := x509.NewCertPool()
2472+ ok := cp.AppendCertsFromPEM(sess.CertPEMBlock)
2473+ if !ok {
2474+ return errors.New("dial: could not parse certificate")
2475+ }
2476+ tlsConfig.RootCAs = cp
2477+ }
2478+ sess.Connection = tls.Client(conn, tlsConfig)
2479+ return nil
2480+}
2481+
2482+type serverMsg struct {
2483+ Type string `json:"T"`
2484+ protocol.BroadcastMsg
2485+ protocol.NotificationsMsg
2486+}
2487+
2488+// Run the session with the server, emits a stream of events.
2489+func (sess *ClientSession) Run(events chan<- string) error {
2490+ conn := sess.Connection
2491+ defer conn.Close()
2492+ conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
2493+ _, err := conn.Write(wireVersionBytes)
2494+ if err != nil {
2495+ return err
2496+ }
2497+ proto := protocol.NewProtocol0(conn)
2498+ err = proto.WriteMessage(protocol.ConnectMsg{
2499+ Type: "connect",
2500+ DeviceId: sess.DeviceId,
2501+ Levels: sess.Levels,
2502+ })
2503+ if err != nil {
2504+ return err
2505+ }
2506+ events <- fmt.Sprintf("connected %v", conn.LocalAddr())
2507+ var recv serverMsg
2508+ for {
2509+ deadAfter := sess.PingInterval + sess.ExchangeTimeout
2510+ conn.SetDeadline(time.Now().Add(deadAfter))
2511+ err = proto.ReadMessage(&recv)
2512+ if err != nil {
2513+ return err
2514+ }
2515+ switch recv.Type {
2516+ case "ping":
2517+ conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
2518+ err := proto.WriteMessage(protocol.PingPongMsg{Type: "pong"})
2519+ if err != nil {
2520+ return err
2521+ }
2522+ if sess.ReportPings {
2523+ events <- "Ping"
2524+ }
2525+ case "broadcast":
2526+ conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
2527+ err := proto.WriteMessage(protocol.PingPongMsg{Type: "ack"})
2528+ if err != nil {
2529+ return err
2530+ }
2531+ pack, err := json.Marshal(recv.Payloads)
2532+ if err != nil {
2533+ return err
2534+ }
2535+ events <- fmt.Sprintf("broadcast chan:%v app:%v topLevel:%d payloads:%s", recv.ChanId, recv.AppId, recv.TopLevel, pack)
2536+ }
2537+ }
2538+ return nil
2539+}
2540
2541=== added directory 'server/acceptance/cmd'
2542=== added file 'server/acceptance/cmd/acceptanceclient.go'
2543--- server/acceptance/cmd/acceptanceclient.go 1970-01-01 00:00:00 +0000
2544+++ server/acceptance/cmd/acceptanceclient.go 2014-01-14 15:09:46 +0000
2545@@ -0,0 +1,79 @@
2546+/*
2547+ Copyright 2013-2014 Canonical Ltd.
2548+
2549+ This program is free software: you can redistribute it and/or modify it
2550+ under the terms of the GNU General Public License version 3, as published
2551+ by the Free Software Foundation.
2552+
2553+ This program is distributed in the hope that it will be useful, but
2554+ WITHOUT ANY WARRANTY; without even the implied warranties of
2555+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
2556+ PURPOSE. See the GNU General Public License for more details.
2557+
2558+ You should have received a copy of the GNU General Public License along
2559+ with this program. If not, see <http://www.gnu.org/licenses/>.
2560+*/
2561+
2562+// acceptanceclient for playing
2563+package main
2564+
2565+import (
2566+ "launchpad.net/ubuntu-push/config"
2567+ "launchpad.net/ubuntu-push/server/acceptance"
2568+ "log"
2569+ "os"
2570+ "path/filepath"
2571+)
2572+
2573+type configuration struct {
2574+ // session configuration
2575+ PingInterval config.ConfigTimeDuration `json:"ping_interval"`
2576+ ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"`
2577+ // server connection config
2578+ Addr config.ConfigHostPort
2579+ CertPEMFile string `json:"cert_pem_file"`
2580+}
2581+
2582+func main() {
2583+ // xxx configure logging
2584+ switch {
2585+ case len(os.Args) < 2:
2586+ log.Fatal("missing config file")
2587+ case len(os.Args) < 3:
2588+ log.Fatal("missing device-id")
2589+ }
2590+ configFName := os.Args[1]
2591+ f, err := os.Open(configFName)
2592+ if err != nil {
2593+ log.Fatalf("reading config: %v", err)
2594+ }
2595+ cfg := &configuration{}
2596+ err = config.ReadConfig(f, cfg)
2597+ if err != nil {
2598+ log.Fatalf("reading config: %v", err)
2599+ }
2600+ session := &acceptance.ClientSession{
2601+ ExchangeTimeout: cfg.ExchangeTimeout.TimeDuration(),
2602+ PingInterval: cfg.PingInterval.TimeDuration(),
2603+ ServerAddr: cfg.Addr.HostPort(),
2604+ DeviceId: os.Args[2],
2605+ }
2606+ session.CertPEMBlock, err = config.LoadFile(cfg.CertPEMFile, filepath.Dir(configFName))
2607+ if err != nil {
2608+ log.Fatalf("reading CertPEMFile: %v", err)
2609+ }
2610+ err = session.Dial()
2611+ if err != nil {
2612+ log.Fatalln(err)
2613+ }
2614+ events := make(chan string, 5)
2615+ go func() {
2616+ for {
2617+ log.Println(<-events)
2618+ }
2619+ }()
2620+ err = session.Run(events)
2621+ if err != nil {
2622+ log.Fatalln(err)
2623+ }
2624+}
2625
2626=== added directory 'server/acceptance/config'
2627=== added file 'server/acceptance/config/README'
2628--- server/acceptance/config/README 1970-01-01 00:00:00 +0000
2629+++ server/acceptance/config/README 2014-01-14 15:09:46 +0000
2630@@ -0,0 +1,7 @@
2631+testing.key/testing.cert
2632+-------------------------
2633+generated with:
2634+
2635+go run /usr/lib/go/src/pkg/crypto/tls/generate_cert.go -ca -host localhost -rsa-bits 512 -duration 87600h
2636+
2637+and then renamed
2638
2639=== added file 'server/acceptance/config/config.json'
2640--- server/acceptance/config/config.json 1970-01-01 00:00:00 +0000
2641+++ server/acceptance/config/config.json 2014-01-14 15:09:46 +0000
2642@@ -0,0 +1,12 @@
2643+{
2644+ "exchange_timeout": "0.5s",
2645+ "ping_interval": "2s",
2646+ "broker_queue_size": 100,
2647+ "session_queue_size": 10,
2648+ "addr": "127.0.0.1:9090",
2649+ "key_pem_file": "testing.key",
2650+ "cert_pem_file": "testing.cert",
2651+ "http_addr": "127.0.0.1:8888",
2652+ "http_read_timeout": "5s",
2653+ "http_write_timeout": "5s"
2654+}
2655
2656=== added file 'server/acceptance/config/testing.cert'
2657--- server/acceptance/config/testing.cert 1970-01-01 00:00:00 +0000
2658+++ server/acceptance/config/testing.cert 2014-01-14 15:09:46 +0000
2659@@ -0,0 +1,10 @@
2660+-----BEGIN CERTIFICATE-----
2661+MIIBYzCCAQ+gAwIBAgIBADALBgkqhkiG9w0BAQUwEjEQMA4GA1UEChMHQWNtZSBD
2662+bzAeFw0xMzEyMTkyMDU1NDNaFw0yMzEyMTcyMDU1NDNaMBIxEDAOBgNVBAoTB0Fj
2663+bWUgQ28wWjALBgkqhkiG9w0BAQEDSwAwSAJBAPw+niki17X2qALE2A2AzE1q5dvK
2664+9CI4OduRtT9IgbFLC6psqAT21NA+QbY17nWSSpyP65zkMkwKXrbDzstwLPkCAwEA
2665+AaNUMFIwDgYDVR0PAQH/BAQDAgCkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1Ud
2666+EwEB/wQFMAMBAf8wGgYDVR0RBBMwEYIJbG9jYWxob3N0hwR/AAABMAsGCSqGSIb3
2667+DQEBBQNBAFqiVI+Km2XPSO+pxITaPvhmuzg+XG3l1+2di3gL+HlDobocjBqRctRU
2668+YySO32W07acjGJmCHUKpCJuq9X8hpmk=
2669+-----END CERTIFICATE-----
2670
2671=== added file 'server/acceptance/config/testing.key'
2672--- server/acceptance/config/testing.key 1970-01-01 00:00:00 +0000
2673+++ server/acceptance/config/testing.key 2014-01-14 15:09:46 +0000
2674@@ -0,0 +1,9 @@
2675+-----BEGIN RSA PRIVATE KEY-----
2676+MIIBPAIBAAJBAPw+niki17X2qALE2A2AzE1q5dvK9CI4OduRtT9IgbFLC6psqAT2
2677+1NA+QbY17nWSSpyP65zkMkwKXrbDzstwLPkCAwEAAQJAKwXbIBULScP6QA6m8xam
2678+wgWbkvN41GVWqPafPV32kPBvKwSc+M1e+JR7g3/xPZE7TCELcfYi4yXEHZZI3Pbh
2679+oQIhAP/UsgJbsfH1GFv8Y8qGl5l/kmwwkwHhuKvEC87Yur9FAiEA/GlQv3ZfaXnT
2680+lcCFT0aL02O0RDiRYyMUG/JAZQJs6CUCIQCHO5SZYIUwxIGK5mCNxxXOAzyQSiD7
2681+hqkKywf+4FvfDQIhALa0TLyqJFom0t7c4iIGAIRc8UlIYQSPiajI64+x9775AiEA
2682+0v4fgSK/Rq059zW1753JjuB6aR0Uh+3RqJII4dUR1Wg=
2683+-----END RSA PRIVATE KEY-----
2684
2685=== added directory 'server/api'
2686=== added file 'server/api/handlers.go'
2687--- server/api/handlers.go 1970-01-01 00:00:00 +0000
2688+++ server/api/handlers.go 2014-01-14 15:09:46 +0000
2689@@ -0,0 +1,244 @@
2690+/*
2691+ Copyright 2013-2014 Canonical Ltd.
2692+
2693+ This program is free software: you can redistribute it and/or modify it
2694+ under the terms of the GNU General Public License version 3, as published
2695+ by the Free Software Foundation.
2696+
2697+ This program is distributed in the hope that it will be useful, but
2698+ WITHOUT ANY WARRANTY; without even the implied warranties of
2699+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
2700+ PURPOSE. See the GNU General Public License for more details.
2701+
2702+ You should have received a copy of the GNU General Public License along
2703+ with this program. If not, see <http://www.gnu.org/licenses/>.
2704+*/
2705+
2706+// Package api has code that offers a REST API for the applications that
2707+// want to push messages.
2708+package api
2709+
2710+import (
2711+ "encoding/json"
2712+ "fmt"
2713+ "io"
2714+ "launchpad.net/ubuntu-push/logger"
2715+ "launchpad.net/ubuntu-push/server/broker"
2716+ "launchpad.net/ubuntu-push/server/store"
2717+ "log"
2718+ "net/http"
2719+)
2720+
2721+const MaxRequestBodyBytes = 4 * 1024
2722+const JSONMediaType = "application/json"
2723+
2724+// APIError represents a API error (both internally and as JSON in a response).
2725+type APIError struct {
2726+ // http status code
2727+ StatusCode int `json:"-"`
2728+ // machine readable label
2729+ ErrorLabel string `json:"error"`
2730+ // human message
2731+ Message string `json:"message"`
2732+}
2733+
2734+// machine readable error labels
2735+const (
2736+ ioError = "io-error"
2737+ invalidRequest = "invalid-request"
2738+ unknownChannel = "unknown channel"
2739+ unavailable = "unavailable"
2740+ internalError = "internal"
2741+)
2742+
2743+func (apiErr *APIError) Error() string {
2744+ return fmt.Sprintf("api %s: %s", apiErr.ErrorLabel, apiErr.Message)
2745+}
2746+
2747+// Well-known prebuilt API errors
2748+var (
2749+ ErrNoContentLengthProvided = &APIError{
2750+ http.StatusLengthRequired,
2751+ invalidRequest,
2752+ "A Content-Length must be provided",
2753+ }
2754+ ErrRequestBodyEmpty = &APIError{
2755+ http.StatusBadRequest,
2756+ invalidRequest,
2757+ "Request body empty",
2758+ }
2759+ ErrRequestBodyTooLarge = &APIError{
2760+ http.StatusRequestEntityTooLarge,
2761+ invalidRequest,
2762+ "Request body too large",
2763+ }
2764+ ErrWrongContentType = &APIError{
2765+ http.StatusUnsupportedMediaType,
2766+ invalidRequest,
2767+ "Wrong content type, should be application/json",
2768+ }
2769+ ErrWrongRequestMethod = &APIError{
2770+ http.StatusMethodNotAllowed,
2771+ invalidRequest,
2772+ "Wrong request method, should be POST",
2773+ }
2774+ ErrMalformedJSONObject = &APIError{
2775+ http.StatusBadRequest,
2776+ invalidRequest,
2777+ "Malformed JSON Object",
2778+ }
2779+ ErrCouldNotReadBody = &APIError{
2780+ http.StatusBadRequest,
2781+ ioError,
2782+ "Could not read request body",
2783+ }
2784+ ErrUnknownChannel = &APIError{
2785+ http.StatusBadRequest,
2786+ unknownChannel,
2787+ "Unknown channel",
2788+ }
2789+ ErrUnknown = &APIError{
2790+ http.StatusInternalServerError,
2791+ internalError,
2792+ "Unknown error",
2793+ }
2794+ ErrCouldNotStoreNotification = &APIError{
2795+ http.StatusServiceUnavailable,
2796+ unavailable,
2797+ "Could not store notification",
2798+ }
2799+)
2800+
2801+type Message struct {
2802+ Registration string `json:"registration"`
2803+ CoalesceTag string `json:"coalesce_tag"`
2804+ Data json.RawMessage `json:"data"`
2805+}
2806+
2807+// Broadcast request JSON object.
2808+type Broadcast struct {
2809+ Channel string `json:"channel"`
2810+ ExpireAfter uint8 `json:"expire_after"`
2811+ Data json.RawMessage `json:"data"`
2812+}
2813+
2814+func respondError(writer http.ResponseWriter, apiErr *APIError) {
2815+ wireError, err := json.Marshal(apiErr)
2816+ if err != nil {
2817+ // xxx general 500 framework
2818+ log.Println("The provided string could not be marshaled into an error:", err)
2819+ http.Error(writer, "Internal Server Error", http.StatusInternalServerError)
2820+ return
2821+ }
2822+ writer.Header().Set("Content-type", JSONMediaType)
2823+ writer.WriteHeader(apiErr.StatusCode)
2824+ writer.Write(wireError)
2825+}
2826+
2827+func checkContentLength(request *http.Request) *APIError {
2828+ if request.ContentLength == -1 {
2829+ return ErrNoContentLengthProvided
2830+ }
2831+ if request.ContentLength == 0 {
2832+ return ErrRequestBodyEmpty
2833+ }
2834+ if request.ContentLength > MaxRequestBodyBytes {
2835+ return ErrRequestBodyTooLarge
2836+ }
2837+ return nil
2838+}
2839+
2840+func checkRequestAsPost(request *http.Request) *APIError {
2841+ if err := checkContentLength(request); err != nil {
2842+ return err
2843+ }
2844+ if request.Header.Get("Content-Type") != JSONMediaType {
2845+ return ErrWrongContentType
2846+ }
2847+ if request.Method != "POST" {
2848+ return ErrWrongRequestMethod
2849+ }
2850+ return nil
2851+}
2852+
2853+func readBody(writer http.ResponseWriter, request *http.Request) ([]byte, *APIError) {
2854+ if err := checkRequestAsPost(request); err != nil {
2855+ return nil, err
2856+ }
2857+
2858+ body := make([]byte, request.ContentLength)
2859+ _, err := io.ReadFull(request.Body, body)
2860+
2861+ if err != nil {
2862+ return nil, ErrCouldNotReadBody
2863+ }
2864+
2865+ return body, nil
2866+}
2867+
2868+// state holds the interfaces to delegate to serving requests
2869+type state struct {
2870+ store store.PendingStore
2871+ broker broker.BrokerSending
2872+ logger logger.Logger
2873+}
2874+
2875+type BroadcastHandler state
2876+
2877+func (h *BroadcastHandler) doBroadcast(bcast *Broadcast) *APIError {
2878+ chanId, err := h.store.GetInternalChannelId(bcast.Channel)
2879+ if err != nil {
2880+ switch err {
2881+ case store.ErrUnknownChannel:
2882+ return ErrUnknownChannel
2883+ default:
2884+ return ErrUnknown
2885+ }
2886+ }
2887+ // xxx ignoring expiration for now
2888+ err = h.store.AppendToChannel(chanId, bcast.Data)
2889+ if err != nil {
2890+ // assume this for now
2891+ return ErrCouldNotStoreNotification
2892+ }
2893+
2894+ h.broker.Broadcast(chanId)
2895+ return nil
2896+}
2897+
2898+func (h *BroadcastHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
2899+ body, apiErr := readBody(writer, request)
2900+
2901+ if apiErr != nil {
2902+ respondError(writer, apiErr)
2903+ return
2904+ }
2905+
2906+ broadcast := &Broadcast{}
2907+ err := json.Unmarshal(body, broadcast)
2908+
2909+ if err != nil {
2910+ respondError(writer, ErrMalformedJSONObject)
2911+ return
2912+ }
2913+
2914+ apiErr = h.doBroadcast(broadcast)
2915+ if apiErr != nil {
2916+ respondError(writer, apiErr)
2917+ return
2918+ }
2919+
2920+ writer.Header().Set("Content-Type", "application/json")
2921+ fmt.Fprintf(writer, `{"ok":true}`)
2922+}
2923+
2924+// MakeHandlersMux makes a handler that dispatches for the various API endpoints.
2925+func MakeHandlersMux(store store.PendingStore, broker broker.BrokerSending, logger logger.Logger) http.Handler {
2926+ mux := http.NewServeMux()
2927+ mux.Handle("/broadcast", &BroadcastHandler{
2928+ store: store,
2929+ broker: broker,
2930+ logger: logger,
2931+ })
2932+ return mux
2933+}
2934
2935=== added file 'server/api/handlers_test.go'
2936--- server/api/handlers_test.go 1970-01-01 00:00:00 +0000
2937+++ server/api/handlers_test.go 2014-01-14 15:09:46 +0000
2938@@ -0,0 +1,345 @@
2939+/*
2940+ Copyright 2013-2014 Canonical Ltd.
2941+
2942+ This program is free software: you can redistribute it and/or modify it
2943+ under the terms of the GNU General Public License version 3, as published
2944+ by the Free Software Foundation.
2945+
2946+ This program is distributed in the hope that it will be useful, but
2947+ WITHOUT ANY WARRANTY; without even the implied warranties of
2948+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
2949+ PURPOSE. See the GNU General Public License for more details.
2950+
2951+ You should have received a copy of the GNU General Public License along
2952+ with this program. If not, see <http://www.gnu.org/licenses/>.
2953+*/
2954+
2955+package api
2956+
2957+import (
2958+ "bytes"
2959+ "encoding/json"
2960+ "errors"
2961+ "fmt"
2962+ "io/ioutil"
2963+ . "launchpad.net/gocheck"
2964+ "launchpad.net/ubuntu-push/server/store"
2965+ "net/http"
2966+ "net/http/httptest"
2967+ "strings"
2968+ "testing"
2969+)
2970+
2971+func TestHandlers(t *testing.T) { TestingT(t) }
2972+
2973+type handlersSuite struct {
2974+ messageEndpoint string
2975+ json string
2976+ client *http.Client
2977+ c *C
2978+}
2979+
2980+var _ = Suite(&handlersSuite{})
2981+
2982+func (s *handlersSuite) SetUpTest(c *C) {
2983+ s.client = &http.Client{}
2984+}
2985+
2986+func (s *handlersSuite) TestAPIError(c *C) {
2987+ var apiErr error = &APIError{400, invalidRequest, "Message"}
2988+ c.Check(apiErr.Error(), Equals, "api invalid-request: Message")
2989+ wire, err := json.Marshal(apiErr)
2990+ c.Assert(err, IsNil)
2991+ c.Check(string(wire), Equals, `{"error":"invalid-request","message":"Message"}`)
2992+}
2993+
2994+type checkBrokerSending struct {
2995+ store store.PendingStore
2996+ chanId store.InternalChannelId
2997+ err error
2998+ top int64
2999+ payloads []json.RawMessage
3000+}
3001+
3002+func (cbsend *checkBrokerSending) Broadcast(chanId store.InternalChannelId) {
3003+ top, payloads, err := cbsend.store.GetChannelSnapshot(chanId)
3004+ cbsend.err = err
3005+ cbsend.chanId = chanId
3006+ cbsend.top = top
3007+ cbsend.payloads = payloads
3008+}
3009+
3010+func (s *handlersSuite) TestDoBroadcast(c *C) {
3011+ sto := store.NewInMemoryPendingStore()
3012+ bsend := &checkBrokerSending{store: sto}
3013+ bh := &BroadcastHandler{sto, bsend, nil}
3014+ payload := json.RawMessage(`{"a": 1}`)
3015+ apiErr := bh.doBroadcast(&Broadcast{
3016+ Channel: "system",
3017+ Data: payload,
3018+ })
3019+ c.Check(apiErr, IsNil)
3020+ c.Check(bsend.err, IsNil)
3021+ c.Check(bsend.chanId, Equals, store.SystemInternalChannelId)
3022+ c.Check(bsend.top, Equals, int64(1))
3023+ c.Check(bsend.payloads, DeepEquals, []json.RawMessage{payload})
3024+}
3025+
3026+func (s *handlersSuite) TestDoBroadcastUnknownChannel(c *C) {
3027+ sto := store.NewInMemoryPendingStore()
3028+ bh := &BroadcastHandler{sto, nil, nil}
3029+ apiErr := bh.doBroadcast(&Broadcast{
3030+ Channel: "unknown",
3031+ Data: json.RawMessage(`{"a": 1}`),
3032+ })
3033+ c.Check(apiErr, Equals, ErrUnknownChannel)
3034+}
3035+
3036+type interceptInMemoryPendingStore struct {
3037+ *store.InMemoryPendingStore
3038+ intercept func(meth string, err error) error
3039+}
3040+
3041+func (isto *interceptInMemoryPendingStore) GetInternalChannelId(channel string) (store.InternalChannelId, error) {
3042+ chanId, err := isto.InMemoryPendingStore.GetInternalChannelId(channel)
3043+ return chanId, isto.intercept("GetInternalChannelId", err)
3044+}
3045+
3046+func (isto *interceptInMemoryPendingStore) AppendToChannel(chanId store.InternalChannelId, payload json.RawMessage) error {
3047+ err := isto.InMemoryPendingStore.AppendToChannel(chanId, payload)
3048+ return isto.intercept("AppendToChannel", err)
3049+}
3050+
3051+func (s *handlersSuite) TestDoBroadcastUnknownError(c *C) {
3052+ sto := &interceptInMemoryPendingStore{
3053+ store.NewInMemoryPendingStore(),
3054+ func(meth string, err error) error {
3055+ return errors.New("other")
3056+ },
3057+ }
3058+ bh := &BroadcastHandler{sto, nil, nil}
3059+ apiErr := bh.doBroadcast(&Broadcast{
3060+ Channel: "system",
3061+ Data: json.RawMessage(`{"a": 1}`),
3062+ })
3063+ c.Check(apiErr, Equals, ErrUnknown)
3064+}
3065+
3066+func (s *handlersSuite) TestDoBroadcastCouldNotStoreNotification(c *C) {
3067+ sto := &interceptInMemoryPendingStore{
3068+ store.NewInMemoryPendingStore(),
3069+ func(meth string, err error) error {
3070+ if meth == "AppendToChannel" {
3071+ return errors.New("fail")
3072+ }
3073+ return err
3074+ },
3075+ }
3076+ bh := &BroadcastHandler{sto, nil, nil}
3077+ apiErr := bh.doBroadcast(&Broadcast{
3078+ Channel: "system",
3079+ Data: json.RawMessage(`{"a": 1}`),
3080+ })
3081+ c.Check(apiErr, Equals, ErrCouldNotStoreNotification)
3082+}
3083+
3084+func newPostRequest(path string, message interface{}, server *httptest.Server) *http.Request {
3085+ packedMessage, err := json.Marshal(message)
3086+ if err != nil {
3087+ panic(err)
3088+ }
3089+ reader := bytes.NewReader(packedMessage)
3090+
3091+ url := server.URL + path
3092+ request, _ := http.NewRequest("POST", url, reader)
3093+ request.ContentLength = int64(reader.Len())
3094+ request.Header.Set("Content-Type", "application/json")
3095+
3096+ return request
3097+}
3098+
3099+func getResponseBody(response *http.Response) ([]byte, error) {
3100+ defer response.Body.Close()
3101+ return ioutil.ReadAll(response.Body)
3102+}
3103+
3104+func checkError(c *C, response *http.Response, apiErr *APIError) {
3105+ c.Check(response.StatusCode, Equals, apiErr.StatusCode)
3106+ c.Check(response.Header.Get("Content-Type"), Equals, "application/json")
3107+ error := &APIError{StatusCode: response.StatusCode}
3108+ body, err := getResponseBody(response)
3109+ c.Assert(err, IsNil)
3110+ err = json.Unmarshal(body, error)
3111+ c.Assert(err, IsNil)
3112+ c.Check(error, DeepEquals, apiErr)
3113+}
3114+
3115+type testBrokerSending struct {
3116+ chanId chan store.InternalChannelId
3117+}
3118+
3119+func (bsend testBrokerSending) Broadcast(chanId store.InternalChannelId) {
3120+ bsend.chanId <- chanId
3121+}
3122+
3123+func (s *handlersSuite) TestRespondsToBasicSystemBroadcast(c *C) {
3124+ sto := store.NewInMemoryPendingStore()
3125+ bsend := testBrokerSending{make(chan store.InternalChannelId, 1)}
3126+ testServer := httptest.NewServer(MakeHandlersMux(sto, bsend, nil))
3127+ defer testServer.Close()
3128+
3129+ payload := json.RawMessage(`{"foo":"bar"}`)
3130+
3131+ request := newPostRequest("/broadcast", &Broadcast{
3132+ Channel: "system",
3133+ ExpireAfter: 60,
3134+ Data: payload,
3135+ }, testServer)
3136+
3137+ response, err := s.client.Do(request)
3138+ c.Assert(err, IsNil)
3139+
3140+ c.Check(response.StatusCode, Equals, http.StatusOK)
3141+ c.Check(response.Header.Get("Content-Type"), Equals, "application/json")
3142+ body, err := getResponseBody(response)
3143+ c.Assert(err, IsNil)
3144+ dest := make(map[string]bool)
3145+ err = json.Unmarshal(body, &dest)
3146+ c.Assert(err, IsNil)
3147+ c.Check(dest, DeepEquals, map[string]bool{"ok": true})
3148+
3149+ top, _, err := sto.GetChannelSnapshot(store.SystemInternalChannelId)
3150+ c.Assert(err, IsNil)
3151+ c.Check(top, Equals, int64(1))
3152+ c.Check(<-bsend.chanId, Equals, store.SystemInternalChannelId)
3153+}
3154+
3155+func (s *handlersSuite) TestFromBroadcastError(c *C) {
3156+ sto := store.NewInMemoryPendingStore()
3157+ testServer := httptest.NewServer(MakeHandlersMux(sto, nil, nil))
3158+ defer testServer.Close()
3159+
3160+ payload := json.RawMessage(`{"foo":"bar"}`)
3161+
3162+ request := newPostRequest("/broadcast", &Broadcast{
3163+ Channel: "unkown",
3164+ ExpireAfter: 60,
3165+ Data: payload,
3166+ }, testServer)
3167+
3168+ response, err := s.client.Do(request)
3169+ c.Assert(err, IsNil)
3170+ checkError(c, response, ErrUnknownChannel)
3171+}
3172+
3173+func (s *handlersSuite) TestCannotBroadcastMalformedData(c *C) {
3174+ testServer := httptest.NewServer(&BroadcastHandler{})
3175+ defer testServer.Close()
3176+
3177+ packedMessage := []byte("{some bogus-message: ")
3178+ reader := bytes.NewReader(packedMessage)
3179+
3180+ request, err := http.NewRequest("POST", testServer.URL, reader)
3181+ c.Assert(err, IsNil)
3182+ request.ContentLength = int64(len(packedMessage))
3183+ request.Header.Set("Content-Type", "application/json")
3184+
3185+ response, err := s.client.Do(request)
3186+ c.Assert(err, IsNil)
3187+ checkError(c, response, ErrMalformedJSONObject)
3188+}
3189+
3190+func (s *handlersSuite) TestCannotBroadcastTooBigMessages(c *C) {
3191+ testServer := httptest.NewServer(&BroadcastHandler{})
3192+ defer testServer.Close()
3193+
3194+ bigString := strings.Repeat("a", MaxRequestBodyBytes)
3195+ dataString := fmt.Sprintf(`"%v"`, bigString)
3196+
3197+ request := newPostRequest("/", &Broadcast{
3198+ Channel: "some-channel",
3199+ ExpireAfter: 60,
3200+ Data: json.RawMessage([]byte(dataString)),
3201+ }, testServer)
3202+
3203+ response, err := s.client.Do(request)
3204+ c.Assert(err, IsNil)
3205+ checkError(c, response, ErrRequestBodyTooLarge)
3206+}
3207+
3208+func (s *handlersSuite) TestCannotBroadcastWithoutContentLength(c *C) {
3209+ testServer := httptest.NewServer(&BroadcastHandler{})
3210+ defer testServer.Close()
3211+
3212+ dataString := `{"foo":"bar"}`
3213+
3214+ request := newPostRequest("/", &Broadcast{
3215+ Channel: "some-channel",
3216+ ExpireAfter: 60,
3217+ Data: json.RawMessage([]byte(dataString)),
3218+ }, testServer)
3219+ request.ContentLength = -1
3220+
3221+ response, err := s.client.Do(request)
3222+ c.Assert(err, IsNil)
3223+ checkError(c, response, ErrNoContentLengthProvided)
3224+}
3225+
3226+func (s *handlersSuite) TestCannotBroadcastEmptyMessages(c *C) {
3227+ testServer := httptest.NewServer(&BroadcastHandler{})
3228+ defer testServer.Close()
3229+
3230+ packedMessage := make([]byte, 0)
3231+ reader := bytes.NewReader(packedMessage)
3232+
3233+ request, err := http.NewRequest("POST", testServer.URL, reader)
3234+ c.Assert(err, IsNil)
3235+ request.ContentLength = int64(len(packedMessage))
3236+ request.Header.Set("Content-Type", "application/json")
3237+
3238+ response, err := s.client.Do(request)
3239+ c.Assert(err, IsNil)
3240+ checkError(c, response, ErrRequestBodyEmpty)
3241+}
3242+
3243+func (s *handlersSuite) TestCannotBroadcastNonJSONMessages(c *C) {
3244+ testServer := httptest.NewServer(&BroadcastHandler{})
3245+ defer testServer.Close()
3246+
3247+ dataString := `{"foo":"bar"}`
3248+
3249+ request := newPostRequest("/", &Broadcast{
3250+ Channel: "some-channel",
3251+ ExpireAfter: 60,
3252+ Data: json.RawMessage([]byte(dataString)),
3253+ }, testServer)
3254+ request.Header.Set("Content-Type", "text/plain")
3255+
3256+ response, err := s.client.Do(request)
3257+ c.Assert(err, IsNil)
3258+ checkError(c, response, ErrWrongContentType)
3259+}
3260+
3261+func (s *handlersSuite) TestCannotBroadcastNonPostMessages(c *C) {
3262+ testServer := httptest.NewServer(&BroadcastHandler{})
3263+ defer testServer.Close()
3264+
3265+ dataString := `{"foo":"bar"}`
3266+ packedMessage, err := json.Marshal(&Broadcast{
3267+ Channel: "some-channel",
3268+ ExpireAfter: 60,
3269+ Data: json.RawMessage([]byte(dataString)),
3270+ })
3271+ s.c.Assert(err, IsNil)
3272+ reader := bytes.NewReader(packedMessage)
3273+
3274+ request, err := http.NewRequest("GET", testServer.URL, reader)
3275+ c.Assert(err, IsNil)
3276+ request.ContentLength = int64(len(packedMessage))
3277+ request.Header.Set("Content-Type", "application/json")
3278+
3279+ response, err := s.client.Do(request)
3280+ c.Assert(err, IsNil)
3281+
3282+ checkError(c, response, ErrWrongRequestMethod)
3283+}
3284
3285=== added directory 'server/broker'
3286=== added file 'server/broker/broker.go'
3287--- server/broker/broker.go 1970-01-01 00:00:00 +0000
3288+++ server/broker/broker.go 2014-01-14 15:09:46 +0000
3289@@ -0,0 +1,71 @@
3290+/*
3291+ Copyright 2013-2014 Canonical Ltd.
3292+
3293+ This program is free software: you can redistribute it and/or modify it
3294+ under the terms of the GNU General Public License version 3, as published
3295+ by the Free Software Foundation.
3296+
3297+ This program is distributed in the hope that it will be useful, but
3298+ WITHOUT ANY WARRANTY; without even the implied warranties of
3299+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
3300+ PURPOSE. See the GNU General Public License for more details.
3301+
3302+ You should have received a copy of the GNU General Public License along
3303+ with this program. If not, see <http://www.gnu.org/licenses/>.
3304+*/
3305+
3306+// Package broker handles session registrations and delivery of messages
3307+// through sessions.
3308+package broker
3309+
3310+import (
3311+ "fmt"
3312+ "launchpad.net/ubuntu-push/protocol"
3313+ "launchpad.net/ubuntu-push/server/store"
3314+)
3315+
3316+// Broker is responsible for registring sessions and delivering messages through them.
3317+type Broker interface {
3318+ // Register the session.
3319+ Register(*protocol.ConnectMsg) (BrokerSession, error)
3320+ // Unregister the session.
3321+ Unregister(BrokerSession)
3322+}
3323+
3324+// BrokerSending is the notification sending facet of the broker.
3325+type BrokerSending interface {
3326+ // Broadcast channel.
3327+ Broadcast(chanId store.InternalChannelId)
3328+}
3329+
3330+// Exchange guides the session through performing an exchange, typically delivery.
3331+type Exchange interface {
3332+ Prepare(BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error)
3333+ Acked(BrokerSession) error
3334+}
3335+
3336+// BrokerSession holds broker session state.
3337+type BrokerSession interface {
3338+ // SessionChannel returns the session control channel
3339+ // on which the session gets exchanges to perform.
3340+ SessionChannel() <-chan Exchange
3341+ // DeviceId returns the device id string.
3342+ DeviceId() string
3343+}
3344+
3345+// Session aborted error.
3346+type ErrAbort struct {
3347+ Reason string
3348+}
3349+
3350+func (ea *ErrAbort) Error() string {
3351+ return fmt.Sprintf("session aborted (%s)", ea.Reason)
3352+}
3353+
3354+// BrokerConfig gives access to the typical broker configuration.
3355+type BrokerConfig interface {
3356+ // SessionQueueSize gives the session queue size.
3357+ SessionQueueSize() uint
3358+ // BrokerQueueSize gives the internal broker queue size.
3359+ BrokerQueueSize() uint
3360+}
3361
3362=== added file 'server/broker/simple.go'
3363--- server/broker/simple.go 1970-01-01 00:00:00 +0000
3364+++ server/broker/simple.go 2014-01-14 15:09:46 +0000
3365@@ -0,0 +1,260 @@
3366+/*
3367+ Copyright 2013-2014 Canonical Ltd.
3368+
3369+ This program is free software: you can redistribute it and/or modify it
3370+ under the terms of the GNU General Public License version 3, as published
3371+ by the Free Software Foundation.
3372+
3373+ This program is distributed in the hope that it will be useful, but
3374+ WITHOUT ANY WARRANTY; without even the implied warranties of
3375+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
3376+ PURPOSE. See the GNU General Public License for more details.
3377+
3378+ You should have received a copy of the GNU General Public License along
3379+ with this program. If not, see <http://www.gnu.org/licenses/>.
3380+*/
3381+
3382+package broker
3383+
3384+import (
3385+ "encoding/json"
3386+ "launchpad.net/ubuntu-push/logger"
3387+ "launchpad.net/ubuntu-push/protocol"
3388+ "launchpad.net/ubuntu-push/server/store"
3389+ // "log"
3390+ "sync"
3391+)
3392+
3393+// simpleBroker implements broker.Broker for everything in just one process.
3394+type SimpleBroker struct {
3395+ sto store.PendingStore
3396+ logger logger.Logger
3397+ // running state
3398+ runMutex sync.Mutex
3399+ running bool
3400+ stop chan bool
3401+ stopped chan bool
3402+ // sessions
3403+ sessionCh chan *simpleBrokerSession
3404+ registry map[string]*simpleBrokerSession
3405+ sessionQueueSize uint
3406+ // delivery
3407+ deliveryCh chan *delivery
3408+}
3409+
3410+// simpleBrokerSession represents a session in the broker.
3411+type simpleBrokerSession struct {
3412+ registered bool
3413+ deviceId string
3414+ done chan bool
3415+ exchanges chan Exchange
3416+ levels map[store.InternalChannelId]int64
3417+ // for exchanges
3418+ broadcastMsg protocol.BroadcastMsg
3419+ ackMsg protocol.AckMsg
3420+}
3421+
3422+type deliveryKind int
3423+
3424+const (
3425+ broadcastDelivery deliveryKind = iota
3426+)
3427+
3428+// delivery holds all the information to request a delivery
3429+type delivery struct {
3430+ kind deliveryKind
3431+ chanId store.InternalChannelId
3432+}
3433+
3434+func (sess *simpleBrokerSession) SessionChannel() <-chan Exchange {
3435+ return sess.exchanges
3436+}
3437+
3438+func (sess *simpleBrokerSession) DeviceId() string {
3439+ return sess.deviceId
3440+}
3441+
3442+// NewSimpleBroker makes a new SimpleBroker.
3443+func NewSimpleBroker(sto store.PendingStore, cfg BrokerConfig, logger logger.Logger) *SimpleBroker {
3444+ sessionCh := make(chan *simpleBrokerSession, cfg.BrokerQueueSize())
3445+ deliveryCh := make(chan *delivery, cfg.BrokerQueueSize())
3446+ registry := make(map[string]*simpleBrokerSession)
3447+ return &SimpleBroker{
3448+ logger: logger,
3449+ sto: sto,
3450+ stop: make(chan bool),
3451+ stopped: make(chan bool),
3452+ registry: registry,
3453+ sessionCh: sessionCh,
3454+ deliveryCh: deliveryCh,
3455+ sessionQueueSize: cfg.SessionQueueSize(),
3456+ }
3457+}
3458+
3459+// Start starts the broker.
3460+func (b *SimpleBroker) Start() {
3461+ b.runMutex.Lock()
3462+ defer b.runMutex.Unlock()
3463+ if b.running {
3464+ return
3465+ }
3466+ b.running = true
3467+ go b.run()
3468+}
3469+
3470+// Stop stops the broker.
3471+func (b *SimpleBroker) Stop() {
3472+ b.runMutex.Lock()
3473+ defer b.runMutex.Unlock()
3474+ if !b.running {
3475+ return
3476+ }
3477+ b.stop <- true
3478+ <-b.stopped
3479+ b.running = false
3480+}
3481+
3482+func (b *SimpleBroker) feedPending(sess *simpleBrokerSession) error {
3483+ // find relevant channels, for now only system
3484+ channels := []store.InternalChannelId{store.SystemInternalChannelId}
3485+ for _, chanId := range channels {
3486+ topLevel, payloads, err := b.sto.GetChannelSnapshot(chanId)
3487+ if err != nil {
3488+ // next broadcast will try again
3489+ b.logger.Errorf("unsuccessful feed pending, get channel snapshot for %v: %v", chanId, err)
3490+ continue
3491+ }
3492+ clientLevel := sess.levels[chanId]
3493+ if clientLevel != topLevel {
3494+ broadcastExchg := &simpleBroadcastExchange{
3495+ chanId: chanId,
3496+ topLevel: topLevel,
3497+ notificationPayloads: payloads,
3498+ }
3499+ sess.exchanges <- broadcastExchg
3500+ }
3501+ }
3502+ return nil
3503+}
3504+
3505+// Register registers a session with the broker. It feeds the session
3506+// pending notifications as well.
3507+func (b *SimpleBroker) Register(connect *protocol.ConnectMsg) (BrokerSession, error) {
3508+ // xxx sanity check DeviceId
3509+ levels := map[store.InternalChannelId]int64{}
3510+ for hexId, v := range connect.Levels {
3511+ id, err := store.HexToInternalChannelId(hexId)
3512+ if err != nil {
3513+ return nil, &ErrAbort{err.Error()}
3514+ }
3515+ levels[id] = v
3516+ }
3517+ sess := &simpleBrokerSession{
3518+ deviceId: connect.DeviceId,
3519+ done: make(chan bool),
3520+ exchanges: make(chan Exchange, b.sessionQueueSize),
3521+ levels: levels,
3522+ }
3523+ b.sessionCh <- sess
3524+ <-sess.done
3525+ err := b.feedPending(sess)
3526+ if err != nil {
3527+ return nil, err
3528+ }
3529+ return sess, nil
3530+}
3531+
3532+// Unregister unregisters a session with the broker. Doesn't wait.
3533+func (b *SimpleBroker) Unregister(s BrokerSession) {
3534+ sess := s.(*simpleBrokerSession)
3535+ b.sessionCh <- sess
3536+}
3537+
3538+// run runs the agent logic of the broker.
3539+func (b *SimpleBroker) run() {
3540+Loop:
3541+ for {
3542+ select {
3543+ case <-b.stop:
3544+ b.stopped <- true
3545+ break Loop
3546+ case sess := <-b.sessionCh:
3547+ if sess.registered { // unregister
3548+ // unregister only current
3549+ if b.registry[sess.deviceId] == sess {
3550+ delete(b.registry, sess.deviceId)
3551+ }
3552+ } else { // register
3553+ b.registry[sess.deviceId] = sess
3554+ sess.registered = true
3555+ sess.done <- true
3556+ }
3557+ case delivery := <-b.deliveryCh:
3558+ switch delivery.kind {
3559+ case broadcastDelivery:
3560+ topLevel, payloads, err := b.sto.GetChannelSnapshot(delivery.chanId)
3561+ if err != nil {
3562+ // next broadcast will try again
3563+ b.logger.Errorf("unsuccessful broadcast, get channel snapshot for %v: %v", delivery.chanId, err)
3564+ continue Loop
3565+ }
3566+ broadcastExchg := &simpleBroadcastExchange{
3567+ chanId: delivery.chanId,
3568+ topLevel: topLevel,
3569+ notificationPayloads: payloads,
3570+ }
3571+ for _, sess := range b.registry {
3572+ sess.exchanges <- broadcastExchg
3573+ }
3574+ }
3575+ }
3576+ }
3577+}
3578+
3579+// Broadcast requests the broadcast for a channel.
3580+func (b *SimpleBroker) Broadcast(chanId store.InternalChannelId) {
3581+ b.deliveryCh <- &delivery{
3582+ kind: broadcastDelivery,
3583+ chanId: chanId,
3584+ }
3585+}
3586+
3587+// Exchanges
3588+
3589+type simpleBroadcastExchange struct {
3590+ chanId store.InternalChannelId
3591+ topLevel int64
3592+ notificationPayloads []json.RawMessage
3593+}
3594+
3595+func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
3596+ c := int64(len(payloads))
3597+ delta := topLevel - clientLevel
3598+ if delta < c {
3599+ return payloads[c-delta:]
3600+ } else {
3601+ return payloads
3602+ }
3603+}
3604+
3605+func (sbe *simpleBroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
3606+ simpleSess := sess.(*simpleBrokerSession)
3607+ simpleSess.broadcastMsg.Type = "broadcast"
3608+ clientLevel := simpleSess.levels[sbe.chanId]
3609+ payloads := filterByLevel(clientLevel, sbe.topLevel, sbe.notificationPayloads)
3610+ // xxx need an AppId as well, later
3611+ simpleSess.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.chanId)
3612+ simpleSess.broadcastMsg.TopLevel = sbe.topLevel
3613+ simpleSess.broadcastMsg.Payloads = payloads
3614+ return &simpleSess.broadcastMsg, &simpleSess.ackMsg, nil
3615+}
3616+
3617+func (sbe *simpleBroadcastExchange) Acked(sess BrokerSession) error {
3618+ simpleSess := sess.(*simpleBrokerSession)
3619+ if simpleSess.ackMsg.Type != "ack" {
3620+ return &ErrAbort{"expected ACK message"}
3621+ }
3622+ // update levels
3623+ simpleSess.levels[sbe.chanId] = sbe.topLevel
3624+ return nil
3625+}
3626
3627=== added file 'server/broker/simple_test.go'
3628--- server/broker/simple_test.go 1970-01-01 00:00:00 +0000
3629+++ server/broker/simple_test.go 2014-01-14 15:09:46 +0000
3630@@ -0,0 +1,327 @@
3631+/*
3632+ Copyright 2013-2014 Canonical Ltd.
3633+
3634+ This program is free software: you can redistribute it and/or modify it
3635+ under the terms of the GNU General Public License version 3, as published
3636+ by the Free Software Foundation.
3637+
3638+ This program is distributed in the hope that it will be useful, but
3639+ WITHOUT ANY WARRANTY; without even the implied warranties of
3640+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
3641+ PURPOSE. See the GNU General Public License for more details.
3642+
3643+ You should have received a copy of the GNU General Public License along
3644+ with this program. If not, see <http://www.gnu.org/licenses/>.
3645+*/
3646+
3647+package broker
3648+
3649+import (
3650+ "encoding/json"
3651+ "errors"
3652+ . "launchpad.net/gocheck"
3653+ "launchpad.net/ubuntu-push/logger"
3654+ "launchpad.net/ubuntu-push/protocol"
3655+ "launchpad.net/ubuntu-push/server/store"
3656+ helpers "launchpad.net/ubuntu-push/testing"
3657+ // "log"
3658+ "testing"
3659+ "time"
3660+)
3661+
3662+func TestSimple(t *testing.T) { TestingT(t) }
3663+
3664+type simpleSuite struct{}
3665+
3666+var _ = Suite(&simpleSuite{})
3667+
3668+type testBrokerConfig struct{}
3669+
3670+func (tbc *testBrokerConfig) SessionQueueSize() uint {
3671+ return 10
3672+}
3673+
3674+func (tbc *testBrokerConfig) BrokerQueueSize() uint {
3675+ return 5
3676+}
3677+
3678+func (s *simpleSuite) TestNew(c *C) {
3679+ sto := store.NewInMemoryPendingStore()
3680+ b := NewSimpleBroker(sto, &testBrokerConfig{}, nil)
3681+ c.Check(cap(b.sessionCh), Equals, 5)
3682+ c.Check(len(b.registry), Equals, 0)
3683+ c.Check(b.sto, Equals, sto)
3684+}
3685+
3686+func (s *simpleSuite) TestStartStop(c *C) {
3687+ b := NewSimpleBroker(nil, &testBrokerConfig{}, nil)
3688+ b.Start()
3689+ c.Check(b.running, Equals, true)
3690+ b.Start()
3691+ b.Stop()
3692+ c.Check(b.running, Equals, false)
3693+ b.Stop()
3694+}
3695+
3696+func (s *simpleSuite) TestRegistration(c *C) {
3697+ sto := store.NewInMemoryPendingStore()
3698+ b := NewSimpleBroker(sto, &testBrokerConfig{}, nil)
3699+ b.Start()
3700+ defer b.Stop()
3701+ sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"0": 5}})
3702+ c.Assert(err, IsNil)
3703+ c.Assert(b.registry["dev-1"], Equals, sess)
3704+ c.Assert(sess.DeviceId(), Equals, "dev-1")
3705+ c.Check(sess.(*simpleBrokerSession).levels, DeepEquals, map[store.InternalChannelId]int64{
3706+ store.SystemInternalChannelId: 5,
3707+ })
3708+ b.Unregister(sess)
3709+ // just to make sure the unregister was processed
3710+ _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""})
3711+ c.Assert(err, IsNil)
3712+ c.Check(b.registry["dev-1"], IsNil)
3713+}
3714+
3715+func (s *simpleSuite) TestRegistrationBrokenLevels(c *C) {
3716+ sto := store.NewInMemoryPendingStore()
3717+ b := NewSimpleBroker(sto, &testBrokerConfig{}, nil)
3718+ b.Start()
3719+ defer b.Stop()
3720+ _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1", Levels: map[string]int64{"z": 5}})
3721+ c.Check(err, FitsTypeOf, &ErrAbort{})
3722+}
3723+
3724+func (s *simpleSuite) TestFeedPending(c *C) {
3725+ sto := store.NewInMemoryPendingStore()
3726+ notification1 := json.RawMessage(`{"m": "M"}`)
3727+ sto.AppendToChannel(store.SystemInternalChannelId, notification1)
3728+ b := NewSimpleBroker(sto, &testBrokerConfig{}, nil)
3729+ sess := &simpleBrokerSession{
3730+ exchanges: make(chan Exchange, 1),
3731+ }
3732+ b.feedPending(sess)
3733+ c.Assert(len(sess.exchanges), Equals, 1)
3734+ exchg1 := <-sess.exchanges
3735+ c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{
3736+ chanId: store.SystemInternalChannelId,
3737+ topLevel: 1,
3738+ notificationPayloads: []json.RawMessage{notification1},
3739+ })
3740+}
3741+
3742+func (s *simpleSuite) TestFeedPendingNop(c *C) {
3743+ sto := store.NewInMemoryPendingStore()
3744+ notification1 := json.RawMessage(`{"m": "M"}`)
3745+ sto.AppendToChannel(store.SystemInternalChannelId, notification1)
3746+ b := NewSimpleBroker(sto, &testBrokerConfig{}, nil)
3747+ sess := &simpleBrokerSession{
3748+ exchanges: make(chan Exchange, 1),
3749+ levels: map[store.InternalChannelId]int64{
3750+ store.SystemInternalChannelId: 1,
3751+ },
3752+ }
3753+ b.feedPending(sess)
3754+ c.Assert(len(sess.exchanges), Equals, 0)
3755+}
3756+
3757+func (s *simpleSuite) TestRegistrationFeedPending(c *C) {
3758+ sto := store.NewInMemoryPendingStore()
3759+ notification1 := json.RawMessage(`{"m": "M"}`)
3760+ sto.AppendToChannel(store.SystemInternalChannelId, notification1)
3761+ b := NewSimpleBroker(sto, &testBrokerConfig{}, nil)
3762+ b.Start()
3763+ defer b.Stop()
3764+ sess, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
3765+ c.Assert(err, IsNil)
3766+ c.Check(len(sess.(*simpleBrokerSession).exchanges), Equals, 1)
3767+}
3768+
3769+func (s *simpleSuite) TestRegistrationFeedPendingError(c *C) {
3770+ buf := &helpers.SyncedLogBuffer{}
3771+ logger := logger.NewSimpleLogger(buf, "error")
3772+ sto := &testFailingStore{}
3773+ b := NewSimpleBroker(sto, &testBrokerConfig{}, logger)
3774+ b.Start()
3775+ defer b.Stop()
3776+ _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
3777+ c.Assert(err, IsNil)
3778+ // but
3779+ c.Check(buf.String(), Matches, ".*ERROR unsuccessful feed pending, get channel snapshot for 0: get channel snapshot fail\n")
3780+}
3781+
3782+func (s *simpleSuite) TestRegistrationLastWins(c *C) {
3783+ sto := store.NewInMemoryPendingStore()
3784+ b := NewSimpleBroker(sto, &testBrokerConfig{}, nil)
3785+ b.Start()
3786+ defer b.Stop()
3787+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
3788+ c.Assert(err, IsNil)
3789+ sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
3790+ c.Assert(err, IsNil)
3791+ c.Assert(b.registry["dev-1"], Equals, sess2)
3792+ b.Unregister(sess1)
3793+ // just to make sure the unregister was processed
3794+ _, err = b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: ""})
3795+ c.Assert(err, IsNil)
3796+ c.Check(b.registry["dev-1"], Equals, sess2)
3797+}
3798+
3799+func (s *simpleSuite) TestBroadcastExchange(c *C) {
3800+ sess := &simpleBrokerSession{
3801+ levels: map[store.InternalChannelId]int64{},
3802+ }
3803+ exchg := &simpleBroadcastExchange{
3804+ chanId: store.SystemInternalChannelId,
3805+ topLevel: 3,
3806+ notificationPayloads: []json.RawMessage{
3807+ json.RawMessage(`{"a":"x"}`),
3808+ json.RawMessage(`{"a":"y"}`),
3809+ },
3810+ }
3811+ inMsg, outMsg, err := exchg.Prepare(sess)
3812+ c.Assert(err, IsNil)
3813+ // check
3814+ marshalled, err := json.Marshal(inMsg)
3815+ c.Assert(err, IsNil)
3816+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
3817+ err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
3818+ c.Assert(err, IsNil)
3819+ err = exchg.Acked(sess)
3820+ c.Assert(err, IsNil)
3821+ c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(3))
3822+}
3823+
3824+func (s *simpleSuite) TestBroadcastExchangeAckMismatch(c *C) {
3825+ sess := &simpleBrokerSession{
3826+ levels: map[store.InternalChannelId]int64{},
3827+ }
3828+ exchg := &simpleBroadcastExchange{
3829+ chanId: store.SystemInternalChannelId,
3830+ topLevel: 3,
3831+ notificationPayloads: []json.RawMessage{
3832+ json.RawMessage(`{"a":"y"}`),
3833+ },
3834+ }
3835+ inMsg, outMsg, err := exchg.Prepare(sess)
3836+ c.Assert(err, IsNil)
3837+ // check
3838+ marshalled, err := json.Marshal(inMsg)
3839+ c.Assert(err, IsNil)
3840+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
3841+ err = json.Unmarshal([]byte(`{}`), outMsg)
3842+ c.Assert(err, IsNil)
3843+ err = exchg.Acked(sess)
3844+ c.Assert(err, Not(IsNil))
3845+ c.Check(sess.levels[store.SystemInternalChannelId], Equals, int64(0))
3846+}
3847+
3848+func (s *simpleSuite) TestFilterByLevel(c *C) {
3849+ payloads := []json.RawMessage{
3850+ json.RawMessage(`{"a": 3}`),
3851+ json.RawMessage(`{"a": 4}`),
3852+ json.RawMessage(`{"a": 5}`),
3853+ }
3854+ res := filterByLevel(5, 5, payloads)
3855+ c.Check(len(res), Equals, 0)
3856+ res = filterByLevel(4, 5, payloads)
3857+ c.Check(len(res), Equals, 1)
3858+ c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 5}`))
3859+ res = filterByLevel(3, 5, payloads)
3860+ c.Check(len(res), Equals, 2)
3861+ c.Check(res[0], DeepEquals, json.RawMessage(`{"a": 4}`))
3862+ res = filterByLevel(2, 5, payloads)
3863+ c.Check(len(res), Equals, 3)
3864+ res = filterByLevel(1, 5, payloads)
3865+ c.Check(len(res), Equals, 3)
3866+}
3867+
3868+func (s *simpleSuite) TestBroadcastExchangeFilterByLevel(c *C) {
3869+ sess := &simpleBrokerSession{
3870+ levels: map[store.InternalChannelId]int64{
3871+ store.SystemInternalChannelId: 2,
3872+ },
3873+ }
3874+ exchg := &simpleBroadcastExchange{
3875+ chanId: store.SystemInternalChannelId,
3876+ topLevel: 3,
3877+ notificationPayloads: []json.RawMessage{
3878+ json.RawMessage(`{"a":"x"}`),
3879+ json.RawMessage(`{"a":"y"}`),
3880+ },
3881+ }
3882+ inMsg, outMsg, err := exchg.Prepare(sess)
3883+ c.Assert(err, IsNil)
3884+ // check
3885+ marshalled, err := json.Marshal(inMsg)
3886+ c.Assert(err, IsNil)
3887+ c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
3888+ err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
3889+ c.Assert(err, IsNil)
3890+ err = exchg.Acked(sess)
3891+ c.Assert(err, IsNil)
3892+}
3893+
3894+func (s *simpleSuite) TestBroadcast(c *C) {
3895+ sto := store.NewInMemoryPendingStore()
3896+ notification1 := json.RawMessage(`{"m": "M"}`)
3897+ sto.AppendToChannel(store.SystemInternalChannelId, notification1)
3898+ b := NewSimpleBroker(sto, &testBrokerConfig{}, nil)
3899+ b.Start()
3900+ defer b.Stop()
3901+ sess1, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
3902+ c.Assert(err, IsNil)
3903+ sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-2"})
3904+ c.Assert(err, IsNil)
3905+ b.Broadcast(store.SystemInternalChannelId)
3906+ select {
3907+ case <-time.After(5 * time.Second):
3908+ c.Fatal("taking too long to get broadcast exchange")
3909+ case exchg1 := <-sess1.SessionChannel():
3910+ c.Check(exchg1, DeepEquals, &simpleBroadcastExchange{
3911+ chanId: store.SystemInternalChannelId,
3912+ topLevel: 1,
3913+ notificationPayloads: []json.RawMessage{notification1},
3914+ })
3915+ }
3916+ select {
3917+ case <-time.After(5 * time.Second):
3918+ c.Fatal("taking too long to get broadcast exchange")
3919+ case exchg2 := <-sess2.SessionChannel():
3920+ c.Check(exchg2, DeepEquals, &simpleBroadcastExchange{
3921+ chanId: store.SystemInternalChannelId,
3922+ topLevel: 1,
3923+ notificationPayloads: []json.RawMessage{notification1},
3924+ })
3925+ }
3926+}
3927+
3928+type testFailingStore struct {
3929+ store.InMemoryPendingStore
3930+ countdownToFail int
3931+}
3932+
3933+func (sto *testFailingStore) GetChannelSnapshot(chanId store.InternalChannelId) (int64, []json.RawMessage, error) {
3934+ if sto.countdownToFail == 0 {
3935+ return 0, nil, errors.New("get channel snapshot fail")
3936+ }
3937+ sto.countdownToFail--
3938+ return 0, nil, nil
3939+}
3940+
3941+func (s *simpleSuite) TestBroadcastFail(c *C) {
3942+ buf := &helpers.SyncedLogBuffer{Written: make(chan bool, 1)}
3943+ logger := logger.NewSimpleLogger(buf, "error")
3944+ sto := &testFailingStore{countdownToFail: 1}
3945+ b := NewSimpleBroker(sto, &testBrokerConfig{}, logger)
3946+ b.Start()
3947+ defer b.Stop()
3948+ _, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
3949+ c.Assert(err, IsNil)
3950+ b.Broadcast(store.SystemInternalChannelId)
3951+ select {
3952+ case <-time.After(5 * time.Second):
3953+ c.Fatal("taking too long to log error")
3954+ case <-buf.Written:
3955+ }
3956+ c.Check(buf.String(), Matches, ".*ERROR unsuccessful broadcast, get channel snapshot for 0: get channel snapshot fail\n")
3957+}
3958
3959=== added directory 'server/dev'
3960=== added file 'server/dev/config.go'
3961--- server/dev/config.go 1970-01-01 00:00:00 +0000
3962+++ server/dev/config.go 2014-01-14 15:09:46 +0000
3963@@ -0,0 +1,110 @@
3964+/*
3965+ Copyright 2013-2014 Canonical Ltd.
3966+
3967+ This program is free software: you can redistribute it and/or modify it
3968+ under the terms of the GNU General Public License version 3, as published
3969+ by the Free Software Foundation.
3970+
3971+ This program is distributed in the hope that it will be useful, but
3972+ WITHOUT ANY WARRANTY; without even the implied warranties of
3973+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
3974+ PURPOSE. See the GNU General Public License for more details.
3975+
3976+ You should have received a copy of the GNU General Public License along
3977+ with this program. If not, see <http://www.gnu.org/licenses/>.
3978+*/
3979+
3980+package main
3981+
3982+import (
3983+ "fmt"
3984+ "io"
3985+ "launchpad.net/ubuntu-push/config"
3986+ "time"
3987+)
3988+
3989+// expectedConfiguration is used as target for JSON parsing the configuration.
3990+type expectedConfiguration struct {
3991+ // session configuration
3992+ PingInterval config.ConfigTimeDuration `json:"ping_interval"`
3993+ ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"`
3994+ // broker configuration
3995+ SessionQueueSize config.ConfigQueueSize `json:"session_queue_size"`
3996+ BrokerQueueSize config.ConfigQueueSize `json:"broker_queue_size"`
3997+ // device listener configuration
3998+ Addr config.ConfigHostPort `json:"addr"`
3999+ KeyPEMFile string `json:"key_pem_file"`
4000+ CertPEMFile string `json:"cert_pem_file"`
4001+ // api http server configuration
4002+ HTTPAddr config.ConfigHostPort `json:"http_addr"`
4003+ HTTPReadTimeout config.ConfigTimeDuration `json:"http_read_timeout"`
4004+ HTTPWriteTimeout config.ConfigTimeDuration `json:"http_write_timeout"`
4005+}
4006+
4007+// configuration holds the server configuration and gives it access
4008+// through the various component config interfaces.
4009+type configuration struct {
4010+ parsed expectedConfiguration
4011+ certPEMBlock []byte
4012+ keyPEMBlock []byte
4013+}
4014+
4015+func (cfg *configuration) PingInterval() time.Duration {
4016+ return cfg.parsed.PingInterval.TimeDuration()
4017+}
4018+
4019+func (cfg *configuration) ExchangeTimeout() time.Duration {
4020+ return cfg.parsed.ExchangeTimeout.TimeDuration()
4021+}
4022+
4023+func (cfg *configuration) SessionQueueSize() uint {
4024+ return cfg.parsed.SessionQueueSize.QueueSize()
4025+}
4026+
4027+func (cfg *configuration) BrokerQueueSize() uint {
4028+ return cfg.parsed.BrokerQueueSize.QueueSize()
4029+}
4030+
4031+func (cfg *configuration) Addr() string {
4032+ return cfg.parsed.Addr.HostPort()
4033+}
4034+
4035+func (cfg *configuration) KeyPEMBlock() []byte {
4036+ return cfg.keyPEMBlock
4037+}
4038+
4039+func (cfg *configuration) CertPEMBlock() []byte {
4040+ return cfg.certPEMBlock
4041+}
4042+
4043+func (cfg *configuration) HTTPAddr() string {
4044+ return cfg.parsed.HTTPAddr.HostPort()
4045+}
4046+
4047+func (cfg *configuration) HTTPReadTimeout() time.Duration {
4048+ return cfg.parsed.HTTPReadTimeout.TimeDuration()
4049+}
4050+
4051+func (cfg *configuration) HTTPWriteTimeout() time.Duration {
4052+ return cfg.parsed.HTTPWriteTimeout.TimeDuration()
4053+}
4054+
4055+// read reads & parses configuration from the reader. it uses baseDir
4056+// to load mentioned files in the configuration.
4057+func (cfg *configuration) read(r io.Reader, baseDir string) error {
4058+ err := config.ReadConfig(r, &cfg.parsed)
4059+ if err != nil {
4060+ return err
4061+ }
4062+ keyPEMBlock, err := config.LoadFile(cfg.parsed.KeyPEMFile, baseDir)
4063+ if err != nil {
4064+ return fmt.Errorf("reading key_pem_file: %v", err)
4065+ }
4066+ certPEMBlock, err := config.LoadFile(cfg.parsed.CertPEMFile, baseDir)
4067+ if err != nil {
4068+ return fmt.Errorf("reading cert_pem_file: %v", err)
4069+ }
4070+ cfg.keyPEMBlock = keyPEMBlock
4071+ cfg.certPEMBlock = certPEMBlock
4072+ return nil
4073+}
4074
4075=== added file 'server/dev/http.go'
4076--- server/dev/http.go 1970-01-01 00:00:00 +0000
4077+++ server/dev/http.go 2014-01-14 15:09:46 +0000
4078@@ -0,0 +1,40 @@
4079+/*
4080+ Copyright 2013-2014 Canonical Ltd.
4081+
4082+ This program is free software: you can redistribute it and/or modify it
4083+ under the terms of the GNU General Public License version 3, as published
4084+ by the Free Software Foundation.
4085+
4086+ This program is distributed in the hope that it will be useful, but
4087+ WITHOUT ANY WARRANTY; without even the implied warranties of
4088+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
4089+ PURPOSE. See the GNU General Public License for more details.
4090+
4091+ You should have received a copy of the GNU General Public License along
4092+ with this program. If not, see <http://www.gnu.org/licenses/>.
4093+*/
4094+
4095+package main
4096+
4097+import (
4098+ "net"
4099+ "net/http"
4100+ "time"
4101+)
4102+
4103+// A HTTPServeConfig holds the HTTP server config.
4104+type HTTPServeConfig interface {
4105+ HTTPAddr() string
4106+ HTTPReadTimeout() time.Duration
4107+ HTTPWriteTimeout() time.Duration
4108+}
4109+
4110+// RunHTTPServe serves HTTP requests.
4111+func RunHTTPServe(lst net.Listener, h http.Handler, cfg HTTPServeConfig) error {
4112+ srv := &http.Server{
4113+ Handler: h,
4114+ ReadTimeout: cfg.HTTPReadTimeout(),
4115+ WriteTimeout: cfg.HTTPReadTimeout(),
4116+ }
4117+ return srv.Serve(lst)
4118+}
4119
4120=== added file 'server/dev/http_test.go'
4121--- server/dev/http_test.go 1970-01-01 00:00:00 +0000
4122+++ server/dev/http_test.go 2014-01-14 15:09:46 +0000
4123@@ -0,0 +1,70 @@
4124+/*
4125+ Copyright 2013-2014 Canonical Ltd.
4126+
4127+ This program is free software: you can redistribute it and/or modify it
4128+ under the terms of the GNU General Public License version 3, as published
4129+ by the Free Software Foundation.
4130+
4131+ This program is distributed in the hope that it will be useful, but
4132+ WITHOUT ANY WARRANTY; without even the implied warranties of
4133+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
4134+ PURPOSE. See the GNU General Public License for more details.
4135+
4136+ You should have received a copy of the GNU General Public License along
4137+ with this program. If not, see <http://www.gnu.org/licenses/>.
4138+*/
4139+
4140+package main
4141+
4142+import (
4143+ "fmt"
4144+ "io/ioutil"
4145+ . "launchpad.net/gocheck"
4146+ // "log"
4147+ "net"
4148+ "net/http"
4149+ "time"
4150+)
4151+
4152+type httpSuite struct{}
4153+
4154+var _ = Suite(&httpSuite{})
4155+
4156+type testHTTPServeConfig struct{}
4157+
4158+func (cfg testHTTPServeConfig) HTTPAddr() string {
4159+ return "127.0.0.1:0"
4160+}
4161+
4162+func (cfg testHTTPServeConfig) HTTPReadTimeout() time.Duration {
4163+ return 5 * time.Second
4164+}
4165+
4166+func (cfg testHTTPServeConfig) HTTPWriteTimeout() time.Duration {
4167+ return 5 * time.Second
4168+}
4169+
4170+func testHandle(w http.ResponseWriter, r *http.Request) {
4171+ fmt.Fprintf(w, "yay!\n")
4172+}
4173+
4174+func (s *httpSuite) TestRunHTTPServe(c *C) {
4175+ cfg := testHTTPServeConfig{}
4176+ lst, err := net.Listen("tcp", cfg.HTTPAddr())
4177+ c.Assert(err, IsNil)
4178+ defer lst.Close()
4179+ errCh := make(chan error, 1)
4180+ h := http.HandlerFunc(testHandle)
4181+ go func() {
4182+ errCh <- RunHTTPServe(lst, h, cfg)
4183+ }()
4184+ resp, err := http.Get(fmt.Sprintf("http://%s/", lst.Addr()))
4185+ c.Assert(err, IsNil)
4186+ defer resp.Body.Close()
4187+ c.Assert(resp.StatusCode, Equals, 200)
4188+ body, err := ioutil.ReadAll(resp.Body)
4189+ c.Assert(err, IsNil)
4190+ c.Check(string(body), Equals, "yay!\n")
4191+ lst.Close()
4192+ c.Check(<-errCh, ErrorMatches, ".*closed.*")
4193+}
4194
4195=== added file 'server/dev/server.go'
4196--- server/dev/server.go 1970-01-01 00:00:00 +0000
4197+++ server/dev/server.go 2014-01-14 15:09:46 +0000
4198@@ -0,0 +1,77 @@
4199+/*
4200+ Copyright 2013-2014 Canonical Ltd.
4201+
4202+ This program is free software: you can redistribute it and/or modify it
4203+ under the terms of the GNU General Public License version 3, as published
4204+ by the Free Software Foundation.
4205+
4206+ This program is distributed in the hope that it will be useful, but
4207+ WITHOUT ANY WARRANTY; without even the implied warranties of
4208+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
4209+ PURPOSE. See the GNU General Public License for more details.
4210+
4211+ You should have received a copy of the GNU General Public License along
4212+ with this program. If not, see <http://www.gnu.org/licenses/>.
4213+*/
4214+
4215+// simple development server.
4216+package main
4217+
4218+import (
4219+ "launchpad.net/ubuntu-push/logger"
4220+ "launchpad.net/ubuntu-push/server/api"
4221+ "launchpad.net/ubuntu-push/server/broker"
4222+ "launchpad.net/ubuntu-push/server/listener"
4223+ "launchpad.net/ubuntu-push/server/session"
4224+ "launchpad.net/ubuntu-push/server/store"
4225+ "net"
4226+ "os"
4227+ "path/filepath"
4228+)
4229+
4230+func main() {
4231+ logger := logger.NewSimpleLogger(os.Stderr, "debug")
4232+ if len(os.Args) < 2 { // xxx use flag
4233+ logger.Fatalf("missing config file")
4234+ }
4235+ configFName := os.Args[1]
4236+ f, err := os.Open(configFName)
4237+ if err != nil {
4238+ logger.Fatalf("reading config: %v", err)
4239+ }
4240+ cfg := &configuration{}
4241+ err = cfg.read(f, filepath.Dir(configFName))
4242+ if err != nil {
4243+ logger.Fatalf("reading config: %v", err)
4244+ }
4245+ // setup a pending store and start the broker
4246+ sto := store.NewInMemoryPendingStore()
4247+ broker := broker.NewSimpleBroker(sto, cfg, logger)
4248+ broker.Start()
4249+ defer broker.Stop()
4250+ // serve the http api
4251+ httpLst, err := net.Listen("tcp", cfg.HTTPAddr())
4252+ if err != nil {
4253+ logger.Fatalf("start http listening: %v", err)
4254+ }
4255+ handler := api.MakeHandlersMux(sto, broker, logger)
4256+ logger.Infof("listening for http on %v", httpLst.Addr())
4257+ go func() {
4258+ err := RunHTTPServe(httpLst, handler, cfg)
4259+ if err != nil {
4260+ logger.Fatalf("accepting http connections: %v", err)
4261+ }
4262+ }()
4263+ // listen for device connections
4264+ lst, err := listener.DeviceListen(cfg)
4265+ if err != nil {
4266+ logger.Fatalf("start device listening: %v", err)
4267+ }
4268+ logger.Infof("listening for devices on %v", lst.Addr())
4269+ err = lst.AcceptLoop(func(conn net.Conn) error {
4270+ return session.Session(conn, broker, cfg, logger)
4271+ }, logger)
4272+ if err != nil {
4273+ logger.Fatalf("accepting device connections: %v", err)
4274+ }
4275+}
4276
4277=== added file 'server/dev/server_test.go'
4278--- server/dev/server_test.go 1970-01-01 00:00:00 +0000
4279+++ server/dev/server_test.go 2014-01-14 15:09:46 +0000
4280@@ -0,0 +1,105 @@
4281+/*
4282+ Copyright 2013-2014 Canonical Ltd.
4283+
4284+ This program is free software: you can redistribute it and/or modify it
4285+ under the terms of the GNU General Public License version 3, as published
4286+ by the Free Software Foundation.
4287+
4288+ This program is distributed in the hope that it will be useful, but
4289+ WITHOUT ANY WARRANTY; without even the implied warranties of
4290+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
4291+ PURPOSE. See the GNU General Public License for more details.
4292+
4293+ You should have received a copy of the GNU General Public License along
4294+ with this program. If not, see <http://www.gnu.org/licenses/>.
4295+*/
4296+
4297+package main
4298+
4299+import (
4300+ // "fmt"
4301+ "bytes"
4302+ "io/ioutil"
4303+ . "launchpad.net/gocheck"
4304+ "os"
4305+ "path/filepath"
4306+ "testing"
4307+ "time"
4308+)
4309+
4310+func TestDevserver(t *testing.T) { TestingT(t) }
4311+
4312+type devserverSuite struct{}
4313+
4314+var _ = Suite(&devserverSuite{})
4315+
4316+func (s *devserverSuite) TestConfigRead(c *C) {
4317+ tmpDir := c.MkDir()
4318+ err := ioutil.WriteFile(filepath.Join(tmpDir, "key.key"), []byte("KeY"), os.ModePerm)
4319+ c.Assert(err, IsNil)
4320+ err = ioutil.WriteFile(filepath.Join(tmpDir, "cert.cert"), []byte("CeRt"), os.ModePerm)
4321+ c.Assert(err, IsNil)
4322+ buf := bytes.NewBufferString(`{
4323+"ping_interval": "5m",
4324+"exchange_timeout": "10s",
4325+"session_queue_size": 10,
4326+"broker_queue_size": 100,
4327+"addr": "127.0.0.1:9999",
4328+"key_pem_file": "key.key",
4329+"cert_pem_file": "cert.cert",
4330+"http_addr": "127.0.0.1:8080",
4331+"http_read_timeout": "5s",
4332+"http_write_timeout": "10s"
4333+}`)
4334+ cfg := &configuration{}
4335+ err = cfg.read(buf, tmpDir)
4336+ c.Assert(err, IsNil)
4337+ c.Check(cfg.PingInterval(), Equals, 5*time.Minute)
4338+ c.Check(cfg.ExchangeTimeout(), Equals, 10*time.Second)
4339+ c.Check(cfg.BrokerQueueSize(), Equals, uint(100))
4340+ c.Check(cfg.SessionQueueSize(), Equals, uint(10))
4341+ c.Check(cfg.Addr(), Equals, "127.0.0.1:9999")
4342+ c.Check(string(cfg.KeyPEMBlock()), Equals, "KeY")
4343+ c.Check(string(cfg.CertPEMBlock()), Equals, "CeRt")
4344+ c.Check(cfg.HTTPAddr(), Equals, "127.0.0.1:8080")
4345+ c.Check(cfg.HTTPReadTimeout(), Equals, 5*time.Second)
4346+ c.Check(cfg.HTTPWriteTimeout(), Equals, 10*time.Second)
4347+}
4348+
4349+func (s *devserverSuite) TestConfigReadErrors(c *C) {
4350+ tmpDir := c.MkDir()
4351+ checkError := func(config, expectedErr string) {
4352+ cfg := &configuration{}
4353+ err := cfg.read(bytes.NewBufferString(config), tmpDir)
4354+ c.Check(err, ErrorMatches, expectedErr)
4355+ }
4356+ checkError("", "EOF")
4357+ checkError(`{"ping_interval": "1m"}`, "missing exchange_timeout")
4358+ checkError(`{"ping_interval": "1m", "exchange_timeout": "5s", "session_queue_size": "foo"}`, "session_queue_size:.*type uint")
4359+ checkError(`{
4360+"exchange_timeout": "10s",
4361+"ping_interval": "5m",
4362+"broker_queue_size": 100,
4363+"session_queue_size": 10,
4364+"addr": ":9000",
4365+"key_pem_file": "doesntexist",
4366+"cert_pem_file": "doesntexist",
4367+"http_addr": ":8080",
4368+"http_read_timeout": "5s",
4369+"http_write_timeout": "10s"
4370+}`, "reading key_pem_file:.*no such file.*")
4371+ err := ioutil.WriteFile(filepath.Join(tmpDir, "key.key"), []byte("KeY"), os.ModePerm)
4372+ c.Assert(err, IsNil)
4373+ checkError(`{
4374+"exchange_timeout": "10s",
4375+"ping_interval": "5m",
4376+"broker_queue_size": 100,
4377+"session_queue_size": 10,
4378+"addr": ":9000",
4379+"key_pem_file": "key.key",
4380+"cert_pem_file": "doesntexist",
4381+"http_addr": ":8080",
4382+"http_read_timeout": "5s",
4383+"http_write_timeout": "10s"
4384+}`, "reading cert_pem_file:.*no such file.*")
4385+}
4386
4387=== added directory 'server/listener'
4388=== added file 'server/listener/listener.go'
4389--- server/listener/listener.go 1970-01-01 00:00:00 +0000
4390+++ server/listener/listener.go 2014-01-14 15:09:46 +0000
4391@@ -0,0 +1,86 @@
4392+/*
4393+ Copyright 2013-2014 Canonical Ltd.
4394+
4395+ This program is free software: you can redistribute it and/or modify it
4396+ under the terms of the GNU General Public License version 3, as published
4397+ by the Free Software Foundation.
4398+
4399+ This program is distributed in the hope that it will be useful, but
4400+ WITHOUT ANY WARRANTY; without even the implied warranties of
4401+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
4402+ PURPOSE. See the GNU General Public License for more details.
4403+
4404+ You should have received a copy of the GNU General Public License along
4405+ with this program. If not, see <http://www.gnu.org/licenses/>.
4406+*/
4407+
4408+// Package listener has code to listen for device connections and setup sessions for them.
4409+package listener
4410+
4411+import (
4412+ "crypto/tls"
4413+ "launchpad.net/ubuntu-push/logger"
4414+ "net"
4415+ "time"
4416+)
4417+
4418+// A DeviceListenerConfig offers the DeviceListener configuration.
4419+type DeviceListenerConfig interface {
4420+ // Addr to listen on.
4421+ Addr() string
4422+ // TLS key
4423+ KeyPEMBlock() []byte
4424+ // TLS cert
4425+ CertPEMBlock() []byte
4426+}
4427+
4428+// DeviceListener listens and setup sessions from device connections.
4429+type DeviceListener struct {
4430+ net.Listener
4431+}
4432+
4433+// DeviceListen creates a DeviceListener for device connections based on config.
4434+func DeviceListen(cfg DeviceListenerConfig) (*DeviceListener, error) {
4435+ cert, err := tls.X509KeyPair(cfg.CertPEMBlock(), cfg.KeyPEMBlock())
4436+ if err != nil {
4437+ return nil, err
4438+ }
4439+ tlsCfg := &tls.Config{
4440+ Certificates: []tls.Certificate{cert},
4441+ SessionTicketsDisabled: true,
4442+ }
4443+ lst, err := tls.Listen("tcp", cfg.Addr(), tlsCfg)
4444+ return &DeviceListener{lst}, err
4445+}
4446+
4447+// handleTemporary checks and handles if the error is just a temporary network
4448+// error.
4449+func handleTemporary(err error) bool {
4450+ if netError, isNetError := err.(net.Error); isNetError {
4451+ if netError.Temporary() {
4452+ // wait, xxx exponential backoff?
4453+ time.Sleep(100 * time.Millisecond)
4454+ return true
4455+ }
4456+ }
4457+ return false
4458+}
4459+
4460+// AcceptLoop accepts connections and starts sessions for them.
4461+func (dl *DeviceListener) AcceptLoop(session func(net.Conn) error, logger logger.Logger) error {
4462+ for {
4463+ // xxx enforce a connection limit
4464+ conn, err := dl.Listener.Accept()
4465+ if err != nil {
4466+ if handleTemporary(err) {
4467+ logger.Errorf("device listener: %s -- retrying", err)
4468+ continue
4469+ }
4470+ return err
4471+ }
4472+ go func() {
4473+ defer logger.Recoverf("terminating device connection")
4474+ session(conn)
4475+ }()
4476+ }
4477+}
4478
4479=== added file 'server/listener/listener_test.go'
4480--- server/listener/listener_test.go 1970-01-01 00:00:00 +0000
4481+++ server/listener/listener_test.go 2014-01-14 15:09:46 +0000
4482@@ -0,0 +1,239 @@
4483+/*
4484+ Copyright 2013-2014 Canonical Ltd.
4485+
4486+ This program is free software: you can redistribute it and/or modify it
4487+ under the terms of the GNU General Public License version 3, as published
4488+ by the Free Software Foundation.
4489+
4490+ This program is distributed in the hope that it will be useful, but
4491+ WITHOUT ANY WARRANTY; without even the implied warranties of
4492+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
4493+ PURPOSE. See the GNU General Public License for more details.
4494+
4495+ You should have received a copy of the GNU General Public License along
4496+ with this program. If not, see <http://www.gnu.org/licenses/>.
4497+*/
4498+
4499+package listener
4500+
4501+import (
4502+ "crypto/tls"
4503+ "crypto/x509"
4504+ . "launchpad.net/gocheck"
4505+ "launchpad.net/ubuntu-push/logger"
4506+ helpers "launchpad.net/ubuntu-push/testing"
4507+ "net"
4508+ "syscall"
4509+ "testing"
4510+ "time"
4511+)
4512+
4513+func TestListener(t *testing.T) { TestingT(t) }
4514+
4515+type listenerSuite struct{}
4516+
4517+var _ = Suite(&listenerSuite{})
4518+
4519+const NofileMax = 500
4520+
4521+func (s *listenerSuite) SetUpSuite(*C) {
4522+ // make it easier to get a too many open files error
4523+ var nofileLimit syscall.Rlimit
4524+ err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &nofileLimit)
4525+ if err != nil {
4526+ panic(err)
4527+ }
4528+ nofileLimit.Cur = NofileMax
4529+ err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &nofileLimit)
4530+ if err != nil {
4531+ panic(err)
4532+ }
4533+}
4534+
4535+type testDevListenerCfg struct {
4536+ addr string
4537+}
4538+
4539+func (cfg *testDevListenerCfg) Addr() string {
4540+ return cfg.addr
4541+}
4542+
4543+// key&cert generated with go run /usr/lib/go/src/pkg/crypto/tls/generate_cert.go -ca -host localhost -rsa-bits 512 -duration 87600h
4544+
4545+func (cfg *testDevListenerCfg) KeyPEMBlock() []byte {
4546+ return []byte(`-----BEGIN RSA PRIVATE KEY-----
4547+MIIBPAIBAAJBAPw+niki17X2qALE2A2AzE1q5dvK9CI4OduRtT9IgbFLC6psqAT2
4548+1NA+QbY17nWSSpyP65zkMkwKXrbDzstwLPkCAwEAAQJAKwXbIBULScP6QA6m8xam
4549+wgWbkvN41GVWqPafPV32kPBvKwSc+M1e+JR7g3/xPZE7TCELcfYi4yXEHZZI3Pbh
4550+oQIhAP/UsgJbsfH1GFv8Y8qGl5l/kmwwkwHhuKvEC87Yur9FAiEA/GlQv3ZfaXnT
4551+lcCFT0aL02O0RDiRYyMUG/JAZQJs6CUCIQCHO5SZYIUwxIGK5mCNxxXOAzyQSiD7
4552+hqkKywf+4FvfDQIhALa0TLyqJFom0t7c4iIGAIRc8UlIYQSPiajI64+x9775AiEA
4553+0v4fgSK/Rq059zW1753JjuB6aR0Uh+3RqJII4dUR1Wg=
4554+-----END RSA PRIVATE KEY-----`)
4555+}
4556+
4557+func (cfg *testDevListenerCfg) CertPEMBlock() []byte {
4558+ return []byte(`-----BEGIN CERTIFICATE-----
4559+MIIBYzCCAQ+gAwIBAgIBADALBgkqhkiG9w0BAQUwEjEQMA4GA1UEChMHQWNtZSBD
4560+bzAeFw0xMzEyMTkyMDU1NDNaFw0yMzEyMTcyMDU1NDNaMBIxEDAOBgNVBAoTB0Fj
4561+bWUgQ28wWjALBgkqhkiG9w0BAQEDSwAwSAJBAPw+niki17X2qALE2A2AzE1q5dvK
4562+9CI4OduRtT9IgbFLC6psqAT21NA+QbY17nWSSpyP65zkMkwKXrbDzstwLPkCAwEA
4563+AaNUMFIwDgYDVR0PAQH/BAQDAgCkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1Ud
4564+EwEB/wQFMAMBAf8wGgYDVR0RBBMwEYIJbG9jYWxob3N0hwR/AAABMAsGCSqGSIb3
4565+DQEBBQNBAFqiVI+Km2XPSO+pxITaPvhmuzg+XG3l1+2di3gL+HlDobocjBqRctRU
4566+YySO32W07acjGJmCHUKpCJuq9X8hpmk=
4567+-----END CERTIFICATE-----`)
4568+}
4569+
4570+func (s *listenerSuite) TestDeviceListen(c *C) {
4571+ lst, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:0"})
4572+ c.Check(err, IsNil)
4573+ defer lst.Close()
4574+ c.Check(lst.Addr().String(), Matches, `127.0.0.1:\d{5}`)
4575+}
4576+
4577+func (s *listenerSuite) TestDeviceListenError(c *C) {
4578+ // assume tests are not running as root
4579+ _, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:99"})
4580+ c.Check(err, ErrorMatches, ".*permission denied.*")
4581+}
4582+
4583+type testNetError struct {
4584+ temp bool
4585+}
4586+
4587+func (tne *testNetError) Error() string {
4588+ return "test net error"
4589+}
4590+
4591+func (tne *testNetError) Temporary() bool {
4592+ return tne.temp
4593+}
4594+
4595+func (tne *testNetError) Timeout() bool {
4596+ return false
4597+}
4598+
4599+var _ net.Error = &testNetError{} // sanity check
4600+
4601+func (s *listenerSuite) TestHandleTemporary(c *C) {
4602+ c.Check(handleTemporary(&testNetError{true}), Equals, true)
4603+ c.Check(handleTemporary(&testNetError{false}), Equals, false)
4604+}
4605+
4606+func testSession(conn net.Conn) error {
4607+ defer conn.Close()
4608+ conn.SetDeadline(time.Now().Add(2 * time.Second))
4609+ var buf [1]byte
4610+ _, err := conn.Read(buf[:])
4611+ if err != nil {
4612+ return err
4613+ }
4614+ _, err = conn.Write(buf[:])
4615+ return err
4616+}
4617+
4618+func testTlsDial(c *C, addr string) (net.Conn, error) {
4619+ cp := x509.NewCertPool()
4620+ ok := cp.AppendCertsFromPEM((&testDevListenerCfg{}).CertPEMBlock())
4621+ c.Assert(ok, Equals, true)
4622+ return tls.Dial("tcp", addr, &tls.Config{RootCAs: cp})
4623+}
4624+
4625+func testWriteByte(c *C, conn net.Conn, toWrite uint32) {
4626+ conn.SetDeadline(time.Now().Add(2 * time.Second))
4627+ _, err := conn.Write([]byte{byte(toWrite)})
4628+ c.Assert(err, IsNil)
4629+}
4630+
4631+func testReadByte(c *C, conn net.Conn, expected uint32) {
4632+ var buf [1]byte
4633+ _, err := conn.Read(buf[:])
4634+ c.Check(err, IsNil)
4635+ c.Check(buf[0], Equals, byte(expected))
4636+}
4637+
4638+func (s *listenerSuite) TestDeviceAcceptLoop(c *C) {
4639+ buf := &helpers.SyncedLogBuffer{}
4640+ logger := logger.NewSimpleLogger(buf, "error")
4641+ lst, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:0"})
4642+ c.Check(err, IsNil)
4643+ defer lst.Close()
4644+ errCh := make(chan error)
4645+ go func() {
4646+ errCh <- lst.AcceptLoop(testSession, logger)
4647+ }()
4648+ listenerAddr := lst.Addr().String()
4649+ conn1, err := testTlsDial(c, listenerAddr)
4650+ c.Assert(err, IsNil)
4651+ defer conn1.Close()
4652+ testWriteByte(c, conn1, '1')
4653+ conn2, err := testTlsDial(c, listenerAddr)
4654+ c.Assert(err, IsNil)
4655+ defer conn2.Close()
4656+ testWriteByte(c, conn2, '2')
4657+ testReadByte(c, conn1, '1')
4658+ testReadByte(c, conn2, '2')
4659+ lst.Close()
4660+ c.Check(<-errCh, ErrorMatches, ".*use of closed.*")
4661+ c.Check(buf.String(), Equals, "")
4662+}
4663+
4664+func (s *listenerSuite) TestDeviceAcceptLoopTemporaryError(c *C) {
4665+ buf := &helpers.SyncedLogBuffer{}
4666+ logger := logger.NewSimpleLogger(buf, "error")
4667+ // ENFILE is not the temp network error we want to handle this way
4668+ // but is relatively easy to generate in a controlled way
4669+ var err error
4670+ lst, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:0"})
4671+ c.Check(err, IsNil)
4672+ defer lst.Close()
4673+ errCh := make(chan error)
4674+ go func() {
4675+ errCh <- lst.AcceptLoop(testSession, logger)
4676+ }()
4677+ listenerAddr := lst.Addr().String()
4678+ conns := make([]net.Conn, 0, NofileMax)
4679+ for i := 0; i < NofileMax; i++ {
4680+ var conn1 net.Conn
4681+ conn1, err = net.Dial("tcp", listenerAddr)
4682+ if err != nil {
4683+ break
4684+ }
4685+ defer conn1.Close()
4686+ conns = append(conns, conn1)
4687+ }
4688+ c.Assert(err, ErrorMatches, "*.too many open.*")
4689+ for _, conn := range conns {
4690+ conn.Close()
4691+ }
4692+ conn2, err := testTlsDial(c, listenerAddr)
4693+ c.Assert(err, IsNil)
4694+ defer conn2.Close()
4695+ testWriteByte(c, conn2, '2')
4696+ testReadByte(c, conn2, '2')
4697+ lst.Close()
4698+ c.Check(<-errCh, ErrorMatches, ".*use of closed.*")
4699+ c.Check(buf.String(), Matches, ".*device listener:.*accept.*too many open.*-- retrying\n")
4700+}
4701+
4702+func (s *listenerSuite) TestDeviceAcceptLoopPanic(c *C) {
4703+ buf := &helpers.SyncedLogBuffer{}
4704+ logger := logger.NewSimpleLogger(buf, "error")
4705+ lst, err := DeviceListen(&testDevListenerCfg{"127.0.0.1:0"})
4706+ c.Check(err, IsNil)
4707+ defer lst.Close()
4708+ errCh := make(chan error)
4709+ go func() {
4710+ errCh <- lst.AcceptLoop(func(conn net.Conn) error {
4711+ defer conn.Close()
4712+ panic("session panic")
4713+ }, logger)
4714+ }()
4715+ listenerAddr := lst.Addr().String()
4716+ _, err = testTlsDial(c, listenerAddr)
4717+ c.Assert(err, Not(IsNil))
4718+ lst.Close()
4719+ c.Check(<-errCh, ErrorMatches, ".*use of closed.*")
4720+ c.Check(buf.String(), Matches, "(?s).*session panic!! terminating device connection:\n.*AcceptLoop.*")
4721+}
4722
4723=== added directory 'server/session'
4724=== added file 'server/session/session.go'
4725--- server/session/session.go 1970-01-01 00:00:00 +0000
4726+++ server/session/session.go 2014-01-14 15:09:46 +0000
4727@@ -0,0 +1,157 @@
4728+/*
4729+ Copyright 2013-2014 Canonical Ltd.
4730+
4731+ This program is free software: you can redistribute it and/or modify it
4732+ under the terms of the GNU General Public License version 3, as published
4733+ by the Free Software Foundation.
4734+
4735+ This program is distributed in the hope that it will be useful, but
4736+ WITHOUT ANY WARRANTY; without even the implied warranties of
4737+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
4738+ PURPOSE. See the GNU General Public License for more details.
4739+
4740+ You should have received a copy of the GNU General Public License along
4741+ with this program. If not, see <http://www.gnu.org/licenses/>.
4742+*/
4743+
4744+// Package session has code handling long-lived connections from devices.
4745+package session
4746+
4747+import (
4748+ "launchpad.net/ubuntu-push/logger"
4749+ "launchpad.net/ubuntu-push/protocol"
4750+ "launchpad.net/ubuntu-push/server/broker"
4751+ "net"
4752+ "time"
4753+)
4754+
4755+// SessionConfig is for carrying the session configuration.
4756+type SessionConfig interface {
4757+ // pings are emitted each ping interval
4758+ PingInterval() time.Duration
4759+ // send and waiting for response shouldn't take more than exchange
4760+ // timeout
4761+ ExchangeTimeout() time.Duration
4762+}
4763+
4764+// sessionStart manages the start of the protocol session.
4765+func sessionStart(proto protocol.Protocol, brkr broker.Broker, cfg SessionConfig) (broker.BrokerSession, error) {
4766+ var connMsg protocol.ConnectMsg
4767+ proto.SetDeadline(time.Now().Add(cfg.ExchangeTimeout()))
4768+ err := proto.ReadMessage(&connMsg)
4769+ if err != nil {
4770+ return nil, err
4771+ }
4772+ if connMsg.Type != "connect" {
4773+ return nil, &broker.ErrAbort{"expected CONNECT message"}
4774+ }
4775+ return brkr.Register(&connMsg)
4776+}
4777+
4778+// exchange writes outMsg message, reads answer in inMsg
4779+func exchange(proto protocol.Protocol, outMsg, inMsg interface{}, exchangeTimeout time.Duration) error {
4780+ proto.SetDeadline(time.Now().Add(exchangeTimeout))
4781+ err := proto.WriteMessage(outMsg)
4782+ if err != nil {
4783+ return err
4784+ }
4785+ err = proto.ReadMessage(inMsg)
4786+ if err != nil {
4787+ return err
4788+ }
4789+ return nil
4790+}
4791+
4792+// sessionLoop manages the exchanges of the protocol session.
4793+func sessionLoop(proto protocol.Protocol, sess broker.BrokerSession, cfg SessionConfig) error {
4794+ pingInterval := cfg.PingInterval()
4795+ exchangeTimeout := cfg.ExchangeTimeout()
4796+ pingTimer := time.NewTimer(pingInterval)
4797+ ch := sess.SessionChannel()
4798+ for {
4799+ select {
4800+ case <-pingTimer.C:
4801+ pingMsg := &protocol.PingPongMsg{"ping"}
4802+ var pongMsg protocol.PingPongMsg
4803+ err := exchange(proto, pingMsg, &pongMsg, exchangeTimeout)
4804+ if err != nil {
4805+ return err
4806+ }
4807+ if pongMsg.Type != "pong" {
4808+ return &broker.ErrAbort{"expected PONG message"}
4809+ }
4810+ pingTimer.Reset(pingInterval)
4811+ case exchg := <-ch:
4812+ // xxx later can use ch closing for shutdown/reset
4813+ pingTimer.Stop()
4814+ outMsg, inMsg, err := exchg.Prepare(sess)
4815+ if err != nil {
4816+ return err
4817+ }
4818+ for {
4819+ done := outMsg.Split()
4820+ err = exchange(proto, outMsg, inMsg, exchangeTimeout)
4821+ if err != nil {
4822+ return err
4823+ }
4824+ if done {
4825+ pingTimer.Reset(pingInterval)
4826+ }
4827+ err = exchg.Acked(sess)
4828+ if err != nil {
4829+ return err
4830+ }
4831+ if done {
4832+ break
4833+ }
4834+ }
4835+ }
4836+ }
4837+ return nil
4838+}
4839+
4840+var sessionsEpoch = time.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()
4841+
4842+// sessionTracker logs session events.
4843+type sessionTracker struct {
4844+ logger.Logger
4845+ sessionId int64 // xxx use timeuuid later
4846+}
4847+
4848+func (trk *sessionTracker) start(conn interface {
4849+ RemoteAddr() net.Addr
4850+}) {
4851+ trk.sessionId = time.Now().UnixNano() - sessionsEpoch
4852+ trk.Debugf("session(%x) connected %v", trk.sessionId, conn.RemoteAddr())
4853+}
4854+
4855+func (trk *sessionTracker) registered(sess broker.BrokerSession) {
4856+ trk.Infof("session(%x) registered %v", trk.sessionId, sess.DeviceId())
4857+}
4858+
4859+func (trk *sessionTracker) end(err error) error {
4860+ trk.Debugf("session(%x) ended with: %v", trk.sessionId, err)
4861+ return err
4862+}
4863+
4864+// Session manages the session with a client.
4865+func Session(conn net.Conn, brkr broker.Broker, cfg SessionConfig, logger logger.Logger) error {
4866+ defer conn.Close()
4867+ track := sessionTracker{Logger: logger}
4868+ track.start(conn)
4869+ v, err := protocol.ReadWireFormatVersion(conn, cfg.ExchangeTimeout())
4870+ if err != nil {
4871+ return track.end(err)
4872+ }
4873+ if v != protocol.ProtocolWireVersion {
4874+ return track.end(&broker.ErrAbort{"unexpected wire format version"})
4875+ }
4876+ proto := protocol.NewProtocol0(conn)
4877+ sess, err := sessionStart(proto, brkr, cfg)
4878+ if err != nil {
4879+ return track.end(err)
4880+ }
4881+ track.registered(sess)
4882+ defer brkr.Unregister(sess)
4883+ return track.end(sessionLoop(proto, sess, cfg))
4884+}
4885
4886=== added file 'server/session/session_test.go'
4887--- server/session/session_test.go 1970-01-01 00:00:00 +0000
4888+++ server/session/session_test.go 2014-01-14 15:09:46 +0000
4889@@ -0,0 +1,619 @@
4890+/*
4891+ Copyright 2013-2014 Canonical Ltd.
4892+
4893+ This program is free software: you can redistribute it and/or modify it
4894+ under the terms of the GNU General Public License version 3, as published
4895+ by the Free Software Foundation.
4896+
4897+ This program is distributed in the hope that it will be useful, but
4898+ WITHOUT ANY WARRANTY; without even the implied warranties of
4899+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
4900+ PURPOSE. See the GNU General Public License for more details.
4901+
4902+ You should have received a copy of the GNU General Public License along
4903+ with this program. If not, see <http://www.gnu.org/licenses/>.
4904+*/
4905+
4906+package session
4907+
4908+import (
4909+ "bufio"
4910+ "bytes"
4911+ "encoding/json"
4912+ "errors"
4913+ "fmt"
4914+ "io"
4915+ "io/ioutil"
4916+ . "launchpad.net/gocheck"
4917+ "launchpad.net/ubuntu-push/logger"
4918+ "launchpad.net/ubuntu-push/protocol"
4919+ "launchpad.net/ubuntu-push/server/broker"
4920+ "net"
4921+ "reflect"
4922+ "testing"
4923+ "time"
4924+)
4925+
4926+func TestSession(t *testing.T) { TestingT(t) }
4927+
4928+type sessionSuite struct{}
4929+
4930+var _ = Suite(&sessionSuite{})
4931+
4932+type testProtocol struct {
4933+ up chan interface{}
4934+ down chan interface{}
4935+}
4936+
4937+// takeNext takes a value from given channel with a 5s timeout
4938+func takeNext(ch <-chan interface{}) interface{} {
4939+ select {
4940+ case <-time.After(5 * time.Second):
4941+ panic("test protocol exchange stuck: too long waiting")
4942+ case v := <-ch:
4943+ return v
4944+ }
4945+ return nil
4946+}
4947+
4948+func (c *testProtocol) SetDeadline(t time.Time) {
4949+ deadAfter := t.Sub(time.Now())
4950+ deadAfter = (deadAfter + time.Millisecond/2) / time.Millisecond * time.Millisecond
4951+ c.down <- fmt.Sprintf("deadline %v", deadAfter)
4952+}
4953+
4954+func (c *testProtocol) ReadMessage(dest interface{}) error {
4955+ switch v := takeNext(c.up).(type) {
4956+ case error:
4957+ return v
4958+ default:
4959+ // make sure JSON.Unmarshal works with dest
4960+ var marshalledMsg []byte
4961+ marshalledMsg, err := json.Marshal(v)
4962+ if err != nil {
4963+ return fmt.Errorf("can't jsonify test value: %v", v)
4964+ }
4965+ return json.Unmarshal(marshalledMsg, dest)
4966+ }
4967+ return nil
4968+}
4969+
4970+func (c *testProtocol) WriteMessage(src interface{}) error {
4971+ // make sure JSON.Marshal works with src
4972+ _, err := json.Marshal(src)
4973+ if err != nil {
4974+ return err
4975+ }
4976+ val := reflect.ValueOf(src)
4977+ if val.Kind() == reflect.Ptr {
4978+ src = val.Elem().Interface()
4979+ }
4980+ c.down <- src
4981+ switch v := takeNext(c.up).(type) {
4982+ case error:
4983+ return v
4984+ }
4985+ return nil
4986+}
4987+
4988+type testSessionConfig struct {
4989+ exchangeTimeout time.Duration
4990+ pingInterval time.Duration
4991+}
4992+
4993+func (tsc *testSessionConfig) PingInterval() time.Duration {
4994+ return tsc.pingInterval
4995+}
4996+
4997+func (tsc *testSessionConfig) ExchangeTimeout() time.Duration {
4998+ return tsc.exchangeTimeout
4999+}
5000+
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches