Merge lp:~free.ekanayaka/txlongpoll/notification-source-integration-tests into lp:txlongpoll

Proposed by Free Ekanayaka
Status: Merged
Merged at revision: 105
Proposed branch: lp:~free.ekanayaka/txlongpoll/notification-source-integration-tests
Merge into: lp:txlongpoll
Diff against target: 475 lines (+392/-4)
3 files modified
txlongpoll/testing/integration.py (+97/-2)
txlongpoll/tests/test_client.py (+1/-1)
txlongpoll/tests/test_integration.py (+294/-1)
To merge this branch: bzr merge lp:~free.ekanayaka/txlongpoll/notification-source-integration-tests
Reviewer Review Type Date Requested Status
Alberto Donato (community) Approve
Review via email: mp+298039@code.launchpad.net

Description of the change

This branch adds integration tests for the NotificationSource class.

They basically cover the same ground as the equivalent tests in the legacy txlongpoll.tests.test_integration.DeprecatedQueueManagerTest test class, but also add new failure scenarios, like network disconnects or broker restarts, showing that NotificationSource handles them gracefully.

To post a comment you must log in.
113. By Free Ekanayaka

Drop leftover code

Revision history for this message
Alberto Donato (ack) wrote :

+1, looks good

A few minor comments/nits inline

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== renamed file 'txlongpoll/testing/client.py' => 'txlongpoll/testing/integration.py'
2--- txlongpoll/testing/client.py 2016-06-08 07:48:49 +0000
3+++ txlongpoll/testing/integration.py 2016-06-22 11:06:16 +0000
4@@ -14,11 +14,19 @@
5 AsynchronousDeferredRunTestForBrokenTwisted,
6 )
7 from twisted.internet import reactor
8+from twisted.internet.protocol import (
9+ Protocol,
10+ Factory,
11+)
12 from twisted.internet.defer import (
13 Deferred,
14 DeferredQueue,
15 inlineCallbacks,
16 )
17+from twisted.internet.endpoints import (
18+ TCP4ClientEndpoint,
19+)
20+from twisted.application.service import Service
21 from txamqp.client import Closed
22 from txlongpoll.client import AMQFactory
23
24@@ -61,13 +69,16 @@
25 """
26
27
28-class AMQTest(ResourcedTestCase, TestCase):
29+class IntegrationTest(ResourcedTestCase, TestCase):
30
31 run_tests_with = AsynchronousDeferredRunTestForBrokenTwisted.make_factory(
32- timeout=5)
33+ timeout=10)
34
35 resources = [('rabbit', FixtureResource(RabbitServerWithoutReset()))]
36
37+
38+class AMQTest(IntegrationTest):
39+
40 VHOST = "/"
41 USER = "guest"
42 PASSWORD = "guest"
43@@ -172,3 +183,87 @@
44 """
45 self.exchanges.add(exchange)
46 return self.real_exchange_declare(exchange=exchange, **kwargs)
47+
48+
49+class ProxyService(Service):
50+ """A TCP proxy that can be instructed to drop packets on the floor."""
51+
52+ def __init__(self, host, port):
53+ """
54+ @param host: The backend host to proxy.
55+ @param port: The port on the backend host to proxy.
56+ """
57+ self._host = host
58+ self._port = port
59+ self._listener = None
60+ self._factory = None
61+
62+ def startService(self):
63+ super(ProxyService, self).startService()
64+ self._factory = Factory()
65+ self._factory.protocol = _FrontendProtocol
66+ self._factory.blocked = False
67+ self._factory.connections = 0
68+ self._factory.backend = TCP4ClientEndpoint(
69+ reactor, self._host, self._port)
70+
71+ self._listener = reactor.listenTCP(0, self._factory)
72+
73+ def stopService(self):
74+ super(ProxyService, self).startService()
75+ self._listener.stopListening()
76+
77+ def block(self):
78+ """Drop all packets on the floor."""
79+ self._factory.blocked = True
80+
81+ def unblock(self):
82+ """Let packets flow again."""
83+ self._factory.blocked = False
84+
85+ @property
86+ def port(self):
87+ """Get the frontend port of the proxy."""
88+ return self._listener.getHost().port
89+
90+ @property
91+ def connections(self):
92+ """Get the number of frontend connections created so far."""
93+ return self._factory.connections
94+
95+
96+class _FrontendProtocol(Protocol):
97+
98+ def connectionMade(self):
99+ self.factory.connections += 1
100+ self.buffer = "" # Pending writes
101+ self.backend = None # Backend protocol
102+ factory = Factory()
103+ factory.protocol = _BackendProtocol
104+ factory.frontend = self
105+
106+ deferred = self.factory.backend.connect(factory)
107+ deferred.addCallback(self._backendConnected)
108+
109+ def connectionLost(self, reason):
110+ self.backend.transport.loseConnection()
111+
112+ def dataReceived(self, data):
113+ if self.factory.blocked:
114+ return
115+ if self.backend:
116+ self.backend.transport.write(data)
117+ else:
118+ self.buffer += data
119+
120+ def _backendConnected(self, backend):
121+ self.backend = backend
122+ self.dataReceived(self.buffer)
123+
124+
125+class _BackendProtocol(Protocol):
126+
127+ def dataReceived(self, data):
128+ if self.factory.frontend.factory.blocked:
129+ return
130+ self.factory.frontend.transport.write(data)
131
132=== modified file 'txlongpoll/tests/test_client.py'
133--- txlongpoll/tests/test_client.py 2011-09-30 09:56:12 +0000
134+++ txlongpoll/tests/test_client.py 2016-06-22 11:06:16 +0000
135@@ -16,7 +16,7 @@
136 from txamqp.queue import Closed
137 from txamqp.spec import Spec
138 from txlongpoll.client import AMQFactory
139-from txlongpoll.testing.client import AMQTest
140+from txlongpoll.testing.integration import AMQTest
141
142
143 class AMQFactoryTest(TestCase):
144
145=== modified file 'txlongpoll/tests/test_integration.py'
146--- txlongpoll/tests/test_integration.py 2016-05-31 11:28:08 +0000
147+++ txlongpoll/tests/test_integration.py 2016-06-22 11:06:16 +0000
148@@ -3,6 +3,8 @@
149
150 """Integration tests running a real RabbitMQ broker."""
151
152+from rabbitfixture.server import RabbitServerResources
153+
154 from twisted.internet import reactor
155 from twisted.internet.defer import (
156 inlineCallbacks,
157@@ -12,26 +14,317 @@
158 Clock,
159 deferLater,
160 )
161+from twisted.application.internet import (
162+ backoffPolicy,
163+ ClientService,
164+)
165
166 from txamqp.content import Content
167 from txamqp.protocol import (
168 AMQChannel,
169 AMQClient,
170 )
171+from txamqp.factory import AMQFactory
172+from txamqp.endpoint import AMQEndpoint
173
174 from testtools.deferredruntest import assert_fails_with
175
176 from txlongpoll.notification import (
177+ NotificationConnector,
178+ NotificationSource,
179 NotFound,
180 Timeout,
181 )
182 from txlongpoll.frontend import DeprecatedQueueManager
183-from txlongpoll.testing.client import (
184+from txlongpoll.testing.integration import (
185+ IntegrationTest,
186+ ProxyService,
187 AMQTest,
188 QueueWrapper,
189 )
190
191
192+class NotificationSourceIntegrationTest(IntegrationTest):
193+
194+ @inlineCallbacks
195+ def setUp(self):
196+ super(NotificationSourceIntegrationTest, self).setUp()
197+ self.endpoint = AMQEndpoint(
198+ reactor, self.rabbit.config.hostname, self.rabbit.config.port,
199+ username="guest", password="guest", heartbeat=1)
200+ self.policy = backoffPolicy(initialDelay=0)
201+ self.service = ClientService(
202+ self.endpoint, AMQFactory(), retryPolicy=self.policy)
203+ self.connector = NotificationConnector(self.service)
204+ self.source = NotificationSource(self.connector)
205+
206+ self.client = yield self.endpoint.connect(AMQFactory())
207+ self.channel = yield self.client.channel(1)
208+ yield self.channel.channel_open()
209+ yield self.channel.queue_declare(queue="uuid")
210+
211+ self.service.startService()
212+
213+ @inlineCallbacks
214+ def tearDown(self):
215+ self.service.stopService()
216+ super(NotificationSourceIntegrationTest, self).tearDown()
217+ # Wrap resetting queues and client in a try/except, since the broker
218+ # may have been stopped (e.g. when this is the last test being run).
219+ try:
220+ yield self.channel.queue_delete(queue="uuid")
221+ except:
222+ pass
223+ finally:
224+ yield self.client.close()
225+
226+ @inlineCallbacks
227+ def test_get_after_publish(self):
228+ """
229+ Calling get() after a message has been published in the associated
230+ queue, returns a Notification for that message.
231+ """
232+ yield self.channel.basic_publish(
233+ routing_key="uuid", content=Content("hello"))
234+ notification = yield self.source.get("uuid", 0)
235+ self.assertEqual("hello", notification.payload)
236+
237+ @inlineCallbacks
238+ def test_get_before_publish(self):
239+ """
240+ Calling get() before a message has been published in the associated
241+ queue, will wait until publication.
242+ """
243+ deferred = self.source.get("uuid", 0)
244+ yield self.channel.basic_publish(
245+ routing_key="uuid", content=Content("hello"))
246+ notification = yield deferred
247+ self.assertEqual("hello", notification.payload)
248+
249+ @inlineCallbacks
250+ def test_get_with_error(self):
251+ """
252+ If an error occurs in during get(), the client is closed so
253+ we can query messages again.
254+ """
255+ yield self.channel.basic_publish(
256+ routing_key="uuid", content=Content("hello"))
257+ try:
258+ yield self.source.get("uuid-unknown", 0)
259+ except NotFound:
260+ pass
261+ else:
262+ self.fail("NotFound wasn't raised")
263+ notification = yield self.source.get("uuid", 0)
264+ self.assertEqual("hello", notification.payload)
265+
266+ @inlineCallbacks
267+ def test_get_concurrent_with_error(self):
268+ """
269+ If an error occurs in a call to get(), other calls don't
270+ fail, and are retried on reconnection instead.
271+ """
272+ client1 = yield self.service.whenConnected()
273+ deferred = self.source.get("uuid", 0)
274+
275+ try:
276+ yield self.source.get("uuid-unknown", 0)
277+ except NotFound:
278+ pass
279+ else:
280+ self.fail("NotFound wasn't raised")
281+
282+ yield self.channel.basic_publish(
283+ routing_key="uuid", content=Content("hello"))
284+
285+ notification = yield deferred
286+ self.assertEquals("hello", notification.payload)
287+ client2 = yield self.service.whenConnected()
288+ # The ClientService has reconnected, yielding a new client.
289+ self.assertIsNot(client1, client2)
290+
291+ @inlineCallbacks
292+ def test_get_timeout(self):
293+ """
294+ Calls to get() timeout after a certain amount of time if no message
295+ arrived on the queue.
296+ """
297+ self.source.timeout = 1
298+ try:
299+ yield self.source.get("uuid", 0)
300+ except Timeout:
301+ pass
302+ else:
303+ self.fail("Timeout not raised")
304+ client = yield self.service.whenConnected()
305+ channel = yield client.channel(1)
306+ # The channel is still opened
307+ self.assertFalse(channel.closed)
308+ # The consumer has been deleted
309+ self.assertNotIn("uuid.0", client.queues)
310+
311+ @inlineCallbacks
312+ def test_get_with_broker_shutdown_during_consume(self):
313+ """
314+ If rabbitmq gets shutdown during the basic-consume call, we wait
315+ for the reconection and retry transparently.
316+ """
317+ # This will make the connector setup the channel before we call
318+ # get(), so by the time we call it in the next line all
319+ # connector-related deferreds will fire synchronously and the
320+ # code will block on basic-consume.
321+ yield self.connector()
322+
323+ d = self.source.get("uuid", 0)
324+
325+ # Restart rabbitmq
326+ yield self.client.close()
327+ yield self.client.disconnected.wait()
328+ self.rabbit.cleanUp()
329+ self.rabbit.config = RabbitServerResources(
330+ port=self.rabbit.config.port) # Ensure that we use the same port
331+ self.rabbit.setUp()
332+
333+ # Get a new channel and re-declare the queue, since the restart has
334+ # destroyed it.
335+ self.client = yield self.endpoint.connect(AMQFactory())
336+ self.channel = yield self.client.channel(1)
337+ yield self.channel.channel_open()
338+ yield self.channel.queue_declare(queue="uuid")
339+
340+ # Publish a message in the queue
341+ yield self.channel.basic_publish(
342+ routing_key="uuid", content=Content("hello"))
343+
344+ notification = yield d
345+ self.assertEqual("hello", notification.payload)
346+
347+ @inlineCallbacks
348+ def test_get_with_broker_die_during_consume(self):
349+ """
350+ If rabbitmq dies during the basic-consume call, we wait for the
351+ reconection and retry transparently.
352+ """
353+ # This will make the connector setup the channel before we call
354+ # get(), so by the time we call it in the next line all
355+ # connector-related deferreds will fire synchronously and the
356+ # code will block on basic-consume.
357+ yield self.connector()
358+
359+ d = self.source.get("uuid", 0)
360+
361+ # Kill rabbitmq and start it again
362+ yield self.client.close()
363+ yield self.client.disconnected.wait()
364+ self.rabbit.runner.kill()
365+ self.rabbit.cleanUp()
366+ self.rabbit.config = RabbitServerResources(
367+ port=self.rabbit.config.port) # Ensure that we use the same port
368+ self.rabbit.setUp()
369+
370+ # Get a new channel and re-declare the queue, since the crash has
371+ # destroyed it.
372+ self.client = yield self.endpoint.connect(AMQFactory())
373+ self.channel = yield self.client.channel(1)
374+ yield self.channel.channel_open()
375+ yield self.channel.queue_declare(queue="uuid")
376+
377+ # Publish a message in the queue
378+ yield self.channel.basic_publish(
379+ routing_key="uuid", content=Content("hello"))
380+
381+ notification = yield d
382+ self.assertEqual("hello", notification.payload)
383+
384+ @inlineCallbacks
385+ def test_wb_get_with_broker_shutdown_during_message_wait(self):
386+ """
387+ If rabbitmq gets shutdown while we wait for messages, we transparently
388+ wait for the reconnection and try again.
389+ """
390+ # This will make the connector setup the channel before we call
391+ # get(), so by the time we call it in the next line all
392+ # connector-related deferreds will fire synchronously and the
393+ # code will block on basic-consume.
394+ yield self.connector()
395+
396+ d = self.source.get("uuid", 0)
397+
398+ # Acquiring the channel lock makes sure that basic-consume has
399+ # succeeded and we started waiting for the message.
400+ yield self.source._channel_lock.acquire()
401+ self.source._channel_lock.release()
402+
403+ # Restart rabbitmq
404+ yield self.client.close()
405+ yield self.client.disconnected.wait()
406+ self.rabbit.cleanUp()
407+ self.rabbit.config = RabbitServerResources(
408+ port=self.rabbit.config.port) # Ensure that we use the same port
409+ self.rabbit.setUp()
410+
411+ # Get a new channel and re-declare the queue, since the restart has
412+ # destroyed it.
413+ self.client = yield self.endpoint.connect(AMQFactory())
414+ self.channel = yield self.client.channel(1)
415+ yield self.channel.channel_open()
416+ yield self.channel.queue_declare(queue="uuid")
417+
418+ # Publish a message in the queue
419+ yield self.channel.basic_publish(
420+ routing_key="uuid", content=Content("hello"))
421+
422+ notification = yield d
423+ self.assertEqual("hello", notification.payload)
424+
425+ @inlineCallbacks
426+ def test_wb_heartbeat(self):
427+ """
428+ If heartbeat checks fail due to network issues, we keep re-trying
429+ until the network recovers.
430+ """
431+ self.service.stopService()
432+
433+ # Put a TCP proxy between NotificationSource and RabbitMQ, to simulate
434+ # packets getting dropped on the floor.
435+ proxy = ProxyService(
436+ self.rabbit.config.hostname, self.rabbit.config.port)
437+ proxy.startService()
438+ self.addCleanup(proxy.stopService)
439+ self.endpoint._port = proxy.port
440+ self.service = ClientService(
441+ self.endpoint, AMQFactory(), retryPolicy=self.policy)
442+ self.connector._service = self.service
443+ self.service.startService()
444+
445+ # This will make the connector setup the channel before we call
446+ # get(), so by the time we call it in the next line all
447+ # connector-related deferreds will fire synchronously and the
448+ # code will block on basic-consume.
449+ channel = yield self.connector()
450+
451+ deferred = self.source.get("uuid", 0)
452+
453+ # Start dropping packets on the floor
454+ proxy.block()
455+
456+ # Publish a notification, which won't be delivered just yet.
457+ yield self.channel.basic_publish(
458+ routing_key="uuid", content=Content("hello"))
459+
460+ # Wait for the first connection to terminate, because heartbeat
461+ # checks will fail.
462+ yield channel.client.disconnected.wait()
463+
464+ # Now let packets flow again.
465+ proxy.unblock()
466+
467+ # The situation got recovered.
468+ notification = yield deferred
469+ self.assertEqual("hello", notification.payload)
470+ self.assertEqual(2, proxy.connections)
471+
472+
473 class DeprecatedQueueManagerTest(AMQTest):
474
475 prefix = None

Subscribers

People subscribed via source and target branches