Merge lp:~diogobaeder/txstatsd/fix-host-lookup into lp:txstatsd

Proposed by Diogo Baeder
Status: Merged
Approved by: Diogo Baeder
Approved revision: 117
Merged at revision: 103
Proposed branch: lp:~diogobaeder/txstatsd/fix-host-lookup
Merge into: lp:txstatsd
Diff against target: 533 lines (+325/-57)
3 files modified
txstatsd/protocol.py (+109/-38)
txstatsd/server/router.py (+1/-1)
txstatsd/tests/test_client.py (+215/-18)
To merge this branch: bzr merge lp:~diogobaeder/txstatsd/fix-host-lookup
Reviewer Review Type Date Requested Status
Sidnei da Silva Approve
Review via email: mp+141177@code.launchpad.net

Commit message

Sending messages to a 'queue' (a list, actually, to make it simpler) while the host is not resolved, and flushing them as soon as the host gets resolved. I didn't use Queue.Queue here because it doesn't seem that we'll have to deal with concurrency issues with this queue, since each client instance has its own queue, but if we feel the need, this can be changed later.

Description of the change

Sending messages to a 'queue' (a list, actually, to make it simpler) while the host is not resolved, and flushing them as soon as the host gets resolved. I didn't use Queue.Queue here because it doesn't seem that we'll have to deal with concurrency issues with this queue, since each client instance has its own queue, but if we feel the need, this can be changed later.

To post a comment you must log in.
Revision history for this message
Sidnei da Silva (sidnei) wrote :

A few comments:

1. I dislike class attributes with parameters that cannot be set through the constructor. How are you suppose to change LIMIT? By subclassing? By monkey patching?

2. Along the same lines, because you can't easily change LIMIT, the tests iterate calling write() up to 'LIMIT' times, which is useless. If LIMIT was configurable, you could set it to '1' in the tests and make them more efficient.

review: Needs Fixing
Revision history for this message
Diogo Baeder (diogobaeder) wrote :

Fixed. Can you check again?

Em 26-12-2012 17:21, Sidnei da Silva escreveu:
> Review: Needs Fixing
>
> A few comments:
>
> 1. I dislike class attributes with parameters that cannot be set through the constructor. How are you suppose to change LIMIT? By subclassing? By monkey patching?
>
> 2. Along the same lines, because you can't easily change LIMIT, the tests iterate calling write() up to 'LIMIT' times, which is useless. If LIMIT was configurable, you could set it to '1' in the tests and make them more efficient.

Revision history for this message
Sidnei da Silva (sidnei) wrote :

You still haven't changed the test to use limit=1. :)

Revision history for this message
Diogo Baeder (diogobaeder) wrote :

I changed it to use limit=2 instead of limit=1 because it's more
expressive to me that the limit is actually used/respected when queueing
the calls (if I use only 1, it's not clear that it respects the number
given as a limit or if it closes right after the first message sent to
the queue). This shouldn't have any noticeable impact on the test duration.

Would this be OK for you?

Em 26-12-2012 18:08, Sidnei da Silva escreveu:
> You still haven't changed the test to use limit=1. :)

Revision history for this message
Diogo Baeder (diogobaeder) wrote :

Changed again to use limit=2 in the default instance for tests, instead
of in specific tests, as requested.

Em 26-12-2012 18:39, Diogo Baeder escreveu:
> I changed it to use limit=2 instead of limit=1 because it's more
> expressive to me that the limit is actually used/respected when queueing
> the calls (if I use only 1, it's not clear that it respects the number
> given as a limit or if it closes right after the first message sent to
> the queue). This shouldn't have any noticeable impact on the test duration.
>
> Would this be OK for you?
>
> Em 26-12-2012 18:08, Sidnei da Silva escreveu:
>> You still haven't changed the test to use limit=1. :)
>

Revision history for this message
Sidnei da Silva (sidnei) wrote :

I'm fairly confident that calling transport.connect() does not do what you want. That sets the transport to 'connected udp' which is not what we want. See: http://twistedmatrix.com/documents/11.0.0/core/howto/udp.html#auto2

