Merge lp:~free.ekanayaka/txlongpoll/notification-ack-and-reject into lp:txlongpoll

Proposed by Free Ekanayaka
Status: Merged
Merged at revision: 107
Proposed branch: lp:~free.ekanayaka/txlongpoll/notification-ack-and-reject
Merge into: lp:txlongpoll
Diff against target: 279 lines (+184/-3)
3 files modified
txlongpoll/notification.py (+45/-3)
txlongpoll/tests/test_integration.py (+59/-0)
txlongpoll/tests/test_notification.py (+80/-0)
To merge this branch: bzr merge lp:~free.ekanayaka/txlongpoll/notification-ack-and-reject
Reviewer Review Type Date Requested Status
Alberto Donato (community) Approve
Review via email: mp+298216@code.launchpad.net

Description of the change

Add acknowledgement/rejection support to txlongpoll.notification.Notification.

To post a comment you must log in.
Revision history for this message
Alberto Donato (ack) wrote :

LGTM, +1

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txlongpoll/notification.py'
2--- txlongpoll/notification.py 2016-06-06 13:07:31 +0000
3+++ txlongpoll/notification.py 2016-06-23 12:20:23 +0000
4@@ -10,6 +10,7 @@
5
6 See also txlongpoll.frontend.FrontEndAjax.
7 """
8+from functools import partial
9
10 from twisted.internet import reactor
11 from twisted.internet.error import ConnectionClosed as TransportClosed
12@@ -50,6 +51,10 @@
13 """
14
15
16+class Bounced(Exception):
17+ """Raised if a Notification could not be ack'ed or rejected."""
18+
19+
20 class NotificationConnector(object):
21 """Provide ready-to-use AMQP channels."""
22
23@@ -104,19 +109,30 @@
24 class Notification(object):
25 """A single notification from a stream."""
26
27- def __init__(self, source, message):
28+ def __init__(self, source, channel, message):
29 """
30- @param source: The NotificationSource the message was received through.
31+ @param source: The NotificationSource that generated the notification.
32+ @param channel: The AMQChannel the message was received through.
33 @param message: The raw txamqp.message.Message received from the
34 underlying AMQP queue.
35 """
36 self._source = source
37+ self._channel = channel
38 self._message = message
39
40 @property
41 def payload(self):
42+ """Return the content of the notification."""
43 return self._message.content.body
44
45+ def ack(self):
46+ """Confirm that the notification was successfully processed."""
47+ return self._source._done(self, True)
48+
49+ def reject(self):
50+ """Reject the the notification, it will be re-queued."""
51+ return self._source._done(self, False)
52+
53
54 class NotificationSource(object):
55 """
56@@ -235,7 +251,7 @@
57 else:
58 raise Timeout()
59
60- returnValue(Notification(self, msg))
61+ returnValue(Notification(self, channel, msg))
62
63 @inlineCallbacks
64 def _check_retriable(self, method, **kwargs):
65@@ -275,6 +291,32 @@
66 finally:
67 self._channel_lock.release()
68
69+ @inlineCallbacks
70+ def _done(self, notification, successful):
71+ """Confirm that a notification has been handled (successfully or not).
72+
73+ @param notification: The Notification to confirm.
74+ @param successful: If True, then the notification has been correctly
75+ processed and will be deleted. If False, it will be re-queued and
76+ be available at the next NotificationSource.get() call for the
77+ same UUID.
78+ """
79+ channel = notification._channel
80+ if successful:
81+ method = channel.basic_ack
82+ else:
83+ method = partial(channel.basic_reject, requeue=True)
84+
85+ yield self._channel_lock.acquire()
86+ try:
87+ yield method(delivery_tag=notification._message.delivery_tag)
88+ except Closed:
89+ # If we hit any channel or connection error, we raise an error
90+ # since there's no way this can be re-tried.
91+ raise Bounced()
92+ finally:
93+ self._channel_lock.release()
94+
95
96 class _Retriable(Exception):
97 """Raised by _check_retriable in case of transient errors."""
98
99=== modified file 'txlongpoll/tests/test_integration.py'
100--- txlongpoll/tests/test_integration.py 2016-06-23 07:56:35 +0000
101+++ txlongpoll/tests/test_integration.py 2016-06-23 12:20:23 +0000
102@@ -34,6 +34,7 @@
103 NotificationSource,
104 NotFound,
105 Timeout,
106+ Bounced,
107 )
108 from txlongpoll.client import AMQP0_8_SPEC_PATH
109 from txlongpoll.frontend import DeprecatedQueueManager
110@@ -315,6 +316,64 @@
111 self.assertEqual("hello", notification.payload)
112 self.assertEqual(2, proxy.connections)
113
114+ @inlineCallbacks
115+ def test_reject_notification(self):
116+ """
117+ Calling reject() on a Notification puts the associated message back in
118+ the queue so that it's available to subsequent get() calls.
119+ """
120+ yield self.channel.basic_publish(
121+ routing_key="uuid", content=Content("hello"))
122+ notification = yield self.source.get("uuid", 0)
123+ yield notification.reject()
124+
125+ notification = yield self.source.get("uuid", 1)
126+ self.assertEqual("hello", notification.payload)
127+
128+ @inlineCallbacks
129+ def test_ack_message(self):
130+ """
131+ Calling ack() on a Notification confirms the removal of the
132+ associated message from the queue, making subsequent calls
133+ waiting for another message.
134+ """
135+ yield self.channel.basic_publish(
136+ routing_key="uuid", content=Content("hello"))
137+ notification = yield self.source.get("uuid", 0)
138+ yield notification.ack()
139+
140+ yield self.channel.basic_publish(
141+ routing_key="uuid", content=Content("hello 2"))
142+ notification = yield self.source.get("uuid", 1)
143+ self.assertEqual("hello 2", notification.payload)
144+
145+ @inlineCallbacks
146+ def test_ack_with_broker_shutdown(self):
147+ """
148+ If rabbitmq gets shutdown before we ack a Notification, an error is
149+ raised.
150+ """
151+ client = yield self.service.whenConnected()
152+
153+ yield self.channel.basic_publish(
154+ routing_key="uuid", content=Content("hello"))
155+ notification = yield self.source.get("uuid", 0)
156+
157+ self.rabbit.cleanUp()
158+
159+ yield client.disconnected.wait()
160+
161+ try:
162+ yield notification.ack()
163+ except Bounced:
164+ pass
165+ else:
166+ self.fail("Notification not bounced")
167+
168+ self.rabbit.config = RabbitServerResources(
169+ port=self.rabbit.config.port) # Ensure that we use the same port
170+ self.rabbit.setUp()
171+
172
173 class DeprecatedQueueManagerTest(AMQTest):
174
175
176=== modified file 'txlongpoll/tests/test_notification.py'
177--- txlongpoll/tests/test_notification.py 2016-06-23 07:04:53 +0000
178+++ txlongpoll/tests/test_notification.py 2016-06-23 12:20:23 +0000
179@@ -12,6 +12,8 @@
180 succeeded,
181 failed,
182 )
183+# XXX This testtools API should probably be public, see #1589657.
184+from testtools.twistedsupport._deferred import extract_result
185
186 from twisted.internet.task import Clock
187 from twisted.logger import Logger
188@@ -25,6 +27,7 @@
189 NotificationSource,
190 Timeout,
191 NotFound,
192+ Bounced,
193 )
194 from txlongpoll.testing.unit import (
195 FakeConnector,
196@@ -281,6 +284,83 @@
197 channel.basic_cancel_ok(consumer_tag="uuid2.1")
198 self.assertThat(deferred2, fires_with_payload("foo"))
199
200+ def test_notification_ack(self):
201+ """
202+ Calling Notification.ack() acknowledges a notification.
203+ """
204+ deferred = self.source.get("uuid", 1)
205+ channel = self.connector.transport.channel(1)
206+ channel.basic_consume_ok(consumer_tag="uuid.1")
207+ channel.deliver("foo", consumer_tag='uuid.1', delivery_tag=1)
208+ channel.basic_cancel_ok(consumer_tag="uuid.1")
209+ notification = extract_result(deferred)
210+ self.connector.transport.outgoing.clear()
211+ notification.ack()
212+ [frame] = self.connector.transport.outgoing[1]
213+ self.assertEqual("ack", frame.payload.method.name)
214+ self.assertEqual((1, False), frame.payload.args)
215+
216+ def test_notification_ack_is_serialized(self):
217+ """
218+ Calls to Notification.ack() are serialized, so they don't get spurious
219+ errors from unrelated channel commands.
220+ """
221+ deferred = self.source.get("uuid1", 1)
222+ channel = self.connector.transport.channel(1)
223+ channel.basic_consume_ok(consumer_tag="uuid1.1")
224+ channel.deliver("foo", consumer_tag='uuid1.1', delivery_tag=1)
225+ channel.basic_cancel_ok(consumer_tag="uuid1.1")
226+ notification = extract_result(deferred)
227+
228+ # Simulate a concurrent get() call locking the channel during its
229+ # basic-consume.
230+ self.source.get("uuid2", 1)
231+
232+ # Calling Notification.ack() now will not result in any outgoing frame,
233+ # since the call will wait for the basic-consume above to complete.
234+ self.connector.transport.outgoing.clear()
235+ notification.ack()
236+ self.assertEqual({}, self.connector.transport.outgoing)
237+
238+ # As soon as the channel lock is released, the frame is sent.
239+ channel.basic_consume_ok(consumer_tag="uuid2.1")
240+ [frame] = self.connector.transport.outgoing[1]
241+ self.assertEqual("ack", frame.payload.method.name)
242+ self.assertEqual((1, False), frame.payload.args)
243+
244+ def test_notification_reject(self):
245+ """
246+ Calling Notification.reject() rejects a notification.
247+ """
248+ deferred = self.source.get("uuid", 1)
249+ channel = self.connector.transport.channel(1)
250+ channel.basic_consume_ok(consumer_tag="uuid.1")
251+ channel.deliver("foo", consumer_tag='uuid.1', delivery_tag=1)
252+ channel.basic_cancel_ok(consumer_tag="uuid.1")
253+ notification = extract_result(deferred)
254+ self.connector.transport.outgoing.clear()
255+ notification.reject()
256+ [frame] = self.connector.transport.outgoing[1]
257+ self.assertEqual("reject", frame.payload.method.name)
258+ self.assertEqual((1, True), frame.payload.args)
259+
260+ def test_notification_bounced(self):
261+ """
262+ If an error happens while ack'ing a Notification, a Bounced exception
263+ is raised.
264+ """
265+ deferred = self.source.get("uuid1", 1)
266+ channel = self.connector.transport.channel(1)
267+ channel.basic_consume_ok(consumer_tag="uuid1.1")
268+ channel.deliver("foo", consumer_tag='uuid1.1', delivery_tag=1)
269+ channel.basic_cancel_ok(consumer_tag="uuid1.1")
270+ notification = extract_result(deferred)
271+
272+ # Simulate the broker shutting down.
273+ channel.connection_close(reply_code=320, reply_text="shutdown")
274+
275+ self.assertRaises(Bounced, extract_result, notification.ack())
276+
277
278 def fires_with_channel(id):
279 """Assert that a connector fires with the given channel ID."""

Subscribers

People subscribed via source and target branches