Merge lp:~sidnei/txstatsd/push-producer-ftw into lp:txstatsd

Proposed by Sidnei da Silva
Status: Merged
Approved by: Ian Wilkinson
Approved revision: 45
Merged at revision: 42
Proposed branch: lp:~sidnei/txstatsd/push-producer-ftw
Merge into: lp:txstatsd
Diff against target: 321 lines (+150/-26)
5 files modified
txstatsd/server/processor.py (+1/-1)
txstatsd/server/protocol.py (+29/-11)
txstatsd/tests/test_configurableprocessor.py (+6/-5)
txstatsd/tests/test_processor.py (+10/-9)
txstatsd/tests/test_protocol.py (+104/-0)
To merge this branch: bzr merge lp:~sidnei/txstatsd/push-producer-ftw
Reviewer Review Type Date Requested Status
Ian Wilkinson (community) Approve
Facundo Batista (community) Approve
Review via email: mp+78657@code.launchpad.net

Commit message

Register the GraphiteProtocol as a producer for the connected transport, so
that if the buffer gets filled we pause message production until the buffer is
flushed.

Description of the change

- Register the GraphiteProtocol as a producer for the connected transport, so
  that if the buffer gets filled we pause message production until the buffer is
  flushed.

To post a comment you must log in.
43. By Sidnei da Silva

- Simplify test a bit.

44. By Sidnei da Silva

- Turns out numStats was missing a newline and everything else already includes it. So let's get rid of the extra one.

45. By Sidnei da Silva

- Fix failures

Revision history for this message
Facundo Batista (facundo) wrote :

Looks ok

review: Approve
Revision history for this message
Ian Wilkinson (theiw) wrote :

