Merge lp:~lucio.torre/graphite/graphite-add-rabbitmq into lp:~graphite-dev/graphite/main

Proposed by Lucio Torre
Status: Merged
Approved by: chrismd
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~lucio.torre/graphite/graphite-add-rabbitmq
Merge into: lp:~graphite-dev/graphite/main
Diff against target: 1151 lines (+1105/-1)
5 files modified
carbon/bin/carbon-cache.py (+14/-1)
carbon/conf/carbon.amqp.conf.example (+73/-0)
carbon/lib/carbon/amqp0-8.xml (+759/-0)
carbon/lib/carbon/amqp_listener.py (+178/-0)
carbon/lib/carbon/amqp_publisher.py (+81/-0)
To merge this branch: bzr merge lp:~lucio.torre/graphite/graphite-add-rabbitmq
Reviewer Review Type Date Requested Status
chrismd Approve
Review via email: mp+16816@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) wrote :

add a listener for amqp.

includes sample configurations, a sample producer and a way of starting up the listener without installing everything to test it.

Revision history for this message
Elliot Murphy (statik) wrote :

I just realized this includes the amqp xml file. I recommend not including this file, as it has a non-free license and will cause a problem for carbon being packaged for debian/ubuntu.

Revision history for this message
chrismd (chrismd) wrote :

I just looked through the code and it looks good, only a couple caveats. First is that the rest of the graphite codebase uses 2 space indentation whereas amqp_listener.py uses 4. I know that's the PEP-8 recommendation, I just prefer 2 spaces myself. I don't care too much if you want to leave it at 4.

Also it uses the @inlineCallbacks decorator which, while incredibly useful, is not compatible with python 2.4. I've been considering upping the python requirement to 2.5 simply because 2.4 is so old and I often find myself breaking compatibility on accident by using more recent features out of habit. So I don't worry about changing that yet, I'll probably make the next release require 2.5.

As for the xml file, is there not a free version of the AMQP spec? What do other txamqp apps use? I just checked out rabbitmq and it appears they have a JSON file describing the spec with a permissive license so we could use it. The only question is, can txamqp read it?

Revision history for this message
chrismd (chrismd) wrote :

Looks like we have a way around the spec license issue. There are stripped down versions of the spec available here http://www.amqp.org/confluence/display/AMQP/AMQP+Specification which are under a BSD license. I confirmed with the txamqp devs that those spec files will work fine.

Lucio, could you replace the spec file in your branch with the stripped down one?

Revision history for this message
chrismd (chrismd) wrote :

One other issue I just noticed, in amqp_listener.py around line 90 you catch a ValueError exception to handle malformed lines but the except block needs a continue statement to avoid causing a NameError in the lines below it.

Revision history for this message
Lucio Torre (lucio.torre) wrote :

1) ill leave indentation at 4 if thats ok. thanks for the giving me the option :)

2) ill rewrite to use plain old deferreds. this will take some work.

3) ill put the bsd version of the xml

4) ill fix the ValueError/continue issue.

ill let you know when ive pushed the new branch.

Revision history for this message
chrismd (chrismd) wrote :

Awesome, thanks! Don't worry about the @inlineCallbacks though because I looked and txAMQP also uses @inlineCallbacks and thus it requires 2.5 anyways.

Initially I had only thought AMQP support would be an optional feature for getting data into Graphite. However the more I have been thinking about it the more I've realized there are some really cool things we could do by further using AMQP (and Thrift) in carbon, so it is quite possibly going to become a required dependency. I'll post some of my ideas to the graphite-dev mailing list once they solidify a little more.

204. By Lucio Torre

merged with trunk

205. By Lucio Torre

fixed proposal requests

Revision history for this message
Lucio Torre (lucio.torre) wrote :

changed the xml file for: http://www.amqp.org/confluence/download/attachments/720900/amqp0-8.stripped.xml?version=1&modificationDate=1250523131000

added continue after validation

please re-review.

thanks.

Revision history for this message
chrismd (chrismd) wrote :

