Merge lp:~lucio.torre/graphite/graphite-add-rabbitmq into lp:~graphite-dev/graphite/main

Proposed by Lucio Torre
Status: Merged
Approved by: chrismd
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~lucio.torre/graphite/graphite-add-rabbitmq
Merge into: lp:~graphite-dev/graphite/main
Diff against target: 1151 lines (+1105/-1)
5 files modified
carbon/bin/carbon-cache.py (+14/-1)
carbon/conf/carbon.amqp.conf.example (+73/-0)
carbon/lib/carbon/amqp0-8.xml (+759/-0)
carbon/lib/carbon/amqp_listener.py (+178/-0)
carbon/lib/carbon/amqp_publisher.py (+81/-0)
To merge this branch: bzr merge lp:~lucio.torre/graphite/graphite-add-rabbitmq
Reviewer Review Type Date Requested Status
chrismd Approve
Review via email: mp+16816@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) wrote :

add a listener for amqp.

includes sample configurations, a sample producer and a way of starting up the listener without installing everything to test it.

Revision history for this message
Elliot Murphy (statik) wrote :

I just realized this includes the amqp xml file. I recommend not including this file, as it has a non-free license and will cause a problem for carbon being packaged for debian/ubuntu.

Revision history for this message
chrismd (chrismd) wrote :

I just looked through the code and it looks good, only a couple caveats. First is that the rest of the graphite codebase uses 2 space indentation whereas amqp_listener.py uses 4. I know that's the PEP-8 recommendation, I just prefer 2 spaces myself. I don't care too much if you want to leave it at 4.

Also it uses the @inlineCallbacks decorator which, while incredibly useful, is not compatible with python 2.4. I've been considering upping the python requirement to 2.5 simply because 2.4 is so old and I often find myself breaking compatibility on accident by using more recent features out of habit. So I don't worry about changing that yet, I'll probably make the next release require 2.5.

As for the xml file, is there not a free version of the AMQP spec? What do other txamqp apps use? I just checked out rabbitmq and it appears they have a JSON file describing the spec with a permissive license so we could use it. The only question is, can txamqp read it?

Revision history for this message
chrismd (chrismd) wrote :

Looks like we have a way around the spec license issue. There are stripped down versions of the spec available here http://www.amqp.org/confluence/display/AMQP/AMQP+Specification which are under a BSD license. I confirmed with the txamqp devs that those spec files will work fine.

Lucio, could you replace the spec file in your branch with the stripped down one?

Revision history for this message
chrismd (chrismd) wrote :

One other issue I just noticed, in amqp_listener.py around line 90 you catch a ValueError exception to handle malformed lines but the except block needs a continue statement to avoid causing a NameError in the lines below it.

Revision history for this message
Lucio Torre (lucio.torre) wrote :

1) ill leave indentation at 4 if thats ok. thanks for the giving me the option :)

2) ill rewrite to use plain old deferreds. this will take some work.

3) ill put the bsd version of the xml

4) ill fix the ValueError/continue issue.

ill let you know when ive pushed the new branch.

Revision history for this message
chrismd (chrismd) wrote :

Awesome, thanks! Don't worry about the @inlineCallbacks though because I looked and txAMQP also uses @inlineCallbacks and thus it requires 2.5 anyways.

Initially I had only thought AMQP support would be an optional feature for getting data into Graphite. However the more I have been thinking about it the more I've realized there are some really cool things we could do by further using AMQP (and Thrift) in carbon, so it is quite possibly going to become a required dependency. I'll post some of my ideas to the graphite-dev mailing list once they solidify a little more.

204. By Lucio Torre

merged with trunk

205. By Lucio Torre

fixed proposal requests

Revision history for this message
Lucio Torre (lucio.torre) wrote :

changed the xml file for: http://www.amqp.org/confluence/download/attachments/720900/amqp0-8.stripped.xml?version=1&modificationDate=1250523131000

added continue after validation

please re-review.

thanks.

Revision history for this message
chrismd (chrismd) wrote :