Instead, you should pass host, port to the transport gateway and use them in transport.write(data, (host, port)).

Revision history for this message
Diogo Baeder (diogobaeder) wrote :

Well, as far as I understood it, it doesn't create a "real connection",
only makes sure that all datagrams are delivered (when they are) to the
same host. As what we want is to send datagrams to the same host (as we
even resolve the host before sending the packets), this makes perfect
sense - and makes it cleaner to send messages, as we only have to pass
the data as an argument, instead of providing (host, port) for every
write() call.

Here, take a look at the paragraphs of the documentation that make it clear:
"A connected UDP socket is slightly different from a standard one - it
can only send and receive datagrams to/from a single address, but this
does not in any way imply a connection"
"Unlike a regular UDP protocol, we do not need to specify where to send
datagrams and are not told where they came from since they can only come
from the address to which the socket is 'connected'."

Em 27-12-2012 13:59, Sidnei da Silva escreveu:
> I'm fairly confident that calling transport.connect() does not do what you want. That sets the transport to 'connected udp' which is not what we want. See: http://twistedmatrix.com/documents/11.0.0/core/howto/udp.html#auto2
>
> Instead, you should pass host, port to the transport gateway and use them in transport.write(data, (host, port)).
>
>

Revision history for this message
Sidnei da Silva (sidnei) wrote :

Yes, but we tried that before and it didn't work as intended. We don't care about making sure that the datagrams are delivered so it's only adding overhead to it.

Revision history for this message
Diogo Baeder (diogobaeder) wrote :

Ah, ok, good to know that, I'll avoid trying to fix the host in the
future, then. Now I changed it to use explicit (host, port) on every call.

Diff updated, can you take a look again?

Em 27-12-2012 15:24, Sidnei da Silva escreveu:
> Yes, but we tried that before and it didn't work as intended. We don't care about making sure that the datagrams are delivered so it's only adding overhead to it.

Revision history for this message
Sidnei da Silva (sidnei) :
review: Approve
Revision history for this message
Ubuntu One Server Tarmac Bot (ubuntuone-server-tarmac) wrote :
Download full text (21.4 KiB)

The attempt to merge lp:~diogobaeder/txstatsd/fix-host-lookup into lp:txstatsd failed. Below is the output from the failed tests.

txstatsd.tests.metrics.test_distinct
  TestDistinct
    test_all ... [OK]
  TestDistinctMetricReporter
    test_reports ... [OK]
  TestHash
    test_chi_square ... [SKIPPED]
    test_hash_chars ... [OK]
  TestPlugin
    test_factory ... [OK]
  TestZeros
    test_zeros ... [OK]
txstatsd.tests.metrics.test_histogrammetric
  TestHistogramReporterMetric
    test_histogram_histogram ... [OK]
    test_histogram_of_numbers_1_through_10000 ... [OK]
    test_histogram_with_zero_recorded_values ... [OK]
txstatsd.tests.metrics.test_metermetric
  TestDeriveMetricReporter
    test_fastpoll ... [OK]
    test_interface ... [OK]
txstatsd.tests.metrics.test_sli
  TestConditions
    test_above ... [OK]
    test_above_linear ... [OK]
    test_below ... [OK]
    test_below_linear ... [OK]
    test_between ... [OK]
  TestFactory
    test_configure ... [OK]
    test_configure_linear ... [OK]
  TestMetric
    test_clear ... [OK]
    test_count_all ... [OK]
    test_count_error ... [OK]
    test_count_threshold ... [OK]
    test_reports ... [OK]
  TestMetricLinear
    test_count_threshold ... [OK]
  TestParsing
    test_parse ... [OK]
txstatsd.tests.metrics.test_timermetric
  TestBlankTimerMetric
    test_count ... [OK]
    test_max ... [OK]
    test_mean ... [OK]
    test_min ... [OK]
    test_no_values ... [OK]
    test_percentiles ... [OK]
    test_rate ... [OK]
    test_std_dev ... [OK]...

117. By Diogo Baeder

