Merge lp:~sidnei/graphite/twistd-plugins into lp:~graphite-dev/graphite/main

Proposed by Sidnei da Silva
Status: Merged
Merged at revision: 420
Proposed branch: lp:~sidnei/graphite/twistd-plugins
Merge into: lp:~graphite-dev/graphite/main
Prerequisite: lp:~sidnei/graphite/hardcoded-conf-dir
Diff against target: 1445 lines (+621/-567)
15 files modified
carbon/bin/carbon-aggregator.py (+23/-156)
carbon/bin/carbon-cache.py (+8/-183)
carbon/bin/carbon-relay.py (+8/-161)
carbon/lib/carbon/amqp_listener.py (+16/-4)
carbon/lib/carbon/conf.py (+220/-20)
carbon/lib/carbon/events.py (+1/-0)
carbon/lib/carbon/instrumentation.py (+16/-3)
carbon/lib/carbon/manhole.py (+7/-4)
carbon/lib/carbon/rules.py (+5/-2)
carbon/lib/carbon/service.py (+157/-0)
carbon/lib/carbon/util.py (+58/-16)
carbon/lib/carbon/writer.py (+27/-18)
carbon/lib/twisted/plugins/carbon_aggregator_plugin.py (+25/-0)
carbon/lib/twisted/plugins/carbon_cache_plugin.py (+25/-0)
carbon/lib/twisted/plugins/carbon_relay_plugin.py (+25/-0)
To merge this branch: bzr merge lp:~sidnei/graphite/twistd-plugins
Reviewer Review Type Date Requested Status
chrismd Needs Fixing
Review via email: mp+67391@code.launchpad.net

Description of the change

Refactor carbon startup scripts to use twistd, register each service as a separate twistd plugin.

To post a comment you must log in.
lp:~sidnei/graphite/twistd-plugins updated
405. By Sidnei da Silva

- Make it work when running with twistd [plugin] as well.

406. By Sidnei da Silva

- Too much copy and paste

407. By Sidnei da Silva

- Move more config stuff to conf module.

Revision history for this message
chrismd (chrismd) wrote :

Sidnei, I think leveraging twistd is a great idea and your implementation looks nice and clean. The only issue I have is that the carbon-*.py scripts have always had a simple 'start|stop|status' interface that makes managing the daemons very easy. Of course twistd can be started/stopped/checked, but it is much less user friendly.

I think a simple solution would be to to take the carbon-*.py scripts you've written and just rename them to be actual .tac files, then the carbon-*.py scripts can all be a single simple wrapper script that provides the traditional carbon CLI and invokes twistd appropriately for you. This way anyone who wants to use twistd's advanced features is free to do so with the tac files, while everyone who is happy with the current CLI is not forced to change.

Let me know what you think, and thanks again for these awesome contributions!

review: Needs Fixing
lp:~sidnei/graphite/twistd-plugins updated
408. By Sidnei da Silva

- Make the legacy scripts just invoke twistd with the right options such that the plugins are called instead.

409. By Sidnei da Silva

- Handle extra args

Revision history for this message
Sidnei da Silva (sidnei) wrote :

