Merge lp:~sidnei/graphite/twistd-plugins into lp:~graphite-dev/graphite/main
- twistd-plugins
- Merge into main
Proposed by
Sidnei da Silva
Status: | Merged |
---|---|
Merged at revision: | 420 |
Proposed branch: | lp:~sidnei/graphite/twistd-plugins |
Merge into: | lp:~graphite-dev/graphite/main |
Prerequisite: | lp:~sidnei/graphite/hardcoded-conf-dir |
Diff against target: |
1445 lines (+621/-567) 15 files modified
carbon/bin/carbon-aggregator.py (+23/-156) carbon/bin/carbon-cache.py (+8/-183) carbon/bin/carbon-relay.py (+8/-161) carbon/lib/carbon/amqp_listener.py (+16/-4) carbon/lib/carbon/conf.py (+220/-20) carbon/lib/carbon/events.py (+1/-0) carbon/lib/carbon/instrumentation.py (+16/-3) carbon/lib/carbon/manhole.py (+7/-4) carbon/lib/carbon/rules.py (+5/-2) carbon/lib/carbon/service.py (+157/-0) carbon/lib/carbon/util.py (+58/-16) carbon/lib/carbon/writer.py (+27/-18) carbon/lib/twisted/plugins/carbon_aggregator_plugin.py (+25/-0) carbon/lib/twisted/plugins/carbon_cache_plugin.py (+25/-0) carbon/lib/twisted/plugins/carbon_relay_plugin.py (+25/-0) |
To merge this branch: | bzr merge lp:~sidnei/graphite/twistd-plugins |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
chrismd | Needs Fixing | ||
Review via email: mp+67391@code.launchpad.net |
Commit message
Description of the change
Refactor carbon startup scripts to use twistd, register each service as a separate twistd plugin.
To post a comment you must log in.
- 405. By Sidnei da Silva
-
- Make it work when running with twistd [plugin] as well.
- 406. By Sidnei da Silva
-
- Too much copy and paste
- 407. By Sidnei da Silva
-
- Move more config stuff to conf module.
- 408. By Sidnei da Silva
-
- Make the legacy scripts just invoke twistd with the right options such that the plugins are called instead.
- 409. By Sidnei da Silva
-
- Handle extra args
Revision history for this message
Sidnei da Silva (sidnei) wrote : | # |
Added support for the legacy arguments, so that the scripts work just as well as before. Very happy with how it turned out!
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'carbon/bin/carbon-aggregator.py' |
2 | --- carbon/bin/carbon-aggregator.py 2011-06-16 14:55:16 +0000 |
3 | +++ carbon/bin/carbon-aggregator.py 2011-07-12 05:59:28 +0000 |
4 | @@ -1,163 +1,30 @@ |
5 | #!/usr/bin/env python |
6 | +"""Copyright 2009 Chris Davis |
7 | + |
8 | +Licensed under the Apache License, Version 2.0 (the "License"); |
9 | +you may not use this file except in compliance with the License. |
10 | +You may obtain a copy of the License at |
11 | + |
12 | + http://www.apache.org/licenses/LICENSE-2.0 |
13 | + |
14 | +Unless required by applicable law or agreed to in writing, software |
15 | +distributed under the License is distributed on an "AS IS" BASIS, |
16 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
17 | +See the License for the specific language governing permissions and |
18 | +limitations under the License.""" |
19 | + |
20 | import sys |
21 | -import os |
22 | -import atexit |
23 | -from os.path import basename, dirname, exists, join, isdir |
24 | - |
25 | - |
26 | -program = basename( sys.argv[0] ).split('.')[0] |
27 | - |
28 | -# Initialize twisted |
29 | -try: |
30 | - from twisted.internet import epollreactor |
31 | - epollreactor.install() |
32 | -except: |
33 | - pass |
34 | -from twisted.internet import reactor |
35 | - |
36 | +from os.path import dirname, join, abspath |
37 | |
38 | # Figure out where we're installed |
39 | -BIN_DIR = dirname( os.path.abspath(__file__) ) |
40 | +BIN_DIR = dirname(abspath(__file__)) |
41 | ROOT_DIR = dirname(BIN_DIR) |
42 | + |
43 | +# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from |
44 | +# source. |
45 | LIB_DIR = join(ROOT_DIR, 'lib') |
46 | - |
47 | sys.path.insert(0, LIB_DIR) |
48 | -os.environ['GRAPHITE_ROOT'] = ROOT_DIR |
49 | - |
50 | -# Capture useful debug info for this commonly reported problem |
51 | -try: |
52 | - import carbon |
53 | -except ImportError: |
54 | - print 'Failed to import carbon, debug information follows.' |
55 | - print 'pwd=%s' % os.getcwd() |
56 | - print 'sys.path=%s' % sys.path |
57 | - print '__file__=%s' % __file__ |
58 | - sys.exit(1) |
59 | - |
60 | - |
61 | -# Read config (we want failures to occur before daemonizing) |
62 | -from carbon.conf import (get_default_parser, parse_options, |
63 | - read_config, settings as global_settings) |
64 | - |
65 | - |
66 | -parser = get_default_parser() |
67 | -parser.add_option( |
68 | - '--rules', |
69 | - default=None, |
70 | - help='Use the give aggregation rules file') |
71 | - |
72 | -(options, args) = parse_options(parser, sys.argv[1:]) |
73 | -settings = read_config(program, options, ROOT_DIR=ROOT_DIR) |
74 | -global_settings.update(settings) |
75 | - |
76 | -if options.rules is None: |
77 | - options.rules = join(settings.CONF_DIR, "aggregation-rules.conf") |
78 | - |
79 | -pidfile = settings.pidfile |
80 | -logdir = settings.LOG_DIR |
81 | - |
82 | -__builtins__.program = program |
83 | -action = args[0] |
84 | - |
85 | - |
86 | -if action == 'stop': |
87 | - if not exists(pidfile): |
88 | - print 'Pidfile %s does not exist' % pidfile |
89 | - raise SystemExit(0) |
90 | - |
91 | - pf = open(pidfile, 'r') |
92 | - try: |
93 | - pid = int( pidfile.read().strip() ) |
94 | - except: |
95 | - print 'Could not read pidfile %s' % pidfile |
96 | - raise SystemExit(1) |
97 | - |
98 | - print 'Deleting %s (contained pid %d)' % (pidfile, pid) |
99 | - os.unlink(pidfile) |
100 | - |
101 | - print 'Sending kill signal to pid %d' % pid |
102 | - os.kill(pid, 15) |
103 | - raise SystemExit(0) |
104 | - |
105 | - |
106 | -elif action == 'status': |
107 | - if not exists(pidfile): |
108 | - print '%s is not running' % program |
109 | - raise SystemExit(0) |
110 | - |
111 | - pf = open(pidfile, 'r') |
112 | - try: |
113 | - pid = int( pidfile.read().strip() ) |
114 | - except: |
115 | - print 'Failed to read pid from %s' % pidfile |
116 | - raise SystemExit(1) |
117 | - |
118 | - if exists('/proc/%d' % pid): |
119 | - print "%s is running with pid %d" % (program, pid) |
120 | - raise SystemExit(0) |
121 | - else: |
122 | - print "%s is not running" % program |
123 | - raise SystemExit(0) |
124 | - |
125 | -# Import application components |
126 | -from carbon.log import logToStdout, logToDir |
127 | -from carbon.instrumentation import startRecording |
128 | -from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener |
129 | -from carbon.aggregator.rules import RuleManager |
130 | -from carbon.aggregator import receiver |
131 | -from carbon.aggregator import client |
132 | -from carbon.rewrite import RewriteRuleManager |
133 | -from carbon.events import metricReceived |
134 | -from carbon.util import daemonize |
135 | - |
136 | -RuleManager.read_from(options.rules) |
137 | - |
138 | -rewrite_rules_conf = join(settings.CONF_DIR, 'rewrite-rules.conf') |
139 | -if exists(rewrite_rules_conf): |
140 | - RewriteRuleManager.read_from(rewrite_rules_conf) |
141 | - |
142 | -# --debug |
143 | -if options.debug: |
144 | - logToStdout() |
145 | - |
146 | -else: |
147 | - if not isdir(logdir): |
148 | - os.makedirs(logdir) |
149 | - |
150 | - daemonize() |
151 | - |
152 | - pf = open(pidfile, 'w') |
153 | - pf.write( str(os.getpid()) ) |
154 | - pf.close() |
155 | - |
156 | - def shutdown(): |
157 | - if os.path.exists(pidfile): |
158 | - os.unlink(pidfile) |
159 | - |
160 | - atexit.register(shutdown) |
161 | - |
162 | - logToDir(logdir) |
163 | - |
164 | - |
165 | -# Configure application components |
166 | -metricReceived.installHandler(receiver.process) |
167 | -startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver) |
168 | -startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver) |
169 | - |
170 | -client.connect(settings.DESTINATION_HOST, settings.DESTINATION_PORT) |
171 | -startRecording() |
172 | - |
173 | - |
174 | -# Run the twisted reactor |
175 | -print "%s running with pid %d" % (program, os.getpid()) |
176 | - |
177 | -if options.profile: |
178 | - import cProfile |
179 | - |
180 | - if exists(options.profile): |
181 | - os.unlink(options.profile) |
182 | - |
183 | - cProfile.run('reactor.run()', options.profile) |
184 | - |
185 | -else: |
186 | - reactor.run() |
187 | + |
188 | +from carbon.util import run_twistd_plugin |
189 | + |
190 | +run_twistd_plugin(__file__) |
191 | |
192 | === modified file 'carbon/bin/carbon-cache.py' |
193 | --- carbon/bin/carbon-cache.py 2011-06-16 14:55:16 +0000 |
194 | +++ carbon/bin/carbon-cache.py 2011-07-12 05:59:28 +0000 |
195 | @@ -14,192 +14,17 @@ |
196 | limitations under the License.""" |
197 | |
198 | import sys |
199 | -import os |
200 | -import socket |
201 | -import pwd |
202 | -import atexit |
203 | -from os.path import basename, dirname, exists, join, isdir |
204 | - |
205 | -program = basename( sys.argv[0] ).split('.')[0] |
206 | -hostname = socket.gethostname().split('.')[0] |
207 | -os.umask(022) |
208 | - |
209 | -# Initialize twisted |
210 | -try: |
211 | - from twisted.internet import epollreactor |
212 | - epollreactor.install() |
213 | -except: |
214 | - pass |
215 | -from twisted.internet import reactor |
216 | - |
217 | +from os.path import dirname, join, abspath |
218 | |
219 | # Figure out where we're installed |
220 | -BIN_DIR = dirname( os.path.abspath(__file__) ) |
221 | +BIN_DIR = dirname(abspath(__file__)) |
222 | ROOT_DIR = dirname(BIN_DIR) |
223 | + |
224 | +# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from |
225 | +# source. |
226 | LIB_DIR = join(ROOT_DIR, 'lib') |
227 | sys.path.insert(0, LIB_DIR) |
228 | |
229 | - |
230 | -# Capture useful debug info for this commonly reported problem |
231 | -try: |
232 | - import carbon |
233 | -except ImportError: |
234 | - print 'Failed to import carbon, debug information follows.' |
235 | - print 'pwd=%s' % os.getcwd() |
236 | - print 'sys.path=%s' % sys.path |
237 | - print '__file__=%s' % __file__ |
238 | - sys.exit(1) |
239 | - |
240 | - |
241 | -# Read config (we want failures to occur before daemonizing) |
242 | -from carbon.conf import (get_default_parser, parse_options, |
243 | - read_config, settings as global_settings) |
244 | - |
245 | - |
246 | -(options, args) = parse_options(get_default_parser(), sys.argv[1:]) |
247 | -settings = read_config(program, options, ROOT_DIR=ROOT_DIR) |
248 | -global_settings.update(settings) |
249 | - |
250 | -instance = options.instance |
251 | -pidfile = settings.pidfile |
252 | -logdir = settings.LOG_DIR |
253 | - |
254 | - |
255 | -__builtins__.instance = instance # This isn't as evil as you might think |
256 | -__builtins__.program = program |
257 | -action = args[0] |
258 | - |
259 | - |
260 | -if action == 'stop': |
261 | - if not exists(pidfile): |
262 | - print 'Pidfile %s does not exist' % pidfile |
263 | - raise SystemExit(0) |
264 | - |
265 | - pf = open(pidfile, 'r') |
266 | - try: |
267 | - pid = int( pf.read().strip() ) |
268 | - except: |
269 | - print 'Could not read pidfile %s' % pidfile |
270 | - raise SystemExit(1) |
271 | - |
272 | - print 'Deleting %s (contained pid %d)' % (pidfile, pid) |
273 | - os.unlink(pidfile) |
274 | - |
275 | - print 'Sending kill signal to pid %d' % pid |
276 | - os.kill(pid, 15) |
277 | - raise SystemExit(0) |
278 | - |
279 | - |
280 | -elif action == 'status': |
281 | - if not exists(pidfile): |
282 | - print '%s (instance %s) is not running' % (program, instance) |
283 | - raise SystemExit(0) |
284 | - |
285 | - pf = open(pidfile, 'r') |
286 | - try: |
287 | - pid = int( pf.read().strip() ) |
288 | - except: |
289 | - print 'Failed to read pid from %s' % pidfile |
290 | - raise SystemExit(1) |
291 | - |
292 | - if exists('/proc/%d' % pid): |
293 | - print "%s (instance %s) is running with pid %d" % (program, instance, pid) |
294 | - raise SystemExit(0) |
295 | - else: |
296 | - print "%s (instance %s) is not running" % (program, instance) |
297 | - raise SystemExit(0) |
298 | - |
299 | -if exists(pidfile): |
300 | - print "Pidfile %s already exists, is %s already running?" % (pidfile, program) |
301 | - raise SystemExit(1) |
302 | - |
303 | -# Import application components |
304 | -from carbon.log import logToStdout, logToDir |
305 | -from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, CacheQueryHandler, startListener |
306 | -from carbon.cache import MetricCache |
307 | -from carbon.instrumentation import startRecording |
308 | -from carbon.events import metricReceived |
309 | - |
310 | -storage_schemas = join(settings.CONF_DIR, 'storage-schemas.conf') |
311 | -if not exists(storage_schemas): |
312 | - print "Error: missing required config %s" % storage_schemas |
313 | - sys.exit(1) |
314 | - |
315 | -use_amqp = settings.get("ENABLE_AMQP", False) |
316 | -if use_amqp: |
317 | - from carbon import amqp_listener |
318 | - amqp_host = settings.get("AMQP_HOST", "localhost") |
319 | - amqp_port = settings.get("AMQP_PORT", 5672) |
320 | - amqp_user = settings.get("AMQP_USER", "guest") |
321 | - amqp_password = settings.get("AMQP_PASSWORD", "guest") |
322 | - amqp_verbose = settings.get("AMQP_VERBOSE", False) |
323 | - amqp_vhost = settings.get("AMQP_VHOST", "/") |
324 | - amqp_spec = settings.get("AMQP_SPEC", None) |
325 | - amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite") |
326 | - |
327 | - |
328 | -# --debug |
329 | -if options.debug: |
330 | - logToStdout() |
331 | - |
332 | -else: |
333 | - if not isdir(logdir): |
334 | - os.makedirs(logdir) |
335 | - |
336 | - if settings.USER: |
337 | - print "Dropping privileges to become the user %s" % settings.USER |
338 | - |
339 | - from carbon.util import daemonize, dropprivs |
340 | - daemonize() |
341 | - |
342 | - pf = open(pidfile, 'w') |
343 | - pf.write( str(os.getpid()) ) |
344 | - pf.close() |
345 | - |
346 | - def shutdown(): |
347 | - if os.path.exists(pidfile): |
348 | - os.unlink(pidfile) |
349 | - |
350 | - atexit.register(shutdown) |
351 | - |
352 | - if settings.USER: |
353 | - pwent = pwd.getpwnam(settings.USER) |
354 | - os.chown(pidfile, pwent.pw_uid, pwent.pw_gid) |
355 | - dropprivs(settings.USER) |
356 | - |
357 | - logToDir(logdir) |
358 | - |
359 | -# Configure application components |
360 | -metricReceived.installHandler(MetricCache.store) |
361 | -startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver) |
362 | -startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver) |
363 | -startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler) |
364 | - |
365 | -if use_amqp: |
366 | - amqp_listener.startReceiver(amqp_host, amqp_port, amqp_user, amqp_password, |
367 | - vhost=amqp_vhost, spec=amqp_spec, |
368 | - exchange_name=amqp_exchange_name, |
369 | - verbose=amqp_verbose) |
370 | - |
371 | -if settings.ENABLE_MANHOLE: |
372 | - from carbon import manhole |
373 | - manhole.start() |
374 | - |
375 | -from carbon.writer import startWriter # have to import this *after* settings are defined |
376 | -startWriter() |
377 | -startRecording() |
378 | - |
379 | - |
380 | -# Run the twisted reactor |
381 | -print "%s running [instance %s]" % (program, instance) |
382 | - |
383 | -if options.profile: |
384 | - import cProfile |
385 | - |
386 | - if exists(options.profile): |
387 | - os.unlink(options.profile) |
388 | - |
389 | - cProfile.run('reactor.run()', options.profile) |
390 | - |
391 | -else: |
392 | - reactor.run() |
393 | +from carbon.util import run_twistd_plugin |
394 | + |
395 | +run_twistd_plugin(__file__) |
396 | |
397 | === modified file 'carbon/bin/carbon-relay.py' |
398 | --- carbon/bin/carbon-relay.py 2011-06-16 14:55:16 +0000 |
399 | +++ carbon/bin/carbon-relay.py 2011-07-12 05:59:28 +0000 |
400 | @@ -14,170 +14,17 @@ |
401 | limitations under the License.""" |
402 | |
403 | import sys |
404 | -import os |
405 | -import atexit |
406 | -from os.path import basename, dirname, exists, join, isdir |
407 | - |
408 | -program = basename( sys.argv[0] ).split('.')[0] |
409 | -os.umask(022) |
410 | - |
411 | - |
412 | -# Initialize twisted |
413 | -try: |
414 | - from twisted.internet import epollreactor |
415 | - epollreactor.install() |
416 | -except: |
417 | - pass |
418 | -from twisted.internet import reactor |
419 | - |
420 | +from os.path import dirname, join, abspath |
421 | |
422 | # Figure out where we're installed |
423 | -BIN_DIR = dirname(__file__) |
424 | +BIN_DIR = dirname(abspath(__file__)) |
425 | ROOT_DIR = dirname(BIN_DIR) |
426 | + |
427 | +# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from |
428 | +# source. |
429 | LIB_DIR = join(ROOT_DIR, 'lib') |
430 | sys.path.insert(0, LIB_DIR) |
431 | |
432 | - |
433 | -# Capture useful debug info for this commonly reported problem |
434 | -try: |
435 | - import carbon |
436 | -except ImportError: |
437 | - print 'Failed to import carbon, debug information follows.' |
438 | - print 'pwd=%s' % os.getcwd() |
439 | - print 'sys.path=%s' % sys.path |
440 | - print '__file__=%s' % __file__ |
441 | - sys.exit(1) |
442 | - |
443 | - |
444 | -# Read config (we want failures to occur before daemonizing) |
445 | -from carbon.conf import (get_default_parser, parse_options, |
446 | - read_config, settings as global_settings) |
447 | - |
448 | - |
449 | -parser = get_default_parser() |
450 | -parser.add_option( |
451 | - '--rules', |
452 | - default=None, |
453 | - help='Use the given relay rules file') |
454 | - |
455 | -(options, args) = parse_options(parser, sys.argv[1:]) |
456 | -settings = read_config(program, options, ROOT_DIR=ROOT_DIR) |
457 | -global_settings.update(settings) |
458 | - |
459 | -if options.rules is None: |
460 | - options.rules = join(settings.CONF_DIR, "relay-rules.conf") |
461 | - |
462 | -pidfile = settings.pidfile |
463 | -logdir = settings.LOG_DIR |
464 | - |
465 | -__builtins__.program = program |
466 | -action = args[0] |
467 | - |
468 | - |
469 | -if action == 'stop': |
470 | - if not exists(pidfile): |
471 | - print 'Pidfile %s does not exist' % pidfile |
472 | - raise SystemExit(0) |
473 | - |
474 | - pf = open(pidfile, 'r') |
475 | - try: |
476 | - pid = int( pf.read().strip() ) |
477 | - except: |
478 | - print 'Could not read pidfile %s' % pidfile |
479 | - raise SystemExit(1) |
480 | - |
481 | - print 'Deleting %s (contained pid %d)' % (pidfile, pid) |
482 | - os.unlink(pidfile) |
483 | - |
484 | - print 'Sending kill signal to pid %d' % pid |
485 | - os.kill(pid, 15) |
486 | - raise SystemExit(0) |
487 | - |
488 | - |
489 | -elif action == 'status': |
490 | - if not exists(pidfile): |
491 | - print '%s is not running' % program |
492 | - raise SystemExit(0) |
493 | - |
494 | - pf = open(pidfile, 'r') |
495 | - try: |
496 | - pid = int( pf.read().strip() ) |
497 | - except: |
498 | - print 'Failed to read pid from %s' % pidfile |
499 | - raise SystemExit(1) |
500 | - |
501 | - if exists('/proc/%d' % pid): |
502 | - print "%s is running with pid %d" % (program, pid) |
503 | - raise SystemExit(0) |
504 | - else: |
505 | - print "%s is not running" % program |
506 | - raise SystemExit(0) |
507 | - |
508 | -if exists(pidfile): |
509 | - print "Pidfile %s already exists, is %s already running?" % (pidfile, program) |
510 | - raise SystemExit(1) |
511 | - |
512 | -# Quick validation |
513 | -if settings.RELAY_METHOD not in ('rules', 'consistent-hashing'): |
514 | - print "In carbon.conf, RELAY_METHOD must be either 'rules' or 'consistent-hashing'. Invalid value: '%s'" % settings.RELAY_METHOD |
515 | - sys.exit(1) |
516 | - |
517 | -# Import application components |
518 | -from carbon.log import logToStdout, logToDir, msg |
519 | -from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener |
520 | -from carbon.relay import createClientConnections, relay |
521 | -from carbon.events import metricReceived |
522 | -from carbon.instrumentation import startRecording |
523 | -from carbon.rules import loadRules, allDestinationServers, parseHostList |
524 | -from carbon.hashing import setDestinationHosts |
525 | - |
526 | -# --debug |
527 | -if options.debug: |
528 | - logToStdout() |
529 | -else: |
530 | - if not isdir(logdir): |
531 | - os.makedirs(logdir) |
532 | - |
533 | - from carbon.util import daemonize |
534 | - daemonize() |
535 | - logToDir(logdir) |
536 | - |
537 | - pidfile = open(pidfile, 'w') |
538 | - pidfile.write( str(os.getpid()) ) |
539 | - pidfile.close() |
540 | - |
541 | - def shutdown(): |
542 | - if os.path.exists(pidfile): |
543 | - os.unlink(pidfile) |
544 | - |
545 | - atexit.register(shutdown) |
546 | - |
547 | - |
548 | -# Configure application components |
549 | -metricReceived.installHandler(relay) |
550 | -startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver) |
551 | -startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver) |
552 | - |
553 | -if settings.RELAY_METHOD == 'rules': |
554 | - loadRules(options.rules) |
555 | - createClientConnections( allDestinationServers() ) |
556 | -elif settings.RELAY_METHOD == 'consistent-hashing': |
557 | - hosts = parseHostList(settings.CH_HOST_LIST) |
558 | - msg('consistent-hashing hosts = %s' % str(hosts)) |
559 | - setDestinationHosts(hosts) |
560 | - createClientConnections(hosts) |
561 | - |
562 | -startRecording() |
563 | - |
564 | - |
565 | -# Run the twisted reactor |
566 | -if options.profile: |
567 | - import cProfile |
568 | - |
569 | - if exists(options.profile): |
570 | - os.unlink(options.profile) |
571 | - |
572 | - cProfile.run('reactor.run()', options.profile) |
573 | - |
574 | -else: |
575 | - reactor.run() |
576 | +from carbon.util import run_twistd_plugin |
577 | + |
578 | +run_twistd_plugin(__file__) |
579 | |
580 | === modified file 'carbon/lib/carbon/amqp_listener.py' |
581 | --- carbon/lib/carbon/amqp_listener.py 2011-04-25 15:50:10 +0000 |
582 | +++ carbon/lib/carbon/amqp_listener.py 2011-07-12 05:59:28 +0000 |
583 | @@ -147,11 +147,12 @@ |
584 | p.factory = self |
585 | return p |
586 | |
587 | -def startReceiver(host, port, username, password, vhost, exchange_name, |
588 | - spec=None, channel=1, verbose=False): |
589 | - """Starts a twisted process that will read messages on the amqp broker |
590 | - and post them as metrics.""" |
591 | |
592 | +def createAMQPListener(username, password, vhost, exchange_name, |
593 | + spec=None, channel=1, verbose=False): |
594 | + """ |
595 | + Create an C{AMQPReconnectingFactory} configured with the specified options. |
596 | + """ |
597 | # use provided spec if not specified |
598 | if not spec: |
599 | spec = txamqp.spec.load(os.path.normpath( |
600 | @@ -161,6 +162,17 @@ |
601 | factory = AMQPReconnectingFactory(username, password, delegate, vhost, |
602 | spec, channel, exchange_name, |
603 | verbose=verbose) |
604 | + return factory |
605 | + |
606 | + |
607 | +def startReceiver(host, port, username, password, vhost, exchange_name, |
608 | + spec=None, channel=1, verbose=False): |
609 | + """ |
610 | + Starts a twisted process that will read messages on the amqp broker and |
611 | + post them as metrics. |
612 | + """ |
613 | + factory = createAMQPListener(username, password, vhost, exchange_name, |
614 | + spec=spec, channel=channel, verbose=verbose) |
615 | reactor.connectTCP(host, port, factory) |
616 | |
617 | |
618 | |
619 | === modified file 'carbon/lib/carbon/conf.py' |
620 | --- carbon/lib/carbon/conf.py 2011-07-10 22:10:06 +0000 |
621 | +++ carbon/lib/carbon/conf.py 2011-07-12 05:59:28 +0000 |
622 | @@ -13,10 +13,19 @@ |
623 | limitations under the License.""" |
624 | |
625 | import os |
626 | -from os.path import join, dirname, normpath |
627 | +import sys |
628 | +import pwd |
629 | + |
630 | +from os.path import join, dirname, normpath, abspath, basename, exists, isdir |
631 | from optparse import OptionParser |
632 | from ConfigParser import ConfigParser |
633 | |
634 | +import whisper |
635 | +from carbon import log |
636 | + |
637 | +from twisted.python import usage |
638 | +from twisted.scripts import twistd |
639 | + |
640 | |
641 | defaults = dict( |
642 | LOCAL_DATA_DIR="/opt/graphite/storage/whisper/", |
643 | @@ -47,6 +56,10 @@ |
644 | ) |
645 | |
646 | |
647 | +def _umask(value): |
648 | + return int(value, 8) |
649 | + |
650 | + |
651 | class OrderedConfigParser(ConfigParser): |
652 | """Hacky workaround to ensure sections are always returned in the order |
653 | they are defined in. Note that this does *not* make any guarantees about |
654 | @@ -119,6 +132,170 @@ |
655 | settings.update(defaults) |
656 | |
657 | |
658 | +class CarbonCacheOptions(usage.Options): |
659 | + |
660 | + optFlags = [ |
661 | + ["debug", "", "Run in debug mode."], |
662 | + ] |
663 | + |
664 | + optParameters = [ |
665 | + ["config", "c", None, "Use the given config file."], |
666 | + ["instance", "", None, "Manage a specific carbon instance."], |
667 | + ["logdir", "", None, "Write logs to the given directory."], |
668 | + ] |
669 | + |
670 | + def postOptions(self): |
671 | + global settings |
672 | + |
673 | + ROOT_DIR = os.getcwd() |
674 | + program = self.parent.subCommand |
675 | + |
676 | + # Use provided pidfile (if any) as default for configuration. If it's |
677 | + # set to 'twistd.pid', that means no value was provided and the default |
678 | + # was used. |
679 | + pidfile = self.parent["pidfile"] |
680 | + if pidfile.endswith("twistd.pid"): |
681 | + pidfile = None |
682 | + self["pidfile"] = pidfile |
683 | + |
684 | + # Enforce a default umask of '022' if none was set. |
685 | + if self.parent["umask"] is None: |
686 | + self.parent["umask"] = 022 |
687 | + |
688 | + # Read extra settings from the configuration file. |
689 | + program_settings = read_config(program, self, ROOT_DIR=ROOT_DIR) |
690 | + settings.update(program_settings) |
691 | + settings["program"] = program |
692 | + |
693 | + # Set process uid/gid by changing the parent config, if a user was |
694 | + # provided in the configuration file. |
695 | + if settings.USER: |
696 | + self.parent["uid"], self.parent["gid"] = ( |
697 | + pwd.getpwnam(settings.USER)[2:4]) |
698 | + |
699 | + # Set the pidfile in parent config to the value that was computed by |
700 | + # C{read_config}. |
701 | + self.parent["pidfile"] = settings["pidfile"] |
702 | + |
703 | + storage_schemas = join(settings["CONF_DIR"], "storage-schemas.conf") |
704 | + if not exists(storage_schemas): |
705 | + print "Error: missing required config %s" % storage_schemas |
706 | + sys.exit(1) |
707 | + |
708 | + if settings.WHISPER_AUTOFLUSH: |
709 | + log.msg("enabling whisper autoflush") |
710 | + whisper.AUTOFLUSH = True |
711 | + |
712 | + # If we are not running in debug mode or non-daemon mode, then log to a |
713 | + # directory, otherwise log output will go to stdout. |
714 | + if not (self["debug"] or self.parent["nodaemon"]): |
715 | + logdir = settings.LOG_DIR |
716 | + if not isdir(logdir): |
717 | + os.makedirs(logdir) |
718 | + log.logToDir(logdir) |
719 | + |
720 | + if not "action" in self: |
721 | + self["action"] = "start" |
722 | + self.handleAction() |
723 | + |
724 | + def parseArgs(self, action): |
725 | + """If an action was provided, store it for further processing.""" |
726 | + self["action"] = action |
727 | + |
728 | + def handleAction(self): |
729 | + """Handle extra argument for backwards-compatibility. |
730 | + |
731 | + * C{start} will simply do minimal pid checking and otherwise let twistd |
732 | + take over. |
733 | + * C{stop} will kill an existing running process if it matches the |
734 | + C{pidfile} contents. |
735 | + * C{status} will simply report if the process is up or not. |
736 | + """ |
737 | + action = self["action"] |
738 | + pidfile = self.parent["pidfile"] |
739 | + program = settings["program"] |
740 | + instance = self["instance"] |
741 | + |
742 | + if action == "stop": |
743 | + if not exists(pidfile): |
744 | + print "Pidfile %s does not exist" % pidfile |
745 | + raise SystemExit(0) |
746 | + pf = open(pidfile, 'r') |
747 | + try: |
748 | + pid = int(pf.read().strip()) |
749 | + except: |
750 | + print "Could not read pidfile %s" % pidfile |
751 | + raise SystemExit(1) |
752 | + print "Sending kill signal to pid %d" % pid |
753 | + os.kill(pid, 15) |
754 | + |
755 | + print "Deleting %s (contained pid %d)" % (pidfile, pid) |
756 | + os.unlink(pidfile) |
757 | + raise SystemExit(0) |
758 | + |
759 | + elif action == "status": |
760 | + if not exists(pidfile): |
761 | + print "%s (instance %s) is not running" % (program, instance) |
762 | + raise SystemExit(0) |
763 | + pf = open(pidfile, "r") |
764 | + try: |
765 | + pid = int(pf.read().strip()) |
766 | + except: |
767 | + print "Failed to read pid from %s" % pidfile |
768 | + raise SystemExit(1) |
769 | + |
770 | + if exists("/proc/%d" % pid): |
771 | + print ("%s (instance %s) is running with pid %d" % |
772 | + (program, instance, pid)) |
773 | + raise SystemExit(0) |
774 | + else: |
775 | + print "%s (instance %s) is not running" % (program, instance) |
776 | + raise SystemExit(0) |
777 | + elif action == "start": |
778 | + if exists(pidfile): |
779 | + print ("Pidfile %s already exists, is %s already running?" % |
780 | + (pidfile, program)) |
781 | + raise SystemExit(1) |
782 | + |
783 | + |
784 | +class CarbonAggregatorOptions(CarbonCacheOptions): |
785 | + |
786 | + optParameters = [ |
787 | + ["rules", "", None, "Use the given aggregation rules file."], |
788 | + ["rewrite-rules", "", None, "Use the given rewrite rules file."], |
789 | + ] + CarbonCacheOptions.optParameters |
790 | + |
791 | + def postOptions(self): |
792 | + CarbonCacheOptions.postOptions(self) |
793 | + if self["rules"] is None: |
794 | + self["rules"] = join(settings["CONF_DIR"], "aggregation-rules.conf") |
795 | + settings["aggregation-rules"] = self["rules"] |
796 | + |
797 | + if self["rewrite-rules"] is None: |
798 | + self["rewrite-rules"] = join(settings["CONF_DIR"], |
799 | + "rewrite-rules.conf") |
800 | + settings["rewrite-rules"] = self["rewrite-rules"] |
801 | + |
802 | + |
803 | +class CarbonRelayOptions(CarbonCacheOptions): |
804 | + |
805 | + optParameters = [ |
806 | + ["rules", "", None, "Use the given relay rules file."], |
807 | + ] + CarbonCacheOptions.optParameters |
808 | + |
809 | + def postOptions(self): |
810 | + CarbonCacheOptions.postOptions(self) |
811 | + if self["rules"] is None: |
812 | + self["rules"] = join(settings["CONF_DIR"], "relay-rules.conf") |
813 | + settings["relay-rules"] = self["rules"] |
814 | + |
815 | + if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing"): |
816 | + print ("In carbon.conf, RELAY_METHOD must be either 'rules' or " |
817 | + "'consistent-hashing'. Invalid value: '%s'" % |
818 | + settings.RELAY_METHOD) |
819 | + sys.exit(1) |
820 | + |
821 | + |
822 | def get_default_parser(usage="%prog [options] <start|stop|status>"): |
823 | """Create a parser for command line options.""" |
824 | parser = OptionParser(usage=usage) |
825 | @@ -147,6 +324,25 @@ |
826 | return parser |
827 | |
828 | |
829 | +def get_parser(name): |
830 | + parser = get_default_parser() |
831 | + if name == "carbon-aggregator": |
832 | + parser.add_option( |
833 | + "--rules", |
834 | + default=None, |
835 | + help="Use the given aggregation rules file.") |
836 | + parser.add_option( |
837 | + "--rewrite-rules", |
838 | + default=None, |
839 | + help="Use the given rewrite rules file.") |
840 | + elif name == "carbon-relay": |
841 | + parser.add_option( |
842 | + "--rules", |
843 | + default=None, |
844 | + help="Use the given relay rules file.") |
845 | + return parser |
846 | + |
847 | + |
848 | def parse_options(parser, args): |
849 | """ |
850 | Parse command line options and print usage message if no arguments were |
851 | @@ -168,7 +364,7 @@ |
852 | def read_config(program, options, **kwargs): |
853 | """ |
854 | Read settings for 'program' from configuration file specified by |
855 | - 'options.config', with missing values provided by 'defaults'. |
856 | + 'options["config"]', with missing values provided by 'defaults'. |
857 | """ |
858 | settings = Settings() |
859 | settings.update(defaults) |
860 | @@ -185,40 +381,44 @@ |
861 | # 'GRAPHITE_CONF_DIR' environment variable. |
862 | settings.setdefault("CONF_DIR", |
863 | os.environ.get("GRAPHITE_CONF_DIR", |
864 | - join(settings.ROOT_DIR, "conf"))) |
865 | - if options.config is None: |
866 | - options.config = join(settings.CONF_DIR, "carbon.conf") |
867 | + join(settings["ROOT_DIR"], "conf"))) |
868 | + if options["config"] is None: |
869 | + options["config"] = join(settings["CONF_DIR"], "carbon.conf") |
870 | else: |
871 | # Set 'CONF_DIR' to the parent directory of the 'carbon.conf' config |
872 | # file. |
873 | - settings["CONF_DIR"] = dirname(normpath(options.config)) |
874 | + settings["CONF_DIR"] = dirname(normpath(options["config"])) |
875 | |
876 | # Storage directory can be overriden by the 'GRAPHITE_STORAGE_DIR' |
877 | # environment variable. It defaults to a path relative to 'ROOT_DIR' for |
878 | # backwards compatibility though. |
879 | settings.setdefault("STORAGE_DIR", |
880 | os.environ.get("GRAPHITE_STORAGE_DIR", |
881 | - join(settings.ROOT_DIR, "storage"))) |
882 | - settings.setdefault("LOG_DIR", join(settings.STORAGE_DIR, "log", program)) |
883 | + join(settings["ROOT_DIR"], "storage"))) |
884 | + settings.setdefault( |
885 | + "LOG_DIR", join(settings["STORAGE_DIR"], "log", program)) |
886 | |
887 | # Read configuration options from program-specific section. |
888 | section = program[len("carbon-"):] |
889 | - settings.readFrom(options.config, section) |
890 | + settings.readFrom(options["config"], section) |
891 | |
892 | # If a specific instance of the program is specified, augment the settings |
893 | # with the instance-specific settings and provide sane defaults for |
894 | # optional settings. |
895 | - if options.instance: |
896 | - settings.readFrom(options.config, "%s:%s" % (section, options.instance)) |
897 | - settings["pidfile"] = (options.pidfile or |
898 | - join(settings.STORAGE_DIR, |
899 | - "%s-%s.pid" % (program, options.instance))) |
900 | - settings["LOG_DIR"] = (options.logdir or |
901 | - "%s-%s" % (settings.LOG_DIR.rstrip('/'), |
902 | - options.instance)) |
903 | + if options["instance"]: |
904 | + settings.readFrom(options["config"], |
905 | + "%s:%s" % (section, options["instance"])) |
906 | + settings["pidfile"] = ( |
907 | + options["pidfile"] or |
908 | + join(settings["STORAGE_DIR"], "%s-%s.pid" % |
909 | + (program, options["instance"]))) |
910 | + settings["LOG_DIR"] = (options["logdir"] or |
911 | + "%s-%s" % (settings["LOG_DIR"].rstrip('/'), |
912 | + options["instance"])) |
913 | else: |
914 | - settings["pidfile"] = (options.pidfile or |
915 | - join(settings.STORAGE_DIR, '%s.pid' % program)) |
916 | - settings["LOG_DIR"] = (options.logdir or settings.LOG_DIR) |
917 | + settings["pidfile"] = ( |
918 | + options["pidfile"] or |
919 | + join(settings["STORAGE_DIR"], '%s.pid' % program)) |
920 | + settings["LOG_DIR"] = (options["logdir"] or settings["LOG_DIR"]) |
921 | |
922 | return settings |
923 | |
924 | === modified file 'carbon/lib/carbon/events.py' |
925 | --- carbon/lib/carbon/events.py 2009-09-10 19:28:51 +0000 |
926 | +++ carbon/lib/carbon/events.py 2011-07-12 05:59:28 +0000 |
927 | @@ -2,6 +2,7 @@ |
928 | |
929 | |
930 | class EventHandler: |
931 | + |
932 | def __init__(self, defaultHandler=None): |
933 | self.handler = defaultHandler |
934 | |
935 | |
936 | === modified file 'carbon/lib/carbon/instrumentation.py' |
937 | --- carbon/lib/carbon/instrumentation.py 2011-07-10 20:46:16 +0000 |
938 | +++ carbon/lib/carbon/instrumentation.py 2011-07-12 05:59:28 +0000 |
939 | @@ -2,13 +2,14 @@ |
940 | import time |
941 | import socket |
942 | from resource import getrusage, RUSAGE_SELF |
943 | + |
944 | +from twisted.application.service import Service |
945 | from twisted.internet.task import LoopingCall |
946 | |
947 | |
948 | stats = {} |
949 | HOSTNAME = socket.gethostname().replace('.','_') |
950 | PAGESIZE = os.sysconf('SC_PAGESIZE') |
951 | -recordTask = None |
952 | rusage = getrusage(RUSAGE_SELF) |
953 | lastUsage = rusage.ru_utime + rusage.ru_stime |
954 | lastUsageTime = time.time() |
955 | @@ -56,8 +57,6 @@ |
956 | |
957 | def startRecording(): |
958 | global recordTask |
959 | - recordTask = LoopingCall(recordMetrics) |
960 | - recordTask.start(60, now=False) |
961 | |
962 | |
963 | def recordMetrics(): |
964 | @@ -140,6 +139,20 @@ |
965 | send_metric(fullMetric, datapoint) |
966 | |
967 | |
968 | +class InstrumentationService(Service): |
969 | + |
970 | + def __init__(self): |
971 | + self.record_task = LoopingCall(recordMetrics) |
972 | + |
973 | + def startService(self): |
974 | + self.record_task.start(60, False) |
975 | + Service.startService(self) |
976 | + |
977 | + def stopService(self): |
978 | + self.record_task.stop() |
979 | + Service.stopService(self) |
980 | + |
981 | + |
982 | # Avoid import circularity |
983 | from carbon.aggregator.buffers import BufferManager |
984 | from carbon.aggregator.client import send_metric |
985 | |
986 | === modified file 'carbon/lib/carbon/manhole.py' |
987 | --- carbon/lib/carbon/manhole.py 2011-04-02 00:44:19 +0000 |
988 | +++ carbon/lib/carbon/manhole.py 2011-07-12 05:59:28 +0000 |
989 | @@ -1,4 +1,4 @@ |
990 | -from twisted.cred import portal, checkers |
991 | +from twisted.cred import portal |
992 | from twisted.conch.ssh import keys |
993 | from twisted.conch.checkers import SSHPublicKeyDatabase |
994 | from twisted.conch.manhole import Manhole |
995 | @@ -21,8 +21,7 @@ |
996 | keyBlob = self.userKeys[credentials.username] |
997 | return keyBlob == credentials.blob |
998 | |
999 | - |
1000 | -def start(): |
1001 | +def createManholeListener(): |
1002 | sshRealm = TerminalRealm() |
1003 | sshRealm.chainedProtocolFactory.protocolFactory = lambda _: Manhole(namespace) |
1004 | |
1005 | @@ -37,4 +36,8 @@ |
1006 | sshPortal = portal.Portal(sshRealm) |
1007 | sshPortal.registerChecker(credChecker) |
1008 | sessionFactory = ConchFactory(sshPortal) |
1009 | - reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE) |
1010 | + return sessionFactory |
1011 | + |
1012 | +def start(): |
1013 | + sessionFactory = createManholeListener() |
1014 | + reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE) |
1015 | |
1016 | === modified file 'carbon/lib/carbon/rules.py' |
1017 | --- carbon/lib/carbon/rules.py 2011-04-05 20:30:50 +0000 |
1018 | +++ carbon/lib/carbon/rules.py 2011-07-12 05:59:28 +0000 |
1019 | @@ -21,13 +21,16 @@ |
1020 | for host_string in host_list: |
1021 | parts = host_string.strip().split(':') |
1022 | server = parts[0] |
1023 | - port = int( parts[1] ) |
1024 | + if len(parts) > 1: |
1025 | + port = int(parts[1]) |
1026 | + else: |
1027 | + port = DEFAULT_CARBON_PORT |
1028 | if len(parts) > 2: |
1029 | instance = parts[2] |
1030 | else: |
1031 | instance = None |
1032 | |
1033 | - hosts.append( (server, port, instance) ) |
1034 | + hosts.append((server, port, instance)) |
1035 | |
1036 | return hosts |
1037 | |
1038 | |
1039 | === added file 'carbon/lib/carbon/service.py' |
1040 | --- carbon/lib/carbon/service.py 1970-01-01 00:00:00 +0000 |
1041 | +++ carbon/lib/carbon/service.py 2011-07-12 05:59:28 +0000 |
1042 | @@ -0,0 +1,157 @@ |
1043 | +#!/usr/bin/env python |
1044 | +"""Copyright 2009 Chris Davis |
1045 | + |
1046 | +Licensed under the Apache License, Version 2.0 (the "License"); |
1047 | +you may not use this file except in compliance with the License. |
1048 | +You may obtain a copy of the License at |
1049 | + |
1050 | + http://www.apache.org/licenses/LICENSE-2.0 |
1051 | + |
1052 | +Unless required by applicable law or agreed to in writing, software |
1053 | +distributed under the License is distributed on an "AS IS" BASIS, |
1054 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
1055 | +See the License for the specific language governing permissions and |
1056 | +limitations under the License.""" |
1057 | + |
1058 | +from os.path import exists |
1059 | + |
1060 | +from twisted.application.service import MultiService |
1061 | +from twisted.application.internet import TCPServer, TCPClient |
1062 | +from twisted.internet.protocol import ServerFactory |
1063 | + |
1064 | + |
1065 | +def createBaseService(config): |
1066 | + from carbon.conf import settings |
1067 | + from carbon.listeners import MetricLineReceiver, MetricPickleReceiver |
1068 | + |
1069 | + root_service = MultiService() |
1070 | + root_service.setName(settings.program) |
1071 | + |
1072 | + use_amqp = settings.get("ENABLE_AMQP", False) |
1073 | + if use_amqp: |
1074 | + from carbon import amqp_listener |
1075 | + |
1076 | + amqp_host = settings.get("AMQP_HOST", "localhost") |
1077 | + amqp_port = settings.get("AMQP_PORT", 5672) |
1078 | + amqp_user = settings.get("AMQP_USER", "guest") |
1079 | + amqp_password = settings.get("AMQP_PASSWORD", "guest") |
1080 | + amqp_verbose = settings.get("AMQP_VERBOSE", False) |
1081 | + amqp_vhost = settings.get("AMQP_VHOST", "/") |
1082 | + amqp_spec = settings.get("AMQP_SPEC", None) |
1083 | + amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite") |
1084 | + |
1085 | + |
1086 | + for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE, |
1087 | + settings.LINE_RECEIVER_PORT, |
1088 | + MetricLineReceiver), |
1089 | + (settings.PICKLE_RECEIVER_INTERFACE, |
1090 | + settings.PICKLE_RECEIVER_PORT, |
1091 | + MetricPickleReceiver)): |
1092 | + factory = ServerFactory() |
1093 | + factory.protocol = protocol |
1094 | + service = TCPServer(int(port), factory, interface=interface) |
1095 | + service.setServiceParent(root_service) |
1096 | + |
1097 | + if use_amqp: |
1098 | + factory = amqp_listener.createAMQPListener( |
1099 | + amqp_user, amqp_password, |
1100 | + vhost=amqp_vhost, spec=amqp_spec, |
1101 | + exchange_name=amqp_exchange_name, |
1102 | + verbose=amqp_verbose) |
1103 | + service = TCPClient(amqp_host, int(amqp_port), factory) |
1104 | + service.setServiceParent(root_service) |
1105 | + |
1106 | + if settings.ENABLE_MANHOLE: |
1107 | + from carbon import manhole |
1108 | + |
1109 | + factory = manhole.createManholeListener() |
1110 | + service = TCPServer(int(settings.MANHOLE_PORT), factory, |
1111 | + interface=settings.MANHOLE_INTERFACE) |
1112 | + service.setServiceParent(root_service) |
1113 | + |
1114 | + # have to import this *after* settings are defined |
1115 | + from carbon.writer import WriterService |
1116 | + |
1117 | + service = WriterService() |
1118 | + service.setServiceParent(root_service) |
1119 | + |
1120 | + # Instantiate an instrumentation service that will record metrics about |
1121 | + # this service. |
1122 | + from carbon.instrumentation import InstrumentationService |
1123 | + |
1124 | + service = InstrumentationService() |
1125 | + service.setServiceParent(root_service) |
1126 | + |
1127 | + return root_service |
1128 | + |
1129 | + |
1130 | +def createCacheService(config): |
1131 | + from carbon.cache import MetricCache |
1132 | + from carbon.conf import settings |
1133 | + from carbon.events import metricReceived |
1134 | + from carbon.listeners import CacheQueryHandler |
1135 | + |
1136 | + # Configure application components |
1137 | + metricReceived.installHandler(MetricCache.store) |
1138 | + |
1139 | + root_service = createBaseService(config) |
1140 | + factory = ServerFactory() |
1141 | + factory.protocol = CacheQueryHandler |
1142 | + service = TCPServer(int(settings.CACHE_QUERY_PORT), factory, |
1143 | + interface=settings.CACHE_QUERY_INTERFACE) |
1144 | + service.setServiceParent(root_service) |
1145 | + |
1146 | + # have to import this *after* settings are defined |
1147 | + from carbon.writer import WriterService |
1148 | + |
1149 | + service = WriterService() |
1150 | + service.setServiceParent(root_service) |
1151 | + |
1152 | + return root_service |
1153 | + |
1154 | + |
1155 | +def createAggregatorService(config): |
1156 | + from carbon.events import metricReceived |
1157 | + from carbon.aggregator import receiver |
1158 | + from carbon.aggregator.rules import RuleManager |
1159 | + from carbon.aggregator import client |
1160 | + from carbon.rewrite import RewriteRuleManager |
1161 | + from carbon.conf import settings |
1162 | + |
1163 | + root_service = createBaseService(config) |
1164 | + |
1165 | + # Configure application components |
1166 | + metricReceived.installHandler(receiver.process) |
1167 | + RuleManager.read_from(settings["aggregation-rules"]) |
1168 | + if exists(settings["rewrite-rules"]): |
1169 | + RewriteRuleManager.read_from(settings["rewrite-rules"]) |
1170 | + |
1171 | + client.connect(settings["DESTINATION_HOST"], |
1172 | + int(settings["DESTINATION_PORT"])) |
1173 | + |
1174 | + return root_service |
1175 | + |
1176 | + |
1177 | +def createRelayService(config): |
1178 | + from carbon.log import msg |
1179 | + from carbon.conf import settings |
1180 | + from carbon.events import metricReceived |
1181 | + from carbon.hashing import setDestinationHosts |
1182 | + from carbon.relay import createClientConnections, relay |
1183 | + from carbon.rules import loadRules, allDestinationServers, parseHostList |
1184 | + |
1185 | + root_service = createBaseService(config) |
1186 | + |
1187 | + # Configure application components |
1188 | + metricReceived.installHandler(relay) |
1189 | + |
1190 | + if settings["RELAY_METHOD"] == "rules": |
1191 | + loadRules(settings["relay-rules"]) |
1192 | + createClientConnections(allDestinationServers()) |
1193 | + elif settings["RELAY_METHOD"] == "consistent-hashing": |
1194 | + hosts = parseHostList(settings["CH_HOST_LIST"]) |
1195 | + msg('consistent-hashing hosts = %s' % str(hosts)) |
1196 | + setDestinationHosts(hosts) |
1197 | + createClientConnections(hosts) |
1198 | + |
1199 | + return root_service |
1200 | |
1201 | === modified file 'carbon/lib/carbon/util.py' |
1202 | --- carbon/lib/carbon/util.py 2009-12-13 01:35:28 +0000 |
1203 | +++ carbon/lib/carbon/util.py 2011-07-12 05:59:28 +0000 |
1204 | @@ -1,22 +1,64 @@ |
1205 | -import sys |
1206 | import os |
1207 | import pwd |
1208 | |
1209 | - |
1210 | -def daemonize(): |
1211 | - if os.fork() > 0: sys.exit(0) |
1212 | - os.setsid() |
1213 | - if os.fork() > 0: sys.exit(0) |
1214 | - si = open('/dev/null', 'r') |
1215 | - so = open('/dev/null', 'a+') |
1216 | - se = open('/dev/null', 'a+', 0) |
1217 | - os.dup2(si.fileno(), sys.stdin.fileno()) |
1218 | - os.dup2(so.fileno(), sys.stdout.fileno()) |
1219 | - os.dup2(se.fileno(), sys.stderr.fileno()) |
1220 | +from os.path import basename |
1221 | + |
1222 | +from twisted.python.util import initgroups |
1223 | +from twisted.scripts.twistd import runApp |
1224 | +from twisted.scripts._twistd_unix import daemonize |
1225 | + |
1226 | + |
1227 | +daemonize = daemonize # Backwards compatibility |
1228 | |
1229 | |
1230 | def dropprivs(user): |
1231 | - uid,gid = pwd.getpwnam(user)[2:4] |
1232 | - os.setregid(gid,gid) |
1233 | - os.setreuid(uid,uid) |
1234 | - return (uid,gid) |
1235 | + uid, gid = pwd.getpwnam(user)[2:4] |
1236 | + initgroups(uid, gid) |
1237 | + os.setregid(gid, gid) |
1238 | + os.setreuid(uid, uid) |
1239 | + return (uid, gid) |
1240 | + |
1241 | + |
1242 | +def run_twistd_plugin(filename): |
1243 | + from carbon.conf import get_parser |
1244 | + from twisted.scripts.twistd import ServerOptions |
1245 | + |
1246 | + program = basename(filename).split('.')[0] |
1247 | + |
1248 | + # First, parse command line options as the legacy carbon scripts used to |
1249 | + # do. |
1250 | + parser = get_parser(program) |
1251 | + (options, args) = parser.parse_args() |
1252 | + |
1253 | + # This isn't as evil as you might think |
1254 | + __builtins__["instance"] = options.instance |
1255 | + __builtins__["program"] = program |
1256 | + |
1257 | + # Then forward applicable options to either twistd or to the plugin itself. |
1258 | + twistd_options = [] |
1259 | + if options.debug: |
1260 | + twistd_options.extend(["-n", "--logfile", "-"]) |
1261 | + if options.profile: |
1262 | + twistd_options.append("--profile") |
1263 | + if options.pidfile: |
1264 | + twistd_options.extend(["--pidfile", options.pidfile]) |
1265 | + |
1266 | + # Now for the plugin-specific options. |
1267 | + twistd_options.append(program) |
1268 | + |
1269 | + if options.debug: |
1270 | + twistd_options.append("--debug") |
1271 | + |
1272 | + for option_name, option_value in vars(options).items(): |
1273 | + if (option_value is not None and |
1274 | + option_name not in ("debug", "profile", "pidfile")): |
1275 | + twistd_options.extend(["--%s" % option_name.replace("_", "-"), |
1276 | + option_value]) |
1277 | + |
1278 | + # Finally, append extra args so that twistd has a chance to process them. |
1279 | + twistd_options.extend(args) |
1280 | + |
1281 | + config = ServerOptions() |
1282 | + config.parseOptions(twistd_options) |
1283 | + |
1284 | + runApp(config) |
1285 | |
1286 | === modified file 'carbon/lib/carbon/writer.py' |
1287 | --- carbon/lib/carbon/writer.py 2011-04-02 19:54:28 +0000 |
1288 | +++ carbon/lib/carbon/writer.py 2011-07-12 05:59:28 +0000 |
1289 | @@ -16,26 +16,29 @@ |
1290 | import os |
1291 | import time |
1292 | from os.path import join, exists, dirname, basename |
1293 | -from threading import Thread |
1294 | -from twisted.internet import reactor |
1295 | -from twisted.internet.task import LoopingCall |
1296 | + |
1297 | +try: |
1298 | + import cPickle as pickle |
1299 | +except ImportError: |
1300 | + import pickle |
1301 | + |
1302 | import whisper |
1303 | + |
1304 | from carbon.cache import MetricCache |
1305 | from carbon.storage import getFilesystemPath, loadStorageSchemas |
1306 | from carbon.conf import settings |
1307 | from carbon.instrumentation import increment, append |
1308 | from carbon import log |
1309 | -try: |
1310 | - import cPickle as pickle |
1311 | -except ImportError: |
1312 | - import pickle |
1313 | - |
1314 | -if settings.WHISPER_AUTOFLUSH: |
1315 | - log.msg("enabling whisper autoflush") |
1316 | - whisper.AUTOFLUSH = True |
1317 | + |
1318 | +from twisted.internet import reactor |
1319 | +from twisted.internet.task import LoopingCall |
1320 | +from twisted.application.service import Service |
1321 | + |
1322 | |
1323 | lastCreateInterval = 0 |
1324 | createCount = 0 |
1325 | +schemas = loadStorageSchemas() |
1326 | + |
1327 | |
1328 | def optimalWriteOrder(): |
1329 | "Generates metrics with the most cached values first and applies a soft rate limit on new metrics" |
1330 | @@ -177,10 +180,16 @@ |
1331 | log.err() |
1332 | |
1333 | |
1334 | -schemaReloadTask = LoopingCall(reloadStorageSchemas) |
1335 | -schemas = loadStorageSchemas() |
1336 | - |
1337 | - |
1338 | -def startWriter(): |
1339 | - schemaReloadTask.start(60) |
1340 | - reactor.callInThread(writeForever) |
1341 | +class WriterService(Service): |
1342 | + |
1343 | + def __init__(self): |
1344 | + self.reload_task = LoopingCall(reloadStorageSchemas) |
1345 | + |
1346 | + def startService(self): |
1347 | + self.reload_task.start(60, False) |
1348 | + reactor.callInThread(writeForever) |
1349 | + Service.startService(self) |
1350 | + |
1351 | + def stopService(self): |
1352 | + self.reload_task.stop() |
1353 | + Service.stopService(self) |
1354 | |
1355 | === added directory 'carbon/lib/twisted' |
1356 | === added directory 'carbon/lib/twisted/plugins' |
1357 | === added file 'carbon/lib/twisted/plugins/carbon_aggregator_plugin.py' |
1358 | --- carbon/lib/twisted/plugins/carbon_aggregator_plugin.py 1970-01-01 00:00:00 +0000 |
1359 | +++ carbon/lib/twisted/plugins/carbon_aggregator_plugin.py 2011-07-12 05:59:28 +0000 |
1360 | @@ -0,0 +1,25 @@ |
1361 | +from zope.interface import implements |
1362 | + |
1363 | +from twisted.plugin import IPlugin |
1364 | +from twisted.application.service import IServiceMaker |
1365 | + |
1366 | +from carbon import service |
1367 | +from carbon import conf |
1368 | + |
1369 | + |
1370 | +class CarbonAggregatorServiceMaker(object): |
1371 | + |
1372 | + implements(IServiceMaker, IPlugin) |
1373 | + tapname = "carbon-aggregator" |
1374 | + description = "Aggregate stats for graphite." |
1375 | + options = conf.CarbonAggregatorOptions |
1376 | + |
1377 | + def makeService(self, options): |
1378 | + """ |
1379 | + Construct a C{carbon-aggregator} service. |
1380 | + """ |
1381 | + return service.createAggregatorService(options) |
1382 | + |
1383 | + |
1384 | +# Now construct an object which *provides* the relevant interfaces |
1385 | +serviceMaker = CarbonAggregatorServiceMaker() |
1386 | |
1387 | === added file 'carbon/lib/twisted/plugins/carbon_cache_plugin.py' |
1388 | --- carbon/lib/twisted/plugins/carbon_cache_plugin.py 1970-01-01 00:00:00 +0000 |
1389 | +++ carbon/lib/twisted/plugins/carbon_cache_plugin.py 2011-07-12 05:59:28 +0000 |
1390 | @@ -0,0 +1,25 @@ |
1391 | +from zope.interface import implements |
1392 | + |
1393 | +from twisted.plugin import IPlugin |
1394 | +from twisted.application.service import IServiceMaker |
1395 | + |
1396 | +from carbon import service |
1397 | +from carbon import conf |
1398 | + |
1399 | + |
1400 | +class CarbonCacheServiceMaker(object): |
1401 | + |
1402 | + implements(IServiceMaker, IPlugin) |
1403 | + tapname = "carbon-cache" |
1404 | + description = "Collect stats for graphite." |
1405 | + options = conf.CarbonCacheOptions |
1406 | + |
1407 | + def makeService(self, options): |
1408 | + """ |
1409 | + Construct a C{carbon-cache} service. |
1410 | + """ |
1411 | + return service.createCacheService(options) |
1412 | + |
1413 | + |
1414 | +# Now construct an object which *provides* the relevant interfaces |
1415 | +serviceMaker = CarbonCacheServiceMaker() |
1416 | |
1417 | === added file 'carbon/lib/twisted/plugins/carbon_relay_plugin.py' |
1418 | --- carbon/lib/twisted/plugins/carbon_relay_plugin.py 1970-01-01 00:00:00 +0000 |
1419 | +++ carbon/lib/twisted/plugins/carbon_relay_plugin.py 2011-07-12 05:59:28 +0000 |
1420 | @@ -0,0 +1,25 @@ |
1421 | +from zope.interface import implements |
1422 | + |
1423 | +from twisted.plugin import IPlugin |
1424 | +from twisted.application.service import IServiceMaker |
1425 | + |
1426 | +from carbon import service |
1427 | +from carbon import conf |
1428 | + |
1429 | + |
1430 | +class CarbonRelayServiceMaker(object): |
1431 | + |
1432 | + implements(IServiceMaker, IPlugin) |
1433 | + tapname = "carbon-relay" |
1434 | + description = "Relay stats for graphite." |
1435 | + options = conf.CarbonRelayOptions |
1436 | + |
1437 | + def makeService(self, options): |
1438 | + """ |
1439 | + Construct a C{carbon-relay} service. |
1440 | + """ |
1441 | + return service.createRelayService(options) |
1442 | + |
1443 | + |
1444 | +# Now construct an object which *provides* the relevant interfaces |
1445 | +serviceMaker = CarbonRelayServiceMaker() |
Sidnei, I think leveraging twistd is a great idea and your implementation looks nice and clean. The only issue I have is that the carbon-*.py scripts have always had a simple 'start|stop|status' interface that makes managing the daemons very easy. Of course twistd can be started/ stopped/ checked, but it is much less user friendly.
I think a simple solution would be to to take the carbon-*.py scripts you've written and just rename them to be actual .tac files, then the carbon-*.py scripts can all be a single simple wrapper script that provides the traditional carbon CLI and invokes twistd appropriately for you. This way anyone who wants to use twistd's advanced features is free to do so with the tac files, while everyone who is happy with the current CLI is not forced to change.
Let me know what you think, and thanks again for these awesome contributions!