Fixing issues with connection stablished after the host resolves and the gateway is instantiated

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txstatsd/protocol.py'
2--- txstatsd/protocol.py 2012-12-18 19:53:43 +0000
3+++ txstatsd/protocol.py 2012-12-27 20:51:21 +0000
4@@ -26,6 +26,9 @@
5 from twisted.python import log
6
7
8+__all__ = ('StatsDClientProtocol', 'TwistedStatsDClient')
9+
10+
11 class StatsDClientProtocol(DatagramProtocol):
12 """A Twisted-based implementation of the StatsD client protocol.
13
14@@ -44,18 +47,82 @@
15 self.client.disconnect()
16
17
18+class DataQueue(object):
19+ """Manages the queue of sent data, so that it can be really sent later when
20+ the host is resolved."""
21+
22+ def __init__(self, limit=1000):
23+ self._limit = limit
24+ self._queue = []
25+
26+ def write(self, data, callback):
27+ """Queue the given data, so that it's sent later.
28+
29+ @param data: The data to be queued.
30+ @param callback: The callback to use when the data is flushed.
31+ """
32+ if len(self._queue) < self._limit:
33+ self._queue.append((data, callback))
34+
35+ def flush(self):
36+ """Flush the queue, returning its items."""
37+ items = self._queue
38+ self._queue = []
39+ return items
40+
41+
42+class TransportGateway(object):
43+ """Responsible for sending datagrams to the actual transport."""
44+
45+ def __init__(self, transport, reactor, host, port):
46+ """
47+ @param transport: DatagramProtocol().transport .
48+ @param reactor: The Twisted reactor in use.
49+ """
50+ self.transport = transport
51+ self.reactor = reactor
52+ self.host = host
53+ self.port = port
54+
55+ def write(self, data, callback):
56+ """Send the metric to the StatsD server.
57+
58+ @param data: The data to be sent.
59+ @param callback: The callback to which the result should be sent.
60+ B{Note}: The C{callback} will be called in the C{reactor}
61+ thread, and not in the thread of the original caller.
62+ """
63+ self.reactor.callFromThread(self._write, data, callback)
64+
65+ def _write(self, data, callback):
66+ """Send the metric to the StatsD server.
67+
68+ @param data: The data to be sent.
69+ @param callback: The callback to which the result should be sent.
70+ @raise twisted.internet.error.MessageLengthError: If the size of data
71+ is too large.
72+ """
73+ try:
74+ bytes_sent = self.transport.write(data, (self.host, self.port))
75+ if callback is not None:
76+ callback(bytes_sent)
77+ except (OverflowError, TypeError, socket.error, socket.gaierror):
78+ if callback is not None:
79+ callback(None)
80+
81+
82 class TwistedStatsDClient(object):
83
84 def __init__(self, host, port, connect_callback=None,
85- disconnect_callback=None, resolver_errback=None):
86- """
87- Build a connection that reports to the endpoint (on C{host} and
88- C{port}) using UDP.
89+ disconnect_callback=None):
90+ """Avoid using this initializer directly; Instead, use the create()
91+ static method, otherwise the messages won't be really delivered.
92+
93+ If you still need to use this directly and want to resolve the host
94+ yourself, remember to call host_resolved() as soon as it's resolved.
95
96 @param host: The StatsD server host.
97 @param port: The StatsD server port.
98- @param resolver_errback: Deprecated parameter, unused.
99- Please avoid using it.
100 @param connect_callback: The callback to invoke on connection.
101 @param disconnect_callback: The callback to invoke on disconnection.
102 """
103@@ -64,12 +131,13 @@
104 self.reactor = reactor
105
106 self.host = host
107-
108 self.port = port
109 self.connect_callback = connect_callback
110 self.disconnect_callback = disconnect_callback
111+ self.data_queue = DataQueue()
112
113 self.transport = None
114+ self.transport_gateway = None
115
116 def __str__(self):
117 return "%s:%d" % (self.host, self.port)
118@@ -77,7 +145,9 @@
119 @staticmethod
120 def create(host, port, connect_callback=None, disconnect_callback=None,
121 resolver_errback=None):
122- """Resolve the host and return a Deferred for the instance.
123+ """Create an instance that resolves the host to an IP asynchronously.
124+
125+ Will queue all messages while the host is not yet resolved.
126
127 Build a connection that reports to the endpoint (on C{host} and
128 C{port}) using UDP.
129@@ -90,28 +160,26 @@
130 @param disconnect_callback: The callback to invoke on disconnection."""
131 from twisted.internet import reactor
132
133- deferred_instance = Deferred()
134-
135- def create_instance(ip):
136- instance = TwistedStatsDClient(
137- host=ip, port=port, connect_callback=connect_callback,
138- disconnect_callback=disconnect_callback)
139- deferred_instance.callback(instance)
140+ instance = TwistedStatsDClient(
141+ host=host, port=port, connect_callback=connect_callback,
142+ disconnect_callback=disconnect_callback)
143
144 if resolver_errback is None:
145 resolver_errback = log.err
146
147- resolver = reactor.resolve(host)
148- resolver.addCallbacks(create_instance, resolver_errback)
149+ instance.resolve_later = reactor.resolve(host)
150+ instance.resolve_later.addCallbacks(instance.host_resolved,
151+ resolver_errback)
152
153- return deferred_instance
154+ return instance
155
156 def connect(self, transport=None):
157 """Connect to the StatsD server."""
158 if transport is not None:
159 self.transport = transport
160- if self.connect_callback is not None:
161- self.connect_callback()
162+ if self.transport_gateway is not None:
163+ self.transport_gateway.transport = transport
164+ self._flush_items()
165
166 def disconnect(self):
167 """Disconnect from the StatsD server."""
168@@ -127,21 +195,24 @@
169 B{Note}: The C{callback} will be called in the C{reactor}
170 thread, and not in the thread of the original caller.
171 """
172- self.reactor.callFromThread(self._write, data, callback)
173-
174- def _write(self, data, callback):
175- """Send the metric to the StatsD server.
176-
177- @param data: The data to be sent.
178- @param callback: The callback to which the result should be sent.
179- @raise twisted.internet.error.MessageLengthError: If the size of data
180- is too large.
181- """
182- if self.host is not None and self.transport is not None:
183- try:
184- bytes_sent = self.transport.write(data, (self.host, self.port))
185- if callback is not None:
186- callback(bytes_sent)
187- except (OverflowError, TypeError, socket.error, socket.gaierror):
188- if callback is not None:
189- callback(None)
190+ if self.transport_gateway is not None and self.transport is not None:
191+ return self.transport_gateway.write(data, callback)
192+ return self.data_queue.write(data, callback)
193+
194+ def host_resolved(self, ip):
195+ """Callback used when the host is resolved to an IP address."""
196+ self.host = ip
197+ self.transport_gateway = TransportGateway(self.transport, self.reactor,
198+ self.host, self.port)
199+
200+ if self.connect_callback is not None:
201+ self.connect_callback()
202+
203+ self._flush_items()
204+
205+ def _flush_items(self):
206+ """Flush all items (data, callback) from the DataQueue to the
207+ TransportGateway."""
208+ for item in self.data_queue.flush():
209+ data, callback = item
210+ self.write(data, callback)
211
212=== modified file 'txstatsd/server/router.py'
213--- txstatsd/server/router.py 2012-06-28 17:29:26 +0000
214+++ txstatsd/server/router.py 2012-12-27 20:51:21 +0000
215@@ -264,7 +264,7 @@
216 d = defer.Deferred()
217 self.ready.addCallback(lambda _: d)
218
219- client = TwistedStatsDClient(
220+ client = TwistedStatsDClient.create(
221 host, port, connect_callback=lambda: d.callback(None))
222 protocol = StatsDClientProtocol(client)
223
224
225=== modified file 'txstatsd/tests/test_client.py'
226--- txstatsd/tests/test_client.py 2012-12-18 19:53:43 +0000
227+++ txstatsd/tests/test_client.py 2012-12-27 20:51:21 +0000
228@@ -21,6 +21,8 @@
229 """Tests for the various client classes."""
230
231 import sys
232+
233+from mocker import Mocker, expect, ANY
234 from twisted.internet import reactor
235 from twisted.internet.defer import inlineCallbacks, Deferred
236 from twisted.python import log
237@@ -32,7 +34,9 @@
238 from txstatsd.metrics.metric import Metric
239 from txstatsd.client import (
240 StatsDClientProtocol, TwistedStatsDClient, UdpStatsDClient,
241- ConsistentHashingClient)
242+ ConsistentHashingClient
243+)
244+from txstatsd.protocol import DataQueue, TransportGateway
245
246
247 class FakeClient(object):
248@@ -63,16 +67,21 @@
249 super(TestClient, self).setUp()
250 self.client = None
251 self.exception = None
252+ self.mocker = Mocker()
253
254 def tearDown(self):
255 if self.client:
256 self.client.transport.stopListening()
257 super(TestClient, self).tearDown()
258
259+ def build_protocol(self):
260+ protocol = StatsDClientProtocol(self.client)
261+ reactor.listenUDP(0, protocol)
262+
263 def test_twistedstatsd_write(self):
264 self.client = TwistedStatsDClient('127.0.0.1', 8000)
265- protocol = StatsDClientProtocol(self.client)
266- reactor.listenUDP(0, protocol)
267+ self.build_protocol()
268+ self.client.host_resolved('127.0.0.1')
269
270 def ensure_bytes_sent(bytes_sent):
271 self.assertEqual(bytes_sent, len('message'))
272@@ -87,10 +96,10 @@
273
274 @inlineCallbacks
275 def test_twistedstatsd_write_with_host_resolved(self):
276- self.client = yield TwistedStatsDClient.create(
277+ self.client = TwistedStatsDClient.create(
278 'localhost', 8000)
279- protocol = StatsDClientProtocol(self.client)
280- reactor.listenUDP(0, protocol)
281+ self.build_protocol()
282+ yield self.client.resolve_later
283
284 def ensure_bytes_sent(bytes_sent):
285 self.assertEqual(bytes_sent, len('message'))
286@@ -106,36 +115,38 @@
287
288 @inlineCallbacks
289 def test_twistedstatsd_with_malformed_address_and_errback(self):
290- def ensure_exception_raised(exception):
291- self.assertTrue(exception.startswith("DNS lookup failed"))
292+ exceptions_captured = []
293
294 def capture_exception_raised(failure):
295 exception = failure.getErrorMessage()
296- self.deferred_instance.callback(exception)
297+ self.assertTrue(exception.startswith("DNS lookup failed"))
298+ exceptions_captured.append(exception)
299
300- self.deferred_instance = TwistedStatsDClient.create(
301+ self.client = TwistedStatsDClient.create(
302 '256.0.0.0', 1,
303 resolver_errback=capture_exception_raised)
304+ self.build_protocol()
305+ yield self.client.resolve_later
306
307- self.deferred_instance.addCallback(ensure_exception_raised)
308- yield self.deferred_instance
309+ self.assertEqual(len(exceptions_captured), 1)
310
311 @inlineCallbacks
312 def test_twistedstatsd_with_malformed_address_and_no_errback(self):
313- def ensure_exception_raised(exception):
314- self.assertTrue(exception.startswith("DNS lookup failed"))
315+ exceptions_captured = []
316
317 def capture_exception_raised(failure):
318 exception = failure.getErrorMessage()
319- self.deferred_instance.callback(exception)
320+ self.assertTrue(exception.startswith("DNS lookup failed"))
321+ exceptions_captured.append(exception)
322
323 self.patch(log, "err", capture_exception_raised)
324
325- self.deferred_instance = TwistedStatsDClient.create(
326+ self.client = TwistedStatsDClient.create(
327 '256.0.0.0', 1)
328+ self.build_protocol()
329+ yield self.client.resolve_later
330
331- self.deferred_instance.addCallback(ensure_exception_raised)
332- yield self.deferred_instance
333+ self.assertEqual(len(exceptions_captured), 1)
334
335 def test_udpstatsd_wellformed_address(self):
336 client = UdpStatsDClient('localhost', 8000)
337@@ -181,6 +192,187 @@
338 if 'twisted' in mod:
339 self.assertTrue(sys.modules[mod] is None)
340
341+ def test_starts_with_data_queue(self):
342+ """The client starts with a DataQueue."""
343+ self.client = TwistedStatsDClient('127.0.0.1', 8000)
344+ self.build_protocol()
345+
346+ self.assertIsInstance(self.client.data_queue, DataQueue)
347+
348+ def test_starts_without_transport_gateway(self):
349+ """The client starts without a TransportGateway."""
350+ self.client = TwistedStatsDClient('127.0.0.1', 8000)
351+ self.build_protocol()
352+
353+ self.assertTrue(self.client.transport_gateway is None)
354+
355+ def test_passes_transport_to_gateway(self):
356+ """The client passes the transport to the gateway as soon as the client
357+ is connected."""
358+ self.client = TwistedStatsDClient('127.0.0.1', 8000)
359+ self.build_protocol()
360+ self.client.host_resolved('127.0.0.1')
361+
362+ self.assertEqual(self.client.transport_gateway.transport,
363+ self.client.transport)
364+
365+ def test_passes_reactor_to_gateway(self):
366+ """The client passes the reactor to the gateway as soon as the client
367+ is connected."""
368+ self.client = TwistedStatsDClient('127.0.0.1', 8000)
369+ self.build_protocol()
370+ self.client.host_resolved('127.0.0.1')
371+
372+ self.assertEqual(self.client.transport_gateway.reactor,
373+ self.client.reactor)
374+
375+ def test_sets_ip_when_host_resolves(self):
376+ """As soon as the host is resolved, set the IP as the host."""
377+ self.client = TwistedStatsDClient('localhost', 8000)
378+ self.build_protocol()
379+ self.assertEqual(self.client.host, 'localhost')
380+
381+ self.client.host_resolved('127.0.0.1')
382+ self.assertEqual(self.client.host, '127.0.0.1')
383+
384+ def test_sets_transport_gateway_when_host_resolves(self):
385+ """As soon as the host is resolved, set the transport gateway."""
386+ self.client = TwistedStatsDClient('localhost', 8000)
387+ self.build_protocol()
388+
389+ self.client.transport_gateway = None
390+
391+ self.client.host_resolved('127.0.0.1')
392+ self.assertIsInstance(self.client.transport_gateway, TransportGateway)
393+
394+ def test_calls_connect_callback_when_host_resolves(self):
395+ """As soon as the host is resolved, call back the connect_callback."""
396+ self.client = TwistedStatsDClient('localhost', 8000)
397+ self.build_protocol()
398+
399+ self.client.connect_callback = self.mocker.mock()
400+ expect(self.client.connect_callback())
401+
402+ with self.mocker:
403+ self.client.host_resolved('127.0.0.1')
404+
405+ def test_sends_messages_to_gateway_after_host_resolves(self):
406+ """After the host is resolved, send messages to the
407+ TransportGateway."""
408+ self.client = TwistedStatsDClient('localhost', 8000)
409+ self.build_protocol()
410+ self.client.host_resolved('127.0.0.1')
411+
412+ message = 'some data'
413+ bytes_sent = len(message)
414+ self.client.data_queue = self.mocker.mock(spec=DataQueue) # not called
415+ self.client.transport_gateway = self.mocker.mock(spec=TransportGateway)
416+ callback = self.mocker.mock()
417+ expect(self.client.transport_gateway.write(message, callback)).result(
418+ bytes_sent)
419+
420+ with self.mocker:
421+ self.assertEqual(self.client.write(message, callback), bytes_sent)
422+
423+ def test_sends_messages_to_queue_before_host_resolves(self):
424+ """Before the host is resolved, send messages to the DataQueue."""
425+ self.client = TwistedStatsDClient('localhost', 8000)
426+ self.build_protocol()
427+
428+ message = 'some data'
429+ bytes_sent = len(message)
430+ self.client.data_queue = self.mocker.mock(spec=DataQueue)
431+ callback = self.mocker.mock()
432+ expect(self.client.data_queue.write(message, callback)).result(None)
433+
434+ with self.mocker:
435+ self.assertEqual(self.client.write(message, callback), None)
436+
437+ def test_flushes_queued_messages_to_the_gateway_when_host_resolves(self):
438+ """As soon as the host is resolved, flush all messages to the
439+ TransportGateway."""
440+ self.client = TwistedStatsDClient('localhost', 8000)
441+ self.build_protocol()
442+
443+ self.client.data_queue.write('data 1', 'callback 1')
444+ self.client.data_queue.write('data 2', 'callback 2')
445+ self.client.data_queue.write('data 3', 'callback 3')
446+
447+ mock_gateway_write = self.mocker.mock()
448+ self.patch(TransportGateway, 'write', mock_gateway_write)
449+ expect(mock_gateway_write('data 1', 'callback 1'))
450+ expect(mock_gateway_write('data 2', 'callback 2'))
451+ expect(mock_gateway_write('data 3', 'callback 3'))
452+
453+ with self.mocker:
454+ self.client.host_resolved('127.0.0.1')
455+
456+ def test_sets_client_transport_when_connected(self):
457+ """Set the transport as an attribute of the client."""
458+ self.client = TwistedStatsDClient('localhost', 8000)
459+ transport = DummyTransport()
460+ self.client.connect(transport)
461+
462+ self.assertEqual(self.client.transport, transport)
463+
464+ def test_sets_gateway_transport_when_connected(self):
465+ """Set the transport as an attribute of the TransportGateway."""
466+ self.client = TwistedStatsDClient('localhost', 8000)
467+ self.client.host_resolved('127.0.0.1')
468+ transport = DummyTransport()
469+ self.client.connect(transport)
470+
471+ self.assertEqual(self.client.transport_gateway.transport, transport)
472+
473+
474+class DataQueueTest(TestCase):
475+ """Tests for the DataQueue class."""
476+
477+ def setUp(self):
478+ super(DataQueueTest, self).setUp()
479+ self.queue = DataQueue(limit=2)
480+
481+ def test_queues_messages_and_callbacks(self):
482+ """All messages are queued with their respective callbacks."""
483+ self.queue.write(data=1, callback='1')
484+ self.queue.write(data=2, callback='2')
485+
486+ self.assertEqual(self.queue.flush(), [
487+ (1, '1'),
488+ (2, '2'),
489+ ])
490+
491+ def test_flushes_the_queue(self):
492+ """All messages are queued with their respective callbacks."""
493+ self.queue.write(data=1, callback='1')
494+ self.queue.write(data=2, callback='2')
495+
496+ self.queue.flush()
497+ self.assertEqual(self.queue.flush(), [])
498+
499+ def test_limits_number_of_messages(self):
500+ """Cannot save more messages than the defined limit."""
501+ self.queue.write('saved data', 'saved callback')
502+ self.queue.write('saved data', 'saved callback')
503+ self.queue.write('discarded data', 'discarded message')
504+
505+ self.assertEqual(len(self.queue.flush()), 2)
506+
507+ def test_discards_messages_after_limit(self):
508+ """Cannot save more messages than the defined limit."""
509+ self.queue.write('saved data', 'saved callback')
510+ self.queue.write('saved data', 'saved callback')
511+ self.queue.write('discarded data', 'discarded message')
512+
513+ self.assertEqual(set(self.queue.flush()),
514+ set([('saved data', 'saved callback')]))
515+
516+ def test_makes_limit_optional(self):
517+ """Use the default limit when not given."""
518+ queue = DataQueue()
519+
520+ self.assertTrue(queue._limit > 0)
521+
522
523 class TestConsistentHashingClient(TestCase):
524
525@@ -251,3 +443,8 @@
526 client.disconnect()
527 self.assertTrue(clients[0].disconnect_called)
528 self.assertTrue(clients[1].disconnect_called)
529+
530+
531+class DummyTransport(object):
532+ def stopListening(self):
533+ pass

Subscribers

People subscribed via source and target branches