Added support for the legacy arguments, so that the scripts work just as well as before. Very happy with how it turned out!

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'carbon/bin/carbon-aggregator.py'
2--- carbon/bin/carbon-aggregator.py 2011-06-16 14:55:16 +0000
3+++ carbon/bin/carbon-aggregator.py 2011-07-12 05:59:28 +0000
4@@ -1,163 +1,30 @@
5 #!/usr/bin/env python
6+"""Copyright 2009 Chris Davis
7+
8+Licensed under the Apache License, Version 2.0 (the "License");
9+you may not use this file except in compliance with the License.
10+You may obtain a copy of the License at
11+
12+ http://www.apache.org/licenses/LICENSE-2.0
13+
14+Unless required by applicable law or agreed to in writing, software
15+distributed under the License is distributed on an "AS IS" BASIS,
16+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+See the License for the specific language governing permissions and
18+limitations under the License."""
19+
20 import sys
21-import os
22-import atexit
23-from os.path import basename, dirname, exists, join, isdir
24-
25-
26-program = basename( sys.argv[0] ).split('.')[0]
27-
28-# Initialize twisted
29-try:
30- from twisted.internet import epollreactor
31- epollreactor.install()
32-except:
33- pass
34-from twisted.internet import reactor
35-
36+from os.path import dirname, join, abspath
37
38 # Figure out where we're installed
39-BIN_DIR = dirname( os.path.abspath(__file__) )
40+BIN_DIR = dirname(abspath(__file__))
41 ROOT_DIR = dirname(BIN_DIR)
42+
43+# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
44+# source.
45 LIB_DIR = join(ROOT_DIR, 'lib')
46-
47 sys.path.insert(0, LIB_DIR)
48-os.environ['GRAPHITE_ROOT'] = ROOT_DIR
49-
50-# Capture useful debug info for this commonly reported problem
51-try:
52- import carbon
53-except ImportError:
54- print 'Failed to import carbon, debug information follows.'
55- print 'pwd=%s' % os.getcwd()
56- print 'sys.path=%s' % sys.path
57- print '__file__=%s' % __file__
58- sys.exit(1)
59-
60-
61-# Read config (we want failures to occur before daemonizing)
62-from carbon.conf import (get_default_parser, parse_options,
63- read_config, settings as global_settings)
64-
65-
66-parser = get_default_parser()
67-parser.add_option(
68- '--rules',
69- default=None,
70- help='Use the give aggregation rules file')
71-
72-(options, args) = parse_options(parser, sys.argv[1:])
73-settings = read_config(program, options, ROOT_DIR=ROOT_DIR)
74-global_settings.update(settings)
75-
76-if options.rules is None:
77- options.rules = join(settings.CONF_DIR, "aggregation-rules.conf")
78-
79-pidfile = settings.pidfile
80-logdir = settings.LOG_DIR
81-
82-__builtins__.program = program
83-action = args[0]
84-
85-
86-if action == 'stop':
87- if not exists(pidfile):
88- print 'Pidfile %s does not exist' % pidfile
89- raise SystemExit(0)
90-
91- pf = open(pidfile, 'r')
92- try:
93- pid = int( pidfile.read().strip() )
94- except:
95- print 'Could not read pidfile %s' % pidfile
96- raise SystemExit(1)
97-
98- print 'Deleting %s (contained pid %d)' % (pidfile, pid)
99- os.unlink(pidfile)
100-
101- print 'Sending kill signal to pid %d' % pid
102- os.kill(pid, 15)
103- raise SystemExit(0)
104-
105-
106-elif action == 'status':
107- if not exists(pidfile):
108- print '%s is not running' % program
109- raise SystemExit(0)
110-
111- pf = open(pidfile, 'r')
112- try:
113- pid = int( pidfile.read().strip() )
114- except:
115- print 'Failed to read pid from %s' % pidfile
116- raise SystemExit(1)
117-
118- if exists('/proc/%d' % pid):
119- print "%s is running with pid %d" % (program, pid)
120- raise SystemExit(0)
121- else:
122- print "%s is not running" % program
123- raise SystemExit(0)
124-
125-# Import application components
126-from carbon.log import logToStdout, logToDir
127-from carbon.instrumentation import startRecording
128-from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener
129-from carbon.aggregator.rules import RuleManager
130-from carbon.aggregator import receiver
131-from carbon.aggregator import client
132-from carbon.rewrite import RewriteRuleManager
133-from carbon.events import metricReceived
134-from carbon.util import daemonize
135-
136-RuleManager.read_from(options.rules)
137-
138-rewrite_rules_conf = join(settings.CONF_DIR, 'rewrite-rules.conf')
139-if exists(rewrite_rules_conf):
140- RewriteRuleManager.read_from(rewrite_rules_conf)
141-
142-# --debug
143-if options.debug:
144- logToStdout()
145-
146-else:
147- if not isdir(logdir):
148- os.makedirs(logdir)
149-
150- daemonize()
151-
152- pf = open(pidfile, 'w')
153- pf.write( str(os.getpid()) )
154- pf.close()
155-
156- def shutdown():
157- if os.path.exists(pidfile):
158- os.unlink(pidfile)
159-
160- atexit.register(shutdown)
161-
162- logToDir(logdir)
163-
164-
165-# Configure application components
166-metricReceived.installHandler(receiver.process)
167-startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
168-startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
169-
170-client.connect(settings.DESTINATION_HOST, settings.DESTINATION_PORT)
171-startRecording()
172-
173-
174-# Run the twisted reactor
175-print "%s running with pid %d" % (program, os.getpid())
176-
177-if options.profile:
178- import cProfile
179-
180- if exists(options.profile):
181- os.unlink(options.profile)
182-
183- cProfile.run('reactor.run()', options.profile)
184-
185-else:
186- reactor.run()
187+
188+from carbon.util import run_twistd_plugin
189+
190+run_twistd_plugin(__file__)
191
192=== modified file 'carbon/bin/carbon-cache.py'
193--- carbon/bin/carbon-cache.py 2011-06-16 14:55:16 +0000
194+++ carbon/bin/carbon-cache.py 2011-07-12 05:59:28 +0000
195@@ -14,192 +14,17 @@
196 limitations under the License."""
197
198 import sys
199-import os
200-import socket
201-import pwd
202-import atexit
203-from os.path import basename, dirname, exists, join, isdir
204-
205-program = basename( sys.argv[0] ).split('.')[0]
206-hostname = socket.gethostname().split('.')[0]
207-os.umask(022)
208-
209-# Initialize twisted
210-try:
211- from twisted.internet import epollreactor
212- epollreactor.install()
213-except:
214- pass
215-from twisted.internet import reactor
216-
217+from os.path import dirname, join, abspath
218
219 # Figure out where we're installed
220-BIN_DIR = dirname( os.path.abspath(__file__) )
221+BIN_DIR = dirname(abspath(__file__))
222 ROOT_DIR = dirname(BIN_DIR)
223+
224+# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
225+# source.
226 LIB_DIR = join(ROOT_DIR, 'lib')
227 sys.path.insert(0, LIB_DIR)
228
229-
230-# Capture useful debug info for this commonly reported problem
231-try:
232- import carbon
233-except ImportError:
234- print 'Failed to import carbon, debug information follows.'
235- print 'pwd=%s' % os.getcwd()
236- print 'sys.path=%s' % sys.path
237- print '__file__=%s' % __file__
238- sys.exit(1)
239-
240-
241-# Read config (we want failures to occur before daemonizing)
242-from carbon.conf import (get_default_parser, parse_options,
243- read_config, settings as global_settings)
244-
245-
246-(options, args) = parse_options(get_default_parser(), sys.argv[1:])
247-settings = read_config(program, options, ROOT_DIR=ROOT_DIR)
248-global_settings.update(settings)
249-
250-instance = options.instance
251-pidfile = settings.pidfile
252-logdir = settings.LOG_DIR
253-
254-
255-__builtins__.instance = instance # This isn't as evil as you might think
256-__builtins__.program = program
257-action = args[0]
258-
259-
260-if action == 'stop':
261- if not exists(pidfile):
262- print 'Pidfile %s does not exist' % pidfile
263- raise SystemExit(0)
264-
265- pf = open(pidfile, 'r')
266- try:
267- pid = int( pf.read().strip() )
268- except:
269- print 'Could not read pidfile %s' % pidfile
270- raise SystemExit(1)
271-
272- print 'Deleting %s (contained pid %d)' % (pidfile, pid)
273- os.unlink(pidfile)
274-
275- print 'Sending kill signal to pid %d' % pid
276- os.kill(pid, 15)
277- raise SystemExit(0)
278-
279-
280-elif action == 'status':
281- if not exists(pidfile):
282- print '%s (instance %s) is not running' % (program, instance)
283- raise SystemExit(0)
284-
285- pf = open(pidfile, 'r')
286- try:
287- pid = int( pf.read().strip() )
288- except:
289- print 'Failed to read pid from %s' % pidfile
290- raise SystemExit(1)
291-
292- if exists('/proc/%d' % pid):
293- print "%s (instance %s) is running with pid %d" % (program, instance, pid)
294- raise SystemExit(0)
295- else:
296- print "%s (instance %s) is not running" % (program, instance)
297- raise SystemExit(0)
298-
299-if exists(pidfile):
300- print "Pidfile %s already exists, is %s already running?" % (pidfile, program)
301- raise SystemExit(1)
302-
303-# Import application components
304-from carbon.log import logToStdout, logToDir
305-from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, CacheQueryHandler, startListener
306-from carbon.cache import MetricCache
307-from carbon.instrumentation import startRecording
308-from carbon.events import metricReceived
309-
310-storage_schemas = join(settings.CONF_DIR, 'storage-schemas.conf')
311-if not exists(storage_schemas):
312- print "Error: missing required config %s" % storage_schemas
313- sys.exit(1)
314-
315-use_amqp = settings.get("ENABLE_AMQP", False)
316-if use_amqp:
317- from carbon import amqp_listener
318- amqp_host = settings.get("AMQP_HOST", "localhost")
319- amqp_port = settings.get("AMQP_PORT", 5672)
320- amqp_user = settings.get("AMQP_USER", "guest")
321- amqp_password = settings.get("AMQP_PASSWORD", "guest")
322- amqp_verbose = settings.get("AMQP_VERBOSE", False)
323- amqp_vhost = settings.get("AMQP_VHOST", "/")
324- amqp_spec = settings.get("AMQP_SPEC", None)
325- amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
326-
327-
328-# --debug
329-if options.debug:
330- logToStdout()
331-
332-else:
333- if not isdir(logdir):
334- os.makedirs(logdir)
335-
336- if settings.USER:
337- print "Dropping privileges to become the user %s" % settings.USER
338-
339- from carbon.util import daemonize, dropprivs
340- daemonize()
341-
342- pf = open(pidfile, 'w')
343- pf.write( str(os.getpid()) )
344- pf.close()
345-
346- def shutdown():
347- if os.path.exists(pidfile):
348- os.unlink(pidfile)
349-
350- atexit.register(shutdown)
351-
352- if settings.USER:
353- pwent = pwd.getpwnam(settings.USER)
354- os.chown(pidfile, pwent.pw_uid, pwent.pw_gid)
355- dropprivs(settings.USER)
356-
357- logToDir(logdir)
358-
359-# Configure application components
360-metricReceived.installHandler(MetricCache.store)
361-startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
362-startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
363-startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler)
364-
365-if use_amqp:
366- amqp_listener.startReceiver(amqp_host, amqp_port, amqp_user, amqp_password,
367- vhost=amqp_vhost, spec=amqp_spec,
368- exchange_name=amqp_exchange_name,
369- verbose=amqp_verbose)
370-
371-if settings.ENABLE_MANHOLE:
372- from carbon import manhole
373- manhole.start()
374-
375-from carbon.writer import startWriter # have to import this *after* settings are defined
376-startWriter()
377-startRecording()
378-
379-
380-# Run the twisted reactor
381-print "%s running [instance %s]" % (program, instance)
382-
383-if options.profile:
384- import cProfile
385-
386- if exists(options.profile):
387- os.unlink(options.profile)
388-
389- cProfile.run('reactor.run()', options.profile)
390-
391-else:
392- reactor.run()
393+from carbon.util import run_twistd_plugin
394+
395+run_twistd_plugin(__file__)
396
397=== modified file 'carbon/bin/carbon-relay.py'
398--- carbon/bin/carbon-relay.py 2011-06-16 14:55:16 +0000
399+++ carbon/bin/carbon-relay.py 2011-07-12 05:59:28 +0000
400@@ -14,170 +14,17 @@
401 limitations under the License."""
402
403 import sys
404-import os
405-import atexit
406-from os.path import basename, dirname, exists, join, isdir
407-
408-program = basename( sys.argv[0] ).split('.')[0]
409-os.umask(022)
410-
411-
412-# Initialize twisted
413-try:
414- from twisted.internet import epollreactor
415- epollreactor.install()
416-except:
417- pass
418-from twisted.internet import reactor
419-
420+from os.path import dirname, join, abspath
421
422 # Figure out where we're installed
423-BIN_DIR = dirname(__file__)
424+BIN_DIR = dirname(abspath(__file__))
425 ROOT_DIR = dirname(BIN_DIR)
426+
427+# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
428+# source.
429 LIB_DIR = join(ROOT_DIR, 'lib')
430 sys.path.insert(0, LIB_DIR)
431
432-
433-# Capture useful debug info for this commonly reported problem
434-try:
435- import carbon
436-except ImportError:
437- print 'Failed to import carbon, debug information follows.'
438- print 'pwd=%s' % os.getcwd()
439- print 'sys.path=%s' % sys.path
440- print '__file__=%s' % __file__
441- sys.exit(1)
442-
443-
444-# Read config (we want failures to occur before daemonizing)
445-from carbon.conf import (get_default_parser, parse_options,
446- read_config, settings as global_settings)
447-
448-
449-parser = get_default_parser()
450-parser.add_option(
451- '--rules',
452- default=None,
453- help='Use the given relay rules file')
454-
455-(options, args) = parse_options(parser, sys.argv[1:])
456-settings = read_config(program, options, ROOT_DIR=ROOT_DIR)
457-global_settings.update(settings)
458-
459-if options.rules is None:
460- options.rules = join(settings.CONF_DIR, "relay-rules.conf")
461-
462-pidfile = settings.pidfile
463-logdir = settings.LOG_DIR
464-
465-__builtins__.program = program
466-action = args[0]
467-
468-
469-if action == 'stop':
470- if not exists(pidfile):
471- print 'Pidfile %s does not exist' % pidfile
472- raise SystemExit(0)
473-
474- pf = open(pidfile, 'r')
475- try:
476- pid = int( pf.read().strip() )
477- except:
478- print 'Could not read pidfile %s' % pidfile
479- raise SystemExit(1)
480-
481- print 'Deleting %s (contained pid %d)' % (pidfile, pid)
482- os.unlink(pidfile)
483-
484- print 'Sending kill signal to pid %d' % pid
485- os.kill(pid, 15)
486- raise SystemExit(0)
487-
488-
489-elif action == 'status':
490- if not exists(pidfile):
491- print '%s is not running' % program
492- raise SystemExit(0)
493-
494- pf = open(pidfile, 'r')
495- try:
496- pid = int( pf.read().strip() )
497- except:
498- print 'Failed to read pid from %s' % pidfile
499- raise SystemExit(1)
500-
501- if exists('/proc/%d' % pid):
502- print "%s is running with pid %d" % (program, pid)
503- raise SystemExit(0)
504- else:
505- print "%s is not running" % program
506- raise SystemExit(0)
507-
508-if exists(pidfile):
509- print "Pidfile %s already exists, is %s already running?" % (pidfile, program)
510- raise SystemExit(1)
511-
512-# Quick validation
513-if settings.RELAY_METHOD not in ('rules', 'consistent-hashing'):
514- print "In carbon.conf, RELAY_METHOD must be either 'rules' or 'consistent-hashing'. Invalid value: '%s'" % settings.RELAY_METHOD
515- sys.exit(1)
516-
517-# Import application components
518-from carbon.log import logToStdout, logToDir, msg
519-from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener
520-from carbon.relay import createClientConnections, relay
521-from carbon.events import metricReceived
522-from carbon.instrumentation import startRecording
523-from carbon.rules import loadRules, allDestinationServers, parseHostList
524-from carbon.hashing import setDestinationHosts
525-
526-# --debug
527-if options.debug:
528- logToStdout()
529-else:
530- if not isdir(logdir):
531- os.makedirs(logdir)
532-
533- from carbon.util import daemonize
534- daemonize()
535- logToDir(logdir)
536-
537- pidfile = open(pidfile, 'w')
538- pidfile.write( str(os.getpid()) )
539- pidfile.close()
540-
541- def shutdown():
542- if os.path.exists(pidfile):
543- os.unlink(pidfile)
544-
545- atexit.register(shutdown)
546-
547-
548-# Configure application components
549-metricReceived.installHandler(relay)
550-startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
551-startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
552-
553-if settings.RELAY_METHOD == 'rules':
554- loadRules(options.rules)
555- createClientConnections( allDestinationServers() )
556-elif settings.RELAY_METHOD == 'consistent-hashing':
557- hosts = parseHostList(settings.CH_HOST_LIST)
558- msg('consistent-hashing hosts = %s' % str(hosts))
559- setDestinationHosts(hosts)
560- createClientConnections(hosts)
561-
562-startRecording()
563-
564-
565-# Run the twisted reactor
566-if options.profile:
567- import cProfile
568-
569- if exists(options.profile):
570- os.unlink(options.profile)
571-
572- cProfile.run('reactor.run()', options.profile)
573-
574-else:
575- reactor.run()
576+from carbon.util import run_twistd_plugin
577+
578+run_twistd_plugin(__file__)
579
580=== modified file 'carbon/lib/carbon/amqp_listener.py'
581--- carbon/lib/carbon/amqp_listener.py 2011-04-25 15:50:10 +0000
582+++ carbon/lib/carbon/amqp_listener.py 2011-07-12 05:59:28 +0000
583@@ -147,11 +147,12 @@
584 p.factory = self
585 return p
586
587-def startReceiver(host, port, username, password, vhost, exchange_name,
588- spec=None, channel=1, verbose=False):
589- """Starts a twisted process that will read messages on the amqp broker
590- and post them as metrics."""
591
592+def createAMQPListener(username, password, vhost, exchange_name,
593+ spec=None, channel=1, verbose=False):
594+ """
595+ Create an C{AMQPReconnectingFactory} configured with the specified options.
596+ """
597 # use provided spec if not specified
598 if not spec:
599 spec = txamqp.spec.load(os.path.normpath(
600@@ -161,6 +162,17 @@
601 factory = AMQPReconnectingFactory(username, password, delegate, vhost,
602 spec, channel, exchange_name,
603 verbose=verbose)
604+ return factory
605+
606+
607+def startReceiver(host, port, username, password, vhost, exchange_name,
608+ spec=None, channel=1, verbose=False):
609+ """
610+ Starts a twisted process that will read messages on the amqp broker and
611+ post them as metrics.
612+ """
613+ factory = createAMQPListener(username, password, vhost, exchange_name,
614+ spec=spec, channel=channel, verbose=verbose)
615 reactor.connectTCP(host, port, factory)
616
617
618
619=== modified file 'carbon/lib/carbon/conf.py'
620--- carbon/lib/carbon/conf.py 2011-07-10 22:10:06 +0000
621+++ carbon/lib/carbon/conf.py 2011-07-12 05:59:28 +0000
622@@ -13,10 +13,19 @@
623 limitations under the License."""
624
625 import os
626-from os.path import join, dirname, normpath
627+import sys
628+import pwd
629+
630+from os.path import join, dirname, normpath, abspath, basename, exists, isdir
631 from optparse import OptionParser
632 from ConfigParser import ConfigParser
633
634+import whisper
635+from carbon import log
636+
637+from twisted.python import usage
638+from twisted.scripts import twistd
639+
640
641 defaults = dict(
642 LOCAL_DATA_DIR="/opt/graphite/storage/whisper/",
643@@ -47,6 +56,10 @@
644 )
645
646
647+def _umask(value):
648+ return int(value, 8)
649+
650+
651 class OrderedConfigParser(ConfigParser):
652 """Hacky workaround to ensure sections are always returned in the order
653 they are defined in. Note that this does *not* make any guarantees about
654@@ -119,6 +132,170 @@
655 settings.update(defaults)
656
657
658+class CarbonCacheOptions(usage.Options):
659+
660+ optFlags = [
661+ ["debug", "", "Run in debug mode."],
662+ ]
663+
664+ optParameters = [
665+ ["config", "c", None, "Use the given config file."],
666+ ["instance", "", None, "Manage a specific carbon instance."],
667+ ["logdir", "", None, "Write logs to the given directory."],
668+ ]
669+
670+ def postOptions(self):
671+ global settings
672+
673+ ROOT_DIR = os.getcwd()
674+ program = self.parent.subCommand
675+
676+ # Use provided pidfile (if any) as default for configuration. If it's
677+ # set to 'twistd.pid', that means no value was provided and the default
678+ # was used.
679+ pidfile = self.parent["pidfile"]
680+ if pidfile.endswith("twistd.pid"):
681+ pidfile = None
682+ self["pidfile"] = pidfile
683+
684+ # Enforce a default umask of '022' if none was set.
685+ if self.parent["umask"] is None:
686+ self.parent["umask"] = 022
687+
688+ # Read extra settings from the configuration file.
689+ program_settings = read_config(program, self, ROOT_DIR=ROOT_DIR)
690+ settings.update(program_settings)
691+ settings["program"] = program
692+
693+ # Set process uid/gid by changing the parent config, if a user was
694+ # provided in the configuration file.
695+ if settings.USER:
696+ self.parent["uid"], self.parent["gid"] = (
697+ pwd.getpwnam(settings.USER)[2:4])
698+
699+ # Set the pidfile in parent config to the value that was computed by
700+ # C{read_config}.
701+ self.parent["pidfile"] = settings["pidfile"]
702+
703+ storage_schemas = join(settings["CONF_DIR"], "storage-schemas.conf")
704+ if not exists(storage_schemas):
705+ print "Error: missing required config %s" % storage_schemas
706+ sys.exit(1)
707+
708+ if settings.WHISPER_AUTOFLUSH:
709+ log.msg("enabling whisper autoflush")
710+ whisper.AUTOFLUSH = True
711+
712+ # If we are not running in debug mode or non-daemon mode, then log to a
713+ # directory, otherwise log output will go to stdout.
714+ if not (self["debug"] or self.parent["nodaemon"]):
715+ logdir = settings.LOG_DIR
716+ if not isdir(logdir):
717+ os.makedirs(logdir)
718+ log.logToDir(logdir)
719+
720+ if not "action" in self:
721+ self["action"] = "start"
722+ self.handleAction()
723+
724+ def parseArgs(self, action):
725+ """If an action was provided, store it for further processing."""
726+ self["action"] = action
727+
728+ def handleAction(self):
729+ """Handle extra argument for backwards-compatibility.
730+
731+ * C{start} will simply do minimal pid checking and otherwise let twistd
732+ take over.
733+ * C{stop} will kill an existing running process if it matches the
734+ C{pidfile} contents.
735+ * C{status} will simply report if the process is up or not.
736+ """
737+ action = self["action"]
738+ pidfile = self.parent["pidfile"]
739+ program = settings["program"]
740+ instance = self["instance"]
741+
742+ if action == "stop":
743+ if not exists(pidfile):
744+ print "Pidfile %s does not exist" % pidfile
745+ raise SystemExit(0)
746+ pf = open(pidfile, 'r')
747+ try:
748+ pid = int(pf.read().strip())
749+ except:
750+ print "Could not read pidfile %s" % pidfile
751+ raise SystemExit(1)
752+ print "Sending kill signal to pid %d" % pid
753+ os.kill(pid, 15)
754+
755+ print "Deleting %s (contained pid %d)" % (pidfile, pid)
756+ os.unlink(pidfile)
757+ raise SystemExit(0)
758+
759+ elif action == "status":
760+ if not exists(pidfile):
761+ print "%s (instance %s) is not running" % (program, instance)
762+ raise SystemExit(0)
763+ pf = open(pidfile, "r")
764+ try:
765+ pid = int(pf.read().strip())
766+ except:
767+ print "Failed to read pid from %s" % pidfile
768+ raise SystemExit(1)
769+
770+ if exists("/proc/%d" % pid):
771+ print ("%s (instance %s) is running with pid %d" %
772+ (program, instance, pid))
773+ raise SystemExit(0)
774+ else:
775+ print "%s (instance %s) is not running" % (program, instance)
776+ raise SystemExit(0)
777+ elif action == "start":
778+ if exists(pidfile):
779+ print ("Pidfile %s already exists, is %s already running?" %
780+ (pidfile, program))
781+ raise SystemExit(1)
782+
783+
784+class CarbonAggregatorOptions(CarbonCacheOptions):
785+
786+ optParameters = [
787+ ["rules", "", None, "Use the given aggregation rules file."],
788+ ["rewrite-rules", "", None, "Use the given rewrite rules file."],
789+ ] + CarbonCacheOptions.optParameters
790+
791+ def postOptions(self):
792+ CarbonCacheOptions.postOptions(self)
793+ if self["rules"] is None:
794+ self["rules"] = join(settings["CONF_DIR"], "aggregation-rules.conf")
795+ settings["aggregation-rules"] = self["rules"]
796+
797+ if self["rewrite-rules"] is None:
798+ self["rewrite-rules"] = join(settings["CONF_DIR"],
799+ "rewrite-rules.conf")
800+ settings["rewrite-rules"] = self["rewrite-rules"]
801+
802+
803+class CarbonRelayOptions(CarbonCacheOptions):
804+
805+ optParameters = [
806+ ["rules", "", None, "Use the given relay rules file."],
807+ ] + CarbonCacheOptions.optParameters
808+
809+ def postOptions(self):
810+ CarbonCacheOptions.postOptions(self)
811+ if self["rules"] is None:
812+ self["rules"] = join(settings["CONF_DIR"], "relay-rules.conf")
813+ settings["relay-rules"] = self["rules"]
814+
815+ if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing"):
816+ print ("In carbon.conf, RELAY_METHOD must be either 'rules' or "
817+ "'consistent-hashing'. Invalid value: '%s'" %
818+ settings.RELAY_METHOD)
819+ sys.exit(1)
820+
821+
822 def get_default_parser(usage="%prog [options] <start|stop|status>"):
823 """Create a parser for command line options."""
824 parser = OptionParser(usage=usage)
825@@ -147,6 +324,25 @@
826 return parser
827
828
829+def get_parser(name):
830+ parser = get_default_parser()
831+ if name == "carbon-aggregator":
832+ parser.add_option(
833+ "--rules",
834+ default=None,
835+ help="Use the given aggregation rules file.")
836+ parser.add_option(
837+ "--rewrite-rules",
838+ default=None,
839+ help="Use the given rewrite rules file.")
840+ elif name == "carbon-relay":
841+ parser.add_option(
842+ "--rules",
843+ default=None,
844+ help="Use the given relay rules file.")
845+ return parser
846+
847+
848 def parse_options(parser, args):
849 """
850 Parse command line options and print usage message if no arguments were
851@@ -168,7 +364,7 @@
852 def read_config(program, options, **kwargs):
853 """
854 Read settings for 'program' from configuration file specified by
855- 'options.config', with missing values provided by 'defaults'.
856+ 'options["config"]', with missing values provided by 'defaults'.
857 """
858 settings = Settings()
859 settings.update(defaults)
860@@ -185,40 +381,44 @@
861 # 'GRAPHITE_CONF_DIR' environment variable.
862 settings.setdefault("CONF_DIR",
863 os.environ.get("GRAPHITE_CONF_DIR",
864- join(settings.ROOT_DIR, "conf")))
865- if options.config is None:
866- options.config = join(settings.CONF_DIR, "carbon.conf")
867+ join(settings["ROOT_DIR"], "conf")))
868+ if options["config"] is None:
869+ options["config"] = join(settings["CONF_DIR"], "carbon.conf")
870 else:
871 # Set 'CONF_DIR' to the parent directory of the 'carbon.conf' config
872 # file.
873- settings["CONF_DIR"] = dirname(normpath(options.config))
874+ settings["CONF_DIR"] = dirname(normpath(options["config"]))
875
876 # Storage directory can be overriden by the 'GRAPHITE_STORAGE_DIR'
877 # environment variable. It defaults to a path relative to 'ROOT_DIR' for
878 # backwards compatibility though.
879 settings.setdefault("STORAGE_DIR",
880 os.environ.get("GRAPHITE_STORAGE_DIR",
881- join(settings.ROOT_DIR, "storage")))
882- settings.setdefault("LOG_DIR", join(settings.STORAGE_DIR, "log", program))
883+ join(settings["ROOT_DIR"], "storage")))
884+ settings.setdefault(
885+ "LOG_DIR", join(settings["STORAGE_DIR"], "log", program))
886
887 # Read configuration options from program-specific section.
888 section = program[len("carbon-"):]
889- settings.readFrom(options.config, section)
890+ settings.readFrom(options["config"], section)
891
892 # If a specific instance of the program is specified, augment the settings
893 # with the instance-specific settings and provide sane defaults for
894 # optional settings.
895- if options.instance:
896- settings.readFrom(options.config, "%s:%s" % (section, options.instance))
897- settings["pidfile"] = (options.pidfile or
898- join(settings.STORAGE_DIR,
899- "%s-%s.pid" % (program, options.instance)))
900- settings["LOG_DIR"] = (options.logdir or
901- "%s-%s" % (settings.LOG_DIR.rstrip('/'),
902- options.instance))
903+ if options["instance"]:
904+ settings.readFrom(options["config"],
905+ "%s:%s" % (section, options["instance"]))
906+ settings["pidfile"] = (
907+ options["pidfile"] or
908+ join(settings["STORAGE_DIR"], "%s-%s.pid" %
909+ (program, options["instance"])))
910+ settings["LOG_DIR"] = (options["logdir"] or
911+ "%s-%s" % (settings["LOG_DIR"].rstrip('/'),
912+ options["instance"]))
913 else:
914- settings["pidfile"] = (options.pidfile or
915- join(settings.STORAGE_DIR, '%s.pid' % program))
916- settings["LOG_DIR"] = (options.logdir or settings.LOG_DIR)
917+ settings["pidfile"] = (
918+ options["pidfile"] or
919+ join(settings["STORAGE_DIR"], '%s.pid' % program))
920+ settings["LOG_DIR"] = (options["logdir"] or settings["LOG_DIR"])
921
922 return settings
923
924=== modified file 'carbon/lib/carbon/events.py'
925--- carbon/lib/carbon/events.py 2009-09-10 19:28:51 +0000
926+++ carbon/lib/carbon/events.py 2011-07-12 05:59:28 +0000
927@@ -2,6 +2,7 @@
928
929
930 class EventHandler:
931+
932 def __init__(self, defaultHandler=None):
933 self.handler = defaultHandler
934
935
936=== modified file 'carbon/lib/carbon/instrumentation.py'
937--- carbon/lib/carbon/instrumentation.py 2011-07-10 20:46:16 +0000
938+++ carbon/lib/carbon/instrumentation.py 2011-07-12 05:59:28 +0000
939@@ -2,13 +2,14 @@
940 import time
941 import socket
942 from resource import getrusage, RUSAGE_SELF
943+
944+from twisted.application.service import Service
945 from twisted.internet.task import LoopingCall
946
947
948 stats = {}
949 HOSTNAME = socket.gethostname().replace('.','_')
950 PAGESIZE = os.sysconf('SC_PAGESIZE')
951-recordTask = None
952 rusage = getrusage(RUSAGE_SELF)
953 lastUsage = rusage.ru_utime + rusage.ru_stime
954 lastUsageTime = time.time()
955@@ -56,8 +57,6 @@
956
957 def startRecording():
958 global recordTask
959- recordTask = LoopingCall(recordMetrics)
960- recordTask.start(60, now=False)
961
962
963 def recordMetrics():
964@@ -140,6 +139,20 @@
965 send_metric(fullMetric, datapoint)
966
967
968+class InstrumentationService(Service):
969+
970+ def __init__(self):
971+ self.record_task = LoopingCall(recordMetrics)
972+
973+ def startService(self):
974+ self.record_task.start(60, False)
975+ Service.startService(self)
976+
977+ def stopService(self):
978+ self.record_task.stop()
979+ Service.stopService(self)
980+
981+
982 # Avoid import circularity
983 from carbon.aggregator.buffers import BufferManager
984 from carbon.aggregator.client import send_metric
985
986=== modified file 'carbon/lib/carbon/manhole.py'
987--- carbon/lib/carbon/manhole.py 2011-04-02 00:44:19 +0000
988+++ carbon/lib/carbon/manhole.py 2011-07-12 05:59:28 +0000
989@@ -1,4 +1,4 @@
990-from twisted.cred import portal, checkers
991+from twisted.cred import portal
992 from twisted.conch.ssh import keys
993 from twisted.conch.checkers import SSHPublicKeyDatabase
994 from twisted.conch.manhole import Manhole
995@@ -21,8 +21,7 @@
996 keyBlob = self.userKeys[credentials.username]
997 return keyBlob == credentials.blob
998
999-
1000-def start():
1001+def createManholeListener():
1002 sshRealm = TerminalRealm()
1003 sshRealm.chainedProtocolFactory.protocolFactory = lambda _: Manhole(namespace)
1004
1005@@ -37,4 +36,8 @@
1006 sshPortal = portal.Portal(sshRealm)
1007 sshPortal.registerChecker(credChecker)
1008 sessionFactory = ConchFactory(sshPortal)
1009- reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE)
1010+ return sessionFactory
1011+
1012+def start():
1013+ sessionFactory = createManholeListener()
1014+ reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE)
1015
1016=== modified file 'carbon/lib/carbon/rules.py'
1017--- carbon/lib/carbon/rules.py 2011-04-05 20:30:50 +0000
1018+++ carbon/lib/carbon/rules.py 2011-07-12 05:59:28 +0000
1019@@ -21,13 +21,16 @@
1020 for host_string in host_list:
1021 parts = host_string.strip().split(':')
1022 server = parts[0]
1023- port = int( parts[1] )
1024+ if len(parts) > 1:
1025+ port = int(parts[1])
1026+ else:
1027+ port = DEFAULT_CARBON_PORT
1028 if len(parts) > 2:
1029 instance = parts[2]
1030 else:
1031 instance = None
1032
1033- hosts.append( (server, port, instance) )
1034+ hosts.append((server, port, instance))
1035
1036 return hosts
1037
1038
1039=== added file 'carbon/lib/carbon/service.py'
1040--- carbon/lib/carbon/service.py 1970-01-01 00:00:00 +0000
1041+++ carbon/lib/carbon/service.py 2011-07-12 05:59:28 +0000
1042@@ -0,0 +1,157 @@
1043+#!/usr/bin/env python
1044+"""Copyright 2009 Chris Davis
1045+
1046+Licensed under the Apache License, Version 2.0 (the "License");
1047+you may not use this file except in compliance with the License.
1048+You may obtain a copy of the License at
1049+
1050+ http://www.apache.org/licenses/LICENSE-2.0
1051+
1052+Unless required by applicable law or agreed to in writing, software
1053+distributed under the License is distributed on an "AS IS" BASIS,
1054+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1055+See the License for the specific language governing permissions and
1056+limitations under the License."""
1057+
1058+from os.path import exists
1059+
1060+from twisted.application.service import MultiService
1061+from twisted.application.internet import TCPServer, TCPClient
1062+from twisted.internet.protocol import ServerFactory
1063+
1064+
1065+def createBaseService(config):
1066+ from carbon.conf import settings
1067+ from carbon.listeners import MetricLineReceiver, MetricPickleReceiver
1068+
1069+ root_service = MultiService()
1070+ root_service.setName(settings.program)
1071+
1072+ use_amqp = settings.get("ENABLE_AMQP", False)
1073+ if use_amqp:
1074+ from carbon import amqp_listener
1075+
1076+ amqp_host = settings.get("AMQP_HOST", "localhost")
1077+ amqp_port = settings.get("AMQP_PORT", 5672)
1078+ amqp_user = settings.get("AMQP_USER", "guest")
1079+ amqp_password = settings.get("AMQP_PASSWORD", "guest")
1080+ amqp_verbose = settings.get("AMQP_VERBOSE", False)
1081+ amqp_vhost = settings.get("AMQP_VHOST", "/")
1082+ amqp_spec = settings.get("AMQP_SPEC", None)
1083+ amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
1084+
1085+
1086+ for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE,
1087+ settings.LINE_RECEIVER_PORT,
1088+ MetricLineReceiver),
1089+ (settings.PICKLE_RECEIVER_INTERFACE,
1090+ settings.PICKLE_RECEIVER_PORT,
1091+ MetricPickleReceiver)):
1092+ factory = ServerFactory()
1093+ factory.protocol = protocol
1094+ service = TCPServer(int(port), factory, interface=interface)
1095+ service.setServiceParent(root_service)
1096+
1097+ if use_amqp:
1098+ factory = amqp_listener.createAMQPListener(
1099+ amqp_user, amqp_password,
1100+ vhost=amqp_vhost, spec=amqp_spec,
1101+ exchange_name=amqp_exchange_name,
1102+ verbose=amqp_verbose)
1103+ service = TCPClient(amqp_host, int(amqp_port), factory)
1104+ service.setServiceParent(root_service)
1105+
1106+ if settings.ENABLE_MANHOLE:
1107+ from carbon import manhole
1108+
1109+ factory = manhole.createManholeListener()
1110+ service = TCPServer(int(settings.MANHOLE_PORT), factory,
1111+ interface=settings.MANHOLE_INTERFACE)
1112+ service.setServiceParent(root_service)
1113+
1114+ # have to import this *after* settings are defined
1115+ from carbon.writer import WriterService
1116+
1117+ service = WriterService()
1118+ service.setServiceParent(root_service)
1119+
1120+ # Instantiate an instrumentation service that will record metrics about
1121+ # this service.
1122+ from carbon.instrumentation import InstrumentationService
1123+
1124+ service = InstrumentationService()
1125+ service.setServiceParent(root_service)
1126+
1127+ return root_service
1128+
1129+
1130+def createCacheService(config):
1131+ from carbon.cache import MetricCache
1132+ from carbon.conf import settings
1133+ from carbon.events import metricReceived
1134+ from carbon.listeners import CacheQueryHandler
1135+
1136+ # Configure application components
1137+ metricReceived.installHandler(MetricCache.store)
1138+
1139+ root_service = createBaseService(config)
1140+ factory = ServerFactory()
1141+ factory.protocol = CacheQueryHandler
1142+ service = TCPServer(int(settings.CACHE_QUERY_PORT), factory,
1143+ interface=settings.CACHE_QUERY_INTERFACE)
1144+ service.setServiceParent(root_service)
1145+
1146+ # have to import this *after* settings are defined
1147+ from carbon.writer import WriterService
1148+
1149+ service = WriterService()
1150+ service.setServiceParent(root_service)
1151+
1152+ return root_service
1153+
1154+
1155+def createAggregatorService(config):
1156+ from carbon.events import metricReceived
1157+ from carbon.aggregator import receiver
1158+ from carbon.aggregator.rules import RuleManager
1159+ from carbon.aggregator import client
1160+ from carbon.rewrite import RewriteRuleManager
1161+ from carbon.conf import settings
1162+
1163+ root_service = createBaseService(config)
1164+
1165+ # Configure application components
1166+ metricReceived.installHandler(receiver.process)
1167+ RuleManager.read_from(settings["aggregation-rules"])
1168+ if exists(settings["rewrite-rules"]):
1169+ RewriteRuleManager.read_from(settings["rewrite-rules"])
1170+
1171+ client.connect(settings["DESTINATION_HOST"],
1172+ int(settings["DESTINATION_PORT"]))
1173+
1174+ return root_service
1175+
1176+
1177+def createRelayService(config):
1178+ from carbon.log import msg
1179+ from carbon.conf import settings
1180+ from carbon.events import metricReceived
1181+ from carbon.hashing import setDestinationHosts
1182+ from carbon.relay import createClientConnections, relay
1183+ from carbon.rules import loadRules, allDestinationServers, parseHostList
1184+
1185+ root_service = createBaseService(config)
1186+
1187+ # Configure application components
1188+ metricReceived.installHandler(relay)
1189+
1190+ if settings["RELAY_METHOD"] == "rules":
1191+ loadRules(settings["relay-rules"])
1192+ createClientConnections(allDestinationServers())
1193+ elif settings["RELAY_METHOD"] == "consistent-hashing":
1194+ hosts = parseHostList(settings["CH_HOST_LIST"])
1195+ msg('consistent-hashing hosts = %s' % str(hosts))
1196+ setDestinationHosts(hosts)
1197+ createClientConnections(hosts)
1198+
1199+ return root_service
1200
1201=== modified file 'carbon/lib/carbon/util.py'
1202--- carbon/lib/carbon/util.py 2009-12-13 01:35:28 +0000
1203+++ carbon/lib/carbon/util.py 2011-07-12 05:59:28 +0000
1204@@ -1,22 +1,64 @@
1205-import sys
1206 import os
1207 import pwd
1208
1209-
1210-def daemonize():
1211- if os.fork() > 0: sys.exit(0)
1212- os.setsid()
1213- if os.fork() > 0: sys.exit(0)
1214- si = open('/dev/null', 'r')
1215- so = open('/dev/null', 'a+')
1216- se = open('/dev/null', 'a+', 0)
1217- os.dup2(si.fileno(), sys.stdin.fileno())
1218- os.dup2(so.fileno(), sys.stdout.fileno())
1219- os.dup2(se.fileno(), sys.stderr.fileno())
1220+from os.path import basename
1221+
1222+from twisted.python.util import initgroups
1223+from twisted.scripts.twistd import runApp
1224+from twisted.scripts._twistd_unix import daemonize
1225+
1226+
1227+daemonize = daemonize # Backwards compatibility
1228
1229
1230 def dropprivs(user):
1231- uid,gid = pwd.getpwnam(user)[2:4]
1232- os.setregid(gid,gid)
1233- os.setreuid(uid,uid)
1234- return (uid,gid)
1235+ uid, gid = pwd.getpwnam(user)[2:4]
1236+ initgroups(uid, gid)
1237+ os.setregid(gid, gid)
1238+ os.setreuid(uid, uid)
1239+ return (uid, gid)
1240+
1241+
1242+def run_twistd_plugin(filename):
1243+ from carbon.conf import get_parser
1244+ from twisted.scripts.twistd import ServerOptions
1245+
1246+ program = basename(filename).split('.')[0]
1247+
1248+ # First, parse command line options as the legacy carbon scripts used to
1249+ # do.
1250+ parser = get_parser(program)
1251+ (options, args) = parser.parse_args()
1252+
1253+ # This isn't as evil as you might think
1254+ __builtins__["instance"] = options.instance
1255+ __builtins__["program"] = program
1256+
1257+ # Then forward applicable options to either twistd or to the plugin itself.
1258+ twistd_options = []
1259+ if options.debug:
1260+ twistd_options.extend(["-n", "--logfile", "-"])
1261+ if options.profile:
1262+ twistd_options.append("--profile")
1263+ if options.pidfile:
1264+ twistd_options.extend(["--pidfile", options.pidfile])
1265+
1266+ # Now for the plugin-specific options.
1267+ twistd_options.append(program)
1268+
1269+ if options.debug:
1270+ twistd_options.append("--debug")
1271+
1272+ for option_name, option_value in vars(options).items():
1273+ if (option_value is not None and
1274+ option_name not in ("debug", "profile", "pidfile")):
1275+ twistd_options.extend(["--%s" % option_name.replace("_", "-"),
1276+ option_value])
1277+
1278+ # Finally, append extra args so that twistd has a chance to process them.
1279+ twistd_options.extend(args)
1280+
1281+ config = ServerOptions()
1282+ config.parseOptions(twistd_options)
1283+
1284+ runApp(config)
1285
1286=== modified file 'carbon/lib/carbon/writer.py'
1287--- carbon/lib/carbon/writer.py 2011-04-02 19:54:28 +0000
1288+++ carbon/lib/carbon/writer.py 2011-07-12 05:59:28 +0000
1289@@ -16,26 +16,29 @@
1290 import os
1291 import time
1292 from os.path import join, exists, dirname, basename
1293-from threading import Thread
1294-from twisted.internet import reactor
1295-from twisted.internet.task import LoopingCall
1296+
1297+try:
1298+ import cPickle as pickle
1299+except ImportError:
1300+ import pickle
1301+
1302 import whisper
1303+
1304 from carbon.cache import MetricCache
1305 from carbon.storage import getFilesystemPath, loadStorageSchemas
1306 from carbon.conf import settings
1307 from carbon.instrumentation import increment, append
1308 from carbon import log
1309-try:
1310- import cPickle as pickle
1311-except ImportError:
1312- import pickle
1313-
1314-if settings.WHISPER_AUTOFLUSH:
1315- log.msg("enabling whisper autoflush")
1316- whisper.AUTOFLUSH = True
1317+
1318+from twisted.internet import reactor
1319+from twisted.internet.task import LoopingCall
1320+from twisted.application.service import Service
1321+
1322
1323 lastCreateInterval = 0
1324 createCount = 0
1325+schemas = loadStorageSchemas()
1326+
1327
1328 def optimalWriteOrder():
1329 "Generates metrics with the most cached values first and applies a soft rate limit on new metrics"
1330@@ -177,10 +180,16 @@
1331 log.err()
1332
1333
1334-schemaReloadTask = LoopingCall(reloadStorageSchemas)
1335-schemas = loadStorageSchemas()
1336-
1337-
1338-def startWriter():
1339- schemaReloadTask.start(60)
1340- reactor.callInThread(writeForever)
1341+class WriterService(Service):
1342+
1343+ def __init__(self):
1344+ self.reload_task = LoopingCall(reloadStorageSchemas)
1345+
1346+ def startService(self):
1347+ self.reload_task.start(60, False)
1348+ reactor.callInThread(writeForever)
1349+ Service.startService(self)
1350+
1351+ def stopService(self):
1352+ self.reload_task.stop()
1353+ Service.stopService(self)
1354
1355=== added directory 'carbon/lib/twisted'
1356=== added directory 'carbon/lib/twisted/plugins'
1357=== added file 'carbon/lib/twisted/plugins/carbon_aggregator_plugin.py'
1358--- carbon/lib/twisted/plugins/carbon_aggregator_plugin.py 1970-01-01 00:00:00 +0000
1359+++ carbon/lib/twisted/plugins/carbon_aggregator_plugin.py 2011-07-12 05:59:28 +0000
1360@@ -0,0 +1,25 @@
1361+from zope.interface import implements
1362+
1363+from twisted.plugin import IPlugin
1364+from twisted.application.service import IServiceMaker
1365+
1366+from carbon import service
1367+from carbon import conf
1368+
1369+
1370+class CarbonAggregatorServiceMaker(object):
1371+
1372+ implements(IServiceMaker, IPlugin)
1373+ tapname = "carbon-aggregator"
1374+ description = "Aggregate stats for graphite."
1375+ options = conf.CarbonAggregatorOptions
1376+
1377+ def makeService(self, options):
1378+ """
1379+ Construct a C{carbon-aggregator} service.
1380+ """
1381+ return service.createAggregatorService(options)
1382+
1383+
1384+# Now construct an object which *provides* the relevant interfaces
1385+serviceMaker = CarbonAggregatorServiceMaker()
1386
1387=== added file 'carbon/lib/twisted/plugins/carbon_cache_plugin.py'
1388--- carbon/lib/twisted/plugins/carbon_cache_plugin.py 1970-01-01 00:00:00 +0000
1389+++ carbon/lib/twisted/plugins/carbon_cache_plugin.py 2011-07-12 05:59:28 +0000
1390@@ -0,0 +1,25 @@
1391+from zope.interface import implements
1392+
1393+from twisted.plugin import IPlugin
1394+from twisted.application.service import IServiceMaker
1395+
1396+from carbon import service
1397+from carbon import conf
1398+
1399+
1400+class CarbonCacheServiceMaker(object):
1401+
1402+ implements(IServiceMaker, IPlugin)
1403+ tapname = "carbon-cache"
1404+ description = "Collect stats for graphite."
1405+ options = conf.CarbonCacheOptions
1406+
1407+ def makeService(self, options):
1408+ """
1409+ Construct a C{carbon-cache} service.
1410+ """
1411+ return service.createCacheService(options)
1412+
1413+
1414+# Now construct an object which *provides* the relevant interfaces
1415+serviceMaker = CarbonCacheServiceMaker()
1416
1417=== added file 'carbon/lib/twisted/plugins/carbon_relay_plugin.py'
1418--- carbon/lib/twisted/plugins/carbon_relay_plugin.py 1970-01-01 00:00:00 +0000
1419+++ carbon/lib/twisted/plugins/carbon_relay_plugin.py 2011-07-12 05:59:28 +0000
1420@@ -0,0 +1,25 @@
1421+from zope.interface import implements
1422+
1423+from twisted.plugin import IPlugin
1424+from twisted.application.service import IServiceMaker
1425+
1426+from carbon import service
1427+from carbon import conf
1428+
1429+
1430+class CarbonRelayServiceMaker(object):
1431+
1432+ implements(IServiceMaker, IPlugin)
1433+ tapname = "carbon-relay"
1434+ description = "Relay stats for graphite."
1435+ options = conf.CarbonRelayOptions
1436+
1437+ def makeService(self, options):
1438+ """
1439+ Construct a C{carbon-relay} service.
1440+ """
1441+ return service.createRelayService(options)
1442+
1443+
1444+# Now construct an object which *provides* the relevant interfaces
1445+serviceMaker = CarbonRelayServiceMaker()