Merge lp:~free.ekanayaka/txlongpoll/notification-source-integration-tests into lp:txlongpoll
- notification-source-integration-tests
- Merge into trunk
Proposed by
Free Ekanayaka
Status: | Merged |
---|---|
Merged at revision: | 105 |
Proposed branch: | lp:~free.ekanayaka/txlongpoll/notification-source-integration-tests |
Merge into: | lp:txlongpoll |
Diff against target: |
475 lines (+392/-4) 3 files modified
txlongpoll/testing/integration.py (+97/-2) txlongpoll/tests/test_client.py (+1/-1) txlongpoll/tests/test_integration.py (+294/-1) |
To merge this branch: | bzr merge lp:~free.ekanayaka/txlongpoll/notification-source-integration-tests |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Alberto Donato (community) | Approve | ||
Review via email: mp+298039@code.launchpad.net |
Commit message
Description of the change
This branch adds integration tests for the NotificationSource class.
They basically cover the same ground as the equivalent tests in the legacy txlongpoll.
To post a comment you must log in.
- 113. By Free Ekanayaka
-
Drop leftover code
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === renamed file 'txlongpoll/testing/client.py' => 'txlongpoll/testing/integration.py' |
2 | --- txlongpoll/testing/client.py 2016-06-08 07:48:49 +0000 |
3 | +++ txlongpoll/testing/integration.py 2016-06-22 11:06:16 +0000 |
4 | @@ -14,11 +14,19 @@ |
5 | AsynchronousDeferredRunTestForBrokenTwisted, |
6 | ) |
7 | from twisted.internet import reactor |
8 | +from twisted.internet.protocol import ( |
9 | + Protocol, |
10 | + Factory, |
11 | +) |
12 | from twisted.internet.defer import ( |
13 | Deferred, |
14 | DeferredQueue, |
15 | inlineCallbacks, |
16 | ) |
17 | +from twisted.internet.endpoints import ( |
18 | + TCP4ClientEndpoint, |
19 | +) |
20 | +from twisted.application.service import Service |
21 | from txamqp.client import Closed |
22 | from txlongpoll.client import AMQFactory |
23 | |
24 | @@ -61,13 +69,16 @@ |
25 | """ |
26 | |
27 | |
28 | -class AMQTest(ResourcedTestCase, TestCase): |
29 | +class IntegrationTest(ResourcedTestCase, TestCase): |
30 | |
31 | run_tests_with = AsynchronousDeferredRunTestForBrokenTwisted.make_factory( |
32 | - timeout=5) |
33 | + timeout=10) |
34 | |
35 | resources = [('rabbit', FixtureResource(RabbitServerWithoutReset()))] |
36 | |
37 | + |
38 | +class AMQTest(IntegrationTest): |
39 | + |
40 | VHOST = "/" |
41 | USER = "guest" |
42 | PASSWORD = "guest" |
43 | @@ -172,3 +183,87 @@ |
44 | """ |
45 | self.exchanges.add(exchange) |
46 | return self.real_exchange_declare(exchange=exchange, **kwargs) |
47 | + |
48 | + |
49 | +class ProxyService(Service): |
50 | + """A TCP proxy that can be instructed to drop packets on the floor.""" |
51 | + |
52 | + def __init__(self, host, port): |
53 | + """ |
54 | + @param host: The backend host to proxy. |
55 | + @param port: The port on the backend host to proxy. |
56 | + """ |
57 | + self._host = host |
58 | + self._port = port |
59 | + self._listener = None |
60 | + self._factory = None |
61 | + |
62 | + def startService(self): |
63 | + super(ProxyService, self).startService() |
64 | + self._factory = Factory() |
65 | + self._factory.protocol = _FrontendProtocol |
66 | + self._factory.blocked = False |
67 | + self._factory.connections = 0 |
68 | + self._factory.backend = TCP4ClientEndpoint( |
69 | + reactor, self._host, self._port) |
70 | + |
71 | + self._listener = reactor.listenTCP(0, self._factory) |
72 | + |
73 | + def stopService(self): |
74 | + super(ProxyService, self).startService() |
75 | + self._listener.stopListening() |
76 | + |
77 | + def block(self): |
78 | + """Drop all packets on the floor.""" |
79 | + self._factory.blocked = True |
80 | + |
81 | + def unblock(self): |
82 | + """Let packets flow again.""" |
83 | + self._factory.blocked = False |
84 | + |
85 | + @property |
86 | + def port(self): |
87 | + """Get the frontend port of the proxy.""" |
88 | + return self._listener.getHost().port |
89 | + |
90 | + @property |
91 | + def connections(self): |
92 | + """Get the number of frontend connections created so far.""" |
93 | + return self._factory.connections |
94 | + |
95 | + |
96 | +class _FrontendProtocol(Protocol): |
97 | + |
98 | + def connectionMade(self): |
99 | + self.factory.connections += 1 |
100 | + self.buffer = "" # Pending writes |
101 | + self.backend = None # Backend protocol |
102 | + factory = Factory() |
103 | + factory.protocol = _BackendProtocol |
104 | + factory.frontend = self |
105 | + |
106 | + deferred = self.factory.backend.connect(factory) |
107 | + deferred.addCallback(self._backendConnected) |
108 | + |
109 | + def connectionLost(self, reason): |
110 | + self.backend.transport.loseConnection() |
111 | + |
112 | + def dataReceived(self, data): |
113 | + if self.factory.blocked: |
114 | + return |
115 | + if self.backend: |
116 | + self.backend.transport.write(data) |
117 | + else: |
118 | + self.buffer += data |
119 | + |
120 | + def _backendConnected(self, backend): |
121 | + self.backend = backend |
122 | + self.dataReceived(self.buffer) |
123 | + |
124 | + |
125 | +class _BackendProtocol(Protocol): |
126 | + |
127 | + def dataReceived(self, data): |
128 | + if self.factory.frontend.factory.blocked: |
129 | + return |
130 | + self.factory.frontend.transport.write(data) |
131 | |
132 | === modified file 'txlongpoll/tests/test_client.py' |
133 | --- txlongpoll/tests/test_client.py 2011-09-30 09:56:12 +0000 |
134 | +++ txlongpoll/tests/test_client.py 2016-06-22 11:06:16 +0000 |
135 | @@ -16,7 +16,7 @@ |
136 | from txamqp.queue import Closed |
137 | from txamqp.spec import Spec |
138 | from txlongpoll.client import AMQFactory |
139 | -from txlongpoll.testing.client import AMQTest |
140 | +from txlongpoll.testing.integration import AMQTest |
141 | |
142 | |
143 | class AMQFactoryTest(TestCase): |
144 | |
145 | === modified file 'txlongpoll/tests/test_integration.py' |
146 | --- txlongpoll/tests/test_integration.py 2016-05-31 11:28:08 +0000 |
147 | +++ txlongpoll/tests/test_integration.py 2016-06-22 11:06:16 +0000 |
148 | @@ -3,6 +3,8 @@ |
149 | |
150 | """Integration tests running a real RabbitMQ broker.""" |
151 | |
152 | +from rabbitfixture.server import RabbitServerResources |
153 | + |
154 | from twisted.internet import reactor |
155 | from twisted.internet.defer import ( |
156 | inlineCallbacks, |
157 | @@ -12,26 +14,317 @@ |
158 | Clock, |
159 | deferLater, |
160 | ) |
161 | +from twisted.application.internet import ( |
162 | + backoffPolicy, |
163 | + ClientService, |
164 | +) |
165 | |
166 | from txamqp.content import Content |
167 | from txamqp.protocol import ( |
168 | AMQChannel, |
169 | AMQClient, |
170 | ) |
171 | +from txamqp.factory import AMQFactory |
172 | +from txamqp.endpoint import AMQEndpoint |
173 | |
174 | from testtools.deferredruntest import assert_fails_with |
175 | |
176 | from txlongpoll.notification import ( |
177 | + NotificationConnector, |
178 | + NotificationSource, |
179 | NotFound, |
180 | Timeout, |
181 | ) |
182 | from txlongpoll.frontend import DeprecatedQueueManager |
183 | -from txlongpoll.testing.client import ( |
184 | +from txlongpoll.testing.integration import ( |
185 | + IntegrationTest, |
186 | + ProxyService, |
187 | AMQTest, |
188 | QueueWrapper, |
189 | ) |
190 | |
191 | |
192 | +class NotificationSourceIntegrationTest(IntegrationTest): |
193 | + |
194 | + @inlineCallbacks |
195 | + def setUp(self): |
196 | + super(NotificationSourceIntegrationTest, self).setUp() |
197 | + self.endpoint = AMQEndpoint( |
198 | + reactor, self.rabbit.config.hostname, self.rabbit.config.port, |
199 | + username="guest", password="guest", heartbeat=1) |
200 | + self.policy = backoffPolicy(initialDelay=0) |
201 | + self.service = ClientService( |
202 | + self.endpoint, AMQFactory(), retryPolicy=self.policy) |
203 | + self.connector = NotificationConnector(self.service) |
204 | + self.source = NotificationSource(self.connector) |
205 | + |
206 | + self.client = yield self.endpoint.connect(AMQFactory()) |
207 | + self.channel = yield self.client.channel(1) |
208 | + yield self.channel.channel_open() |
209 | + yield self.channel.queue_declare(queue="uuid") |
210 | + |
211 | + self.service.startService() |
212 | + |
213 | + @inlineCallbacks |
214 | + def tearDown(self): |
215 | + self.service.stopService() |
216 | + super(NotificationSourceIntegrationTest, self).tearDown() |
217 | + # Wrap resetting queues and client in a try/except, since the broker |
218 | + # may have been stopped (e.g. when this is the last test being run). |
219 | + try: |
220 | + yield self.channel.queue_delete(queue="uuid") |
221 | + except: |
222 | + pass |
223 | + finally: |
224 | + yield self.client.close() |
225 | + |
226 | + @inlineCallbacks |
227 | + def test_get_after_publish(self): |
228 | + """ |
229 | + Calling get() after a message has been published in the associated |
230 | + queue, returns a Notification for that message. |
231 | + """ |
232 | + yield self.channel.basic_publish( |
233 | + routing_key="uuid", content=Content("hello")) |
234 | + notification = yield self.source.get("uuid", 0) |
235 | + self.assertEqual("hello", notification.payload) |
236 | + |
237 | + @inlineCallbacks |
238 | + def test_get_before_publish(self): |
239 | + """ |
240 | + Calling get() before a message has been published in the associated |
241 | + queue, will wait until publication. |
242 | + """ |
243 | + deferred = self.source.get("uuid", 0) |
244 | + yield self.channel.basic_publish( |
245 | + routing_key="uuid", content=Content("hello")) |
246 | + notification = yield deferred |
247 | + self.assertEqual("hello", notification.payload) |
248 | + |
249 | + @inlineCallbacks |
250 | + def test_get_with_error(self): |
251 | + """ |
252 | + If an error occurs in during get(), the client is closed so |
253 | + we can query messages again. |
254 | + """ |
255 | + yield self.channel.basic_publish( |
256 | + routing_key="uuid", content=Content("hello")) |
257 | + try: |
258 | + yield self.source.get("uuid-unknown", 0) |
259 | + except NotFound: |
260 | + pass |
261 | + else: |
262 | + self.fail("NotFound wasn't raised") |
263 | + notification = yield self.source.get("uuid", 0) |
264 | + self.assertEqual("hello", notification.payload) |
265 | + |
266 | + @inlineCallbacks |
267 | + def test_get_concurrent_with_error(self): |
268 | + """ |
269 | + If an error occurs in a call to get(), other calls don't |
270 | + fail, and are retried on reconnection instead. |
271 | + """ |
272 | + client1 = yield self.service.whenConnected() |
273 | + deferred = self.source.get("uuid", 0) |
274 | + |
275 | + try: |
276 | + yield self.source.get("uuid-unknown", 0) |
277 | + except NotFound: |
278 | + pass |
279 | + else: |
280 | + self.fail("NotFound wasn't raised") |
281 | + |
282 | + yield self.channel.basic_publish( |
283 | + routing_key="uuid", content=Content("hello")) |
284 | + |
285 | + notification = yield deferred |
286 | + self.assertEquals("hello", notification.payload) |
287 | + client2 = yield self.service.whenConnected() |
288 | + # The ClientService has reconnected, yielding a new client. |
289 | + self.assertIsNot(client1, client2) |
290 | + |
291 | + @inlineCallbacks |
292 | + def test_get_timeout(self): |
293 | + """ |
294 | + Calls to get() timeout after a certain amount of time if no message |
295 | + arrived on the queue. |
296 | + """ |
297 | + self.source.timeout = 1 |
298 | + try: |
299 | + yield self.source.get("uuid", 0) |
300 | + except Timeout: |
301 | + pass |
302 | + else: |
303 | + self.fail("Timeout not raised") |
304 | + client = yield self.service.whenConnected() |
305 | + channel = yield client.channel(1) |
306 | + # The channel is still opened |
307 | + self.assertFalse(channel.closed) |
308 | + # The consumer has been deleted |
309 | + self.assertNotIn("uuid.0", client.queues) |
310 | + |
311 | + @inlineCallbacks |
312 | + def test_get_with_broker_shutdown_during_consume(self): |
313 | + """ |
314 | + If rabbitmq gets shutdown during the basic-consume call, we wait |
315 | + for the reconection and retry transparently. |
316 | + """ |
317 | + # This will make the connector setup the channel before we call |
318 | + # get(), so by the time we call it in the next line all |
319 | + # connector-related deferreds will fire synchronously and the |
320 | + # code will block on basic-consume. |
321 | + yield self.connector() |
322 | + |
323 | + d = self.source.get("uuid", 0) |
324 | + |
325 | + # Restart rabbitmq |
326 | + yield self.client.close() |
327 | + yield self.client.disconnected.wait() |
328 | + self.rabbit.cleanUp() |
329 | + self.rabbit.config = RabbitServerResources( |
330 | + port=self.rabbit.config.port) # Ensure that we use the same port |
331 | + self.rabbit.setUp() |
332 | + |
333 | + # Get a new channel and re-declare the queue, since the restart has |
334 | + # destroyed it. |
335 | + self.client = yield self.endpoint.connect(AMQFactory()) |
336 | + self.channel = yield self.client.channel(1) |
337 | + yield self.channel.channel_open() |
338 | + yield self.channel.queue_declare(queue="uuid") |
339 | + |
340 | + # Publish a message in the queue |
341 | + yield self.channel.basic_publish( |
342 | + routing_key="uuid", content=Content("hello")) |
343 | + |
344 | + notification = yield d |
345 | + self.assertEqual("hello", notification.payload) |
346 | + |
347 | + @inlineCallbacks |
348 | + def test_get_with_broker_die_during_consume(self): |
349 | + """ |
350 | + If rabbitmq dies during the basic-consume call, we wait for the |
351 | + reconection and retry transparently. |
352 | + """ |
353 | + # This will make the connector setup the channel before we call |
354 | + # get(), so by the time we call it in the next line all |
355 | + # connector-related deferreds will fire synchronously and the |
356 | + # code will block on basic-consume. |
357 | + yield self.connector() |
358 | + |
359 | + d = self.source.get("uuid", 0) |
360 | + |
361 | + # Kill rabbitmq and start it again |
362 | + yield self.client.close() |
363 | + yield self.client.disconnected.wait() |
364 | + self.rabbit.runner.kill() |
365 | + self.rabbit.cleanUp() |
366 | + self.rabbit.config = RabbitServerResources( |
367 | + port=self.rabbit.config.port) # Ensure that we use the same port |
368 | + self.rabbit.setUp() |
369 | + |
370 | + # Get a new channel and re-declare the queue, since the crash has |
371 | + # destroyed it. |
372 | + self.client = yield self.endpoint.connect(AMQFactory()) |
373 | + self.channel = yield self.client.channel(1) |
374 | + yield self.channel.channel_open() |
375 | + yield self.channel.queue_declare(queue="uuid") |
376 | + |
377 | + # Publish a message in the queue |
378 | + yield self.channel.basic_publish( |
379 | + routing_key="uuid", content=Content("hello")) |
380 | + |
381 | + notification = yield d |
382 | + self.assertEqual("hello", notification.payload) |
383 | + |
384 | + @inlineCallbacks |
385 | + def test_wb_get_with_broker_shutdown_during_message_wait(self): |
386 | + """ |
387 | + If rabbitmq gets shutdown while we wait for messages, we transparently |
388 | + wait for the reconnection and try again. |
389 | + """ |
390 | + # This will make the connector setup the channel before we call |
391 | + # get(), so by the time we call it in the next line all |
392 | + # connector-related deferreds will fire synchronously and the |
393 | + # code will block on basic-consume. |
394 | + yield self.connector() |
395 | + |
396 | + d = self.source.get("uuid", 0) |
397 | + |
398 | + # Acquiring the channel lock makes sure that basic-consume has |
399 | + # succeeded and we started waiting for the message. |
400 | + yield self.source._channel_lock.acquire() |
401 | + self.source._channel_lock.release() |
402 | + |
403 | + # Restart rabbitmq |
404 | + yield self.client.close() |
405 | + yield self.client.disconnected.wait() |
406 | + self.rabbit.cleanUp() |
407 | + self.rabbit.config = RabbitServerResources( |
408 | + port=self.rabbit.config.port) # Ensure that we use the same port |
409 | + self.rabbit.setUp() |
410 | + |
411 | + # Get a new channel and re-declare the queue, since the restart has |
412 | + # destroyed it. |
413 | + self.client = yield self.endpoint.connect(AMQFactory()) |
414 | + self.channel = yield self.client.channel(1) |
415 | + yield self.channel.channel_open() |
416 | + yield self.channel.queue_declare(queue="uuid") |
417 | + |
418 | + # Publish a message in the queue |
419 | + yield self.channel.basic_publish( |
420 | + routing_key="uuid", content=Content("hello")) |
421 | + |
422 | + notification = yield d |
423 | + self.assertEqual("hello", notification.payload) |
424 | + |
425 | + @inlineCallbacks |
426 | + def test_wb_heartbeat(self): |
427 | + """ |
428 | + If heartbeat checks fail due to network issues, we keep re-trying |
429 | + until the network recovers. |
430 | + """ |
431 | + self.service.stopService() |
432 | + |
433 | + # Put a TCP proxy between NotificationSource and RabbitMQ, to simulate |
434 | + # packets getting dropped on the floor. |
435 | + proxy = ProxyService( |
436 | + self.rabbit.config.hostname, self.rabbit.config.port) |
437 | + proxy.startService() |
438 | + self.addCleanup(proxy.stopService) |
439 | + self.endpoint._port = proxy.port |
440 | + self.service = ClientService( |
441 | + self.endpoint, AMQFactory(), retryPolicy=self.policy) |
442 | + self.connector._service = self.service |
443 | + self.service.startService() |
444 | + |
445 | + # This will make the connector setup the channel before we call |
446 | + # get(), so by the time we call it in the next line all |
447 | + # connector-related deferreds will fire synchronously and the |
448 | + # code will block on basic-consume. |
449 | + channel = yield self.connector() |
450 | + |
451 | + deferred = self.source.get("uuid", 0) |
452 | + |
453 | + # Start dropping packets on the floor |
454 | + proxy.block() |
455 | + |
456 | + # Publish a notification, which won't be delivered just yet. |
457 | + yield self.channel.basic_publish( |
458 | + routing_key="uuid", content=Content("hello")) |
459 | + |
460 | + # Wait for the first connection to terminate, because heartbeat |
461 | + # checks will fail. |
462 | + yield channel.client.disconnected.wait() |
463 | + |
464 | + # Now let packets flow again. |
465 | + proxy.unblock() |
466 | + |
467 | + # The situation got recovered. |
468 | + notification = yield deferred |
469 | + self.assertEqual("hello", notification.payload) |
470 | + self.assertEqual(2, proxy.connections) |
471 | + |
472 | + |
473 | class DeprecatedQueueManagerTest(AMQTest): |
474 | |
475 | prefix = None |
+1, looks good
A few minor comments/nits inline