Merge lp:~drawks/graphite/graphite-msgpack into lp:~graphite-dev/graphite/main

Proposed by Dave Rawks
Status: Work in progress
Proposed branch: lp:~drawks/graphite/graphite-msgpack
Merge into: lp:~graphite-dev/graphite/main
Diff against target: 173 lines (+65/-9)
6 files modified
carbon/bin/validate-storage-schemas.py (+1/-1)
carbon/conf/carbon.conf.example (+6/-0)
carbon/lib/carbon/conf.py (+5/-0)
carbon/lib/carbon/protocols.py (+30/-1)
carbon/lib/carbon/service.py (+15/-7)
check-dependencies.py (+8/-0)
To merge this branch: bzr merge lp:~drawks/graphite/graphite-msgpack
Reviewer Review Type Date Requested Status
Sidnei da Silva (community) Approve
Review via email: mp+92921@code.launchpad.net

Description of the change

Added msgpack as a receiver protocol. msgpack is super fast at deserializing and much safer than pickle since it doesn't support serializing things like modules, functions, or class objects. Also added support to enable/disable individual listeners in carbon.conf

New code path can be tested with a simple metric writer like:

import msgpack, socket, time:
s=socket.create_connection(('localhost',2005))
fs=s.makefile()
for second in xrange(int(time.time()) - 86400,int(time.time())):
    msgpack.pack(("foo.bar",(second,1)),fs)
fs.flush()
fs.close()
s.shutdown(socket.SHUT_RDWR)

To post a comment you must log in.
Revision history for this message
Sidnei da Silva (sidnei) wrote :

1. Please move the import of Unpacker from msgpack from module-level to inside connectionMade().
2. (as said on IRC), note that setting the port to 0 effectively disables it, so the 3 new extra settings are not needed. If you do think it's better/more explicit to keep them, then might make sense to remove the 'if port:' from service.py and require setting the ENABLE_* settings instead.

review: Approve
Revision history for this message
Dave Rawks (drawks) wrote :

1. Delayed imports are generally frowned upon as you can have a situation where someone setups up the listener and then it doesn't break until the first connection to it is made. Also, if the import is inside connectionMade then this will result in reimport on every new connection to the listener, this seems less than optimal AND doesn't follow the same code conventions used in the pickle listener.

2. I do this that making enabling/disabling explicit is more clear from a managment point of view. Also I can imagine a world where we have more/modular listeners and not all of them will follow the convention of listening on a tcp/udp port. Just for argument sake, say that there is a listener that eats metrics from a serial port... OTOH removing the check for sane non-zero port numbers in the current code would lead to a situation where we'd have an exception deeper in the code when the bogus value is passed into the actual twisted protocol code.

lp:~drawks/graphite/graphite-msgpack updated
688. By Dave Rawks

Disable msgpack receiver by default

689. By Dave Rawks

changed shebang line to use python from env instead of harcoded interpreter

Unmerged revisions

689. By Dave Rawks

changed shebang line to use python from env instead of harcoded interpreter

688. By Dave Rawks

Disable msgpack receiver by default

687. By Dave Rawks <email address hidden>

Added config options to enable/disable line,pickle, and msgpack receivers.
made msgpack import conditional on the msgpack receiver being enabled
updated check_dependencies to test for msgpack

686. By Dave Rawks <email address hidden>