Looks good to merge into trunk.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'carbon/bin/carbon-cache.py'
--- carbon/bin/carbon-cache.py 2009-10-30 12:32:49 +0000
+++ carbon/bin/carbon-cache.py 2010-01-11 15:35:21 +0000
@@ -27,7 +27,7 @@
27try:27try:
28 from twisted.internet import epollreactor28 from twisted.internet import epollreactor
29 epollreactor.install()29 epollreactor.install()
30except: 30except:
31 pass31 pass
32from twisted.internet import reactor32from twisted.internet import reactor
3333
@@ -112,6 +112,14 @@
112# Read config (we want failures to occur before daemonizing)112# Read config (we want failures to occur before daemonizing)
113settings.readFrom(options.config, 'cache')113settings.readFrom(options.config, 'cache')
114114
115use_amqp = settings.get("ENABLE_AMQP", False)
116if use_amqp:
117 from carbon import amqp_listener
118 amqp_host = settings.get("AMQP_HOST", "localhost")
119 amqp_port = settings.get("AMQP_PORT", 5672)
120 amqp_user = settings.get("AMQP_USER", "guest")
121 amqp_password = settings.get("AMQP_PASSWORD", "guest")
122 amqp_verbose = settings.get("AMQP_VERBOSE", False)
115123
116# --debug124# --debug
117if options.debug:125if options.debug:
@@ -146,6 +154,11 @@
146startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)154startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
147startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)155startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
148startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler)156startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler)
157
158if use_amqp:
159 amqp_listener.startReceiver(amqp_host, amqp_port,
160 amqp_user, amqp_password, verbose=amqp_verbose)
161#
149startWriter()162startWriter()
150startRecordingCacheMetrics()163startRecordingCacheMetrics()
151164
152165
=== added file 'carbon/conf/carbon.amqp.conf.example'
--- carbon/conf/carbon.amqp.conf.example 1970-01-01 00:00:00 +0000
+++ carbon/conf/carbon.amqp.conf.example 2010-01-11 15:35:21 +0000
@@ -0,0 +1,73 @@
1# This is a configuration file with AMQP enabled
2
3[cache]
4LOCAL_DATA_DIR =
5
6# Specify the user to drop privileges to
7# If this is blank carbon runs as the user that invokes it
8# This user must have write access to the local data directory
9USER =
10
11# Require at least this many seconds to pass after creating a new database file
12# before creating another. This is intended to help avoid the performance impact
13# caused by a flood of new metrics.
14CREATION_DELAY = 1.0
15
16# Limit the size of the cache to avoid swapping or becoming CPU bound.
17# Sorts and serving cache queries gets more expensive as the cache grows.
18# Use the value "inf" (infinity) for an unlimited cache size.
19MAX_CACHE_SIZE = inf
20
21# Limits the number of whisper update_many() calls per second, which effectively
22# means the number of write requests sent to the disk. This is intended to
23# prevent over-utilizing the disk and thus starving the rest of the system.
24# When the rate of required updates exceeds this, then carbon's caching will
25# take effect and increase the overall throughput accordingly.
26MAX_UPDATES_PER_SECOND = 1000
27
28# Softly limits the number of whisper files that get created each minute.
29# Setting this value low (like at 50) is a good way to ensure your graphite
30# system will not be adversely impacted when a bunch of new metrics are
31# sent to it. The trade off is that it will take much longer for those metrics'
32# database files to all get created and thus longer until the data becomes usable.
33# Setting this value high (like "inf" for infinity) will cause graphite to create
34# the files quickly but at the risk of slowing I/O down considerably for a while.
35MAX_CREATES_PER_MINUTE = inf
36
37LINE_RECEIVER_INTERFACE = 0.0.0.0
38LINE_RECEIVER_PORT = 2003
39
40PICKLE_RECEIVER_INTERFACE = 0.0.0.0
41PICKLE_RECEIVER_PORT = 2004
42
43CACHE_QUERY_INTERFACE = 0.0.0.0
44CACHE_QUERY_PORT = 7002
45
46# Enable AMQP if you want to receve metrics using you amqp broker
47ENABLE_AMQP = True
48
49# Verbose means a line will be logged for every metric received
50# useful for testing
51AMQP_VERBOSE = True
52
53# your credentials for the amqp server
54# AMQP_USER = guest
55# AMQP_PASSWORD = guest
56
57# the network settings for the amqp server
58# AMQP_HOST = localhost
59# AMQP_PORT = 5672
60
61# NOTE: you cannot run both a cache and a relay on the same server
62# with the default configuration, you have to specify a distinict
63# interfaces and ports for the listeners.
64
65[relay]
66LINE_RECEIVER_INTERFACE = 0.0.0.0
67LINE_RECEIVER_PORT = 2003
68
69PICKLE_RECEIVER_INTERFACE = 0.0.0.0
70PICKLE_RECEIVER_PORT = 2004
71
72CACHE_SERVERS = server1, server2, server3
73MAX_QUEUE_SIZE = 10000
074
=== added file 'carbon/lib/carbon/amqp0-8.xml'
--- carbon/lib/carbon/amqp0-8.xml 1970-01-01 00:00:00 +0000
+++ carbon/lib/carbon/amqp0-8.xml 2010-01-11 15:35:21 +0000
@@ -0,0 +1,759 @@
1<?xml version="1.0" encoding="UTF-8"?>
2<!--
3Copyright (c) 2009 AMQP Working Group.
4All rights reserved.
5
6Redistribution and use in source and binary forms, with or without
7modification, are permitted provided that the following conditions
8are met:
91. Redistributions of source code must retain the above copyright
10notice, this list of conditions and the following disclaimer.
112. Redistributions in binary form must reproduce the above copyright
12notice, this list of conditions and the following disclaimer in the
13documentation and/or other materials provided with the distribution.
143. The name of the author may not be used to endorse or promote products
15derived from this software without specific prior written permission.
16
17THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27-->
28<amqp major="8" minor="0" port="5672">
29 <constant name="frame method" value="1"/>
30 <constant name="frame header" value="2"/>
31 <constant name="frame body" value="3"/>
32 <constant name="frame oob method" value="4"/>
33 <constant name="frame oob header" value="5"/>
34 <constant name="frame oob body" value="6"/>
35 <constant name="frame trace" value="7"/>
36 <constant name="frame heartbeat" value="8"/>
37 <constant name="frame min size" value="4096"/>
38 <constant name="frame end" value="206"/>
39 <constant name="reply success" value="200"/>
40 <constant name="not delivered" value="310" class="soft error"/>
41 <constant name="content too large" value="311" class="soft error"/>
42 <constant name="connection forced" value="320" class="hard error"/>
43 <constant name="invalid path" value="402" class="hard error"/>
44 <constant name="access refused" value="403" class="soft error"/>
45 <constant name="not found" value="404" class="soft error"/>
46 <constant name="resource locked" value="405" class="soft error"/>
47 <constant name="frame error" value="501" class="hard error"/>
48 <constant name="syntax error" value="502" class="hard error"/>
49 <constant name="command invalid" value="503" class="hard error"/>
50 <constant name="channel error" value="504" class="hard error"/>
51 <constant name="resource error" value="506" class="hard error"/>
52 <constant name="not allowed" value="530" class="hard error"/>
53 <constant name="not implemented" value="540" class="hard error"/>
54 <constant name="internal error" value="541" class="hard error"/>
55 <domain name="access ticket" type="short">
56 <assert check="ne" value="0"/>
57 </domain>
58 <domain name="class id" type="short"/>
59 <domain name="consumer tag" type="shortstr"/>
60 <domain name="delivery tag" type="longlong"/>
61 <domain name="exchange name" type="shortstr">
62 <assert check="length" value="127"/>
63 </domain>
64 <domain name="known hosts" type="shortstr"/>
65 <domain name="method id" type="short"/>
66 <domain name="no ack" type="bit"/>
67 <domain name="no local" type="bit"/>
68 <domain name="path" type="shortstr">
69 <assert check="notnull"/>
70 <assert check="syntax" rule="path"/>
71 <assert check="length" value="127"/>
72 </domain>
73 <domain name="peer properties" type="table"/>
74 <domain name="queue name" type="shortstr">
75 <assert check="length" value="127"/>
76 </domain>
77 <domain name="redelivered" type="bit"/>
78 <domain name="reply code" type="short">
79 <assert check="notnull"/>
80 </domain>
81 <domain name="reply text" type="shortstr">
82 <assert check="notnull"/>
83 </domain>
84 <class name="connection" handler="connection" index="10">
85 <chassis name="server" implement="MUST"/>
86 <chassis name="client" implement="MUST"/>
87 <method name="start" synchronous="1" index="10">
88 <chassis name="client" implement="MUST"/>
89 <response name="start-ok"/>
90 <field name="version major" type="octet"/>
91 <field name="version minor" type="octet"/>
92 <field name="server properties" domain="peer properties"/>
93 <field name="mechanisms" type="longstr">
94 <see name="security mechanisms"/>
95 <assert check="notnull"/>
96 </field>
97 <field name="locales" type="longstr">
98 <assert check="notnull"/>
99 </field>
100 </method>
101 <method name="start-ok" synchronous="1" index="11">
102 <chassis name="server" implement="MUST"/>
103 <field name="client properties" domain="peer properties"/>
104 <field name="mechanism" type="shortstr">
105 <assert check="notnull"/>
106 </field>
107 <field name="response" type="longstr">
108 <assert check="notnull"/>
109 </field>
110 <field name="locale" type="shortstr">
111 <assert check="notnull"/>
112 </field>
113 </method>
114 <method name="secure" synchronous="1" index="20">
115 <chassis name="client" implement="MUST"/>
116 <response name="secure-ok"/>
117 <field name="challenge" type="longstr">
118 <see name="security mechanisms"/>
119 </field>
120 </method>
121 <method name="secure-ok" synchronous="1" index="21">
122 <chassis name="server" implement="MUST"/>
123 <field name="response" type="longstr">
124 <assert check="notnull"/>
125 </field>
126 </method>
127 <method name="tune" synchronous="1" index="30">
128 <chassis name="client" implement="MUST"/>
129 <response name="tune-ok"/>
130 <field name="channel max" type="short"/>
131 <field name="frame max" type="long"/>
132 <field name="heartbeat" type="short"/>
133 </method>
134 <method name="tune-ok" synchronous="1" index="31">
135 <chassis name="server" implement="MUST"/>
136 <field name="channel max" type="short">
137 <assert check="notnull"/>
138 <assert check="le" method="tune" field="channel max"/>
139 </field>
140 <field name="frame max" type="long"/>
141 <field name="heartbeat" type="short"/>
142 </method>
143 <method name="open" synchronous="1" index="40">
144 <chassis name="server" implement="MUST"/>
145 <response name="open-ok"/>
146 <response name="redirect"/>
147 <field name="virtual host" domain="path">
148 <assert check="regexp" value="^[a-zA-Z0-9/-_]+$"/>
149 </field>
150 <field name="capabilities" type="shortstr"/>
151 <field name="insist" type="bit"/>
152 </method>
153 <method name="open-ok" synchronous="1" index="41">
154 <chassis name="client" implement="MUST"/>
155 <field name="known hosts" domain="known hosts"/>
156 </method>
157 <method name="redirect" synchronous="1" index="50">
158 <chassis name="client" implement="MAY"/>
159 <field name="host" type="shortstr">
160 <assert check="notnull"/>
161 </field>
162 <field name="known hosts" domain="known hosts"/>
163 </method>
164 <method name="close" synchronous="1" index="60">
165 <chassis name="client" implement="MUST"/>
166 <chassis name="server" implement="MUST"/>
167 <response name="close-ok"/>
168 <field name="reply code" domain="reply code"/>
169 <field name="reply text" domain="reply text"/>
170 <field name="class id" domain="class id"/>
171 <field name="method id" domain="class id"/>
172 </method>
173 <method name="close-ok" synchronous="1" index="61">
174 <chassis name="client" implement="MUST"/>
175 <chassis name="server" implement="MUST"/>
176 </method>
177 </class>
178 <class name="channel" handler="channel" index="20">
179 <chassis name="server" implement="MUST"/>
180 <chassis name="client" implement="MUST"/>
181 <method name="open" synchronous="1" index="10">
182 <chassis name="server" implement="MUST"/>
183 <response name="open-ok"/>
184 <field name="out of band" type="shortstr">
185 <assert check="null"/>
186 </field>
187 </method>
188 <method name="open-ok" synchronous="1" index="11">
189 <chassis name="client" implement="MUST"/>
190 </method>
191 <method name="flow" synchronous="1" index="20">
192 <chassis name="server" implement="MUST"/>
193 <chassis name="client" implement="MUST"/>
194 <response name="flow-ok"/>
195 <field name="active" type="bit"/>
196 </method>
197 <method name="flow-ok" index="21">
198 <chassis name="server" implement="MUST"/>
199 <chassis name="client" implement="MUST"/>
200 <field name="active" type="bit"/>
201 </method>
202 <method name="alert" index="30">
203 <chassis name="client" implement="MUST"/>
204 <field name="reply code" domain="reply code"/>
205 <field name="reply text" domain="reply text"/>
206 <field name="details" type="table"/>
207 </method>
208 <method name="close" synchronous="1" index="40">
209 <chassis name="client" implement="MUST"/>
210 <chassis name="server" implement="MUST"/>
211 <response name="close-ok"/>
212 <field name="reply code" domain="reply code"/>
213 <field name="reply text" domain="reply text"/>
214 <field name="class id" domain="class id"/>
215 <field name="method id" domain="method id"/>
216 </method>
217 <method name="close-ok" synchronous="1" index="41">
218 <chassis name="client" implement="MUST"/>
219 <chassis name="server" implement="MUST"/>
220 </method>
221 </class>
222 <class name="access" handler="connection" index="30">
223 <chassis name="server" implement="MUST"/>
224 <chassis name="client" implement="MUST"/>
225 <method name="request" synchronous="1" index="10">
226 <chassis name="server" implement="MUST"/>
227 <response name="request-ok"/>
228 <field name="realm" domain="path"/>
229 <field name="exclusive" type="bit"/>
230 <field name="passive" type="bit"/>
231 <field name="active" type="bit"/>
232 <field name="write" type="bit"/>
233 <field name="read" type="bit"/>
234 </method>
235 <method name="request-ok" synchronous="1" index="11">
236 <chassis name="client" implement="MUST"/>
237 <field name="ticket" domain="access ticket"/>
238 </method>
239 </class>
240 <class name="exchange" handler="channel" index="40">
241 <chassis name="server" implement="MUST"/>
242 <chassis name="client" implement="MUST"/>
243 <method name="declare" synchronous="1" index="10">
244 <chassis name="server" implement="MUST"/>
245 <response name="declare-ok"/>
246 <field name="ticket" domain="access ticket"/>
247 <field name="exchange" domain="exchange name">
248 <assert check="regexp" value="^[a-zA-Z0-9-_.:]+$"/>
249 </field>
250 <field name="type" type="shortstr">
251 <assert check="regexp" value="^[a-zA-Z0-9-_.:]+$"/>
252 </field>
253 <field name="passive" type="bit"/>
254 <field name="durable" type="bit"/>
255 <field name="auto delete" type="bit"/>
256 <field name="internal" type="bit"/>
257 <field name="nowait" type="bit"/>
258 <field name="arguments" type="table"/>
259 </method>
260 <method name="declare-ok" synchronous="1" index="11">
261 <chassis name="client" implement="MUST"/>
262 </method>
263 <method name="delete" synchronous="1" index="20">
264 <chassis name="server" implement="MUST"/>
265 <response name="delete-ok"/>
266 <field name="ticket" domain="access ticket"/>
267 <field name="exchange" domain="exchange name">
268 <assert check="notnull"/>
269 </field>
270 <field name="if unused" type="bit"/>
271 <field name="nowait" type="bit"/>
272 </method>
273 <method name="delete-ok" synchronous="1" index="21">
274 <chassis name="client" implement="MUST"/>
275 </method>
276 </class>
277 <class name="queue" handler="channel" index="50">
278 <chassis name="server" implement="MUST"/>
279 <chassis name="client" implement="MUST"/>
280 <method name="declare" synchronous="1" index="10">
281 <chassis name="server" implement="MUST"/>
282 <response name="declare-ok"/>
283 <field name="ticket" domain="access ticket"/>
284 <field name="queue" domain="queue name">
285 <assert check="regexp" value="^[a-zA-Z0-9-_.:]*$"/>
286 </field>
287 <field name="passive" type="bit"/>
288 <field name="durable" type="bit"/>
289 <field name="exclusive" type="bit"/>
290 <field name="auto delete" type="bit"/>
291 <field name="nowait" type="bit"/>
292 <field name="arguments" type="table"/>
293 </method>
294 <method name="declare-ok" synchronous="1" index="11">
295 <chassis name="client" implement="MUST"/>
296 <field name="queue" domain="queue name">
297 <assert check="notnull"/>
298 </field>
299 <field name="message count" type="long"/>
300 <field name="consumer count" type="long"/>
301 </method>
302 <method name="bind" synchronous="1" index="20">
303 <chassis name="server" implement="MUST"/>
304 <response name="bind-ok"/>
305 <field name="ticket" domain="access ticket"/>
306 <field name="queue" domain="queue name"/>
307 <field name="exchange" domain="exchange name"/>
308 <field name="routing key" type="shortstr"/>
309 <field name="nowait" type="bit"/>
310 <field name="arguments" type="table"/>
311 </method>
312 <method name="bind-ok" synchronous="1" index="21">
313 <chassis name="client" implement="MUST"/>
314 </method>
315 <method name="purge" synchronous="1" index="30">
316 <chassis name="server" implement="MUST"/>
317 <response name="purge-ok"/>
318 <field name="ticket" domain="access ticket"/>
319 <field name="queue" domain="queue name"/>
320 <field name="nowait" type="bit"/>
321 </method>
322 <method name="purge-ok" synchronous="1" index="31">
323 <chassis name="client" implement="MUST"/>
324 <field name="message count" type="long"/>
325 </method>
326 <method name="delete" synchronous="1" index="40">
327 <chassis name="server" implement="MUST"/>
328 <response name="delete-ok"/>
329 <field name="ticket" domain="access ticket"/>
330 <field name="queue" domain="queue name"/>
331 <field name="if unused" type="bit"/>
332 <field name="if empty" type="bit">
333 <test/>
334 </field>
335 <field name="nowait" type="bit"/>
336 </method>
337 <method name="delete-ok" synchronous="1" index="41">
338 <chassis name="client" implement="MUST"/>
339 <field name="message count" type="long"/>
340 </method>
341 </class>
342 <class name="basic" handler="channel" index="60">
343 <chassis name="server" implement="MUST"/>
344 <chassis name="client" implement="MAY"/>
345 <field name="content type" type="shortstr"/>
346 <field name="content encoding" type="shortstr"/>
347 <field name="headers" type="table"/>
348 <field name="delivery mode" type="octet"/>
349 <field name="priority" type="octet"/>
350 <field name="correlation id" type="shortstr"/>
351 <field name="reply to" type="shortstr"/>
352 <field name="expiration" type="shortstr"/>
353 <field name="message id" type="shortstr"/>
354 <field name="timestamp" type="timestamp"/>
355 <field name="type" type="shortstr"/>
356 <field name="user id" type="shortstr"/>
357 <field name="app id" type="shortstr"/>
358 <field name="cluster id" type="shortstr"/>
359 <method name="qos" synchronous="1" index="10">
360 <chassis name="server" implement="MUST"/>
361 <response name="qos-ok"/>
362 <field name="prefetch size" type="long"/>
363 <field name="prefetch count" type="short"/>
364 <field name="global" type="bit"/>
365 </method>
366 <method name="qos-ok" synchronous="1" index="11">
367 <chassis name="client" implement="MUST"/>
368 </method>
369 <method name="consume" synchronous="1" index="20">
370 <chassis name="server" implement="MUST"/>
371 <response name="consume-ok"/>
372 <field name="ticket" domain="access ticket"/>
373 <field name="queue" domain="queue name"/>
374 <field name="consumer tag" domain="consumer tag"/>
375 <field name="no local" domain="no local"/>
376 <field name="no ack" domain="no ack"/>
377 <field name="exclusive" type="bit"/>
378 <field name="nowait" type="bit"/>
379 </method>
380 <method name="consume-ok" synchronous="1" index="21">
381 <chassis name="client" implement="MUST"/>
382 <field name="consumer tag" domain="consumer tag"/>
383 </method>
384 <method name="cancel" synchronous="1" index="30">
385 <chassis name="server" implement="MUST"/>
386 <response name="cancel-ok"/>
387 <field name="consumer tag" domain="consumer tag"/>
388 <field name="nowait" type="bit"/>
389 </method>
390 <method name="cancel-ok" synchronous="1" index="31">
391 <chassis name="client" implement="MUST"/>
392 <field name="consumer tag" domain="consumer tag"/>
393 </method>
394 <method name="publish" content="1" index="40">
395 <chassis name="server" implement="MUST"/>
396 <field name="ticket" domain="access ticket"/>
397 <field name="exchange" domain="exchange name"/>
398 <field name="routing key" type="shortstr"/>
399 <field name="mandatory" type="bit"/>
400 <field name="immediate" type="bit"/>
401 </method>
402 <method name="return" content="1" index="50">
403 <chassis name="client" implement="MUST"/>
404 <field name="reply code" domain="reply code"/>
405 <field name="reply text" domain="reply text"/>
406 <field name="exchange" domain="exchange name"/>
407 <field name="routing key" type="shortstr"/>
408 </method>
409 <method name="deliver" content="1" index="60">
410 <chassis name="client" implement="MUST"/>
411 <field name="consumer tag" domain="consumer tag"/>
412 <field name="delivery tag" domain="delivery tag"/>
413 <field name="redelivered" domain="redelivered"/>
414 <field name="exchange" domain="exchange name"/>
415 <field name="routing key" type="shortstr"/>
416 </method>
417 <method name="get" synchronous="1" index="70">
418 <response name="get-ok"/>
419 <response name="get-empty"/>
420 <chassis name="server" implement="MUST"/>
421 <field name="ticket" domain="access ticket"/>
422 <field name="queue" domain="queue name"/>
423 <field name="no ack" domain="no ack"/>
424 </method>
425 <method name="get-ok" synchronous="1" content="1" index="71">
426 <chassis name="client" implement="MAY"/>
427 <field name="delivery tag" domain="delivery tag"/>
428 <field name="redelivered" domain="redelivered"/>
429 <field name="exchange" domain="exchange name"/>
430 <field name="routing key" type="shortstr"/>
431 <field name="message count" type="long"/>
432 </method>
433 <method name="get-empty" synchronous="1" index="72">
434 <chassis name="client" implement="MAY"/>
435 <field name="cluster id" type="shortstr"/>
436 </method>
437 <method name="ack" index="80">
438 <chassis name="server" implement="MUST"/>
439 <field name="delivery tag" domain="delivery tag"/>
440 <field name="multiple" type="bit"/>
441 </method>
442 <method name="reject" index="90">
443 <chassis name="server" implement="MUST"/>
444 <field name="delivery tag" domain="delivery tag"/>
445 <field name="requeue" type="bit"/>
446 </method>
447 <method name="recover" index="100">
448 <chassis name="server" implement="MUST"/>
449 <field name="requeue" type="bit"/>
450 </method>
451 </class>
452 <class name="file" handler="channel" index="70">
453 <chassis name="server" implement="MAY"/>
454 <chassis name="client" implement="MAY"/>
455 <field name="content type" type="shortstr"/>
456 <field name="content encoding" type="shortstr"/>
457 <field name="headers" type="table"/>
458 <field name="priority" type="octet"/>
459 <field name="reply to" type="shortstr"/>
460 <field name="message id" type="shortstr"/>
461 <field name="filename" type="shortstr"/>
462 <field name="timestamp" type="timestamp"/>
463 <field name="cluster id" type="shortstr"/>
464 <method name="qos" synchronous="1" index="10">
465 <chassis name="server" implement="MUST"/>
466 <response name="qos-ok"/>
467 <field name="prefetch size" type="long"/>
468 <field name="prefetch count" type="short"/>
469 <field name="global" type="bit"/>
470 </method>
471 <method name="qos-ok" synchronous="1" index="11">
472 <chassis name="client" implement="MUST"/>
473 </method>
474 <method name="consume" synchronous="1" index="20">
475 <chassis name="server" implement="MUST"/>
476 <response name="consume-ok"/>
477 <field name="ticket" domain="access ticket"/>
478 <field name="queue" domain="queue name"/>
479 <field name="consumer tag" domain="consumer tag"/>
480 <field name="no local" domain="no local"/>
481 <field name="no ack" domain="no ack"/>
482 <field name="exclusive" type="bit"/>
483 <field name="nowait" type="bit"/>
484 </method>
485 <method name="consume-ok" synchronous="1" index="21">
486 <chassis name="client" implement="MUST"/>
487 <field name="consumer tag" domain="consumer tag"/>
488 </method>
489 <method name="cancel" synchronous="1" index="30">
490 <chassis name="server" implement="MUST"/>
491 <response name="cancel-ok"/>
492 <field name="consumer tag" domain="consumer tag"/>
493 <field name="nowait" type="bit"/>
494 </method>
495 <method name="cancel-ok" synchronous="1" index="31">
496 <chassis name="client" implement="MUST"/>
497 <field name="consumer tag" domain="consumer tag"/>
498 </method>
499 <method name="open" synchronous="1" index="40">
500 <response name="open-ok"/>
501 <chassis name="server" implement="MUST"/>
502 <chassis name="client" implement="MUST"/>
503 <field name="identifier" type="shortstr"/>
504 <field name="content size" type="longlong"/>
505 </method>
506 <method name="open-ok" synchronous="1" index="41">
507 <response name="stage"/>
508 <chassis name="server" implement="MUST"/>
509 <chassis name="client" implement="MUST"/>
510 <field name="staged size" type="longlong"/>
511 </method>
512 <method name="stage" content="1" index="50">
513 <chassis name="server" implement="MUST"/>
514 <chassis name="client" implement="MUST"/>
515 </method>
516 <method name="publish" index="60">
517 <chassis name="server" implement="MUST"/>
518 <field name="ticket" domain="access ticket"/>
519 <field name="exchange" domain="exchange name"/>
520 <field name="routing key" type="shortstr"/>
521 <field name="mandatory" type="bit"/>
522 <field name="immediate" type="bit"/>
523 <field name="identifier" type="shortstr"/>
524 </method>
525 <method name="return" content="1" index="70">
526 <chassis name="client" implement="MUST"/>
527 <field name="reply code" domain="reply code"/>
528 <field name="reply text" domain="reply text"/>
529 <field name="exchange" domain="exchange name"/>
530 <field name="routing key" type="shortstr"/>
531 </method>
532 <method name="deliver" index="80">
533 <chassis name="client" implement="MUST"/>
534 <field name="consumer tag" domain="consumer tag"/>
535 <field name="delivery tag" domain="delivery tag"/>
536 <field name="redelivered" domain="redelivered"/>
537 <field name="exchange" domain="exchange name"/>
538 <field name="routing key" type="shortstr"/>
539 <field name="identifier" type="shortstr"/>
540 </method>
541 <method name="ack" index="90">
542 <chassis name="server" implement="MUST"/>
543 <field name="delivery tag" domain="delivery tag"/>
544 <field name="multiple" type="bit"/>
545 </method>
546 <method name="reject" index="100">
547 <chassis name="server" implement="MUST"/>
548 <field name="delivery tag" domain="delivery tag"/>
549 <field name="requeue" type="bit"/>
550 </method>
551 </class>
552 <class name="stream" handler="channel" index="80">
553 <chassis name="server" implement="MAY"/>
554 <chassis name="client" implement="MAY"/>
555 <field name="content type" type="shortstr"/>
556 <field name="content encoding" type="shortstr"/>
557 <field name="headers" type="table"/>
558 <field name="priority" type="octet"/>
559 <field name="timestamp" type="timestamp"/>
560 <method name="qos" synchronous="1" index="10">
561 <chassis name="server" implement="MUST"/>
562 <response name="qos-ok"/>
563 <field name="prefetch size" type="long"/>
564 <field name="prefetch count" type="short"/>
565 <field name="consume rate" type="long"/>
566 <field name="global" type="bit"/>
567 </method>
568 <method name="qos-ok" synchronous="1" index="11">
569 <chassis name="client" implement="MUST"/>
570 </method>
571 <method name="consume" synchronous="1" index="20">
572 <chassis name="server" implement="MUST"/>
573 <response name="consume-ok"/>
574 <field name="ticket" domain="access ticket"/>
575 <field name="queue" domain="queue name"/>
576 <field name="consumer tag" domain="consumer tag"/>
577 <field name="no local" domain="no local"/>
578 <field name="exclusive" type="bit"/>
579 <field name="nowait" type="bit"/>
580 </method>
581 <method name="consume-ok" synchronous="1" index="21">
582 <chassis name="client" implement="MUST"/>
583 <field name="consumer tag" domain="consumer tag"/>
584 </method>
585 <method name="cancel" synchronous="1" index="30">
586 <chassis name="server" implement="MUST"/>
587 <response name="cancel-ok"/>
588 <field name="consumer tag" domain="consumer tag"/>
589 <field name="nowait" type="bit"/>
590 </method>
591 <method name="cancel-ok" synchronous="1" index="31">
592 <chassis name="client" implement="MUST"/>
593 <field name="consumer tag" domain="consumer tag"/>
594 </method>
595 <method name="publish" content="1" index="40">
596 <chassis name="server" implement="MUST"/>
597 <field name="ticket" domain="access ticket"/>
598 <field name="exchange" domain="exchange name"/>
599 <field name="routing key" type="shortstr"/>
600 <field name="mandatory" type="bit"/>
601 <field name="immediate" type="bit"/>
602 </method>
603 <method name="return" content="1" index="50">
604 <chassis name="client" implement="MUST"/>
605 <field name="reply code" domain="reply code"/>
606 <field name="reply text" domain="reply text"/>
607 <field name="exchange" domain="exchange name"/>
608 <field name="routing key" type="shortstr"/>
609 </method>
610 <method name="deliver" content="1" index="60">
611 <chassis name="client" implement="MUST"/>
612 <field name="consumer tag" domain="consumer tag"/>
613 <field name="delivery tag" domain="delivery tag"/>
614 <field name="exchange" domain="exchange name"/>
615 <field name="queue" domain="queue name">
616 <assert check="notnull"/>
617 </field>
618 </method>
619 </class>
620 <class name="tx" handler="channel" index="90">
621 <chassis name="server" implement="SHOULD"/>
622 <chassis name="client" implement="MAY"/>
623 <method name="select" synchronous="1" index="10">
624 <chassis name="server" implement="MUST"/>
625 <response name="select-ok"/>
626 </method>
627 <method name="select-ok" synchronous="1" index="11">
628 <chassis name="client" implement="MUST"/>
629 </method>
630 <method name="commit" synchronous="1" index="20">
631 <chassis name="server" implement="MUST"/>
632 <response name="commit-ok"/>
633 </method>
634 <method name="commit-ok" synchronous="1" index="21">
635 <chassis name="client" implement="MUST"/>
636 </method>
637 <method name="rollback" synchronous="1" index="30">
638 <chassis name="server" implement="MUST"/>
639 <response name="rollback-ok"/>
640 </method>
641 <method name="rollback-ok" synchronous="1" index="31">
642 <chassis name="client" implement="MUST"/>
643 </method>
644 </class>
645 <class name="dtx" handler="channel" index="100">
646 <chassis name="server" implement="MAY"/>
647 <chassis name="client" implement="MAY"/>
648 <method name="select" synchronous="1" index="10">
649 <chassis name="server" implement="MUST"/>
650 <response name="select-ok"/>
651 </method>
652 <method name="select-ok" synchronous="1" index="11">
653 <chassis name="client" implement="MUST"/>
654 </method>
655 <method name="start" synchronous="1" index="20">
656 <chassis name="server" implement="MAY"/>
657 <response name="start-ok"/>
658 <field name="dtx identifier" type="shortstr">
659 <assert check="notnull"/>
660 </field>
661 </method>
662 <method name="start-ok" synchronous="1" index="21">
663 <chassis name="client" implement="MUST"/>
664 </method>
665 </class>
666 <class name="tunnel" handler="tunnel" index="110">
667 <chassis name="server" implement="MAY"/>
668 <chassis name="client" implement="MAY"/>
669 <field name="headers" type="table"/>
670 <field name="proxy name" type="shortstr"/>
671 <field name="data name" type="shortstr"/>
672 <field name="durable" type="octet"/>
673 <field name="broadcast" type="octet"/>
674 <method name="request" content="1" index="10">
675 <chassis name="server" implement="MUST"/>
676 <field name="meta data" type="table"/>
677 </method>
678 </class>
679 <class name="test" handler="channel" index="120">
680 <chassis name="server" implement="MUST"/>
681 <chassis name="client" implement="SHOULD"/>
682 <method name="integer" synchronous="1" index="10">
683 <chassis name="client" implement="MUST"/>
684 <chassis name="server" implement="MUST"/>
685 <response name="integer-ok"/>
686 <field name="integer 1" type="octet"/>
687 <field name="integer 2" type="short"/>
688 <field name="integer 3" type="long"/>
689 <field name="integer 4" type="longlong"/>
690 <field name="operation" type="octet">
691 <assert check="enum">
692 <value name="add"/>
693 <value name="min"/>
694 <value name="max"/>
695 </assert>
696 </field>
697 </method>
698 <method name="integer-ok" synchronous="1" index="11">
699 <chassis name="client" implement="MUST"/>
700 <chassis name="server" implement="MUST"/>
701 <field name="result" type="longlong"/>
702 </method>
703 <method name="string" synchronous="1" index="20">
704 <chassis name="client" implement="MUST"/>
705 <chassis name="server" implement="MUST"/>
706 <response name="string-ok"/>
707 <field name="string 1" type="shortstr"/>
708 <field name="string 2" type="longstr"/>
709 <field name="operation" type="octet">
710 <assert check="enum">
711 <value name="add"/>
712 <value name="min"/>
713 <value name="max"/>
714 </assert>
715 </field>
716 </method>
717 <method name="string-ok" synchronous="1" index="21">
718 <chassis name="client" implement="MUST"/>
719 <chassis name="server" implement="MUST"/>
720 <field name="result" type="longstr"/>
721 </method>
722 <method name="table" synchronous="1" index="30">
723 <chassis name="client" implement="MUST"/>
724 <chassis name="server" implement="MUST"/>
725 <response name="table-ok"/>
726 <field name="table" type="table"/>
727 <field name="integer op" type="octet">
728 <assert check="enum">
729 <value name="add"/>
730 <value name="min"/>
731 <value name="max"/>
732 </assert>
733 </field>
734 <field name="string op" type="octet">
735 <assert check="enum">
736 <value name="add"/>
737 <value name="min"/>
738 <value name="max"/>
739 </assert>
740 </field>
741 </method>
742 <method name="table-ok" synchronous="1" index="31">
743 <chassis name="client" implement="MUST"/>
744 <chassis name="server" implement="MUST"/>
745 <field name="integer result" type="longlong"/>
746 <field name="string result" type="longstr"/>
747 </method>
748 <method name="content" synchronous="1" content="1" index="40">
749 <chassis name="client" implement="MUST"/>
750 <chassis name="server" implement="MUST"/>
751 <response name="content-ok"/>
752 </method>
753 <method name="content-ok" synchronous="1" content="1" index="41">
754 <chassis name="client" implement="MUST"/>
755 <chassis name="server" implement="MUST"/>
756 <field name="content checksum" type="long"/>
757 </method>
758 </class>
759</amqp>
0\ No newline at end of file760\ No newline at end of file
1761
=== added file 'carbon/lib/carbon/amqp_listener.py'
--- carbon/lib/carbon/amqp_listener.py 1970-01-01 00:00:00 +0000
+++ carbon/lib/carbon/amqp_listener.py 2010-01-11 15:35:21 +0000
@@ -0,0 +1,178 @@
1#!/usr/bin/env python
2"""
3Copyright 2009 Lucio Torre <lucio.torre@canonical.com>
4
5This is an AMQP client that will connect to the specified broker and read
6messages, parse them, and post them as metrics.
7
8The message format is the same as in the TCP line protocol
9(METRIC, VALUE, TIMESTAMP) with the added possibility of putting multiple "\n"
10separated lines in one message.
11
12Can be started standalone for testing or using carbon-cache.py (see example
13config file provided)
14
15"""
16import sys
17import os
18from optparse import OptionParser
19
20from twisted.internet.defer import inlineCallbacks
21from twisted.internet import reactor
22from twisted.internet.protocol import ClientCreator, Protocol, \
23 ReconnectingClientFactory
24from txamqp.protocol import AMQClient
25from txamqp.client import TwistedDelegate
26import txamqp.spec
27
28try:
29 import carbon
30except:
31 # this is being run directly, carbon is not installed
32 LIB_DIR = os.path.dirname(os.path.dirname(__file__))
33 sys.path.insert(0, LIB_DIR)
34
35import carbon.listeners #satisfy import order requirements
36from carbon.instrumentation import increment
37from carbon.events import metricReceived
38from carbon import log
39
40
41
42class AMQPGraphiteProtocol(AMQClient):
43 """This is the protocol instance that will receive and post metrics."""
44
45 consumer_tag = "graphite_consumer"
46
47 @inlineCallbacks
48 def connectionMade(self):
49 yield AMQClient.connectionMade(self)
50 log.listener("New AMQP connection made")
51 yield self.setup()
52 yield self.receive_loop()
53
54 @inlineCallbacks
55 def setup(self):
56 yield self.authenticate(self.factory.username, self.factory.password)
57 chan = yield self.channel(1)
58 yield chan.channel_open()
59
60 yield chan.queue_declare(queue=self.factory.queue_name, durable=False,
61 exclusive=False, auto_delete=True)
62 yield chan.exchange_declare(
63 exchange=self.factory.exchange_name, type="fanout", durable=False,
64 auto_delete=True)
65
66 yield chan.queue_bind(queue=self.factory.queue_name,
67 exchange=self.factory.exchange_name)
68
69 yield chan.basic_consume(queue=self.factory.queue_name, no_ack=True,
70 consumer_tag=self.consumer_tag)
71 @inlineCallbacks
72 def receive_loop(self):
73 queue = yield self.queue(self.consumer_tag)
74
75 while True:
76 msg = yield queue.get()
77 self.processMessage(msg)
78
79 def processMessage(self, message):
80 """Parse a message and post it as a metric."""
81
82 if self.factory.verbose:
83 log.listener("Message received: %s" % (message,))
84
85 for line in message.content.body.split("\n"):
86 try:
87 metric, value, timestamp = line.strip().split()
88 datapoint = ( float(timestamp), float(value) )
89 except ValueError:
90 log.listener("wrong value in line: %s" % (line,))
91 continue
92
93 increment('metricsReceived')
94 metricReceived(metric, datapoint)
95 if self.factory.verbose:
96 log.listener("Metric posted: %s %s %s" %
97 (metric, value, timestamp,))
98
99
100class AMQPReconnectingFactory(ReconnectingClientFactory):
101 """The reconnecting factory.
102
103 Knows how to create the extended client and how to keep trying to
104 connect in case of errors."""
105
106 protocol = AMQPGraphiteProtocol
107
108 def __init__(self, username, password, delegate, vhost, spec, channel,
109 exchange_name, queue_name, verbose):
110 self.username = username
111 self.password = password
112 self.delegate = delegate
113 self.vhost = vhost
114 self.spec = spec
115 self.channel = channel
116 self.exchange_name = exchange_name
117 self.queue_name = queue_name
118 self.verbose = verbose
119
120 def buildProtocol(self, addr):
121 p = self.protocol(self.delegate, self.vhost, self.spec)
122 p.factory = self
123 return p
124
125def startReceiver(host, port, username, password, vhost="/", spec=None,
126 channel=1, exchange_name="graphite_exchange",
127 queue_name="graphite_queue", verbose=False):
128 """Starts a twisted process that will read messages on the amqp broker
129 and post them as metrics."""
130
131 # use provided spec if not specified
132 if not spec:
133 spec = txamqp.spec.load(os.path.normpath(
134 os.path.join(os.path.dirname(__file__), 'amqp0-8.xml')))
135
136 delegate = TwistedDelegate()
137 factory = AMQPReconnectingFactory(username, password, delegate, vhost,
138 spec, channel, exchange_name, queue_name,
139 verbose=verbose)
140 reactor.connectTCP(host, port, factory)
141
142
143def main():
144 parser = OptionParser()
145 parser.add_option("-t", "--host", dest="host",
146 help="host name", metavar="HOST", default="localhost")
147
148 parser.add_option("-p", "--port", dest="port", type=int,
149 help="port number", metavar="PORT",
150 default=5672)
151
152 parser.add_option("-u", "--user", dest="username",
153 help="username", metavar="USERNAME",
154 default="guest")
155
156 parser.add_option("-w", "--password", dest="password",
157 help="password", metavar="PASSWORD",
158 default="guest")
159
160 parser.add_option("-V", "--vhost", dest="vhost",
161 help="vhost", metavar="VHOST",
162 default="/")
163
164 parser.add_option("-v", "--verbose", dest="verbose",
165 help="verbose",
166 default=False, action="store_true")
167
168 (options, args) = parser.parse_args()
169
170
171 log.logToStdout()
172 startReceiver(options.host, options.port, options.username,
173 options.password, vhost=options.vhost,
174 verbose=options.verbose)
175 reactor.run()
176
177if __name__ == "__main__":
178 main()
0179
=== added file 'carbon/lib/carbon/amqp_publisher.py'
--- carbon/lib/carbon/amqp_publisher.py 1970-01-01 00:00:00 +0000
+++ carbon/lib/carbon/amqp_publisher.py 2010-01-11 15:35:21 +0000
@@ -0,0 +1,81 @@
1#!/usr/bin/env python
2"""
3Copyright 2009 Lucio Torre <lucio.torre@canonical.com>
4
5Will publish metrics over AMQP
6"""
7
8import os
9from optparse import OptionParser
10
11from twisted.internet.defer import inlineCallbacks
12from twisted.internet import reactor, task
13from twisted.internet.protocol import ClientCreator
14from txamqp.protocol import AMQClient
15from txamqp.client import TwistedDelegate
16from txamqp.content import Content
17import txamqp.spec
18
19@inlineCallbacks
20def gotConnection(conn, message, username, password,
21 channel_number, exchange_name):
22 yield conn.authenticate(username, password)
23 channel = yield conn.channel(channel_number)
24 yield channel.channel_open()
25
26 msg = Content(message)
27 msg["delivery mode"] = 2
28 channel.basic_publish(exchange=exchange_name, content=msg)
29 yield channel.channel_close()
30
31def writeMetric(message, host, port, username, password, vhost,
32 spec= None, channel=1, exchange_name="graphite_exchange"):
33
34 if not spec:
35 spec = txamqp.spec.load(os.path.normpath(
36 os.path.join(os.path.dirname(__file__), 'amqp0-8.xml')))
37
38 delegate = TwistedDelegate()
39
40 d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,
41 spec=spec).connectTCP(host, port)
42
43 d.addCallback(gotConnection, message, username, password,
44 channel, exchange_name)
45
46 return d
47
48
49def main():
50 parser = OptionParser()
51 parser.add_option("-t", "--host", dest="host",
52 help="host name", metavar="HOST", default="localhost")
53
54 parser.add_option("-p", "--port", dest="port", type=int,
55 help="port number", metavar="PORT",
56 default=5672)
57
58 parser.add_option("-u", "--user", dest="username",
59 help="username", metavar="USERNAME",
60 default="guest")
61
62 parser.add_option("-w", "--password", dest="password",
63 help="password", metavar="PASSWORD",
64 default="guest")
65
66 parser.add_option("-v", "--vhost", dest="vhost",
67 help="vhost", metavar="VHOST",
68 default="/")
69
70 (options, args) = parser.parse_args()
71
72
73 message = " ".join(args)
74 d = writeMetric(message, options.host, options.port, options.username,
75 options.password, vhost=options.vhost)
76 d.addErrback(lambda f: f.printTraceback())
77 d.addBoth(lambda _: reactor.stop())
78 reactor.run()
79
80if __name__ == "__main__":
81 main()