Merge lp:~diogobaeder/txstatsd/fix-host-lookup into lp:txstatsd
- fix-host-lookup
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Diogo Baeder |
Approved revision: | 117 |
Merged at revision: | 103 |
Proposed branch: | lp:~diogobaeder/txstatsd/fix-host-lookup |
Merge into: | lp:txstatsd |
Diff against target: |
533 lines (+325/-57) 3 files modified
txstatsd/protocol.py (+109/-38) txstatsd/server/router.py (+1/-1) txstatsd/tests/test_client.py (+215/-18) |
To merge this branch: | bzr merge lp:~diogobaeder/txstatsd/fix-host-lookup |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Sidnei da Silva | Approve | ||
Review via email: mp+141177@code.launchpad.net |
Commit message
Sending messages to a 'queue' (a list, actually, to make it simpler) while the host is not resolved, and flushing them as soon as the host gets resolved. I didn't use Queue.Queue here because it doesn't seem that we'll have to deal with concurrency issues with this queue, since each client instance has its own queue, but if we feel the need, this can be changed later.
Description of the change
Sending messages to a 'queue' (a list, actually, to make it simpler) while the host is not resolved, and flushing them as soon as the host gets resolved. I didn't use Queue.Queue here because it doesn't seem that we'll have to deal with concurrency issues with this queue, since each client instance has its own queue, but if we feel the need, this can be changed later.
Diogo Baeder (diogobaeder) wrote : | # |
Fixed. Can you check again?
Em 26-12-2012 17:21, Sidnei da Silva escreveu:
> Review: Needs Fixing
>
> A few comments:
>
> 1. I dislike class attributes with parameters that cannot be set through the constructor. How are you suppose to change LIMIT? By subclassing? By monkey patching?
>
> 2. Along the same lines, because you can't easily change LIMIT, the tests iterate calling write() up to 'LIMIT' times, which is useless. If LIMIT was configurable, you could set it to '1' in the tests and make them more efficient.
Sidnei da Silva (sidnei) wrote : | # |
You still haven't changed the test to use limit=1. :)
Diogo Baeder (diogobaeder) wrote : | # |
I changed it to use limit=2 instead of limit=1 because it's more
expressive to me that the limit is actually used/respected when queueing
the calls (if I use only 1, it's not clear that it respects the number
given as a limit or if it closes right after the first message sent to
the queue). This shouldn't have any noticeable impact on the test duration.
Would this be OK for you?
Em 26-12-2012 18:08, Sidnei da Silva escreveu:
> You still haven't changed the test to use limit=1. :)
Diogo Baeder (diogobaeder) wrote : | # |
Changed again to use limit=2 in the default instance for tests, instead
of in specific tests, as requested.
Em 26-12-2012 18:39, Diogo Baeder escreveu:
> I changed it to use limit=2 instead of limit=1 because it's more
> expressive to me that the limit is actually used/respected when queueing
> the calls (if I use only 1, it's not clear that it respects the number
> given as a limit or if it closes right after the first message sent to
> the queue). This shouldn't have any noticeable impact on the test duration.
>
> Would this be OK for you?
>
> Em 26-12-2012 18:08, Sidnei da Silva escreveu:
>> You still haven't changed the test to use limit=1. :)
>
Sidnei da Silva (sidnei) wrote : | # |
I'm fairly confident that calling transport.connect() does not do what you want. That sets the transport to 'connected udp' which is not what we want. See: http://
Instead, you should pass host, port to the transport gateway and use them in transport.
Diogo Baeder (diogobaeder) wrote : | # |
Well, as far as I understood it, it doesn't create a "real connection",
only makes sure that all datagrams are delivered (when they are) to the
same host. As what we want is to send datagrams to the same host (as we
even resolve the host before sending the packets), this makes perfect
sense - and makes it cleaner to send messages, as we only have to pass
the data as an argument, instead of providing (host, port) for every
write() call.
Here, take a look at the paragraphs of the documentation that make it clear:
"A connected UDP socket is slightly different from a standard one - it
can only send and receive datagrams to/from a single address, but this
does not in any way imply a connection"
"Unlike a regular UDP protocol, we do not need to specify where to send
datagrams and are not told where they came from since they can only come
from the address to which the socket is 'connected'."
Em 27-12-2012 13:59, Sidnei da Silva escreveu:
> I'm fairly confident that calling transport.connect() does not do what you want. That sets the transport to 'connected udp' which is not what we want. See: http://
>
> Instead, you should pass host, port to the transport gateway and use them in transport.
>
>
Sidnei da Silva (sidnei) wrote : | # |
Yes, but we tried that before and it didn't work as intended. We don't care about making sure that the datagrams are delivered so it's only adding overhead to it.
Diogo Baeder (diogobaeder) wrote : | # |
Ah, ok, good to know that, I'll avoid trying to fix the host in the
future, then. Now I changed it to use explicit (host, port) on every call.
Diff updated, can you take a look again?
Em 27-12-2012 15:24, Sidnei da Silva escreveu:
> Yes, but we tried that before and it didn't work as intended. We don't care about making sure that the datagrams are delivered so it's only adding overhead to it.
Sidnei da Silva (sidnei) : | # |
Ubuntu One Server Tarmac Bot (ubuntuone-server-tarmac) wrote : | # |
The attempt to merge lp:~diogobaeder/txstatsd/fix-host-lookup into lp:txstatsd failed. Below is the output from the failed tests.
txstatsd.
TestDistinct
test_all ... [OK]
TestDistinctM
test_reports ... [OK]
TestHash
test_chi_square ... [SKIPPED]
test_hash_chars ... [OK]
TestPlugin
test_factory ... [OK]
TestZeros
test_zeros ... [OK]
txstatsd.
TestHistogram
test_
test_
test_
txstatsd.
TestDeriveMet
test_fastpoll ... [OK]
test_interface ... [OK]
txstatsd.
TestConditions
test_above ... [OK]
test_
test_below ... [OK]
test_
test_between ... [OK]
TestFactory
test_configure ... [OK]
test_
TestMetric
test_clear ... [OK]
test_count_all ... [OK]
test_
test_
test_reports ... [OK]
TestMetricLinear
test_
TestParsing
test_parse ... [OK]
txstatsd.
TestBlankTime
test_count ... [OK]
test_max ... [OK]
test_mean ... [OK]
test_min ... [OK]
test_no_values ... [OK]
test_
test_rate ... [OK]
test_std_dev ... [OK]...
- 117. By Diogo Baeder
-
Fixing issues with connection stablished after the host resolves and the gateway is instantiated
Preview Diff
1 | === modified file 'txstatsd/protocol.py' |
2 | --- txstatsd/protocol.py 2012-12-18 19:53:43 +0000 |
3 | +++ txstatsd/protocol.py 2012-12-27 20:51:21 +0000 |
4 | @@ -26,6 +26,9 @@ |
5 | from twisted.python import log |
6 | |
7 | |
8 | +__all__ = ('StatsDClientProtocol', 'TwistedStatsDClient') |
9 | + |
10 | + |
11 | class StatsDClientProtocol(DatagramProtocol): |
12 | """A Twisted-based implementation of the StatsD client protocol. |
13 | |
14 | @@ -44,18 +47,82 @@ |
15 | self.client.disconnect() |
16 | |
17 | |
18 | +class DataQueue(object): |
19 | + """Manages the queue of sent data, so that it can be really sent later when |
20 | + the host is resolved.""" |
21 | + |
22 | + def __init__(self, limit=1000): |
23 | + self._limit = limit |
24 | + self._queue = [] |
25 | + |
26 | + def write(self, data, callback): |
27 | + """Queue the given data, so that it's sent later. |
28 | + |
29 | + @param data: The data to be queued. |
30 | + @param callback: The callback to use when the data is flushed. |
31 | + """ |
32 | + if len(self._queue) < self._limit: |
33 | + self._queue.append((data, callback)) |
34 | + |
35 | + def flush(self): |
36 | + """Flush the queue, returning its items.""" |
37 | + items = self._queue |
38 | + self._queue = [] |
39 | + return items |
40 | + |
41 | + |
42 | +class TransportGateway(object): |
43 | + """Responsible for sending datagrams to the actual transport.""" |
44 | + |
45 | + def __init__(self, transport, reactor, host, port): |
46 | + """ |
47 | + @param transport: DatagramProtocol().transport . |
48 | + @param reactor: The Twisted reactor in use. |
49 | + """ |
50 | + self.transport = transport |
51 | + self.reactor = reactor |
52 | + self.host = host |
53 | + self.port = port |
54 | + |
55 | + def write(self, data, callback): |
56 | + """Send the metric to the StatsD server. |
57 | + |
58 | + @param data: The data to be sent. |
59 | + @param callback: The callback to which the result should be sent. |
60 | + B{Note}: The C{callback} will be called in the C{reactor} |
61 | + thread, and not in the thread of the original caller. |
62 | + """ |
63 | + self.reactor.callFromThread(self._write, data, callback) |
64 | + |
65 | + def _write(self, data, callback): |
66 | + """Send the metric to the StatsD server. |
67 | + |
68 | + @param data: The data to be sent. |
69 | + @param callback: The callback to which the result should be sent. |
70 | + @raise twisted.internet.error.MessageLengthError: If the size of data |
71 | + is too large. |
72 | + """ |
73 | + try: |
74 | + bytes_sent = self.transport.write(data, (self.host, self.port)) |
75 | + if callback is not None: |
76 | + callback(bytes_sent) |
77 | + except (OverflowError, TypeError, socket.error, socket.gaierror): |
78 | + if callback is not None: |
79 | + callback(None) |
80 | + |
81 | + |
82 | class TwistedStatsDClient(object): |
83 | |
84 | def __init__(self, host, port, connect_callback=None, |
85 | - disconnect_callback=None, resolver_errback=None): |
86 | - """ |
87 | - Build a connection that reports to the endpoint (on C{host} and |
88 | - C{port}) using UDP. |
89 | + disconnect_callback=None): |
90 | + """Avoid using this initializer directly; Instead, use the create() |
91 | + static method, otherwise the messages won't be really delivered. |
92 | + |
93 | + If you still need to use this directly and want to resolve the host |
94 | + yourself, remember to call host_resolved() as soon as it's resolved. |
95 | |
96 | @param host: The StatsD server host. |
97 | @param port: The StatsD server port. |
98 | - @param resolver_errback: Deprecated parameter, unused. |
99 | - Please avoid using it. |
100 | @param connect_callback: The callback to invoke on connection. |
101 | @param disconnect_callback: The callback to invoke on disconnection. |
102 | """ |
103 | @@ -64,12 +131,13 @@ |
104 | self.reactor = reactor |
105 | |
106 | self.host = host |
107 | - |
108 | self.port = port |
109 | self.connect_callback = connect_callback |
110 | self.disconnect_callback = disconnect_callback |
111 | + self.data_queue = DataQueue() |
112 | |
113 | self.transport = None |
114 | + self.transport_gateway = None |
115 | |
116 | def __str__(self): |
117 | return "%s:%d" % (self.host, self.port) |
118 | @@ -77,7 +145,9 @@ |
119 | @staticmethod |
120 | def create(host, port, connect_callback=None, disconnect_callback=None, |
121 | resolver_errback=None): |
122 | - """Resolve the host and return a Deferred for the instance. |
123 | + """Create an instance that resolves the host to an IP asynchronously. |
124 | + |
125 | + Will queue all messages while the host is not yet resolved. |
126 | |
127 | Build a connection that reports to the endpoint (on C{host} and |
128 | C{port}) using UDP. |
129 | @@ -90,28 +160,26 @@ |
130 | @param disconnect_callback: The callback to invoke on disconnection.""" |
131 | from twisted.internet import reactor |
132 | |
133 | - deferred_instance = Deferred() |
134 | - |
135 | - def create_instance(ip): |
136 | - instance = TwistedStatsDClient( |
137 | - host=ip, port=port, connect_callback=connect_callback, |
138 | - disconnect_callback=disconnect_callback) |
139 | - deferred_instance.callback(instance) |
140 | + instance = TwistedStatsDClient( |
141 | + host=host, port=port, connect_callback=connect_callback, |
142 | + disconnect_callback=disconnect_callback) |
143 | |
144 | if resolver_errback is None: |
145 | resolver_errback = log.err |
146 | |
147 | - resolver = reactor.resolve(host) |
148 | - resolver.addCallbacks(create_instance, resolver_errback) |
149 | + instance.resolve_later = reactor.resolve(host) |
150 | + instance.resolve_later.addCallbacks(instance.host_resolved, |
151 | + resolver_errback) |
152 | |
153 | - return deferred_instance |
154 | + return instance |
155 | |
156 | def connect(self, transport=None): |
157 | """Connect to the StatsD server.""" |
158 | if transport is not None: |
159 | self.transport = transport |
160 | - if self.connect_callback is not None: |
161 | - self.connect_callback() |
162 | + if self.transport_gateway is not None: |
163 | + self.transport_gateway.transport = transport |
164 | + self._flush_items() |
165 | |
166 | def disconnect(self): |
167 | """Disconnect from the StatsD server.""" |
168 | @@ -127,21 +195,24 @@ |
169 | B{Note}: The C{callback} will be called in the C{reactor} |
170 | thread, and not in the thread of the original caller. |
171 | """ |
172 | - self.reactor.callFromThread(self._write, data, callback) |
173 | - |
174 | - def _write(self, data, callback): |
175 | - """Send the metric to the StatsD server. |
176 | - |
177 | - @param data: The data to be sent. |
178 | - @param callback: The callback to which the result should be sent. |
179 | - @raise twisted.internet.error.MessageLengthError: If the size of data |
180 | - is too large. |
181 | - """ |
182 | - if self.host is not None and self.transport is not None: |
183 | - try: |
184 | - bytes_sent = self.transport.write(data, (self.host, self.port)) |
185 | - if callback is not None: |
186 | - callback(bytes_sent) |
187 | - except (OverflowError, TypeError, socket.error, socket.gaierror): |
188 | - if callback is not None: |
189 | - callback(None) |
190 | + if self.transport_gateway is not None and self.transport is not None: |
191 | + return self.transport_gateway.write(data, callback) |
192 | + return self.data_queue.write(data, callback) |
193 | + |
194 | + def host_resolved(self, ip): |
195 | + """Callback used when the host is resolved to an IP address.""" |
196 | + self.host = ip |
197 | + self.transport_gateway = TransportGateway(self.transport, self.reactor, |
198 | + self.host, self.port) |
199 | + |
200 | + if self.connect_callback is not None: |
201 | + self.connect_callback() |
202 | + |
203 | + self._flush_items() |
204 | + |
205 | + def _flush_items(self): |
206 | + """Flush all items (data, callback) from the DataQueue to the |
207 | + TransportGateway.""" |
208 | + for item in self.data_queue.flush(): |
209 | + data, callback = item |
210 | + self.write(data, callback) |
211 | |
212 | === modified file 'txstatsd/server/router.py' |
213 | --- txstatsd/server/router.py 2012-06-28 17:29:26 +0000 |
214 | +++ txstatsd/server/router.py 2012-12-27 20:51:21 +0000 |
215 | @@ -264,7 +264,7 @@ |
216 | d = defer.Deferred() |
217 | self.ready.addCallback(lambda _: d) |
218 | |
219 | - client = TwistedStatsDClient( |
220 | + client = TwistedStatsDClient.create( |
221 | host, port, connect_callback=lambda: d.callback(None)) |
222 | protocol = StatsDClientProtocol(client) |
223 | |
224 | |
225 | === modified file 'txstatsd/tests/test_client.py' |
226 | --- txstatsd/tests/test_client.py 2012-12-18 19:53:43 +0000 |
227 | +++ txstatsd/tests/test_client.py 2012-12-27 20:51:21 +0000 |
228 | @@ -21,6 +21,8 @@ |
229 | """Tests for the various client classes.""" |
230 | |
231 | import sys |
232 | + |
233 | +from mocker import Mocker, expect, ANY |
234 | from twisted.internet import reactor |
235 | from twisted.internet.defer import inlineCallbacks, Deferred |
236 | from twisted.python import log |
237 | @@ -32,7 +34,9 @@ |
238 | from txstatsd.metrics.metric import Metric |
239 | from txstatsd.client import ( |
240 | StatsDClientProtocol, TwistedStatsDClient, UdpStatsDClient, |
241 | - ConsistentHashingClient) |
242 | + ConsistentHashingClient |
243 | +) |
244 | +from txstatsd.protocol import DataQueue, TransportGateway |
245 | |
246 | |
247 | class FakeClient(object): |
248 | @@ -63,16 +67,21 @@ |
249 | super(TestClient, self).setUp() |
250 | self.client = None |
251 | self.exception = None |
252 | + self.mocker = Mocker() |
253 | |
254 | def tearDown(self): |
255 | if self.client: |
256 | self.client.transport.stopListening() |
257 | super(TestClient, self).tearDown() |
258 | |
259 | + def build_protocol(self): |
260 | + protocol = StatsDClientProtocol(self.client) |
261 | + reactor.listenUDP(0, protocol) |
262 | + |
263 | def test_twistedstatsd_write(self): |
264 | self.client = TwistedStatsDClient('127.0.0.1', 8000) |
265 | - protocol = StatsDClientProtocol(self.client) |
266 | - reactor.listenUDP(0, protocol) |
267 | + self.build_protocol() |
268 | + self.client.host_resolved('127.0.0.1') |
269 | |
270 | def ensure_bytes_sent(bytes_sent): |
271 | self.assertEqual(bytes_sent, len('message')) |
272 | @@ -87,10 +96,10 @@ |
273 | |
274 | @inlineCallbacks |
275 | def test_twistedstatsd_write_with_host_resolved(self): |
276 | - self.client = yield TwistedStatsDClient.create( |
277 | + self.client = TwistedStatsDClient.create( |
278 | 'localhost', 8000) |
279 | - protocol = StatsDClientProtocol(self.client) |
280 | - reactor.listenUDP(0, protocol) |
281 | + self.build_protocol() |
282 | + yield self.client.resolve_later |
283 | |
284 | def ensure_bytes_sent(bytes_sent): |
285 | self.assertEqual(bytes_sent, len('message')) |
286 | @@ -106,36 +115,38 @@ |
287 | |
288 | @inlineCallbacks |
289 | def test_twistedstatsd_with_malformed_address_and_errback(self): |
290 | - def ensure_exception_raised(exception): |
291 | - self.assertTrue(exception.startswith("DNS lookup failed")) |
292 | + exceptions_captured = [] |
293 | |
294 | def capture_exception_raised(failure): |
295 | exception = failure.getErrorMessage() |
296 | - self.deferred_instance.callback(exception) |
297 | + self.assertTrue(exception.startswith("DNS lookup failed")) |
298 | + exceptions_captured.append(exception) |
299 | |
300 | - self.deferred_instance = TwistedStatsDClient.create( |
301 | + self.client = TwistedStatsDClient.create( |
302 | '256.0.0.0', 1, |
303 | resolver_errback=capture_exception_raised) |
304 | + self.build_protocol() |
305 | + yield self.client.resolve_later |
306 | |
307 | - self.deferred_instance.addCallback(ensure_exception_raised) |
308 | - yield self.deferred_instance |
309 | + self.assertEqual(len(exceptions_captured), 1) |
310 | |
311 | @inlineCallbacks |
312 | def test_twistedstatsd_with_malformed_address_and_no_errback(self): |
313 | - def ensure_exception_raised(exception): |
314 | - self.assertTrue(exception.startswith("DNS lookup failed")) |
315 | + exceptions_captured = [] |
316 | |
317 | def capture_exception_raised(failure): |
318 | exception = failure.getErrorMessage() |
319 | - self.deferred_instance.callback(exception) |
320 | + self.assertTrue(exception.startswith("DNS lookup failed")) |
321 | + exceptions_captured.append(exception) |
322 | |
323 | self.patch(log, "err", capture_exception_raised) |
324 | |
325 | - self.deferred_instance = TwistedStatsDClient.create( |
326 | + self.client = TwistedStatsDClient.create( |
327 | '256.0.0.0', 1) |
328 | + self.build_protocol() |
329 | + yield self.client.resolve_later |
330 | |
331 | - self.deferred_instance.addCallback(ensure_exception_raised) |
332 | - yield self.deferred_instance |
333 | + self.assertEqual(len(exceptions_captured), 1) |
334 | |
335 | def test_udpstatsd_wellformed_address(self): |
336 | client = UdpStatsDClient('localhost', 8000) |
337 | @@ -181,6 +192,187 @@ |
338 | if 'twisted' in mod: |
339 | self.assertTrue(sys.modules[mod] is None) |
340 | |
341 | + def test_starts_with_data_queue(self): |
342 | + """The client starts with a DataQueue.""" |
343 | + self.client = TwistedStatsDClient('127.0.0.1', 8000) |
344 | + self.build_protocol() |
345 | + |
346 | + self.assertIsInstance(self.client.data_queue, DataQueue) |
347 | + |
348 | + def test_starts_without_transport_gateway(self): |
349 | + """The client starts without a TransportGateway.""" |
350 | + self.client = TwistedStatsDClient('127.0.0.1', 8000) |
351 | + self.build_protocol() |
352 | + |
353 | + self.assertTrue(self.client.transport_gateway is None) |
354 | + |
355 | + def test_passes_transport_to_gateway(self): |
356 | + """The client passes the transport to the gateway as soon as the client |
357 | + is connected.""" |
358 | + self.client = TwistedStatsDClient('127.0.0.1', 8000) |
359 | + self.build_protocol() |
360 | + self.client.host_resolved('127.0.0.1') |
361 | + |
362 | + self.assertEqual(self.client.transport_gateway.transport, |
363 | + self.client.transport) |
364 | + |
365 | + def test_passes_reactor_to_gateway(self): |
366 | + """The client passes the reactor to the gateway as soon as the client |
367 | + is connected.""" |
368 | + self.client = TwistedStatsDClient('127.0.0.1', 8000) |
369 | + self.build_protocol() |
370 | + self.client.host_resolved('127.0.0.1') |
371 | + |
372 | + self.assertEqual(self.client.transport_gateway.reactor, |
373 | + self.client.reactor) |
374 | + |
375 | + def test_sets_ip_when_host_resolves(self): |
376 | + """As soon as the host is resolved, set the IP as the host.""" |
377 | + self.client = TwistedStatsDClient('localhost', 8000) |
378 | + self.build_protocol() |
379 | + self.assertEqual(self.client.host, 'localhost') |
380 | + |
381 | + self.client.host_resolved('127.0.0.1') |
382 | + self.assertEqual(self.client.host, '127.0.0.1') |
383 | + |
384 | + def test_sets_transport_gateway_when_host_resolves(self): |
385 | + """As soon as the host is resolved, set the transport gateway.""" |
386 | + self.client = TwistedStatsDClient('localhost', 8000) |
387 | + self.build_protocol() |
388 | + |
389 | + self.client.transport_gateway = None |
390 | + |
391 | + self.client.host_resolved('127.0.0.1') |
392 | + self.assertIsInstance(self.client.transport_gateway, TransportGateway) |
393 | + |
394 | + def test_calls_connect_callback_when_host_resolves(self): |
395 | + """As soon as the host is resolved, call back the connect_callback.""" |
396 | + self.client = TwistedStatsDClient('localhost', 8000) |
397 | + self.build_protocol() |
398 | + |
399 | + self.client.connect_callback = self.mocker.mock() |
400 | + expect(self.client.connect_callback()) |
401 | + |
402 | + with self.mocker: |
403 | + self.client.host_resolved('127.0.0.1') |
404 | + |
405 | + def test_sends_messages_to_gateway_after_host_resolves(self): |
406 | + """After the host is resolved, send messages to the |
407 | + TransportGateway.""" |
408 | + self.client = TwistedStatsDClient('localhost', 8000) |
409 | + self.build_protocol() |
410 | + self.client.host_resolved('127.0.0.1') |
411 | + |
412 | + message = 'some data' |
413 | + bytes_sent = len(message) |
414 | + self.client.data_queue = self.mocker.mock(spec=DataQueue) # not called |
415 | + self.client.transport_gateway = self.mocker.mock(spec=TransportGateway) |
416 | + callback = self.mocker.mock() |
417 | + expect(self.client.transport_gateway.write(message, callback)).result( |
418 | + bytes_sent) |
419 | + |
420 | + with self.mocker: |
421 | + self.assertEqual(self.client.write(message, callback), bytes_sent) |
422 | + |
423 | + def test_sends_messages_to_queue_before_host_resolves(self): |
424 | + """Before the host is resolved, send messages to the DataQueue.""" |
425 | + self.client = TwistedStatsDClient('localhost', 8000) |
426 | + self.build_protocol() |
427 | + |
428 | + message = 'some data' |
429 | + bytes_sent = len(message) |
430 | + self.client.data_queue = self.mocker.mock(spec=DataQueue) |
431 | + callback = self.mocker.mock() |
432 | + expect(self.client.data_queue.write(message, callback)).result(None) |
433 | + |
434 | + with self.mocker: |
435 | + self.assertEqual(self.client.write(message, callback), None) |
436 | + |
437 | + def test_flushes_queued_messages_to_the_gateway_when_host_resolves(self): |
438 | + """As soon as the host is resolved, flush all messages to the |
439 | + TransportGateway.""" |
440 | + self.client = TwistedStatsDClient('localhost', 8000) |
441 | + self.build_protocol() |
442 | + |
443 | + self.client.data_queue.write('data 1', 'callback 1') |
444 | + self.client.data_queue.write('data 2', 'callback 2') |
445 | + self.client.data_queue.write('data 3', 'callback 3') |
446 | + |
447 | + mock_gateway_write = self.mocker.mock() |
448 | + self.patch(TransportGateway, 'write', mock_gateway_write) |
449 | + expect(mock_gateway_write('data 1', 'callback 1')) |
450 | + expect(mock_gateway_write('data 2', 'callback 2')) |
451 | + expect(mock_gateway_write('data 3', 'callback 3')) |
452 | + |
453 | + with self.mocker: |
454 | + self.client.host_resolved('127.0.0.1') |
455 | + |
456 | + def test_sets_client_transport_when_connected(self): |
457 | + """Set the transport as an attribute of the client.""" |
458 | + self.client = TwistedStatsDClient('localhost', 8000) |
459 | + transport = DummyTransport() |
460 | + self.client.connect(transport) |
461 | + |
462 | + self.assertEqual(self.client.transport, transport) |
463 | + |
464 | + def test_sets_gateway_transport_when_connected(self): |
465 | + """Set the transport as an attribute of the TransportGateway.""" |
466 | + self.client = TwistedStatsDClient('localhost', 8000) |
467 | + self.client.host_resolved('127.0.0.1') |
468 | + transport = DummyTransport() |
469 | + self.client.connect(transport) |
470 | + |
471 | + self.assertEqual(self.client.transport_gateway.transport, transport) |
472 | + |
473 | + |
474 | +class DataQueueTest(TestCase): |
475 | + """Tests for the DataQueue class.""" |
476 | + |
477 | + def setUp(self): |
478 | + super(DataQueueTest, self).setUp() |
479 | + self.queue = DataQueue(limit=2) |
480 | + |
481 | + def test_queues_messages_and_callbacks(self): |
482 | + """All messages are queued with their respective callbacks.""" |
483 | + self.queue.write(data=1, callback='1') |
484 | + self.queue.write(data=2, callback='2') |
485 | + |
486 | + self.assertEqual(self.queue.flush(), [ |
487 | + (1, '1'), |
488 | + (2, '2'), |
489 | + ]) |
490 | + |
491 | + def test_flushes_the_queue(self): |
492 | + """All messages are queued with their respective callbacks.""" |
493 | + self.queue.write(data=1, callback='1') |
494 | + self.queue.write(data=2, callback='2') |
495 | + |
496 | + self.queue.flush() |
497 | + self.assertEqual(self.queue.flush(), []) |
498 | + |
499 | + def test_limits_number_of_messages(self): |
500 | + """Cannot save more messages than the defined limit.""" |
501 | + self.queue.write('saved data', 'saved callback') |
502 | + self.queue.write('saved data', 'saved callback') |
503 | + self.queue.write('discarded data', 'discarded message') |
504 | + |
505 | + self.assertEqual(len(self.queue.flush()), 2) |
506 | + |
507 | + def test_discards_messages_after_limit(self): |
508 | + """Cannot save more messages than the defined limit.""" |
509 | + self.queue.write('saved data', 'saved callback') |
510 | + self.queue.write('saved data', 'saved callback') |
511 | + self.queue.write('discarded data', 'discarded message') |
512 | + |
513 | + self.assertEqual(set(self.queue.flush()), |
514 | + set([('saved data', 'saved callback')])) |
515 | + |
516 | + def test_makes_limit_optional(self): |
517 | + """Use the default limit when not given.""" |
518 | + queue = DataQueue() |
519 | + |
520 | + self.assertTrue(queue._limit > 0) |
521 | + |
522 | |
523 | class TestConsistentHashingClient(TestCase): |
524 | |
525 | @@ -251,3 +443,8 @@ |
526 | client.disconnect() |
527 | self.assertTrue(clients[0].disconnect_called) |
528 | self.assertTrue(clients[1].disconnect_called) |
529 | + |
530 | + |
531 | +class DummyTransport(object): |
532 | + def stopListening(self): |
533 | + pass |
A few comments:
1. I dislike class attributes with parameters that cannot be set through the constructor. How are you suppose to change LIMIT? By subclassing? By monkey patching?
2. Along the same lines, because you can't easily change LIMIT, the tests iterate calling write() up to 'LIMIT' times, which is useless. If LIMIT was configurable, you could set it to '1' in the tests and make them more efficient.