+1

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txstatsd/server/processor.py'
2--- txstatsd/server/processor.py 2011-10-01 04:05:34 +0000
3+++ txstatsd/server/processor.py 2011-10-07 19:53:28 +0000
4@@ -192,7 +192,7 @@
5 messages.extend(meter_metrics)
6 num_stats += events
7
8- messages.append("statsd.numStats %s %s" % (num_stats, timestamp))
9+ messages.append("statsd.numStats %s %s\n" % (num_stats, timestamp))
10 return messages
11
12 def flush_counter_metrics(self, interval, timestamp):
13
14=== modified file 'txstatsd/server/protocol.py'
15--- txstatsd/server/protocol.py 2011-08-31 10:45:16 +0000
16+++ txstatsd/server/protocol.py 2011-10-07 19:53:28 +0000
17@@ -1,7 +1,8 @@
18-from twisted.internet import task, defer
19+from zope.interface import implements
20+
21+from twisted.internet import task, interfaces
22 from twisted.internet.protocol import (
23- DatagramProtocol, ReconnectingClientFactory)
24-from twisted.protocols.basic import LineOnlyReceiver
25+ DatagramProtocol, ReconnectingClientFactory, Protocol)
26
27
28 class StatsDServerProtocol(DatagramProtocol):
29@@ -27,28 +28,45 @@
30 self.processor.process(data)
31
32
33-class GraphiteProtocol(LineOnlyReceiver):
34+class GraphiteProtocol(Protocol):
35 """A client protocol for talking to Graphite.
36
37 Messages to Graphite are line-based and C{\n}-separated.
38 """
39
40- delimiter = "\n"
41+ implements(interfaces.IPushProducer)
42
43- def __init__(self, processor, interval):
44+ def __init__(self, processor, interval, clock=None):
45+ self.paused = False
46 self.processor = processor
47 self.interval = interval
48 self.flush_task = task.LoopingCall(self.flushProcessor)
49+ if clock is not None:
50+ self.flush_task.clock = clock
51 self.flush_task.start(self.interval / 1000, False)
52
53- @defer.inlineCallbacks
54+ def connectionMade(self):
55+ """
56+ A connection has been made, register ourselves as a producer for the
57+ bound transport.
58+ """
59+ self.transport.registerProducer(self, True)
60+
61 def flushProcessor(self):
62 """Flush messages queued in the processor to Graphite."""
63 for message in self.processor.flush(interval=self.interval):
64- for line in message.splitlines():
65- if self.connected:
66- self.sendLine(line)
67- yield
68+ if self.connected and not self.paused:
69+ self.transport.write(message)
70+
71+ def pauseProducing(self):
72+ """Pause producing messages, since the buffer is full."""
73+ self.paused = True
74+
75+ stopProducing = pauseProducing
76+
77+ def resumeProducing(self):
78+ """We can write to the transport again. Yay!."""
79+ self.paused = False
80
81
82 class GraphiteClientFactory(ReconnectingClientFactory):
83
84=== modified file 'txstatsd/tests/test_configurableprocessor.py'
85--- txstatsd/tests/test_configurableprocessor.py 2011-09-14 12:01:10 +0000
86+++ txstatsd/tests/test_configurableprocessor.py 2011-10-07 19:53:28 +0000
87@@ -21,7 +21,7 @@
88 self.assertEqual(2, len(messages))
89 counters = messages[0].splitlines()
90 self.assertEqual("gorets.count 17 42", counters[0])
91- self.assertEqual("statsd.numStats 1 42", messages[1])
92+ self.assertEqual("statsd.numStats 1 42", messages[1].splitlines()[0])
93
94 def test_flush_counter_with_prefix(self):
95 """
96@@ -34,7 +34,7 @@
97 self.assertEqual(2, len(messages))
98 counters = messages[0].splitlines()
99 self.assertEqual("test.metric.gorets.count 17 42", counters[0])
100- self.assertEqual("statsd.numStats 1 42", messages[1])
101+ self.assertEqual("statsd.numStats 1 42", messages[1].splitlines()[0])
102
103 def test_flush_single_timer_single_time(self):
104 """
105@@ -59,7 +59,7 @@
106 self.assertEqual("glork.98percentile 24.0 42", timers[7])
107 self.assertEqual("glork.99percentile 24.0 42", timers[8])
108 self.assertEqual("glork.999percentile 24.0 42", timers[9])
109- self.assertEqual("statsd.numStats 1 42", messages[1])
110+ self.assertEqual("statsd.numStats 1 42", messages[1].splitlines()[0])
111
112 def test_flush_single_timer_multiple_times(self):
113 """
114@@ -94,7 +94,7 @@
115 self.assertEqual("glork.98percentile 42.0 42", timers[7])
116 self.assertEqual("glork.99percentile 42.0 42", timers[8])
117 self.assertEqual("glork.999percentile 42.0 42", timers[9])
118- self.assertEqual("statsd.numStats 1 42", messages[1])
119+ self.assertEqual("statsd.numStats 1 42", messages[1].splitlines()[0])
120
121
122 class FlushMeterMetricMessagesTest(TestCase):
123@@ -138,4 +138,5 @@
124 "test.metric.gorets.15min_rate 0.0 %s" % self.time_now,
125 meter_metric[4])
126 self.assertEqual(
127- "statsd.numStats 1 %s" % self.time_now, messages[1])
128+ "statsd.numStats 1 %s" % self.time_now,
129+ messages[1].splitlines()[0])
130
131=== modified file 'txstatsd/tests/test_processor.py'
132--- txstatsd/tests/test_processor.py 2011-09-14 12:01:10 +0000
133+++ txstatsd/tests/test_processor.py 2011-10-07 19:53:28 +0000
134@@ -117,7 +117,7 @@
135 Flushing the message processor when there are no stats available should
136 still produce one message where C{statsd.numStats} is set to zero.
137 """
138- self.assertEqual(["statsd.numStats 0 42"], self.processor.flush())
139+ self.assertEqual(["statsd.numStats 0 42\n"], self.processor.flush())
140
141 def test_flush_counter(self):
142 """
143@@ -130,7 +130,7 @@
144 counters = messages[0].splitlines()
145 self.assertEqual("stats.gorets 4 42", counters[0])
146 self.assertEqual("stats_counts.gorets 42 42", counters[1])
147- self.assertEqual("statsd.numStats 1 42", messages[1])
148+ self.assertEqual("statsd.numStats 1 42", messages[1].splitlines()[0])
149 self.assertEqual(0, self.processor.counter_metrics["gorets"])
150
151 def test_flush_counter_one_second_interval(self):
152@@ -144,7 +144,7 @@
153 counters = messages[0].splitlines()
154 self.assertEqual("stats.gorets 42 42", counters[0])
155 self.assertEqual("stats_counts.gorets 42 42", counters[1])
156- self.assertEqual("statsd.numStats 1 42", messages[1])
157+ self.assertEqual("statsd.numStats 1 42", messages[1].splitlines()[0])
158 self.assertEqual(0, self.processor.counter_metrics["gorets"])
159
160 def test_flush_single_timer_single_time(self):
161@@ -162,7 +162,7 @@
162 self.assertEqual("stats.timers.glork.upper_90 24 42", timers[2])
163 self.assertEqual("stats.timers.glork.lower 24 42", timers[3])
164 self.assertEqual("stats.timers.glork.count 1 42", timers[4])
165- self.assertEqual("statsd.numStats 1 42", messages[1])
166+ self.assertEqual("statsd.numStats 1 42", messages[1].splitlines()[0])
167 self.assertEqual([], self.processor.timer_metrics["glork"])
168
169 def test_flush_single_timer_multiple_times(self):
170@@ -183,7 +183,7 @@
171 self.assertEqual("stats.timers.glork.upper_90 23 42", timers[2])
172 self.assertEqual("stats.timers.glork.lower 4 42", timers[3])
173 self.assertEqual("stats.timers.glork.count 6 42", timers[4])
174- self.assertEqual("statsd.numStats 1 42", messages[1])
175+ self.assertEqual("statsd.numStats 1 42", messages[1].splitlines()[0])
176 self.assertEqual([], self.processor.timer_metrics["glork"])
177
178 def test_flush_single_timer_50th_percentile(self):
179@@ -207,7 +207,7 @@
180 self.assertEqual("stats.timers.glork.upper_50 15 42", timers[2])
181 self.assertEqual("stats.timers.glork.lower 4 42", timers[3])
182 self.assertEqual("stats.timers.glork.count 6 42", timers[4])
183- self.assertEqual("statsd.numStats 1 42", messages[1])
184+ self.assertEqual("statsd.numStats 1 42", messages[1].splitlines()[0])
185 self.assertEqual([], self.processor.timer_metrics["glork"])
186
187 def test_flush_gauge_metric(self):
188@@ -224,7 +224,7 @@
189 self.assertEqual(
190 "stats.gauge.gorets.value 9.6 42", gauge_metric[0])
191 self.assertEqual(
192- "statsd.numStats 1 42", messages[1])
193+ "statsd.numStats 1 42", messages[1].splitlines()[0])
194 self.assertEqual(0, len(self.processor.gauge_metrics))
195
196
197@@ -268,7 +268,8 @@
198 "stats.meter.gorets.15min_rate 0.0 %s" % self.time_now,
199 meter_metric[4])
200 self.assertEqual(
201- "statsd.numStats 1 %s" % self.time_now, messages[1])
202+ "statsd.numStats 1 %s" % self.time_now,
203+ messages[1].splitlines()[0])
204
205 # As we are employing the expected results from test_ewma.py
206 # we perform the initial tick(), before advancing the clock 60sec.
207@@ -295,4 +296,4 @@
208 meter_metric[4].startswith(
209 "stats.meter.gorets.15min_rate 0.5613041"))
210 self.assertEqual(
211- "statsd.numStats 1 %s" % self.time_now, messages[1])
212+ "statsd.numStats 1 %s" % self.time_now, messages[1].splitlines()[0])
213
214=== added file 'txstatsd/tests/test_protocol.py'
215--- txstatsd/tests/test_protocol.py 1970-01-01 00:00:00 +0000
216+++ txstatsd/tests/test_protocol.py 2011-10-07 19:53:28 +0000
217@@ -0,0 +1,104 @@
218+"""Tests for the Graphite Protocol classes."""
219+
220+from twisted.trial.unittest import TestCase
221+
222+from txstatsd.server.protocol import GraphiteProtocol, GraphiteClientFactory
223+from twisted.internet import task
224+from twisted.test import proto_helpers
225+
226+
227+class FakeProcessor(object):
228+
229+ def __init__(self):
230+ self.sequence = 0
231+
232+ def flush(self, interval):
233+ """Always produce a sequence number followed by 9 lines of output"""
234+ self.sequence += 1
235+ return [str(self.sequence)]
236+
237+
238+class FakeTransport(object):
239+
240+ def __init__(self):
241+ self.messages = []
242+
243+ def write(self, data):
244+ self.messages.append(data)
245+
246+
247+class TestGraphiteProtocol(TestCase):
248+
249+ def setUp(self):
250+ super(TestGraphiteProtocol, self).setUp()
251+ self.processor = FakeProcessor()
252+ self.transport = FakeTransport()
253+ self.clock = task.Clock()
254+ self.protocol = GraphiteProtocol(self.processor, 1000, clock=self.clock)
255+ self.protocol.transport = self.transport
256+ self.protocol.connected = True
257+
258+ def test_write_unless_paused(self):
259+ """
260+ If the producer isn't paused, then write to the transport. Once the
261+ producer is paused, nothing is written to the transport anymore.
262+ """
263+ self.assertEqual(0, len(self.transport.messages))
264+ self.clock.advance(1)
265+ self.assertEqual(1, len(self.transport.messages))
266+ self.clock.advance(1)
267+ self.assertEqual(2, len(self.transport.messages))
268+ self.protocol.pauseProducing()
269+ self.clock.advance(1)
270+ self.assertEqual(2, len(self.transport.messages))
271+
272+ def test_paused_producer_discards_everything_until_resumed(self):
273+ """
274+ If the producer is paused, everything is discarded until the producer
275+ is resumed.
276+ """
277+ self.protocol.pauseProducing()
278+ self.assertEqual(0, len(self.transport.messages))
279+ self.clock.advance(1)
280+ self.assertEqual(0, len(self.transport.messages))
281+ self.clock.advance(1)
282+ self.assertEqual(0, len(self.transport.messages))
283+ self.protocol.resumeProducing()
284+ self.clock.advance(1)
285+ self.assertEqual(1, len(self.transport.messages))
286+ self.assertEqual("3", self.transport.messages[-1])
287+
288+ def test_stopped_producer_discards_everything(self):
289+ """
290+ If the producer is stopped, everything is discarded.
291+ """
292+ self.assertEqual(0, len(self.transport.messages))
293+ self.clock.advance(1)
294+ self.assertEqual(1, len(self.transport.messages))
295+ self.protocol.stopProducing()
296+ self.clock.advance(1)
297+ self.assertEqual(1, len(self.transport.messages))
298+ self.clock.advance(1)
299+ self.assertEqual(1, len(self.transport.messages))
300+
301+
302+class TestProducerRegistration(TestCase):
303+
304+ def test_register_producer(self):
305+ """
306+ The Graphite protocol client registers itself as a producer of the
307+ transport it's connected to.
308+ """
309+ processor = FakeProcessor()
310+
311+ # We don't care about the address argument here.
312+ client = GraphiteClientFactory(processor, 1).buildProtocol(None)
313+ clientTransport = proto_helpers.StringTransport()
314+ client.makeConnection(clientTransport)
315+
316+ # check that the producer is registered
317+ self.assertTrue(clientTransport.producer is client)
318+
319+ # check the streaming attribute
320+ self.assertTrue(clientTransport.streaming)
321+ client.flush_task.stop()

Subscribers

People subscribed via source and target branches