Looks good to merge into trunk.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'carbon/bin/carbon-cache.py'
2--- carbon/bin/carbon-cache.py 2009-10-30 12:32:49 +0000
3+++ carbon/bin/carbon-cache.py 2010-01-11 15:35:21 +0000
4@@ -27,7 +27,7 @@
5 try:
6 from twisted.internet import epollreactor
7 epollreactor.install()
8-except:
9+except:
10 pass
11 from twisted.internet import reactor
12
13@@ -112,6 +112,14 @@
14 # Read config (we want failures to occur before daemonizing)
15 settings.readFrom(options.config, 'cache')
16
17+use_amqp = settings.get("ENABLE_AMQP", False)
18+if use_amqp:
19+ from carbon import amqp_listener
20+ amqp_host = settings.get("AMQP_HOST", "localhost")
21+ amqp_port = settings.get("AMQP_PORT", 5672)
22+ amqp_user = settings.get("AMQP_USER", "guest")
23+ amqp_password = settings.get("AMQP_PASSWORD", "guest")
24+ amqp_verbose = settings.get("AMQP_VERBOSE", False)
25
26 # --debug
27 if options.debug:
28@@ -146,6 +154,11 @@
29 startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
30 startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
31 startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler)
32+
33+if use_amqp:
34+ amqp_listener.startReceiver(amqp_host, amqp_port,
35+ amqp_user, amqp_password, verbose=amqp_verbose)
36+#
37 startWriter()
38 startRecordingCacheMetrics()
39
40
41=== added file 'carbon/conf/carbon.amqp.conf.example'
42--- carbon/conf/carbon.amqp.conf.example 1970-01-01 00:00:00 +0000
43+++ carbon/conf/carbon.amqp.conf.example 2010-01-11 15:35:21 +0000
44@@ -0,0 +1,73 @@
45+# This is a configuration file with AMQP enabled
46+
47+[cache]
48+LOCAL_DATA_DIR =
49+
50+# Specify the user to drop privileges to
51+# If this is blank carbon runs as the user that invokes it
52+# This user must have write access to the local data directory
53+USER =
54+
55+# Require at least this many seconds to pass after creating a new database file
56+# before creating another. This is intended to help avoid the performance impact
57+# caused by a flood of new metrics.
58+CREATION_DELAY = 1.0
59+
60+# Limit the size of the cache to avoid swapping or becoming CPU bound.
61+# Sorts and serving cache queries gets more expensive as the cache grows.
62+# Use the value "inf" (infinity) for an unlimited cache size.
63+MAX_CACHE_SIZE = inf
64+
65+# Limits the number of whisper update_many() calls per second, which effectively
66+# means the number of write requests sent to the disk. This is intended to
67+# prevent over-utilizing the disk and thus starving the rest of the system.
68+# When the rate of required updates exceeds this, then carbon's caching will
69+# take effect and increase the overall throughput accordingly.
70+MAX_UPDATES_PER_SECOND = 1000
71+
72+# Softly limits the number of whisper files that get created each minute.
73+# Setting this value low (like at 50) is a good way to ensure your graphite
74+# system will not be adversely impacted when a bunch of new metrics are
75+# sent to it. The trade off is that it will take much longer for those metrics'
76+# database files to all get created and thus longer until the data becomes usable.
77+# Setting this value high (like "inf" for infinity) will cause graphite to create
78+# the files quickly but at the risk of slowing I/O down considerably for a while.
79+MAX_CREATES_PER_MINUTE = inf
80+
81+LINE_RECEIVER_INTERFACE = 0.0.0.0
82+LINE_RECEIVER_PORT = 2003
83+
84+PICKLE_RECEIVER_INTERFACE = 0.0.0.0
85+PICKLE_RECEIVER_PORT = 2004
86+
87+CACHE_QUERY_INTERFACE = 0.0.0.0
88+CACHE_QUERY_PORT = 7002
89+
90+# Enable AMQP if you want to receve metrics using you amqp broker
91+ENABLE_AMQP = True
92+
93+# Verbose means a line will be logged for every metric received
94+# useful for testing
95+AMQP_VERBOSE = True
96+
97+# your credentials for the amqp server
98+# AMQP_USER = guest
99+# AMQP_PASSWORD = guest
100+
101+# the network settings for the amqp server
102+# AMQP_HOST = localhost
103+# AMQP_PORT = 5672
104+
105+# NOTE: you cannot run both a cache and a relay on the same server
106+# with the default configuration, you have to specify a distinict
107+# interfaces and ports for the listeners.
108+
109+[relay]
110+LINE_RECEIVER_INTERFACE = 0.0.0.0
111+LINE_RECEIVER_PORT = 2003
112+
113+PICKLE_RECEIVER_INTERFACE = 0.0.0.0
114+PICKLE_RECEIVER_PORT = 2004
115+
116+CACHE_SERVERS = server1, server2, server3
117+MAX_QUEUE_SIZE = 10000
118
119=== added file 'carbon/lib/carbon/amqp0-8.xml'
120--- carbon/lib/carbon/amqp0-8.xml 1970-01-01 00:00:00 +0000
121+++ carbon/lib/carbon/amqp0-8.xml 2010-01-11 15:35:21 +0000
122@@ -0,0 +1,759 @@
123+<?xml version="1.0" encoding="UTF-8"?>
124+<!--
125+Copyright (c) 2009 AMQP Working Group.
126+All rights reserved.
127+
128+Redistribution and use in source and binary forms, with or without
129+modification, are permitted provided that the following conditions
130+are met:
131+1. Redistributions of source code must retain the above copyright
132+notice, this list of conditions and the following disclaimer.
133+2. Redistributions in binary form must reproduce the above copyright
134+notice, this list of conditions and the following disclaimer in the
135+documentation and/or other materials provided with the distribution.
136+3. The name of the author may not be used to endorse or promote products
137+derived from this software without specific prior written permission.
138+
139+THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
140+IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
141+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
142+IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
143+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
144+NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
145+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
146+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
147+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
148+THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
149+-->
150+<amqp major="8" minor="0" port="5672">
151+ <constant name="frame method" value="1"/>
152+ <constant name="frame header" value="2"/>
153+ <constant name="frame body" value="3"/>
154+ <constant name="frame oob method" value="4"/>
155+ <constant name="frame oob header" value="5"/>
156+ <constant name="frame oob body" value="6"/>
157+ <constant name="frame trace" value="7"/>
158+ <constant name="frame heartbeat" value="8"/>
159+ <constant name="frame min size" value="4096"/>
160+ <constant name="frame end" value="206"/>
161+ <constant name="reply success" value="200"/>
162+ <constant name="not delivered" value="310" class="soft error"/>
163+ <constant name="content too large" value="311" class="soft error"/>
164+ <constant name="connection forced" value="320" class="hard error"/>
165+ <constant name="invalid path" value="402" class="hard error"/>
166+ <constant name="access refused" value="403" class="soft error"/>
167+ <constant name="not found" value="404" class="soft error"/>
168+ <constant name="resource locked" value="405" class="soft error"/>
169+ <constant name="frame error" value="501" class="hard error"/>
170+ <constant name="syntax error" value="502" class="hard error"/>
171+ <constant name="command invalid" value="503" class="hard error"/>
172+ <constant name="channel error" value="504" class="hard error"/>
173+ <constant name="resource error" value="506" class="hard error"/>
174+ <constant name="not allowed" value="530" class="hard error"/>
175+ <constant name="not implemented" value="540" class="hard error"/>
176+ <constant name="internal error" value="541" class="hard error"/>
177+ <domain name="access ticket" type="short">
178+ <assert check="ne" value="0"/>
179+ </domain>
180+ <domain name="class id" type="short"/>
181+ <domain name="consumer tag" type="shortstr"/>
182+ <domain name="delivery tag" type="longlong"/>
183+ <domain name="exchange name" type="shortstr">
184+ <assert check="length" value="127"/>
185+ </domain>
186+ <domain name="known hosts" type="shortstr"/>
187+ <domain name="method id" type="short"/>
188+ <domain name="no ack" type="bit"/>
189+ <domain name="no local" type="bit"/>
190+ <domain name="path" type="shortstr">
191+ <assert check="notnull"/>
192+ <assert check="syntax" rule="path"/>
193+ <assert check="length" value="127"/>
194+ </domain>
195+ <domain name="peer properties" type="table"/>
196+ <domain name="queue name" type="shortstr">
197+ <assert check="length" value="127"/>
198+ </domain>
199+ <domain name="redelivered" type="bit"/>
200+ <domain name="reply code" type="short">
201+ <assert check="notnull"/>
202+ </domain>
203+ <domain name="reply text" type="shortstr">
204+ <assert check="notnull"/>
205+ </domain>
206+ <class name="connection" handler="connection" index="10">
207+ <chassis name="server" implement="MUST"/>
208+ <chassis name="client" implement="MUST"/>
209+ <method name="start" synchronous="1" index="10">
210+ <chassis name="client" implement="MUST"/>
211+ <response name="start-ok"/>
212+ <field name="version major" type="octet"/>
213+ <field name="version minor" type="octet"/>
214+ <field name="server properties" domain="peer properties"/>
215+ <field name="mechanisms" type="longstr">
216+ <see name="security mechanisms"/>
217+ <assert check="notnull"/>
218+ </field>
219+ <field name="locales" type="longstr">
220+ <assert check="notnull"/>
221+ </field>
222+ </method>
223+ <method name="start-ok" synchronous="1" index="11">
224+ <chassis name="server" implement="MUST"/>
225+ <field name="client properties" domain="peer properties"/>
226+ <field name="mechanism" type="shortstr">
227+ <assert check="notnull"/>
228+ </field>
229+ <field name="response" type="longstr">
230+ <assert check="notnull"/>
231+ </field>
232+ <field name="locale" type="shortstr">
233+ <assert check="notnull"/>
234+ </field>
235+ </method>
236+ <method name="secure" synchronous="1" index="20">
237+ <chassis name="client" implement="MUST"/>
238+ <response name="secure-ok"/>
239+ <field name="challenge" type="longstr">
240+ <see name="security mechanisms"/>
241+ </field>
242+ </method>
243+ <method name="secure-ok" synchronous="1" index="21">
244+ <chassis name="server" implement="MUST"/>
245+ <field name="response" type="longstr">
246+ <assert check="notnull"/>
247+ </field>
248+ </method>
249+ <method name="tune" synchronous="1" index="30">
250+ <chassis name="client" implement="MUST"/>
251+ <response name="tune-ok"/>
252+ <field name="channel max" type="short"/>
253+ <field name="frame max" type="long"/>
254+ <field name="heartbeat" type="short"/>
255+ </method>
256+ <method name="tune-ok" synchronous="1" index="31">
257+ <chassis name="server" implement="MUST"/>
258+ <field name="channel max" type="short">
259+ <assert check="notnull"/>
260+ <assert check="le" method="tune" field="channel max"/>
261+ </field>
262+ <field name="frame max" type="long"/>
263+ <field name="heartbeat" type="short"/>
264+ </method>
265+ <method name="open" synchronous="1" index="40">
266+ <chassis name="server" implement="MUST"/>
267+ <response name="open-ok"/>
268+ <response name="redirect"/>
269+ <field name="virtual host" domain="path">
270+ <assert check="regexp" value="^[a-zA-Z0-9/-_]+$"/>
271+ </field>
272+ <field name="capabilities" type="shortstr"/>
273+ <field name="insist" type="bit"/>
274+ </method>
275+ <method name="open-ok" synchronous="1" index="41">
276+ <chassis name="client" implement="MUST"/>
277+ <field name="known hosts" domain="known hosts"/>
278+ </method>
279+ <method name="redirect" synchronous="1" index="50">
280+ <chassis name="client" implement="MAY"/>
281+ <field name="host" type="shortstr">
282+ <assert check="notnull"/>
283+ </field>
284+ <field name="known hosts" domain="known hosts"/>
285+ </method>
286+ <method name="close" synchronous="1" index="60">
287+ <chassis name="client" implement="MUST"/>
288+ <chassis name="server" implement="MUST"/>
289+ <response name="close-ok"/>
290+ <field name="reply code" domain="reply code"/>
291+ <field name="reply text" domain="reply text"/>
292+ <field name="class id" domain="class id"/>
293+ <field name="method id" domain="class id"/>
294+ </method>
295+ <method name="close-ok" synchronous="1" index="61">
296+ <chassis name="client" implement="MUST"/>
297+ <chassis name="server" implement="MUST"/>
298+ </method>
299+ </class>
300+ <class name="channel" handler="channel" index="20">
301+ <chassis name="server" implement="MUST"/>
302+ <chassis name="client" implement="MUST"/>
303+ <method name="open" synchronous="1" index="10">
304+ <chassis name="server" implement="MUST"/>
305+ <response name="open-ok"/>
306+ <field name="out of band" type="shortstr">
307+ <assert check="null"/>
308+ </field>
309+ </method>
310+ <method name="open-ok" synchronous="1" index="11">
311+ <chassis name="client" implement="MUST"/>
312+ </method>
313+ <method name="flow" synchronous="1" index="20">
314+ <chassis name="server" implement="MUST"/>
315+ <chassis name="client" implement="MUST"/>
316+ <response name="flow-ok"/>
317+ <field name="active" type="bit"/>
318+ </method>
319+ <method name="flow-ok" index="21">
320+ <chassis name="server" implement="MUST"/>
321+ <chassis name="client" implement="MUST"/>
322+ <field name="active" type="bit"/>
323+ </method>
324+ <method name="alert" index="30">
325+ <chassis name="client" implement="MUST"/>
326+ <field name="reply code" domain="reply code"/>
327+ <field name="reply text" domain="reply text"/>
328+ <field name="details" type="table"/>
329+ </method>
330+ <method name="close" synchronous="1" index="40">
331+ <chassis name="client" implement="MUST"/>
332+ <chassis name="server" implement="MUST"/>
333+ <response name="close-ok"/>
334+ <field name="reply code" domain="reply code"/>
335+ <field name="reply text" domain="reply text"/>
336+ <field name="class id" domain="class id"/>
337+ <field name="method id" domain="method id"/>
338+ </method>
339+ <method name="close-ok" synchronous="1" index="41">
340+ <chassis name="client" implement="MUST"/>
341+ <chassis name="server" implement="MUST"/>
342+ </method>
343+ </class>
344+ <class name="access" handler="connection" index="30">
345+ <chassis name="server" implement="MUST"/>
346+ <chassis name="client" implement="MUST"/>
347+ <method name="request" synchronous="1" index="10">
348+ <chassis name="server" implement="MUST"/>
349+ <response name="request-ok"/>
350+ <field name="realm" domain="path"/>
351+ <field name="exclusive" type="bit"/>
352+ <field name="passive" type="bit"/>
353+ <field name="active" type="bit"/>
354+ <field name="write" type="bit"/>
355+ <field name="read" type="bit"/>
356+ </method>
357+ <method name="request-ok" synchronous="1" index="11">
358+ <chassis name="client" implement="MUST"/>
359+ <field name="ticket" domain="access ticket"/>
360+ </method>
361+ </class>
362+ <class name="exchange" handler="channel" index="40">
363+ <chassis name="server" implement="MUST"/>
364+ <chassis name="client" implement="MUST"/>
365+ <method name="declare" synchronous="1" index="10">
366+ <chassis name="server" implement="MUST"/>
367+ <response name="declare-ok"/>
368+ <field name="ticket" domain="access ticket"/>
369+ <field name="exchange" domain="exchange name">
370+ <assert check="regexp" value="^[a-zA-Z0-9-_.:]+$"/>
371+ </field>
372+ <field name="type" type="shortstr">
373+ <assert check="regexp" value="^[a-zA-Z0-9-_.:]+$"/>
374+ </field>
375+ <field name="passive" type="bit"/>
376+ <field name="durable" type="bit"/>
377+ <field name="auto delete" type="bit"/>
378+ <field name="internal" type="bit"/>
379+ <field name="nowait" type="bit"/>
380+ <field name="arguments" type="table"/>
381+ </method>
382+ <method name="declare-ok" synchronous="1" index="11">
383+ <chassis name="client" implement="MUST"/>
384+ </method>
385+ <method name="delete" synchronous="1" index="20">
386+ <chassis name="server" implement="MUST"/>
387+ <response name="delete-ok"/>
388+ <field name="ticket" domain="access ticket"/>
389+ <field name="exchange" domain="exchange name">
390+ <assert check="notnull"/>
391+ </field>
392+ <field name="if unused" type="bit"/>
393+ <field name="nowait" type="bit"/>
394+ </method>
395+ <method name="delete-ok" synchronous="1" index="21">
396+ <chassis name="client" implement="MUST"/>
397+ </method>
398+ </class>
399+ <class name="queue" handler="channel" index="50">
400+ <chassis name="server" implement="MUST"/>
401+ <chassis name="client" implement="MUST"/>
402+ <method name="declare" synchronous="1" index="10">
403+ <chassis name="server" implement="MUST"/>
404+ <response name="declare-ok"/>
405+ <field name="ticket" domain="access ticket"/>
406+ <field name="queue" domain="queue name">
407+ <assert check="regexp" value="^[a-zA-Z0-9-_.:]*$"/>
408+ </field>
409+ <field name="passive" type="bit"/>
410+ <field name="durable" type="bit"/>
411+ <field name="exclusive" type="bit"/>
412+ <field name="auto delete" type="bit"/>
413+ <field name="nowait" type="bit"/>
414+ <field name="arguments" type="table"/>
415+ </method>
416+ <method name="declare-ok" synchronous="1" index="11">
417+ <chassis name="client" implement="MUST"/>
418+ <field name="queue" domain="queue name">
419+ <assert check="notnull"/>
420+ </field>
421+ <field name="message count" type="long"/>
422+ <field name="consumer count" type="long"/>
423+ </method>
424+ <method name="bind" synchronous="1" index="20">
425+ <chassis name="server" implement="MUST"/>
426+ <response name="bind-ok"/>
427+ <field name="ticket" domain="access ticket"/>
428+ <field name="queue" domain="queue name"/>
429+ <field name="exchange" domain="exchange name"/>
430+ <field name="routing key" type="shortstr"/>
431+ <field name="nowait" type="bit"/>
432+ <field name="arguments" type="table"/>
433+ </method>
434+ <method name="bind-ok" synchronous="1" index="21">
435+ <chassis name="client" implement="MUST"/>
436+ </method>
437+ <method name="purge" synchronous="1" index="30">
438+ <chassis name="server" implement="MUST"/>
439+ <response name="purge-ok"/>
440+ <field name="ticket" domain="access ticket"/>
441+ <field name="queue" domain="queue name"/>
442+ <field name="nowait" type="bit"/>
443+ </method>
444+ <method name="purge-ok" synchronous="1" index="31">
445+ <chassis name="client" implement="MUST"/>
446+ <field name="message count" type="long"/>
447+ </method>
448+ <method name="delete" synchronous="1" index="40">
449+ <chassis name="server" implement="MUST"/>
450+ <response name="delete-ok"/>
451+ <field name="ticket" domain="access ticket"/>
452+ <field name="queue" domain="queue name"/>
453+ <field name="if unused" type="bit"/>
454+ <field name="if empty" type="bit">
455+ <test/>
456+ </field>
457+ <field name="nowait" type="bit"/>
458+ </method>
459+ <method name="delete-ok" synchronous="1" index="41">
460+ <chassis name="client" implement="MUST"/>
461+ <field name="message count" type="long"/>
462+ </method>
463+ </class>
464+ <class name="basic" handler="channel" index="60">
465+ <chassis name="server" implement="MUST"/>
466+ <chassis name="client" implement="MAY"/>
467+ <field name="content type" type="shortstr"/>
468+ <field name="content encoding" type="shortstr"/>
469+ <field name="headers" type="table"/>
470+ <field name="delivery mode" type="octet"/>
471+ <field name="priority" type="octet"/>
472+ <field name="correlation id" type="shortstr"/>
473+ <field name="reply to" type="shortstr"/>
474+ <field name="expiration" type="shortstr"/>
475+ <field name="message id" type="shortstr"/>
476+ <field name="timestamp" type="timestamp"/>
477+ <field name="type" type="shortstr"/>
478+ <field name="user id" type="shortstr"/>
479+ <field name="app id" type="shortstr"/>
480+ <field name="cluster id" type="shortstr"/>
481+ <method name="qos" synchronous="1" index="10">
482+ <chassis name="server" implement="MUST"/>
483+ <response name="qos-ok"/>
484+ <field name="prefetch size" type="long"/>
485+ <field name="prefetch count" type="short"/>
486+ <field name="global" type="bit"/>
487+ </method>
488+ <method name="qos-ok" synchronous="1" index="11">
489+ <chassis name="client" implement="MUST"/>
490+ </method>
491+ <method name="consume" synchronous="1" index="20">
492+ <chassis name="server" implement="MUST"/>
493+ <response name="consume-ok"/>
494+ <field name="ticket" domain="access ticket"/>
495+ <field name="queue" domain="queue name"/>
496+ <field name="consumer tag" domain="consumer tag"/>
497+ <field name="no local" domain="no local"/>
498+ <field name="no ack" domain="no ack"/>
499+ <field name="exclusive" type="bit"/>
500+ <field name="nowait" type="bit"/>
501+ </method>
502+ <method name="consume-ok" synchronous="1" index="21">
503+ <chassis name="client" implement="MUST"/>
504+ <field name="consumer tag" domain="consumer tag"/>
505+ </method>
506+ <method name="cancel" synchronous="1" index="30">
507+ <chassis name="server" implement="MUST"/>
508+ <response name="cancel-ok"/>
509+ <field name="consumer tag" domain="consumer tag"/>
510+ <field name="nowait" type="bit"/>
511+ </method>
512+ <method name="cancel-ok" synchronous="1" index="31">
513+ <chassis name="client" implement="MUST"/>
514+ <field name="consumer tag" domain="consumer tag"/>
515+ </method>
516+ <method name="publish" content="1" index="40">
517+ <chassis name="server" implement="MUST"/>
518+ <field name="ticket" domain="access ticket"/>
519+ <field name="exchange" domain="exchange name"/>
520+ <field name="routing key" type="shortstr"/>
521+ <field name="mandatory" type="bit"/>
522+ <field name="immediate" type="bit"/>
523+ </method>
524+ <method name="return" content="1" index="50">
525+ <chassis name="client" implement="MUST"/>
526+ <field name="reply code" domain="reply code"/>
527+ <field name="reply text" domain="reply text"/>
528+ <field name="exchange" domain="exchange name"/>
529+ <field name="routing key" type="shortstr"/>
530+ </method>
531+ <method name="deliver" content="1" index="60">
532+ <chassis name="client" implement="MUST"/>
533+ <field name="consumer tag" domain="consumer tag"/>
534+ <field name="delivery tag" domain="delivery tag"/>
535+ <field name="redelivered" domain="redelivered"/>
536+ <field name="exchange" domain="exchange name"/>
537+ <field name="routing key" type="shortstr"/>
538+ </method>
539+ <method name="get" synchronous="1" index="70">
540+ <response name="get-ok"/>
541+ <response name="get-empty"/>
542+ <chassis name="server" implement="MUST"/>
543+ <field name="ticket" domain="access ticket"/>
544+ <field name="queue" domain="queue name"/>
545+ <field name="no ack" domain="no ack"/>
546+ </method>
547+ <method name="get-ok" synchronous="1" content="1" index="71">
548+ <chassis name="client" implement="MAY"/>
549+ <field name="delivery tag" domain="delivery tag"/>
550+ <field name="redelivered" domain="redelivered"/>
551+ <field name="exchange" domain="exchange name"/>
552+ <field name="routing key" type="shortstr"/>
553+ <field name="message count" type="long"/>
554+ </method>
555+ <method name="get-empty" synchronous="1" index="72">
556+ <chassis name="client" implement="MAY"/>
557+ <field name="cluster id" type="shortstr"/>
558+ </method>
559+ <method name="ack" index="80">
560+ <chassis name="server" implement="MUST"/>
561+ <field name="delivery tag" domain="delivery tag"/>
562+ <field name="multiple" type="bit"/>
563+ </method>
564+ <method name="reject" index="90">
565+ <chassis name="server" implement="MUST"/>
566+ <field name="delivery tag" domain="delivery tag"/>
567+ <field name="requeue" type="bit"/>
568+ </method>
569+ <method name="recover" index="100">
570+ <chassis name="server" implement="MUST"/>
571+ <field name="requeue" type="bit"/>
572+ </method>
573+ </class>
574+ <class name="file" handler="channel" index="70">
575+ <chassis name="server" implement="MAY"/>
576+ <chassis name="client" implement="MAY"/>
577+ <field name="content type" type="shortstr"/>
578+ <field name="content encoding" type="shortstr"/>
579+ <field name="headers" type="table"/>
580+ <field name="priority" type="octet"/>
581+ <field name="reply to" type="shortstr"/>
582+ <field name="message id" type="shortstr"/>
583+ <field name="filename" type="shortstr"/>
584+ <field name="timestamp" type="timestamp"/>
585+ <field name="cluster id" type="shortstr"/>
586+ <method name="qos" synchronous="1" index="10">
587+ <chassis name="server" implement="MUST"/>
588+ <response name="qos-ok"/>
589+ <field name="prefetch size" type="long"/>
590+ <field name="prefetch count" type="short"/>
591+ <field name="global" type="bit"/>
592+ </method>
593+ <method name="qos-ok" synchronous="1" index="11">
594+ <chassis name="client" implement="MUST"/>
595+ </method>
596+ <method name="consume" synchronous="1" index="20">
597+ <chassis name="server" implement="MUST"/>
598+ <response name="consume-ok"/>
599+ <field name="ticket" domain="access ticket"/>
600+ <field name="queue" domain="queue name"/>
601+ <field name="consumer tag" domain="consumer tag"/>
602+ <field name="no local" domain="no local"/>
603+ <field name="no ack" domain="no ack"/>
604+ <field name="exclusive" type="bit"/>
605+ <field name="nowait" type="bit"/>
606+ </method>
607+ <method name="consume-ok" synchronous="1" index="21">
608+ <chassis name="client" implement="MUST"/>
609+ <field name="consumer tag" domain="consumer tag"/>
610+ </method>
611+ <method name="cancel" synchronous="1" index="30">
612+ <chassis name="server" implement="MUST"/>
613+ <response name="cancel-ok"/>
614+ <field name="consumer tag" domain="consumer tag"/>
615+ <field name="nowait" type="bit"/>
616+ </method>
617+ <method name="cancel-ok" synchronous="1" index="31">
618+ <chassis name="client" implement="MUST"/>
619+ <field name="consumer tag" domain="consumer tag"/>
620+ </method>
621+ <method name="open" synchronous="1" index="40">
622+ <response name="open-ok"/>
623+ <chassis name="server" implement="MUST"/>
624+ <chassis name="client" implement="MUST"/>
625+ <field name="identifier" type="shortstr"/>
626+ <field name="content size" type="longlong"/>
627+ </method>
628+ <method name="open-ok" synchronous="1" index="41">
629+ <response name="stage"/>
630+ <chassis name="server" implement="MUST"/>
631+ <chassis name="client" implement="MUST"/>
632+ <field name="staged size" type="longlong"/>
633+ </method>
634+ <method name="stage" content="1" index="50">
635+ <chassis name="server" implement="MUST"/>
636+ <chassis name="client" implement="MUST"/>
637+ </method>
638+ <method name="publish" index="60">
639+ <chassis name="server" implement="MUST"/>
640+ <field name="ticket" domain="access ticket"/>
641+ <field name="exchange" domain="exchange name"/>
642+ <field name="routing key" type="shortstr"/>
643+ <field name="mandatory" type="bit"/>
644+ <field name="immediate" type="bit"/>
645+ <field name="identifier" type="shortstr"/>
646+ </method>
647+ <method name="return" content="1" index="70">
648+ <chassis name="client" implement="MUST"/>
649+ <field name="reply code" domain="reply code"/>
650+ <field name="reply text" domain="reply text"/>
651+ <field name="exchange" domain="exchange name"/>
652+ <field name="routing key" type="shortstr"/>
653+ </method>
654+ <method name="deliver" index="80">
655+ <chassis name="client" implement="MUST"/>
656+ <field name="consumer tag" domain="consumer tag"/>
657+ <field name="delivery tag" domain="delivery tag"/>
658+ <field name="redelivered" domain="redelivered"/>
659+ <field name="exchange" domain="exchange name"/>
660+ <field name="routing key" type="shortstr"/>
661+ <field name="identifier" type="shortstr"/>
662+ </method>
663+ <method name="ack" index="90">
664+ <chassis name="server" implement="MUST"/>
665+ <field name="delivery tag" domain="delivery tag"/>
666+ <field name="multiple" type="bit"/>
667+ </method>
668+ <method name="reject" index="100">
669+ <chassis name="server" implement="MUST"/>
670+ <field name="delivery tag" domain="delivery tag"/>
671+ <field name="requeue" type="bit"/>
672+ </method>
673+ </class>
674+ <class name="stream" handler="channel" index="80">
675+ <chassis name="server" implement="MAY"/>
676+ <chassis name="client" implement="MAY"/>
677+ <field name="content type" type="shortstr"/>
678+ <field name="content encoding" type="shortstr"/>
679+ <field name="headers" type="table"/>
680+ <field name="priority" type="octet"/>
681+ <field name="timestamp" type="timestamp"/>
682+ <method name="qos" synchronous="1" index="10">
683+ <chassis name="server" implement="MUST"/>
684+ <response name="qos-ok"/>
685+ <field name="prefetch size" type="long"/>
686+ <field name="prefetch count" type="short"/>
687+ <field name="consume rate" type="long"/>
688+ <field name="global" type="bit"/>
689+ </method>
690+ <method name="qos-ok" synchronous="1" index="11">
691+ <chassis name="client" implement="MUST"/>
692+ </method>
693+ <method name="consume" synchronous="1" index="20">
694+ <chassis name="server" implement="MUST"/>
695+ <response name="consume-ok"/>
696+ <field name="ticket" domain="access ticket"/>
697+ <field name="queue" domain="queue name"/>
698+ <field name="consumer tag" domain="consumer tag"/>
699+ <field name="no local" domain="no local"/>
700+ <field name="exclusive" type="bit"/>
701+ <field name="nowait" type="bit"/>
702+ </method>
703+ <method name="consume-ok" synchronous="1" index="21">
704+ <chassis name="client" implement="MUST"/>
705+ <field name="consumer tag" domain="consumer tag"/>
706+ </method>
707+ <method name="cancel" synchronous="1" index="30">
708+ <chassis name="server" implement="MUST"/>
709+ <response name="cancel-ok"/>
710+ <field name="consumer tag" domain="consumer tag"/>
711+ <field name="nowait" type="bit"/>
712+ </method>
713+ <method name="cancel-ok" synchronous="1" index="31">
714+ <chassis name="client" implement="MUST"/>
715+ <field name="consumer tag" domain="consumer tag"/>
716+ </method>
717+ <method name="publish" content="1" index="40">
718+ <chassis name="server" implement="MUST"/>
719+ <field name="ticket" domain="access ticket"/>
720+ <field name="exchange" domain="exchange name"/>
721+ <field name="routing key" type="shortstr"/>
722+ <field name="mandatory" type="bit"/>
723+ <field name="immediate" type="bit"/>
724+ </method>
725+ <method name="return" content="1" index="50">
726+ <chassis name="client" implement="MUST"/>
727+ <field name="reply code" domain="reply code"/>
728+ <field name="reply text" domain="reply text"/>
729+ <field name="exchange" domain="exchange name"/>
730+ <field name="routing key" type="shortstr"/>
731+ </method>
732+ <method name="deliver" content="1" index="60">
733+ <chassis name="client" implement="MUST"/>
734+ <field name="consumer tag" domain="consumer tag"/>
735+ <field name="delivery tag" domain="delivery tag"/>
736+ <field name="exchange" domain="exchange name"/>
737+ <field name="queue" domain="queue name">
738+ <assert check="notnull"/>
739+ </field>
740+ </method>
741+ </class>
742+ <class name="tx" handler="channel" index="90">
743+ <chassis name="server" implement="SHOULD"/>
744+ <chassis name="client" implement="MAY"/>
745+ <method name="select" synchronous="1" index="10">
746+ <chassis name="server" implement="MUST"/>
747+ <response name="select-ok"/>
748+ </method>
749+ <method name="select-ok" synchronous="1" index="11">
750+ <chassis name="client" implement="MUST"/>
751+ </method>
752+ <method name="commit" synchronous="1" index="20">
753+ <chassis name="server" implement="MUST"/>
754+ <response name="commit-ok"/>
755+ </method>
756+ <method name="commit-ok" synchronous="1" index="21">
757+ <chassis name="client" implement="MUST"/>
758+ </method>
759+ <method name="rollback" synchronous="1" index="30">
760+ <chassis name="server" implement="MUST"/>
761+ <response name="rollback-ok"/>
762+ </method>
763+ <method name="rollback-ok" synchronous="1" index="31">
764+ <chassis name="client" implement="MUST"/>
765+ </method>
766+ </class>
767+ <class name="dtx" handler="channel" index="100">
768+ <chassis name="server" implement="MAY"/>
769+ <chassis name="client" implement="MAY"/>
770+ <method name="select" synchronous="1" index="10">
771+ <chassis name="server" implement="MUST"/>
772+ <response name="select-ok"/>
773+ </method>
774+ <method name="select-ok" synchronous="1" index="11">
775+ <chassis name="client" implement="MUST"/>
776+ </method>
777+ <method name="start" synchronous="1" index="20">
778+ <chassis name="server" implement="MAY"/>
779+ <response name="start-ok"/>
780+ <field name="dtx identifier" type="shortstr">
781+ <assert check="notnull"/>
782+ </field>
783+ </method>
784+ <method name="start-ok" synchronous="1" index="21">
785+ <chassis name="client" implement="MUST"/>
786+ </method>
787+ </class>
788+ <class name="tunnel" handler="tunnel" index="110">
789+ <chassis name="server" implement="MAY"/>
790+ <chassis name="client" implement="MAY"/>
791+ <field name="headers" type="table"/>
792+ <field name="proxy name" type="shortstr"/>
793+ <field name="data name" type="shortstr"/>
794+ <field name="durable" type="octet"/>
795+ <field name="broadcast" type="octet"/>
796+ <method name="request" content="1" index="10">
797+ <chassis name="server" implement="MUST"/>
798+ <field name="meta data" type="table"/>
799+ </method>
800+ </class>
801+ <class name="test" handler="channel" index="120">
802+ <chassis name="server" implement="MUST"/>
803+ <chassis name="client" implement="SHOULD"/>
804+ <method name="integer" synchronous="1" index="10">
805+ <chassis name="client" implement="MUST"/>
806+ <chassis name="server" implement="MUST"/>
807+ <response name="integer-ok"/>
808+ <field name="integer 1" type="octet"/>
809+ <field name="integer 2" type="short"/>
810+ <field name="integer 3" type="long"/>
811+ <field name="integer 4" type="longlong"/>
812+ <field name="operation" type="octet">
813+ <assert check="enum">
814+ <value name="add"/>
815+ <value name="min"/>
816+ <value name="max"/>
817+ </assert>
818+ </field>
819+ </method>
820+ <method name="integer-ok" synchronous="1" index="11">
821+ <chassis name="client" implement="MUST"/>
822+ <chassis name="server" implement="MUST"/>
823+ <field name="result" type="longlong"/>
824+ </method>
825+ <method name="string" synchronous="1" index="20">
826+ <chassis name="client" implement="MUST"/>
827+ <chassis name="server" implement="MUST"/>
828+ <response name="string-ok"/>
829+ <field name="string 1" type="shortstr"/>
830+ <field name="string 2" type="longstr"/>
831+ <field name="operation" type="octet">
832+ <assert check="enum">
833+ <value name="add"/>
834+ <value name="min"/>
835+ <value name="max"/>
836+ </assert>
837+ </field>
838+ </method>
839+ <method name="string-ok" synchronous="1" index="21">
840+ <chassis name="client" implement="MUST"/>
841+ <chassis name="server" implement="MUST"/>
842+ <field name="result" type="longstr"/>
843+ </method>
844+ <method name="table" synchronous="1" index="30">
845+ <chassis name="client" implement="MUST"/>
846+ <chassis name="server" implement="MUST"/>
847+ <response name="table-ok"/>
848+ <field name="table" type="table"/>
849+ <field name="integer op" type="octet">
850+ <assert check="enum">
851+ <value name="add"/>
852+ <value name="min"/>
853+ <value name="max"/>
854+ </assert>
855+ </field>
856+ <field name="string op" type="octet">
857+ <assert check="enum">
858+ <value name="add"/>
859+ <value name="min"/>
860+ <value name="max"/>
861+ </assert>
862+ </field>
863+ </method>
864+ <method name="table-ok" synchronous="1" index="31">
865+ <chassis name="client" implement="MUST"/>
866+ <chassis name="server" implement="MUST"/>
867+ <field name="integer result" type="longlong"/>
868+ <field name="string result" type="longstr"/>
869+ </method>
870+ <method name="content" synchronous="1" content="1" index="40">
871+ <chassis name="client" implement="MUST"/>
872+ <chassis name="server" implement="MUST"/>
873+ <response name="content-ok"/>
874+ </method>
875+ <method name="content-ok" synchronous="1" content="1" index="41">
876+ <chassis name="client" implement="MUST"/>
877+ <chassis name="server" implement="MUST"/>
878+ <field name="content checksum" type="long"/>
879+ </method>
880+ </class>
881+</amqp>
882\ No newline at end of file
883
884=== added file 'carbon/lib/carbon/amqp_listener.py'
885--- carbon/lib/carbon/amqp_listener.py 1970-01-01 00:00:00 +0000
886+++ carbon/lib/carbon/amqp_listener.py 2010-01-11 15:35:21 +0000
887@@ -0,0 +1,178 @@
888+#!/usr/bin/env python
889+"""
890+Copyright 2009 Lucio Torre <lucio.torre@canonical.com>
891+
892+This is an AMQP client that will connect to the specified broker and read
893+messages, parse them, and post them as metrics.
894+
895+The message format is the same as in the TCP line protocol
896+(METRIC, VALUE, TIMESTAMP) with the added possibility of putting multiple "\n"
897+separated lines in one message.
898+
899+Can be started standalone for testing or using carbon-cache.py (see example
900+config file provided)
901+
902+"""
903+import sys
904+import os
905+from optparse import OptionParser
906+
907+from twisted.internet.defer import inlineCallbacks
908+from twisted.internet import reactor
909+from twisted.internet.protocol import ClientCreator, Protocol, \
910+ ReconnectingClientFactory
911+from txamqp.protocol import AMQClient
912+from txamqp.client import TwistedDelegate
913+import txamqp.spec
914+
915+try:
916+ import carbon
917+except:
918+ # this is being run directly, carbon is not installed
919+ LIB_DIR = os.path.dirname(os.path.dirname(__file__))
920+ sys.path.insert(0, LIB_DIR)
921+
922+import carbon.listeners #satisfy import order requirements
923+from carbon.instrumentation import increment
924+from carbon.events import metricReceived
925+from carbon import log
926+
927+
928+
929+class AMQPGraphiteProtocol(AMQClient):
930+ """This is the protocol instance that will receive and post metrics."""
931+
932+ consumer_tag = "graphite_consumer"
933+
934+ @inlineCallbacks
935+ def connectionMade(self):
936+ yield AMQClient.connectionMade(self)
937+ log.listener("New AMQP connection made")
938+ yield self.setup()
939+ yield self.receive_loop()
940+
941+ @inlineCallbacks
942+ def setup(self):
943+ yield self.authenticate(self.factory.username, self.factory.password)
944+ chan = yield self.channel(1)
945+ yield chan.channel_open()
946+
947+ yield chan.queue_declare(queue=self.factory.queue_name, durable=False,
948+ exclusive=False, auto_delete=True)
949+ yield chan.exchange_declare(
950+ exchange=self.factory.exchange_name, type="fanout", durable=False,
951+ auto_delete=True)
952+
953+ yield chan.queue_bind(queue=self.factory.queue_name,
954+ exchange=self.factory.exchange_name)
955+
956+ yield chan.basic_consume(queue=self.factory.queue_name, no_ack=True,
957+ consumer_tag=self.consumer_tag)
958+ @inlineCallbacks
959+ def receive_loop(self):
960+ queue = yield self.queue(self.consumer_tag)
961+
962+ while True:
963+ msg = yield queue.get()
964+ self.processMessage(msg)
965+
966+ def processMessage(self, message):
967+ """Parse a message and post it as a metric."""
968+
969+ if self.factory.verbose:
970+ log.listener("Message received: %s" % (message,))
971+
972+ for line in message.content.body.split("\n"):
973+ try:
974+ metric, value, timestamp = line.strip().split()
975+ datapoint = ( float(timestamp), float(value) )
976+ except ValueError:
977+ log.listener("wrong value in line: %s" % (line,))
978+ continue
979+
980+ increment('metricsReceived')
981+ metricReceived(metric, datapoint)
982+ if self.factory.verbose:
983+ log.listener("Metric posted: %s %s %s" %
984+ (metric, value, timestamp,))
985+
986+
987+class AMQPReconnectingFactory(ReconnectingClientFactory):
988+ """The reconnecting factory.
989+
990+ Knows how to create the extended client and how to keep trying to
991+ connect in case of errors."""
992+
993+ protocol = AMQPGraphiteProtocol
994+
995+ def __init__(self, username, password, delegate, vhost, spec, channel,
996+ exchange_name, queue_name, verbose):
997+ self.username = username
998+ self.password = password
999+ self.delegate = delegate
1000+ self.vhost = vhost
1001+ self.spec = spec
1002+ self.channel = channel
1003+ self.exchange_name = exchange_name
1004+ self.queue_name = queue_name
1005+ self.verbose = verbose
1006+
1007+ def buildProtocol(self, addr):
1008+ p = self.protocol(self.delegate, self.vhost, self.spec)
1009+ p.factory = self
1010+ return p
1011+
1012+def startReceiver(host, port, username, password, vhost="/", spec=None,
1013+ channel=1, exchange_name="graphite_exchange",
1014+ queue_name="graphite_queue", verbose=False):
1015+ """Starts a twisted process that will read messages on the amqp broker
1016+ and post them as metrics."""
1017+
1018+ # use provided spec if not specified
1019+ if not spec:
1020+ spec = txamqp.spec.load(os.path.normpath(
1021+ os.path.join(os.path.dirname(__file__), 'amqp0-8.xml')))
1022+
1023+ delegate = TwistedDelegate()
1024+ factory = AMQPReconnectingFactory(username, password, delegate, vhost,
1025+ spec, channel, exchange_name, queue_name,
1026+ verbose=verbose)
1027+ reactor.connectTCP(host, port, factory)
1028+
1029+
1030+def main():
1031+ parser = OptionParser()
1032+ parser.add_option("-t", "--host", dest="host",
1033+ help="host name", metavar="HOST", default="localhost")
1034+
1035+ parser.add_option("-p", "--port", dest="port", type=int,
1036+ help="port number", metavar="PORT",
1037+ default=5672)
1038+
1039+ parser.add_option("-u", "--user", dest="username",
1040+ help="username", metavar="USERNAME",
1041+ default="guest")
1042+
1043+ parser.add_option("-w", "--password", dest="password",
1044+ help="password", metavar="PASSWORD",
1045+ default="guest")
1046+
1047+ parser.add_option("-V", "--vhost", dest="vhost",
1048+ help="vhost", metavar="VHOST",
1049+ default="/")
1050+
1051+ parser.add_option("-v", "--verbose", dest="verbose",
1052+ help="verbose",
1053+ default=False, action="store_true")
1054+
1055+ (options, args) = parser.parse_args()
1056+
1057+
1058+ log.logToStdout()
1059+ startReceiver(options.host, options.port, options.username,
1060+ options.password, vhost=options.vhost,
1061+ verbose=options.verbose)
1062+ reactor.run()
1063+
1064+if __name__ == "__main__":
1065+ main()
1066
1067=== added file 'carbon/lib/carbon/amqp_publisher.py'
1068--- carbon/lib/carbon/amqp_publisher.py 1970-01-01 00:00:00 +0000
1069+++ carbon/lib/carbon/amqp_publisher.py 2010-01-11 15:35:21 +0000
1070@@ -0,0 +1,81 @@
1071+#!/usr/bin/env python
1072+"""
1073+Copyright 2009 Lucio Torre <lucio.torre@canonical.com>
1074+
1075+Will publish metrics over AMQP
1076+"""
1077+
1078+import os
1079+from optparse import OptionParser
1080+
1081+from twisted.internet.defer import inlineCallbacks
1082+from twisted.internet import reactor, task
1083+from twisted.internet.protocol import ClientCreator
1084+from txamqp.protocol import AMQClient
1085+from txamqp.client import TwistedDelegate
1086+from txamqp.content import Content
1087+import txamqp.spec
1088+
1089+@inlineCallbacks
1090+def gotConnection(conn, message, username, password,
1091+ channel_number, exchange_name):
1092+ yield conn.authenticate(username, password)
1093+ channel = yield conn.channel(channel_number)
1094+ yield channel.channel_open()
1095+
1096+ msg = Content(message)
1097+ msg["delivery mode"] = 2
1098+ channel.basic_publish(exchange=exchange_name, content=msg)
1099+ yield channel.channel_close()
1100+
1101+def writeMetric(message, host, port, username, password, vhost,
1102+ spec= None, channel=1, exchange_name="graphite_exchange"):
1103+
1104+ if not spec:
1105+ spec = txamqp.spec.load(os.path.normpath(
1106+ os.path.join(os.path.dirname(__file__), 'amqp0-8.xml')))
1107+
1108+ delegate = TwistedDelegate()
1109+
1110+ d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,
1111+ spec=spec).connectTCP(host, port)
1112+
1113+ d.addCallback(gotConnection, message, username, password,
1114+ channel, exchange_name)
1115+
1116+ return d
1117+
1118+
1119+def main():
1120+ parser = OptionParser()
1121+ parser.add_option("-t", "--host", dest="host",
1122+ help="host name", metavar="HOST", default="localhost")
1123+
1124+ parser.add_option("-p", "--port", dest="port", type=int,
1125+ help="port number", metavar="PORT",
1126+ default=5672)
1127+
1128+ parser.add_option("-u", "--user", dest="username",
1129+ help="username", metavar="USERNAME",
1130+ default="guest")
1131+
1132+ parser.add_option("-w", "--password", dest="password",
1133+ help="password", metavar="PASSWORD",
1134+ default="guest")
1135+
1136+ parser.add_option("-v", "--vhost", dest="vhost",
1137+ help="vhost", metavar="VHOST",
1138+ default="/")
1139+
1140+ (options, args) = parser.parse_args()
1141+
1142+
1143+ message = " ".join(args)
1144+ d = writeMetric(message, options.host, options.port, options.username,
1145+ options.password, vhost=options.vhost)
1146+ d.addErrback(lambda f: f.printTraceback())
1147+ d.addBoth(lambda _: reactor.stop())
1148+ reactor.run()
1149+
1150+if __name__ == "__main__":
1151+ main()