Merge lp:~lucio.torre/graphite/graphite-add-rabbitmq into lp:~graphite-dev/graphite/main
- graphite-add-rabbitmq
- Merge into main
Status: | Merged |
---|---|
Approved by: | chrismd |
Approved revision: | not available |
Merged at revision: | not available |
Proposed branch: | lp:~lucio.torre/graphite/graphite-add-rabbitmq |
Merge into: | lp:~graphite-dev/graphite/main |
Diff against target: |
1151 lines (+1105/-1) 5 files modified
carbon/bin/carbon-cache.py (+14/-1) carbon/conf/carbon.amqp.conf.example (+73/-0) carbon/lib/carbon/amqp0-8.xml (+759/-0) carbon/lib/carbon/amqp_listener.py (+178/-0) carbon/lib/carbon/amqp_publisher.py (+81/-0) |
To merge this branch: | bzr merge lp:~lucio.torre/graphite/graphite-add-rabbitmq |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
chrismd | Approve | ||
Review via email: mp+16816@code.launchpad.net |
Commit message
Description of the change
Lucio Torre (lucio.torre) wrote : | # |
Elliot Murphy (statik) wrote : | # |
I just realized this includes the amqp xml file. I recommend not including this file, as it has a non-free license and will cause a problem for carbon being packaged for debian/ubuntu.
chrismd (chrismd) wrote : | # |
I just looked through the code and it looks good, only a couple caveats. First is that the rest of the graphite codebase uses 2 space indentation whereas amqp_listener.py uses 4. I know that's the PEP-8 recommendation, I just prefer 2 spaces myself. I don't care too much if you want to leave it at 4.
Also it uses the @inlineCallbacks decorator which, while incredibly useful, is not compatible with python 2.4. I've been considering upping the python requirement to 2.5 simply because 2.4 is so old and I often find myself breaking compatibility on accident by using more recent features out of habit. So I don't worry about changing that yet, I'll probably make the next release require 2.5.
As for the xml file, is there not a free version of the AMQP spec? What do other txamqp apps use? I just checked out rabbitmq and it appears they have a JSON file describing the spec with a permissive license so we could use it. The only question is, can txamqp read it?
chrismd (chrismd) wrote : | # |
Looks like we have a way around the spec license issue. There are stripped down versions of the spec available here http://
Lucio, could you replace the spec file in your branch with the stripped down one?
chrismd (chrismd) wrote : | # |
One other issue I just noticed, in amqp_listener.py around line 90 you catch a ValueError exception to handle malformed lines but the except block needs a continue statement to avoid causing a NameError in the lines below it.
Lucio Torre (lucio.torre) wrote : | # |
1) ill leave indentation at 4 if thats ok. thanks for the giving me the option :)
2) ill rewrite to use plain old deferreds. this will take some work.
3) ill put the bsd version of the xml
4) ill fix the ValueError/continue issue.
ill let you know when ive pushed the new branch.
chrismd (chrismd) wrote : | # |
Awesome, thanks! Don't worry about the @inlineCallbacks though because I looked and txAMQP also uses @inlineCallbacks and thus it requires 2.5 anyways.
Initially I had only thought AMQP support would be an optional feature for getting data into Graphite. However the more I have been thinking about it the more I've realized there are some really cool things we could do by further using AMQP (and Thrift) in carbon, so it is quite possibly going to become a required dependency. I'll post some of my ideas to the graphite-dev mailing list once they solidify a little more.
- 204. By Lucio Torre
-
merged with trunk
- 205. By Lucio Torre
-
fixed proposal requests
Lucio Torre (lucio.torre) wrote : | # |
changed the xml file for: http://
added continue after validation
please re-review.
thanks.
chrismd (chrismd) wrote : | # |
Looks good to merge into trunk.
Preview Diff
1 | === modified file 'carbon/bin/carbon-cache.py' |
2 | --- carbon/bin/carbon-cache.py 2009-10-30 12:32:49 +0000 |
3 | +++ carbon/bin/carbon-cache.py 2010-01-11 15:35:21 +0000 |
4 | @@ -27,7 +27,7 @@ |
5 | try: |
6 | from twisted.internet import epollreactor |
7 | epollreactor.install() |
8 | -except: |
9 | +except: |
10 | pass |
11 | from twisted.internet import reactor |
12 | |
13 | @@ -112,6 +112,14 @@ |
14 | # Read config (we want failures to occur before daemonizing) |
15 | settings.readFrom(options.config, 'cache') |
16 | |
17 | +use_amqp = settings.get("ENABLE_AMQP", False) |
18 | +if use_amqp: |
19 | + from carbon import amqp_listener |
20 | + amqp_host = settings.get("AMQP_HOST", "localhost") |
21 | + amqp_port = settings.get("AMQP_PORT", 5672) |
22 | + amqp_user = settings.get("AMQP_USER", "guest") |
23 | + amqp_password = settings.get("AMQP_PASSWORD", "guest") |
24 | + amqp_verbose = settings.get("AMQP_VERBOSE", False) |
25 | |
26 | # --debug |
27 | if options.debug: |
28 | @@ -146,6 +154,11 @@ |
29 | startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver) |
30 | startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver) |
31 | startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler) |
32 | + |
33 | +if use_amqp: |
34 | + amqp_listener.startReceiver(amqp_host, amqp_port, |
35 | + amqp_user, amqp_password, verbose=amqp_verbose) |
36 | +# |
37 | startWriter() |
38 | startRecordingCacheMetrics() |
39 | |
40 | |
41 | === added file 'carbon/conf/carbon.amqp.conf.example' |
42 | --- carbon/conf/carbon.amqp.conf.example 1970-01-01 00:00:00 +0000 |
43 | +++ carbon/conf/carbon.amqp.conf.example 2010-01-11 15:35:21 +0000 |
44 | @@ -0,0 +1,73 @@ |
45 | +# This is a configuration file with AMQP enabled |
46 | + |
47 | +[cache] |
48 | +LOCAL_DATA_DIR = |
49 | + |
50 | +# Specify the user to drop privileges to |
51 | +# If this is blank carbon runs as the user that invokes it |
52 | +# This user must have write access to the local data directory |
53 | +USER = |
54 | + |
55 | +# Require at least this many seconds to pass after creating a new database file |
56 | +# before creating another. This is intended to help avoid the performance impact |
57 | +# caused by a flood of new metrics. |
58 | +CREATION_DELAY = 1.0 |
59 | + |
60 | +# Limit the size of the cache to avoid swapping or becoming CPU bound. |
61 | +# Sorts and serving cache queries gets more expensive as the cache grows. |
62 | +# Use the value "inf" (infinity) for an unlimited cache size. |
63 | +MAX_CACHE_SIZE = inf |
64 | + |
65 | +# Limits the number of whisper update_many() calls per second, which effectively |
66 | +# means the number of write requests sent to the disk. This is intended to |
67 | +# prevent over-utilizing the disk and thus starving the rest of the system. |
68 | +# When the rate of required updates exceeds this, then carbon's caching will |
69 | +# take effect and increase the overall throughput accordingly. |
70 | +MAX_UPDATES_PER_SECOND = 1000 |
71 | + |
72 | +# Softly limits the number of whisper files that get created each minute. |
73 | +# Setting this value low (like at 50) is a good way to ensure your graphite |
74 | +# system will not be adversely impacted when a bunch of new metrics are |
75 | +# sent to it. The trade off is that it will take much longer for those metrics' |
76 | +# database files to all get created and thus longer until the data becomes usable. |
77 | +# Setting this value high (like "inf" for infinity) will cause graphite to create |
78 | +# the files quickly but at the risk of slowing I/O down considerably for a while. |
79 | +MAX_CREATES_PER_MINUTE = inf |
80 | + |
81 | +LINE_RECEIVER_INTERFACE = 0.0.0.0 |
82 | +LINE_RECEIVER_PORT = 2003 |
83 | + |
84 | +PICKLE_RECEIVER_INTERFACE = 0.0.0.0 |
85 | +PICKLE_RECEIVER_PORT = 2004 |
86 | + |
87 | +CACHE_QUERY_INTERFACE = 0.0.0.0 |
88 | +CACHE_QUERY_PORT = 7002 |
89 | + |
90 | +# Enable AMQP if you want to receve metrics using you amqp broker |
91 | +ENABLE_AMQP = True |
92 | + |
93 | +# Verbose means a line will be logged for every metric received |
94 | +# useful for testing |
95 | +AMQP_VERBOSE = True |
96 | + |
97 | +# your credentials for the amqp server |
98 | +# AMQP_USER = guest |
99 | +# AMQP_PASSWORD = guest |
100 | + |
101 | +# the network settings for the amqp server |
102 | +# AMQP_HOST = localhost |
103 | +# AMQP_PORT = 5672 |
104 | + |
105 | +# NOTE: you cannot run both a cache and a relay on the same server |
106 | +# with the default configuration, you have to specify a distinict |
107 | +# interfaces and ports for the listeners. |
108 | + |
109 | +[relay] |
110 | +LINE_RECEIVER_INTERFACE = 0.0.0.0 |
111 | +LINE_RECEIVER_PORT = 2003 |
112 | + |
113 | +PICKLE_RECEIVER_INTERFACE = 0.0.0.0 |
114 | +PICKLE_RECEIVER_PORT = 2004 |
115 | + |
116 | +CACHE_SERVERS = server1, server2, server3 |
117 | +MAX_QUEUE_SIZE = 10000 |
118 | |
119 | === added file 'carbon/lib/carbon/amqp0-8.xml' |
120 | --- carbon/lib/carbon/amqp0-8.xml 1970-01-01 00:00:00 +0000 |
121 | +++ carbon/lib/carbon/amqp0-8.xml 2010-01-11 15:35:21 +0000 |
122 | @@ -0,0 +1,759 @@ |
123 | +<?xml version="1.0" encoding="UTF-8"?> |
124 | +<!-- |
125 | +Copyright (c) 2009 AMQP Working Group. |
126 | +All rights reserved. |
127 | + |
128 | +Redistribution and use in source and binary forms, with or without |
129 | +modification, are permitted provided that the following conditions |
130 | +are met: |
131 | +1. Redistributions of source code must retain the above copyright |
132 | +notice, this list of conditions and the following disclaimer. |
133 | +2. Redistributions in binary form must reproduce the above copyright |
134 | +notice, this list of conditions and the following disclaimer in the |
135 | +documentation and/or other materials provided with the distribution. |
136 | +3. The name of the author may not be used to endorse or promote products |
137 | +derived from this software without specific prior written permission. |
138 | + |
139 | +THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR |
140 | +IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
141 | +OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. |
142 | +IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, |
143 | +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
144 | +NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
145 | +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
146 | +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
147 | +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
148 | +THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
149 | +--> |
150 | +<amqp major="8" minor="0" port="5672"> |
151 | + <constant name="frame method" value="1"/> |
152 | + <constant name="frame header" value="2"/> |
153 | + <constant name="frame body" value="3"/> |
154 | + <constant name="frame oob method" value="4"/> |
155 | + <constant name="frame oob header" value="5"/> |
156 | + <constant name="frame oob body" value="6"/> |
157 | + <constant name="frame trace" value="7"/> |
158 | + <constant name="frame heartbeat" value="8"/> |
159 | + <constant name="frame min size" value="4096"/> |
160 | + <constant name="frame end" value="206"/> |
161 | + <constant name="reply success" value="200"/> |
162 | + <constant name="not delivered" value="310" class="soft error"/> |
163 | + <constant name="content too large" value="311" class="soft error"/> |
164 | + <constant name="connection forced" value="320" class="hard error"/> |
165 | + <constant name="invalid path" value="402" class="hard error"/> |
166 | + <constant name="access refused" value="403" class="soft error"/> |
167 | + <constant name="not found" value="404" class="soft error"/> |
168 | + <constant name="resource locked" value="405" class="soft error"/> |
169 | + <constant name="frame error" value="501" class="hard error"/> |
170 | + <constant name="syntax error" value="502" class="hard error"/> |
171 | + <constant name="command invalid" value="503" class="hard error"/> |
172 | + <constant name="channel error" value="504" class="hard error"/> |
173 | + <constant name="resource error" value="506" class="hard error"/> |
174 | + <constant name="not allowed" value="530" class="hard error"/> |
175 | + <constant name="not implemented" value="540" class="hard error"/> |
176 | + <constant name="internal error" value="541" class="hard error"/> |
177 | + <domain name="access ticket" type="short"> |
178 | + <assert check="ne" value="0"/> |
179 | + </domain> |
180 | + <domain name="class id" type="short"/> |
181 | + <domain name="consumer tag" type="shortstr"/> |
182 | + <domain name="delivery tag" type="longlong"/> |
183 | + <domain name="exchange name" type="shortstr"> |
184 | + <assert check="length" value="127"/> |
185 | + </domain> |
186 | + <domain name="known hosts" type="shortstr"/> |
187 | + <domain name="method id" type="short"/> |
188 | + <domain name="no ack" type="bit"/> |
189 | + <domain name="no local" type="bit"/> |
190 | + <domain name="path" type="shortstr"> |
191 | + <assert check="notnull"/> |
192 | + <assert check="syntax" rule="path"/> |
193 | + <assert check="length" value="127"/> |
194 | + </domain> |
195 | + <domain name="peer properties" type="table"/> |
196 | + <domain name="queue name" type="shortstr"> |
197 | + <assert check="length" value="127"/> |
198 | + </domain> |
199 | + <domain name="redelivered" type="bit"/> |
200 | + <domain name="reply code" type="short"> |
201 | + <assert check="notnull"/> |
202 | + </domain> |
203 | + <domain name="reply text" type="shortstr"> |
204 | + <assert check="notnull"/> |
205 | + </domain> |
206 | + <class name="connection" handler="connection" index="10"> |
207 | + <chassis name="server" implement="MUST"/> |
208 | + <chassis name="client" implement="MUST"/> |
209 | + <method name="start" synchronous="1" index="10"> |
210 | + <chassis name="client" implement="MUST"/> |
211 | + <response name="start-ok"/> |
212 | + <field name="version major" type="octet"/> |
213 | + <field name="version minor" type="octet"/> |
214 | + <field name="server properties" domain="peer properties"/> |
215 | + <field name="mechanisms" type="longstr"> |
216 | + <see name="security mechanisms"/> |
217 | + <assert check="notnull"/> |
218 | + </field> |
219 | + <field name="locales" type="longstr"> |
220 | + <assert check="notnull"/> |
221 | + </field> |
222 | + </method> |
223 | + <method name="start-ok" synchronous="1" index="11"> |
224 | + <chassis name="server" implement="MUST"/> |
225 | + <field name="client properties" domain="peer properties"/> |
226 | + <field name="mechanism" type="shortstr"> |
227 | + <assert check="notnull"/> |
228 | + </field> |
229 | + <field name="response" type="longstr"> |
230 | + <assert check="notnull"/> |
231 | + </field> |
232 | + <field name="locale" type="shortstr"> |
233 | + <assert check="notnull"/> |
234 | + </field> |
235 | + </method> |
236 | + <method name="secure" synchronous="1" index="20"> |
237 | + <chassis name="client" implement="MUST"/> |
238 | + <response name="secure-ok"/> |
239 | + <field name="challenge" type="longstr"> |
240 | + <see name="security mechanisms"/> |
241 | + </field> |
242 | + </method> |
243 | + <method name="secure-ok" synchronous="1" index="21"> |
244 | + <chassis name="server" implement="MUST"/> |
245 | + <field name="response" type="longstr"> |
246 | + <assert check="notnull"/> |
247 | + </field> |
248 | + </method> |
249 | + <method name="tune" synchronous="1" index="30"> |
250 | + <chassis name="client" implement="MUST"/> |
251 | + <response name="tune-ok"/> |
252 | + <field name="channel max" type="short"/> |
253 | + <field name="frame max" type="long"/> |
254 | + <field name="heartbeat" type="short"/> |
255 | + </method> |
256 | + <method name="tune-ok" synchronous="1" index="31"> |
257 | + <chassis name="server" implement="MUST"/> |
258 | + <field name="channel max" type="short"> |
259 | + <assert check="notnull"/> |
260 | + <assert check="le" method="tune" field="channel max"/> |
261 | + </field> |
262 | + <field name="frame max" type="long"/> |
263 | + <field name="heartbeat" type="short"/> |
264 | + </method> |
265 | + <method name="open" synchronous="1" index="40"> |
266 | + <chassis name="server" implement="MUST"/> |
267 | + <response name="open-ok"/> |
268 | + <response name="redirect"/> |
269 | + <field name="virtual host" domain="path"> |
270 | + <assert check="regexp" value="^[a-zA-Z0-9/-_]+$"/> |
271 | + </field> |
272 | + <field name="capabilities" type="shortstr"/> |
273 | + <field name="insist" type="bit"/> |
274 | + </method> |
275 | + <method name="open-ok" synchronous="1" index="41"> |
276 | + <chassis name="client" implement="MUST"/> |
277 | + <field name="known hosts" domain="known hosts"/> |
278 | + </method> |
279 | + <method name="redirect" synchronous="1" index="50"> |
280 | + <chassis name="client" implement="MAY"/> |
281 | + <field name="host" type="shortstr"> |
282 | + <assert check="notnull"/> |
283 | + </field> |
284 | + <field name="known hosts" domain="known hosts"/> |
285 | + </method> |
286 | + <method name="close" synchronous="1" index="60"> |
287 | + <chassis name="client" implement="MUST"/> |
288 | + <chassis name="server" implement="MUST"/> |
289 | + <response name="close-ok"/> |
290 | + <field name="reply code" domain="reply code"/> |
291 | + <field name="reply text" domain="reply text"/> |
292 | + <field name="class id" domain="class id"/> |
293 | + <field name="method id" domain="class id"/> |
294 | + </method> |
295 | + <method name="close-ok" synchronous="1" index="61"> |
296 | + <chassis name="client" implement="MUST"/> |
297 | + <chassis name="server" implement="MUST"/> |
298 | + </method> |
299 | + </class> |
300 | + <class name="channel" handler="channel" index="20"> |
301 | + <chassis name="server" implement="MUST"/> |
302 | + <chassis name="client" implement="MUST"/> |
303 | + <method name="open" synchronous="1" index="10"> |
304 | + <chassis name="server" implement="MUST"/> |
305 | + <response name="open-ok"/> |
306 | + <field name="out of band" type="shortstr"> |
307 | + <assert check="null"/> |
308 | + </field> |
309 | + </method> |
310 | + <method name="open-ok" synchronous="1" index="11"> |
311 | + <chassis name="client" implement="MUST"/> |
312 | + </method> |
313 | + <method name="flow" synchronous="1" index="20"> |
314 | + <chassis name="server" implement="MUST"/> |
315 | + <chassis name="client" implement="MUST"/> |
316 | + <response name="flow-ok"/> |
317 | + <field name="active" type="bit"/> |
318 | + </method> |
319 | + <method name="flow-ok" index="21"> |
320 | + <chassis name="server" implement="MUST"/> |
321 | + <chassis name="client" implement="MUST"/> |
322 | + <field name="active" type="bit"/> |
323 | + </method> |
324 | + <method name="alert" index="30"> |
325 | + <chassis name="client" implement="MUST"/> |
326 | + <field name="reply code" domain="reply code"/> |
327 | + <field name="reply text" domain="reply text"/> |
328 | + <field name="details" type="table"/> |
329 | + </method> |
330 | + <method name="close" synchronous="1" index="40"> |
331 | + <chassis name="client" implement="MUST"/> |
332 | + <chassis name="server" implement="MUST"/> |
333 | + <response name="close-ok"/> |
334 | + <field name="reply code" domain="reply code"/> |
335 | + <field name="reply text" domain="reply text"/> |
336 | + <field name="class id" domain="class id"/> |
337 | + <field name="method id" domain="method id"/> |
338 | + </method> |
339 | + <method name="close-ok" synchronous="1" index="41"> |
340 | + <chassis name="client" implement="MUST"/> |
341 | + <chassis name="server" implement="MUST"/> |
342 | + </method> |
343 | + </class> |
344 | + <class name="access" handler="connection" index="30"> |
345 | + <chassis name="server" implement="MUST"/> |
346 | + <chassis name="client" implement="MUST"/> |
347 | + <method name="request" synchronous="1" index="10"> |
348 | + <chassis name="server" implement="MUST"/> |
349 | + <response name="request-ok"/> |
350 | + <field name="realm" domain="path"/> |
351 | + <field name="exclusive" type="bit"/> |
352 | + <field name="passive" type="bit"/> |
353 | + <field name="active" type="bit"/> |
354 | + <field name="write" type="bit"/> |
355 | + <field name="read" type="bit"/> |
356 | + </method> |
357 | + <method name="request-ok" synchronous="1" index="11"> |
358 | + <chassis name="client" implement="MUST"/> |
359 | + <field name="ticket" domain="access ticket"/> |
360 | + </method> |
361 | + </class> |
362 | + <class name="exchange" handler="channel" index="40"> |
363 | + <chassis name="server" implement="MUST"/> |
364 | + <chassis name="client" implement="MUST"/> |
365 | + <method name="declare" synchronous="1" index="10"> |
366 | + <chassis name="server" implement="MUST"/> |
367 | + <response name="declare-ok"/> |
368 | + <field name="ticket" domain="access ticket"/> |
369 | + <field name="exchange" domain="exchange name"> |
370 | + <assert check="regexp" value="^[a-zA-Z0-9-_.:]+$"/> |
371 | + </field> |
372 | + <field name="type" type="shortstr"> |
373 | + <assert check="regexp" value="^[a-zA-Z0-9-_.:]+$"/> |
374 | + </field> |
375 | + <field name="passive" type="bit"/> |
376 | + <field name="durable" type="bit"/> |
377 | + <field name="auto delete" type="bit"/> |
378 | + <field name="internal" type="bit"/> |
379 | + <field name="nowait" type="bit"/> |
380 | + <field name="arguments" type="table"/> |
381 | + </method> |
382 | + <method name="declare-ok" synchronous="1" index="11"> |
383 | + <chassis name="client" implement="MUST"/> |
384 | + </method> |
385 | + <method name="delete" synchronous="1" index="20"> |
386 | + <chassis name="server" implement="MUST"/> |
387 | + <response name="delete-ok"/> |
388 | + <field name="ticket" domain="access ticket"/> |
389 | + <field name="exchange" domain="exchange name"> |
390 | + <assert check="notnull"/> |
391 | + </field> |
392 | + <field name="if unused" type="bit"/> |
393 | + <field name="nowait" type="bit"/> |
394 | + </method> |
395 | + <method name="delete-ok" synchronous="1" index="21"> |
396 | + <chassis name="client" implement="MUST"/> |
397 | + </method> |
398 | + </class> |
399 | + <class name="queue" handler="channel" index="50"> |
400 | + <chassis name="server" implement="MUST"/> |
401 | + <chassis name="client" implement="MUST"/> |
402 | + <method name="declare" synchronous="1" index="10"> |
403 | + <chassis name="server" implement="MUST"/> |
404 | + <response name="declare-ok"/> |
405 | + <field name="ticket" domain="access ticket"/> |
406 | + <field name="queue" domain="queue name"> |
407 | + <assert check="regexp" value="^[a-zA-Z0-9-_.:]*$"/> |
408 | + </field> |
409 | + <field name="passive" type="bit"/> |
410 | + <field name="durable" type="bit"/> |
411 | + <field name="exclusive" type="bit"/> |
412 | + <field name="auto delete" type="bit"/> |
413 | + <field name="nowait" type="bit"/> |
414 | + <field name="arguments" type="table"/> |
415 | + </method> |
416 | + <method name="declare-ok" synchronous="1" index="11"> |
417 | + <chassis name="client" implement="MUST"/> |
418 | + <field name="queue" domain="queue name"> |
419 | + <assert check="notnull"/> |
420 | + </field> |
421 | + <field name="message count" type="long"/> |
422 | + <field name="consumer count" type="long"/> |
423 | + </method> |
424 | + <method name="bind" synchronous="1" index="20"> |
425 | + <chassis name="server" implement="MUST"/> |
426 | + <response name="bind-ok"/> |
427 | + <field name="ticket" domain="access ticket"/> |
428 | + <field name="queue" domain="queue name"/> |
429 | + <field name="exchange" domain="exchange name"/> |
430 | + <field name="routing key" type="shortstr"/> |
431 | + <field name="nowait" type="bit"/> |
432 | + <field name="arguments" type="table"/> |
433 | + </method> |
434 | + <method name="bind-ok" synchronous="1" index="21"> |
435 | + <chassis name="client" implement="MUST"/> |
436 | + </method> |
437 | + <method name="purge" synchronous="1" index="30"> |
438 | + <chassis name="server" implement="MUST"/> |
439 | + <response name="purge-ok"/> |
440 | + <field name="ticket" domain="access ticket"/> |
441 | + <field name="queue" domain="queue name"/> |
442 | + <field name="nowait" type="bit"/> |
443 | + </method> |
444 | + <method name="purge-ok" synchronous="1" index="31"> |
445 | + <chassis name="client" implement="MUST"/> |
446 | + <field name="message count" type="long"/> |
447 | + </method> |
448 | + <method name="delete" synchronous="1" index="40"> |
449 | + <chassis name="server" implement="MUST"/> |
450 | + <response name="delete-ok"/> |
451 | + <field name="ticket" domain="access ticket"/> |
452 | + <field name="queue" domain="queue name"/> |
453 | + <field name="if unused" type="bit"/> |
454 | + <field name="if empty" type="bit"> |
455 | + <test/> |
456 | + </field> |
457 | + <field name="nowait" type="bit"/> |
458 | + </method> |
459 | + <method name="delete-ok" synchronous="1" index="41"> |
460 | + <chassis name="client" implement="MUST"/> |
461 | + <field name="message count" type="long"/> |
462 | + </method> |
463 | + </class> |
464 | + <class name="basic" handler="channel" index="60"> |
465 | + <chassis name="server" implement="MUST"/> |
466 | + <chassis name="client" implement="MAY"/> |
467 | + <field name="content type" type="shortstr"/> |
468 | + <field name="content encoding" type="shortstr"/> |
469 | + <field name="headers" type="table"/> |
470 | + <field name="delivery mode" type="octet"/> |
471 | + <field name="priority" type="octet"/> |
472 | + <field name="correlation id" type="shortstr"/> |
473 | + <field name="reply to" type="shortstr"/> |
474 | + <field name="expiration" type="shortstr"/> |
475 | + <field name="message id" type="shortstr"/> |
476 | + <field name="timestamp" type="timestamp"/> |
477 | + <field name="type" type="shortstr"/> |
478 | + <field name="user id" type="shortstr"/> |
479 | + <field name="app id" type="shortstr"/> |
480 | + <field name="cluster id" type="shortstr"/> |
481 | + <method name="qos" synchronous="1" index="10"> |
482 | + <chassis name="server" implement="MUST"/> |
483 | + <response name="qos-ok"/> |
484 | + <field name="prefetch size" type="long"/> |
485 | + <field name="prefetch count" type="short"/> |
486 | + <field name="global" type="bit"/> |
487 | + </method> |
488 | + <method name="qos-ok" synchronous="1" index="11"> |
489 | + <chassis name="client" implement="MUST"/> |
490 | + </method> |
491 | + <method name="consume" synchronous="1" index="20"> |
492 | + <chassis name="server" implement="MUST"/> |
493 | + <response name="consume-ok"/> |
494 | + <field name="ticket" domain="access ticket"/> |
495 | + <field name="queue" domain="queue name"/> |
496 | + <field name="consumer tag" domain="consumer tag"/> |
497 | + <field name="no local" domain="no local"/> |
498 | + <field name="no ack" domain="no ack"/> |
499 | + <field name="exclusive" type="bit"/> |
500 | + <field name="nowait" type="bit"/> |
501 | + </method> |
502 | + <method name="consume-ok" synchronous="1" index="21"> |
503 | + <chassis name="client" implement="MUST"/> |
504 | + <field name="consumer tag" domain="consumer tag"/> |
505 | + </method> |
506 | + <method name="cancel" synchronous="1" index="30"> |
507 | + <chassis name="server" implement="MUST"/> |
508 | + <response name="cancel-ok"/> |
509 | + <field name="consumer tag" domain="consumer tag"/> |
510 | + <field name="nowait" type="bit"/> |
511 | + </method> |
512 | + <method name="cancel-ok" synchronous="1" index="31"> |
513 | + <chassis name="client" implement="MUST"/> |
514 | + <field name="consumer tag" domain="consumer tag"/> |
515 | + </method> |
516 | + <method name="publish" content="1" index="40"> |
517 | + <chassis name="server" implement="MUST"/> |
518 | + <field name="ticket" domain="access ticket"/> |
519 | + <field name="exchange" domain="exchange name"/> |
520 | + <field name="routing key" type="shortstr"/> |
521 | + <field name="mandatory" type="bit"/> |
522 | + <field name="immediate" type="bit"/> |
523 | + </method> |
524 | + <method name="return" content="1" index="50"> |
525 | + <chassis name="client" implement="MUST"/> |
526 | + <field name="reply code" domain="reply code"/> |
527 | + <field name="reply text" domain="reply text"/> |
528 | + <field name="exchange" domain="exchange name"/> |
529 | + <field name="routing key" type="shortstr"/> |
530 | + </method> |
531 | + <method name="deliver" content="1" index="60"> |
532 | + <chassis name="client" implement="MUST"/> |
533 | + <field name="consumer tag" domain="consumer tag"/> |
534 | + <field name="delivery tag" domain="delivery tag"/> |
535 | + <field name="redelivered" domain="redelivered"/> |
536 | + <field name="exchange" domain="exchange name"/> |
537 | + <field name="routing key" type="shortstr"/> |
538 | + </method> |
539 | + <method name="get" synchronous="1" index="70"> |
540 | + <response name="get-ok"/> |
541 | + <response name="get-empty"/> |
542 | + <chassis name="server" implement="MUST"/> |
543 | + <field name="ticket" domain="access ticket"/> |
544 | + <field name="queue" domain="queue name"/> |
545 | + <field name="no ack" domain="no ack"/> |
546 | + </method> |
547 | + <method name="get-ok" synchronous="1" content="1" index="71"> |
548 | + <chassis name="client" implement="MAY"/> |
549 | + <field name="delivery tag" domain="delivery tag"/> |
550 | + <field name="redelivered" domain="redelivered"/> |
551 | + <field name="exchange" domain="exchange name"/> |
552 | + <field name="routing key" type="shortstr"/> |
553 | + <field name="message count" type="long"/> |
554 | + </method> |
555 | + <method name="get-empty" synchronous="1" index="72"> |
556 | + <chassis name="client" implement="MAY"/> |
557 | + <field name="cluster id" type="shortstr"/> |
558 | + </method> |
559 | + <method name="ack" index="80"> |
560 | + <chassis name="server" implement="MUST"/> |
561 | + <field name="delivery tag" domain="delivery tag"/> |
562 | + <field name="multiple" type="bit"/> |
563 | + </method> |
564 | + <method name="reject" index="90"> |
565 | + <chassis name="server" implement="MUST"/> |
566 | + <field name="delivery tag" domain="delivery tag"/> |
567 | + <field name="requeue" type="bit"/> |
568 | + </method> |
569 | + <method name="recover" index="100"> |
570 | + <chassis name="server" implement="MUST"/> |
571 | + <field name="requeue" type="bit"/> |
572 | + </method> |
573 | + </class> |
574 | + <class name="file" handler="channel" index="70"> |
575 | + <chassis name="server" implement="MAY"/> |
576 | + <chassis name="client" implement="MAY"/> |
577 | + <field name="content type" type="shortstr"/> |
578 | + <field name="content encoding" type="shortstr"/> |
579 | + <field name="headers" type="table"/> |
580 | + <field name="priority" type="octet"/> |
581 | + <field name="reply to" type="shortstr"/> |
582 | + <field name="message id" type="shortstr"/> |
583 | + <field name="filename" type="shortstr"/> |
584 | + <field name="timestamp" type="timestamp"/> |
585 | + <field name="cluster id" type="shortstr"/> |
586 | + <method name="qos" synchronous="1" index="10"> |
587 | + <chassis name="server" implement="MUST"/> |
588 | + <response name="qos-ok"/> |
589 | + <field name="prefetch size" type="long"/> |
590 | + <field name="prefetch count" type="short"/> |
591 | + <field name="global" type="bit"/> |
592 | + </method> |
593 | + <method name="qos-ok" synchronous="1" index="11"> |
594 | + <chassis name="client" implement="MUST"/> |
595 | + </method> |
596 | + <method name="consume" synchronous="1" index="20"> |
597 | + <chassis name="server" implement="MUST"/> |
598 | + <response name="consume-ok"/> |
599 | + <field name="ticket" domain="access ticket"/> |
600 | + <field name="queue" domain="queue name"/> |
601 | + <field name="consumer tag" domain="consumer tag"/> |
602 | + <field name="no local" domain="no local"/> |
603 | + <field name="no ack" domain="no ack"/> |
604 | + <field name="exclusive" type="bit"/> |
605 | + <field name="nowait" type="bit"/> |
606 | + </method> |
607 | + <method name="consume-ok" synchronous="1" index="21"> |
608 | + <chassis name="client" implement="MUST"/> |
609 | + <field name="consumer tag" domain="consumer tag"/> |
610 | + </method> |
611 | + <method name="cancel" synchronous="1" index="30"> |
612 | + <chassis name="server" implement="MUST"/> |
613 | + <response name="cancel-ok"/> |
614 | + <field name="consumer tag" domain="consumer tag"/> |
615 | + <field name="nowait" type="bit"/> |
616 | + </method> |
617 | + <method name="cancel-ok" synchronous="1" index="31"> |
618 | + <chassis name="client" implement="MUST"/> |
619 | + <field name="consumer tag" domain="consumer tag"/> |
620 | + </method> |
621 | + <method name="open" synchronous="1" index="40"> |
622 | + <response name="open-ok"/> |
623 | + <chassis name="server" implement="MUST"/> |
624 | + <chassis name="client" implement="MUST"/> |
625 | + <field name="identifier" type="shortstr"/> |
626 | + <field name="content size" type="longlong"/> |
627 | + </method> |
628 | + <method name="open-ok" synchronous="1" index="41"> |
629 | + <response name="stage"/> |
630 | + <chassis name="server" implement="MUST"/> |
631 | + <chassis name="client" implement="MUST"/> |
632 | + <field name="staged size" type="longlong"/> |
633 | + </method> |
634 | + <method name="stage" content="1" index="50"> |
635 | + <chassis name="server" implement="MUST"/> |
636 | + <chassis name="client" implement="MUST"/> |
637 | + </method> |
638 | + <method name="publish" index="60"> |
639 | + <chassis name="server" implement="MUST"/> |
640 | + <field name="ticket" domain="access ticket"/> |
641 | + <field name="exchange" domain="exchange name"/> |
642 | + <field name="routing key" type="shortstr"/> |
643 | + <field name="mandatory" type="bit"/> |
644 | + <field name="immediate" type="bit"/> |
645 | + <field name="identifier" type="shortstr"/> |
646 | + </method> |
647 | + <method name="return" content="1" index="70"> |
648 | + <chassis name="client" implement="MUST"/> |
649 | + <field name="reply code" domain="reply code"/> |
650 | + <field name="reply text" domain="reply text"/> |
651 | + <field name="exchange" domain="exchange name"/> |
652 | + <field name="routing key" type="shortstr"/> |
653 | + </method> |
654 | + <method name="deliver" index="80"> |
655 | + <chassis name="client" implement="MUST"/> |
656 | + <field name="consumer tag" domain="consumer tag"/> |
657 | + <field name="delivery tag" domain="delivery tag"/> |
658 | + <field name="redelivered" domain="redelivered"/> |
659 | + <field name="exchange" domain="exchange name"/> |
660 | + <field name="routing key" type="shortstr"/> |
661 | + <field name="identifier" type="shortstr"/> |
662 | + </method> |
663 | + <method name="ack" index="90"> |
664 | + <chassis name="server" implement="MUST"/> |
665 | + <field name="delivery tag" domain="delivery tag"/> |
666 | + <field name="multiple" type="bit"/> |
667 | + </method> |
668 | + <method name="reject" index="100"> |
669 | + <chassis name="server" implement="MUST"/> |
670 | + <field name="delivery tag" domain="delivery tag"/> |
671 | + <field name="requeue" type="bit"/> |
672 | + </method> |
673 | + </class> |
674 | + <class name="stream" handler="channel" index="80"> |
675 | + <chassis name="server" implement="MAY"/> |
676 | + <chassis name="client" implement="MAY"/> |
677 | + <field name="content type" type="shortstr"/> |
678 | + <field name="content encoding" type="shortstr"/> |
679 | + <field name="headers" type="table"/> |
680 | + <field name="priority" type="octet"/> |
681 | + <field name="timestamp" type="timestamp"/> |
682 | + <method name="qos" synchronous="1" index="10"> |
683 | + <chassis name="server" implement="MUST"/> |
684 | + <response name="qos-ok"/> |
685 | + <field name="prefetch size" type="long"/> |
686 | + <field name="prefetch count" type="short"/> |
687 | + <field name="consume rate" type="long"/> |
688 | + <field name="global" type="bit"/> |
689 | + </method> |
690 | + <method name="qos-ok" synchronous="1" index="11"> |
691 | + <chassis name="client" implement="MUST"/> |
692 | + </method> |
693 | + <method name="consume" synchronous="1" index="20"> |
694 | + <chassis name="server" implement="MUST"/> |
695 | + <response name="consume-ok"/> |
696 | + <field name="ticket" domain="access ticket"/> |
697 | + <field name="queue" domain="queue name"/> |
698 | + <field name="consumer tag" domain="consumer tag"/> |
699 | + <field name="no local" domain="no local"/> |
700 | + <field name="exclusive" type="bit"/> |
701 | + <field name="nowait" type="bit"/> |
702 | + </method> |
703 | + <method name="consume-ok" synchronous="1" index="21"> |
704 | + <chassis name="client" implement="MUST"/> |
705 | + <field name="consumer tag" domain="consumer tag"/> |
706 | + </method> |
707 | + <method name="cancel" synchronous="1" index="30"> |
708 | + <chassis name="server" implement="MUST"/> |
709 | + <response name="cancel-ok"/> |
710 | + <field name="consumer tag" domain="consumer tag"/> |
711 | + <field name="nowait" type="bit"/> |
712 | + </method> |
713 | + <method name="cancel-ok" synchronous="1" index="31"> |
714 | + <chassis name="client" implement="MUST"/> |
715 | + <field name="consumer tag" domain="consumer tag"/> |
716 | + </method> |
717 | + <method name="publish" content="1" index="40"> |
718 | + <chassis name="server" implement="MUST"/> |
719 | + <field name="ticket" domain="access ticket"/> |
720 | + <field name="exchange" domain="exchange name"/> |
721 | + <field name="routing key" type="shortstr"/> |
722 | + <field name="mandatory" type="bit"/> |
723 | + <field name="immediate" type="bit"/> |
724 | + </method> |
725 | + <method name="return" content="1" index="50"> |
726 | + <chassis name="client" implement="MUST"/> |
727 | + <field name="reply code" domain="reply code"/> |
728 | + <field name="reply text" domain="reply text"/> |
729 | + <field name="exchange" domain="exchange name"/> |
730 | + <field name="routing key" type="shortstr"/> |
731 | + </method> |
732 | + <method name="deliver" content="1" index="60"> |
733 | + <chassis name="client" implement="MUST"/> |
734 | + <field name="consumer tag" domain="consumer tag"/> |
735 | + <field name="delivery tag" domain="delivery tag"/> |
736 | + <field name="exchange" domain="exchange name"/> |
737 | + <field name="queue" domain="queue name"> |
738 | + <assert check="notnull"/> |
739 | + </field> |
740 | + </method> |
741 | + </class> |
742 | + <class name="tx" handler="channel" index="90"> |
743 | + <chassis name="server" implement="SHOULD"/> |
744 | + <chassis name="client" implement="MAY"/> |
745 | + <method name="select" synchronous="1" index="10"> |
746 | + <chassis name="server" implement="MUST"/> |
747 | + <response name="select-ok"/> |
748 | + </method> |
749 | + <method name="select-ok" synchronous="1" index="11"> |
750 | + <chassis name="client" implement="MUST"/> |
751 | + </method> |
752 | + <method name="commit" synchronous="1" index="20"> |
753 | + <chassis name="server" implement="MUST"/> |
754 | + <response name="commit-ok"/> |
755 | + </method> |
756 | + <method name="commit-ok" synchronous="1" index="21"> |
757 | + <chassis name="client" implement="MUST"/> |
758 | + </method> |
759 | + <method name="rollback" synchronous="1" index="30"> |
760 | + <chassis name="server" implement="MUST"/> |
761 | + <response name="rollback-ok"/> |
762 | + </method> |
763 | + <method name="rollback-ok" synchronous="1" index="31"> |
764 | + <chassis name="client" implement="MUST"/> |
765 | + </method> |
766 | + </class> |
767 | + <class name="dtx" handler="channel" index="100"> |
768 | + <chassis name="server" implement="MAY"/> |
769 | + <chassis name="client" implement="MAY"/> |
770 | + <method name="select" synchronous="1" index="10"> |
771 | + <chassis name="server" implement="MUST"/> |
772 | + <response name="select-ok"/> |
773 | + </method> |
774 | + <method name="select-ok" synchronous="1" index="11"> |
775 | + <chassis name="client" implement="MUST"/> |
776 | + </method> |
777 | + <method name="start" synchronous="1" index="20"> |
778 | + <chassis name="server" implement="MAY"/> |
779 | + <response name="start-ok"/> |
780 | + <field name="dtx identifier" type="shortstr"> |
781 | + <assert check="notnull"/> |
782 | + </field> |
783 | + </method> |
784 | + <method name="start-ok" synchronous="1" index="21"> |
785 | + <chassis name="client" implement="MUST"/> |
786 | + </method> |
787 | + </class> |
788 | + <class name="tunnel" handler="tunnel" index="110"> |
789 | + <chassis name="server" implement="MAY"/> |
790 | + <chassis name="client" implement="MAY"/> |
791 | + <field name="headers" type="table"/> |
792 | + <field name="proxy name" type="shortstr"/> |
793 | + <field name="data name" type="shortstr"/> |
794 | + <field name="durable" type="octet"/> |
795 | + <field name="broadcast" type="octet"/> |
796 | + <method name="request" content="1" index="10"> |
797 | + <chassis name="server" implement="MUST"/> |
798 | + <field name="meta data" type="table"/> |
799 | + </method> |
800 | + </class> |
801 | + <class name="test" handler="channel" index="120"> |
802 | + <chassis name="server" implement="MUST"/> |
803 | + <chassis name="client" implement="SHOULD"/> |
804 | + <method name="integer" synchronous="1" index="10"> |
805 | + <chassis name="client" implement="MUST"/> |
806 | + <chassis name="server" implement="MUST"/> |
807 | + <response name="integer-ok"/> |
808 | + <field name="integer 1" type="octet"/> |
809 | + <field name="integer 2" type="short"/> |
810 | + <field name="integer 3" type="long"/> |
811 | + <field name="integer 4" type="longlong"/> |
812 | + <field name="operation" type="octet"> |
813 | + <assert check="enum"> |
814 | + <value name="add"/> |
815 | + <value name="min"/> |
816 | + <value name="max"/> |
817 | + </assert> |
818 | + </field> |
819 | + </method> |
820 | + <method name="integer-ok" synchronous="1" index="11"> |
821 | + <chassis name="client" implement="MUST"/> |
822 | + <chassis name="server" implement="MUST"/> |
823 | + <field name="result" type="longlong"/> |
824 | + </method> |
825 | + <method name="string" synchronous="1" index="20"> |
826 | + <chassis name="client" implement="MUST"/> |
827 | + <chassis name="server" implement="MUST"/> |
828 | + <response name="string-ok"/> |
829 | + <field name="string 1" type="shortstr"/> |
830 | + <field name="string 2" type="longstr"/> |
831 | + <field name="operation" type="octet"> |
832 | + <assert check="enum"> |
833 | + <value name="add"/> |
834 | + <value name="min"/> |
835 | + <value name="max"/> |
836 | + </assert> |
837 | + </field> |
838 | + </method> |
839 | + <method name="string-ok" synchronous="1" index="21"> |
840 | + <chassis name="client" implement="MUST"/> |
841 | + <chassis name="server" implement="MUST"/> |
842 | + <field name="result" type="longstr"/> |
843 | + </method> |
844 | + <method name="table" synchronous="1" index="30"> |
845 | + <chassis name="client" implement="MUST"/> |
846 | + <chassis name="server" implement="MUST"/> |
847 | + <response name="table-ok"/> |
848 | + <field name="table" type="table"/> |
849 | + <field name="integer op" type="octet"> |
850 | + <assert check="enum"> |
851 | + <value name="add"/> |
852 | + <value name="min"/> |
853 | + <value name="max"/> |
854 | + </assert> |
855 | + </field> |
856 | + <field name="string op" type="octet"> |
857 | + <assert check="enum"> |
858 | + <value name="add"/> |
859 | + <value name="min"/> |
860 | + <value name="max"/> |
861 | + </assert> |
862 | + </field> |
863 | + </method> |
864 | + <method name="table-ok" synchronous="1" index="31"> |
865 | + <chassis name="client" implement="MUST"/> |
866 | + <chassis name="server" implement="MUST"/> |
867 | + <field name="integer result" type="longlong"/> |
868 | + <field name="string result" type="longstr"/> |
869 | + </method> |
870 | + <method name="content" synchronous="1" content="1" index="40"> |
871 | + <chassis name="client" implement="MUST"/> |
872 | + <chassis name="server" implement="MUST"/> |
873 | + <response name="content-ok"/> |
874 | + </method> |
875 | + <method name="content-ok" synchronous="1" content="1" index="41"> |
876 | + <chassis name="client" implement="MUST"/> |
877 | + <chassis name="server" implement="MUST"/> |
878 | + <field name="content checksum" type="long"/> |
879 | + </method> |
880 | + </class> |
881 | +</amqp> |
882 | \ No newline at end of file |
883 | |
884 | === added file 'carbon/lib/carbon/amqp_listener.py' |
885 | --- carbon/lib/carbon/amqp_listener.py 1970-01-01 00:00:00 +0000 |
886 | +++ carbon/lib/carbon/amqp_listener.py 2010-01-11 15:35:21 +0000 |
887 | @@ -0,0 +1,178 @@ |
888 | +#!/usr/bin/env python |
889 | +""" |
890 | +Copyright 2009 Lucio Torre <lucio.torre@canonical.com> |
891 | + |
892 | +This is an AMQP client that will connect to the specified broker and read |
893 | +messages, parse them, and post them as metrics. |
894 | + |
895 | +The message format is the same as in the TCP line protocol |
896 | +(METRIC, VALUE, TIMESTAMP) with the added possibility of putting multiple "\n" |
897 | +separated lines in one message. |
898 | + |
899 | +Can be started standalone for testing or using carbon-cache.py (see example |
900 | +config file provided) |
901 | + |
902 | +""" |
903 | +import sys |
904 | +import os |
905 | +from optparse import OptionParser |
906 | + |
907 | +from twisted.internet.defer import inlineCallbacks |
908 | +from twisted.internet import reactor |
909 | +from twisted.internet.protocol import ClientCreator, Protocol, \ |
910 | + ReconnectingClientFactory |
911 | +from txamqp.protocol import AMQClient |
912 | +from txamqp.client import TwistedDelegate |
913 | +import txamqp.spec |
914 | + |
915 | +try: |
916 | + import carbon |
917 | +except: |
918 | + # this is being run directly, carbon is not installed |
919 | + LIB_DIR = os.path.dirname(os.path.dirname(__file__)) |
920 | + sys.path.insert(0, LIB_DIR) |
921 | + |
922 | +import carbon.listeners #satisfy import order requirements |
923 | +from carbon.instrumentation import increment |
924 | +from carbon.events import metricReceived |
925 | +from carbon import log |
926 | + |
927 | + |
928 | + |
929 | +class AMQPGraphiteProtocol(AMQClient): |
930 | + """This is the protocol instance that will receive and post metrics.""" |
931 | + |
932 | + consumer_tag = "graphite_consumer" |
933 | + |
934 | + @inlineCallbacks |
935 | + def connectionMade(self): |
936 | + yield AMQClient.connectionMade(self) |
937 | + log.listener("New AMQP connection made") |
938 | + yield self.setup() |
939 | + yield self.receive_loop() |
940 | + |
941 | + @inlineCallbacks |
942 | + def setup(self): |
943 | + yield self.authenticate(self.factory.username, self.factory.password) |
944 | + chan = yield self.channel(1) |
945 | + yield chan.channel_open() |
946 | + |
947 | + yield chan.queue_declare(queue=self.factory.queue_name, durable=False, |
948 | + exclusive=False, auto_delete=True) |
949 | + yield chan.exchange_declare( |
950 | + exchange=self.factory.exchange_name, type="fanout", durable=False, |
951 | + auto_delete=True) |
952 | + |
953 | + yield chan.queue_bind(queue=self.factory.queue_name, |
954 | + exchange=self.factory.exchange_name) |
955 | + |
956 | + yield chan.basic_consume(queue=self.factory.queue_name, no_ack=True, |
957 | + consumer_tag=self.consumer_tag) |
958 | + @inlineCallbacks |
959 | + def receive_loop(self): |
960 | + queue = yield self.queue(self.consumer_tag) |
961 | + |
962 | + while True: |
963 | + msg = yield queue.get() |
964 | + self.processMessage(msg) |
965 | + |
966 | + def processMessage(self, message): |
967 | + """Parse a message and post it as a metric.""" |
968 | + |
969 | + if self.factory.verbose: |
970 | + log.listener("Message received: %s" % (message,)) |
971 | + |
972 | + for line in message.content.body.split("\n"): |
973 | + try: |
974 | + metric, value, timestamp = line.strip().split() |
975 | + datapoint = ( float(timestamp), float(value) ) |
976 | + except ValueError: |
977 | + log.listener("wrong value in line: %s" % (line,)) |
978 | + continue |
979 | + |
980 | + increment('metricsReceived') |
981 | + metricReceived(metric, datapoint) |
982 | + if self.factory.verbose: |
983 | + log.listener("Metric posted: %s %s %s" % |
984 | + (metric, value, timestamp,)) |
985 | + |
986 | + |
987 | +class AMQPReconnectingFactory(ReconnectingClientFactory): |
988 | + """The reconnecting factory. |
989 | + |
990 | + Knows how to create the extended client and how to keep trying to |
991 | + connect in case of errors.""" |
992 | + |
993 | + protocol = AMQPGraphiteProtocol |
994 | + |
995 | + def __init__(self, username, password, delegate, vhost, spec, channel, |
996 | + exchange_name, queue_name, verbose): |
997 | + self.username = username |
998 | + self.password = password |
999 | + self.delegate = delegate |
1000 | + self.vhost = vhost |
1001 | + self.spec = spec |
1002 | + self.channel = channel |
1003 | + self.exchange_name = exchange_name |
1004 | + self.queue_name = queue_name |
1005 | + self.verbose = verbose |
1006 | + |
1007 | + def buildProtocol(self, addr): |
1008 | + p = self.protocol(self.delegate, self.vhost, self.spec) |
1009 | + p.factory = self |
1010 | + return p |
1011 | + |
1012 | +def startReceiver(host, port, username, password, vhost="/", spec=None, |
1013 | + channel=1, exchange_name="graphite_exchange", |
1014 | + queue_name="graphite_queue", verbose=False): |
1015 | + """Starts a twisted process that will read messages on the amqp broker |
1016 | + and post them as metrics.""" |
1017 | + |
1018 | + # use provided spec if not specified |
1019 | + if not spec: |
1020 | + spec = txamqp.spec.load(os.path.normpath( |
1021 | + os.path.join(os.path.dirname(__file__), 'amqp0-8.xml'))) |
1022 | + |
1023 | + delegate = TwistedDelegate() |
1024 | + factory = AMQPReconnectingFactory(username, password, delegate, vhost, |
1025 | + spec, channel, exchange_name, queue_name, |
1026 | + verbose=verbose) |
1027 | + reactor.connectTCP(host, port, factory) |
1028 | + |
1029 | + |
1030 | +def main(): |
1031 | + parser = OptionParser() |
1032 | + parser.add_option("-t", "--host", dest="host", |
1033 | + help="host name", metavar="HOST", default="localhost") |
1034 | + |
1035 | + parser.add_option("-p", "--port", dest="port", type=int, |
1036 | + help="port number", metavar="PORT", |
1037 | + default=5672) |
1038 | + |
1039 | + parser.add_option("-u", "--user", dest="username", |
1040 | + help="username", metavar="USERNAME", |
1041 | + default="guest") |
1042 | + |
1043 | + parser.add_option("-w", "--password", dest="password", |
1044 | + help="password", metavar="PASSWORD", |
1045 | + default="guest") |
1046 | + |
1047 | + parser.add_option("-V", "--vhost", dest="vhost", |
1048 | + help="vhost", metavar="VHOST", |
1049 | + default="/") |
1050 | + |
1051 | + parser.add_option("-v", "--verbose", dest="verbose", |
1052 | + help="verbose", |
1053 | + default=False, action="store_true") |
1054 | + |
1055 | + (options, args) = parser.parse_args() |
1056 | + |
1057 | + |
1058 | + log.logToStdout() |
1059 | + startReceiver(options.host, options.port, options.username, |
1060 | + options.password, vhost=options.vhost, |
1061 | + verbose=options.verbose) |
1062 | + reactor.run() |
1063 | + |
1064 | +if __name__ == "__main__": |
1065 | + main() |
1066 | |
1067 | === added file 'carbon/lib/carbon/amqp_publisher.py' |
1068 | --- carbon/lib/carbon/amqp_publisher.py 1970-01-01 00:00:00 +0000 |
1069 | +++ carbon/lib/carbon/amqp_publisher.py 2010-01-11 15:35:21 +0000 |
1070 | @@ -0,0 +1,81 @@ |
1071 | +#!/usr/bin/env python |
1072 | +""" |
1073 | +Copyright 2009 Lucio Torre <lucio.torre@canonical.com> |
1074 | + |
1075 | +Will publish metrics over AMQP |
1076 | +""" |
1077 | + |
1078 | +import os |
1079 | +from optparse import OptionParser |
1080 | + |
1081 | +from twisted.internet.defer import inlineCallbacks |
1082 | +from twisted.internet import reactor, task |
1083 | +from twisted.internet.protocol import ClientCreator |
1084 | +from txamqp.protocol import AMQClient |
1085 | +from txamqp.client import TwistedDelegate |
1086 | +from txamqp.content import Content |
1087 | +import txamqp.spec |
1088 | + |
1089 | +@inlineCallbacks |
1090 | +def gotConnection(conn, message, username, password, |
1091 | + channel_number, exchange_name): |
1092 | + yield conn.authenticate(username, password) |
1093 | + channel = yield conn.channel(channel_number) |
1094 | + yield channel.channel_open() |
1095 | + |
1096 | + msg = Content(message) |
1097 | + msg["delivery mode"] = 2 |
1098 | + channel.basic_publish(exchange=exchange_name, content=msg) |
1099 | + yield channel.channel_close() |
1100 | + |
1101 | +def writeMetric(message, host, port, username, password, vhost, |
1102 | + spec= None, channel=1, exchange_name="graphite_exchange"): |
1103 | + |
1104 | + if not spec: |
1105 | + spec = txamqp.spec.load(os.path.normpath( |
1106 | + os.path.join(os.path.dirname(__file__), 'amqp0-8.xml'))) |
1107 | + |
1108 | + delegate = TwistedDelegate() |
1109 | + |
1110 | + d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost, |
1111 | + spec=spec).connectTCP(host, port) |
1112 | + |
1113 | + d.addCallback(gotConnection, message, username, password, |
1114 | + channel, exchange_name) |
1115 | + |
1116 | + return d |
1117 | + |
1118 | + |
1119 | +def main(): |
1120 | + parser = OptionParser() |
1121 | + parser.add_option("-t", "--host", dest="host", |
1122 | + help="host name", metavar="HOST", default="localhost") |
1123 | + |
1124 | + parser.add_option("-p", "--port", dest="port", type=int, |
1125 | + help="port number", metavar="PORT", |
1126 | + default=5672) |
1127 | + |
1128 | + parser.add_option("-u", "--user", dest="username", |
1129 | + help="username", metavar="USERNAME", |
1130 | + default="guest") |
1131 | + |
1132 | + parser.add_option("-w", "--password", dest="password", |
1133 | + help="password", metavar="PASSWORD", |
1134 | + default="guest") |
1135 | + |
1136 | + parser.add_option("-v", "--vhost", dest="vhost", |
1137 | + help="vhost", metavar="VHOST", |
1138 | + default="/") |
1139 | + |
1140 | + (options, args) = parser.parse_args() |
1141 | + |
1142 | + |
1143 | + message = " ".join(args) |
1144 | + d = writeMetric(message, options.host, options.port, options.username, |
1145 | + options.password, vhost=options.vhost) |
1146 | + d.addErrback(lambda f: f.printTraceback()) |
1147 | + d.addBoth(lambda _: reactor.stop()) |
1148 | + reactor.run() |
1149 | + |
1150 | +if __name__ == "__main__": |
1151 | + main() |
add a listener for amqp.
includes sample configurations, a sample producer and a way of starting up the listener without installing everything to test it.