Added msgpack protocol linereceiver

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'carbon/bin/validate-storage-schemas.py'
2--- carbon/bin/validate-storage-schemas.py 2011-09-11 06:43:44 +0000
3+++ carbon/bin/validate-storage-schemas.py 2012-02-15 23:21:21 +0000
4@@ -1,4 +1,4 @@
5-#!/usr/bin/python
6+#!/usr/bin/env python
7 """Copyright 2009 Chris Davis
8
9 Licensed under the Apache License, Version 2.0 (the "License");
10
11=== modified file 'carbon/conf/carbon.conf.example'
12--- carbon/conf/carbon.conf.example 2011-12-14 16:22:17 +0000
13+++ carbon/conf/carbon.conf.example 2012-02-15 23:21:21 +0000
14@@ -56,6 +56,7 @@
15 # the files quickly but at the risk of slowing I/O down considerably for a while.
16 MAX_CREATES_PER_MINUTE = 50
17
18+ENABLE_LINE_RECEIVER = True
19 LINE_RECEIVER_INTERFACE = 0.0.0.0
20 LINE_RECEIVER_PORT = 2003
21
22@@ -66,6 +67,7 @@
23 UDP_RECEIVER_INTERFACE = 0.0.0.0
24 UDP_RECEIVER_PORT = 2003
25
26+ENABLE_PICKLE_RECEIVER = True
27 PICKLE_RECEIVER_INTERFACE = 0.0.0.0
28 PICKLE_RECEIVER_PORT = 2004
29
30@@ -74,6 +76,10 @@
31 # Set this to True to revert to the old-fashioned insecure unpickler.
32 USE_INSECURE_UNPICKLER = False
33
34+ENABLE_MSGPACK_RECEIVER = False
35+MSGPACK_RECEIVER_INTERFACE = 0.0.0.0
36+MSGPACK_RECEIVER_PORT = 2005
37+
38 CACHE_QUERY_INTERFACE = 0.0.0.0
39 CACHE_QUERY_PORT = 7002
40
41
42=== modified file 'carbon/lib/carbon/conf.py'
43--- carbon/lib/carbon/conf.py 2012-02-09 21:38:55 +0000
44+++ carbon/lib/carbon/conf.py 2012-02-15 23:21:21 +0000
45@@ -32,11 +32,16 @@
46 MAX_CACHE_SIZE=float('inf'),
47 MAX_UPDATES_PER_SECOND=500,
48 MAX_CREATES_PER_MINUTE=float('inf'),
49+ ENABLE_LINE_RECEIVER=True,
50 LINE_RECEIVER_INTERFACE='0.0.0.0',
51 LINE_RECEIVER_PORT=2003,
52 ENABLE_UDP_LISTENER=False,
53 UDP_RECEIVER_INTERFACE='0.0.0.0',
54 UDP_RECEIVER_PORT=2003,
55+ ENABLE_MSGPACK_RECEIVER=False,
56+ MSGPACK_RECEIVER_INTERFACE='0.0.0.0',
57+ MSGPACK_RECEIVER_PORT=2005,
58+ ENABLE_PICKLE_RECEIVER=True,
59 PICKLE_RECEIVER_INTERFACE='0.0.0.0',
60 PICKLE_RECEIVER_PORT=2004,
61 CACHE_QUERY_INTERFACE='0.0.0.0',
62
63=== modified file 'carbon/lib/carbon/protocols.py'
64--- carbon/lib/carbon/protocols.py 2012-02-11 06:26:28 +0000
65+++ carbon/lib/carbon/protocols.py 2012-02-15 23:21:21 +0000
66@@ -1,11 +1,13 @@
67 from twisted.internet import reactor
68-from twisted.internet.protocol import DatagramProtocol
69+from twisted.internet.protocol import DatagramProtocol, Protocol
70 from twisted.internet.error import ConnectionDone
71 from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver
72 from carbon import log, events, state, management
73 from carbon.conf import settings
74 from carbon.regexlist import WhiteList, BlackList
75 from carbon.util import pickle, get_unpickler
76+if settings.ENABLE_MSGPACK_RECEIVER:
77+ from msgpack import Unpacker
78
79
80 class MetricReceiver:
81@@ -106,6 +108,33 @@
82 self.metricReceived(metric, datapoint)
83
84
85+class MetricMessagePackReceiver(MetricReceiver, Protocol):
86+
87+ def connectionMade(self):
88+ MetricReceiver.connectionMade(self)
89+ self.unpacker = Unpacker()
90+
91+ def dataReceived(self, data):
92+ if len(data) <= 0:
93+ log.listener('msgpack short read from %s' % self.peerName)
94+ return
95+ try:
96+ self.unpacker.feed(data)
97+ for (metric, datapoint) in self.unpacker:
98+ if not isinstance(metric,str):
99+ log.listener('invalid metric name/type %r/%r received from %s' % ( metric, type(metric), self.peerName))
100+ continue
101+ try:
102+ datapoint = ( float(datapoint[0]), float(datapoint[1]) )
103+ except:
104+ continue
105+ self.metricReceived(metric, datapoint)
106+ except:
107+ log.listener('invalid msgpack received from %s, ignoring' % self.peerName)
108+ return
109+
110+
111+
112 class CacheManagementHandler(Int32StringReceiver):
113 def connectionMade(self):
114 peer = self.transport.getPeer()
115
116=== modified file 'carbon/lib/carbon/service.py'
117--- carbon/lib/carbon/service.py 2011-12-14 16:22:17 +0000
118+++ carbon/lib/carbon/service.py 2012-02-15 23:21:21 +0000
119@@ -40,7 +40,7 @@
120 def createBaseService(config):
121 from carbon.conf import settings
122 from carbon.protocols import (MetricLineReceiver, MetricPickleReceiver,
123- MetricDatagramReceiver)
124+ MetricDatagramReceiver, MetricMessagePackReceiver)
125
126 root_service = CarbonRootService()
127 root_service.setName(settings.program)
128@@ -59,12 +59,20 @@
129 amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
130
131
132- for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE,
133- settings.LINE_RECEIVER_PORT,
134- MetricLineReceiver),
135- (settings.PICKLE_RECEIVER_INTERFACE,
136- settings.PICKLE_RECEIVER_PORT,
137- MetricPickleReceiver)):
138+ receivers = []
139+ if settings.ENABLE_LINE_RECEIVER:
140+ receivers.append((settings.LINE_RECEIVER_INTERFACE,
141+ settings.LINE_RECEIVER_PORT,
142+ MetricLineReceiver))
143+ if settings.ENABLE_PICKLE_RECEIVER:
144+ receivers.append((settings.PICKLE_RECEIVER_INTERFACE,
145+ settings.PICKLE_RECEIVER_PORT,
146+ MetricPickleReceiver))
147+ if settings.ENABLE_MSGPACK_RECEIVER:
148+ receivers.append((settings.MSGPACK_RECEIVER_INTERFACE,
149+ settings.MSGPACK_RECEIVER_PORT,
150+ MetricMessagePackReceiver))
151+ for (interface, port, protocol) in receivers:
152 if port:
153 factory = ServerFactory()
154 factory.protocol = protocol
155
156=== modified file 'check-dependencies.py'
157--- check-dependencies.py 2012-02-10 05:14:15 +0000
158+++ check-dependencies.py 2012-02-15 23:21:21 +0000
159@@ -168,6 +168,14 @@
160 print "Note that txamqp requires python 2.5 or greater."
161 warning += 1
162
163+# Test for msgpack
164+try:
165+ import msgpack
166+except:
167+ print "[WARNING]"
168+ print "Unable to import the 'msgpack' module, this is required if you want to use msgpack receiver."
169+ warning += 1
170+
171
172 if fatal:
173 print "%d necessary dependencies not met. Graphite will not function until these dependencies are fulfilled." % fatal

Subscribers

People subscribed via source and target branches