Merge ~cjwatson/launchpad:remove-rabbitmq-sessions into launchpad:master

Proposed by Colin Watson
Status: Merged
Approved by: Colin Watson
Approved revision: 5d7785185f6386257d80b426c8802490a939cd9d
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~cjwatson/launchpad:remove-rabbitmq-sessions
Merge into: launchpad:master
Diff against target: 1024 lines (+5/-858)
8 files modified
dev/null (+0/-447)
lib/lp/services/configure.zcml (+0/-1)
lib/lp/services/messaging/interfaces.py (+0/-82)
lib/lp/services/messaging/rabbit.py (+1/-254)
lib/lp/services/webapp/doc/webapp-publication.rst (+1/-1)
lib/lp/services/webapp/interfaces.py (+0/-17)
lib/lp/services/webapp/publication.py (+2/-4)
lib/lp/services/webapp/tests/test_servers.py (+1/-52)
Reviewer Review Type Date Requested Status
Jürgen Gmach Approve
Review via email: mp+430846@code.launchpad.net

Commit message

Remove RabbitMQ session infrastructure

Description of the change

This was only used by the longpoll mechanism that was removed in commit df2fe07c8766a80affa8467c5491af647f923840.

I started to try to modify this code to support multiple RabbitMQ broker URLs, then realized that would be a lot easier if I removed a big chunk of it first.

To post a comment you must log in.
Revision history for this message
Jürgen Gmach (jugmac00) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/services/configure.zcml b/lib/lp/services/configure.zcml
2index 1461879..77dac94 100644
3--- a/lib/lp/services/configure.zcml
4+++ b/lib/lp/services/configure.zcml
5@@ -21,7 +21,6 @@
6 <include package=".mail" />
7 <include package=".memcache" />
8 <include package=".messages" />
9- <include package=".messaging" />
10 <include package=".oauth" />
11 <include package=".openid" />
12 <include package=".profile" />
13diff --git a/lib/lp/services/messaging/configure.zcml b/lib/lp/services/messaging/configure.zcml
14deleted file mode 100644
15index f8d23bd..0000000
16--- a/lib/lp/services/messaging/configure.zcml
17+++ /dev/null
18@@ -1,18 +0,0 @@
19-<!-- Copyright 2011 Canonical Ltd. This software is licensed under the
20- GNU Affero General Public License version 3 (see the file LICENSE).
21--->
22-<configure
23- xmlns="http://namespaces.zope.org/zope"
24- xmlns:browser="http://namespaces.zope.org/browser"
25- xmlns:i18n="http://namespaces.zope.org/i18n"
26- i18n_domain="launchpad">
27- <utility
28- provides=".interfaces.IMessageSession"
29- component=".rabbit.unreliable_session" />
30- <subscriber
31- for="lp.services.webapp.interfaces.IFinishReadOnlyRequestEvent"
32- handler=".rabbit.session_finish_handler" />
33- <subscriber
34- for="lp.services.webapp.interfaces.IFinishReadOnlyRequestEvent"
35- handler=".rabbit.unreliable_session_finish_handler" />
36-</configure>
37diff --git a/lib/lp/services/messaging/interfaces.py b/lib/lp/services/messaging/interfaces.py
38index c2c5dee..c73efac 100644
39--- a/lib/lp/services/messaging/interfaces.py
40+++ b/lib/lp/services/messaging/interfaces.py
41@@ -4,96 +4,14 @@
42 """Messaging interfaces."""
43
44 __all__ = [
45- "IMessageConsumer",
46- "IMessageProducer",
47- "IMessageSession",
48 "MessagingException",
49 "MessagingUnavailable",
50- "QueueEmpty",
51- "QueueNotFound",
52 ]
53
54
55-from zope.interface import Interface
56-from zope.schema import Bool
57-
58-
59 class MessagingException(Exception):
60 """Failure in messaging."""
61
62
63 class MessagingUnavailable(MessagingException):
64 """Messaging systems are not available."""
65-
66-
67-class QueueNotFound(MessagingException):
68- """Raised if the queue was not found."""
69-
70-
71-class QueueEmpty(MessagingException):
72- """Raised if there are no queued messages on a non-blocking read."""
73-
74-
75-class IMessageSession(Interface):
76-
77- is_connected = Bool(
78- "Whether the session is connected to the messaging system."
79- )
80-
81- def connect():
82- """Connect to the messaging system.
83-
84- If the session is already connected this should be a no-op.
85- """
86-
87- def disconnect():
88- """Disconnect from the messaging system.
89-
90- If the session is already disconnected this should be a no-op.
91- """
92-
93- def flush():
94- """Run deferred tasks."""
95-
96- def finish():
97- """Flush the session and reset."""
98-
99- def reset():
100- """Reset the session."""
101-
102- def defer(func, *args, **kwargs):
103- """Schedule something to happen when this session is finished."""
104-
105- def getProducer(name):
106- """Get a `IMessageProducer` associated with this session."""
107-
108- def getConsumer(name):
109- """Get a `IMessageConsumer` associated with this session."""
110-
111-
112-class IMessageConsumer(Interface):
113- def receive(blocking=True):
114- """Receive data from the queue.
115-
116- :raises EmptyQueue: If non-blocking and the queue is empty.
117- """
118-
119-
120-class IMessageProducer(Interface):
121- def send(data):
122- """Serialize `data` into JSON and send it to the queue on commit."""
123-
124- def sendNow(data):
125- """Serialize `data` into JSON and send it to the queue immediately."""
126-
127- def associateConsumer(consumer):
128- """Make the consumer receive messages from this producer on commit.
129-
130- :param consumer: An `IMessageConsumer`
131- """
132-
133- def associateConsumerNow(consumer):
134- """Make the consumer receive messages from this producer.
135-
136- :param consumer: An `IMessageConsumer`
137- """
138diff --git a/lib/lp/services/messaging/rabbit.py b/lib/lp/services/messaging/rabbit.py
139index 46b3b18..6c8bd34 100644
140--- a/lib/lp/services/messaging/rabbit.py
141+++ b/lib/lp/services/messaging/rabbit.py
142@@ -6,51 +6,12 @@
143 __all__ = [
144 "connect",
145 "is_configured",
146- "session",
147- "unreliable_session",
148 ]
149
150-import json
151-import sys
152-import threading
153-import time
154-from collections import deque
155-from functools import partial
156-
157 import amqp
158-import transaction
159-from transaction._transaction import Status as TransactionStatus
160-from zope.interface import implementer
161
162 from lp.services.config import config
163-from lp.services.messaging.interfaces import (
164- IMessageConsumer,
165- IMessageProducer,
166- IMessageSession,
167- MessagingUnavailable,
168- QueueEmpty,
169- QueueNotFound,
170-)
171-
172-LAUNCHPAD_EXCHANGE = "launchpad-exchange"
173-
174-
175-@implementer(transaction.interfaces.ISynchronizer)
176-class RabbitSessionTransactionSync:
177- def __init__(self, session):
178- self.session = session
179-
180- def newTransaction(self, txn):
181- pass
182-
183- def beforeCompletion(self, txn):
184- pass
185-
186- def afterCompletion(self, txn):
187- if txn.status == TransactionStatus.COMMITTED:
188- self.session.finish()
189- else:
190- self.session.reset()
191+from lp.services.messaging.interfaces import MessagingUnavailable
192
193
194 def is_configured():
195@@ -78,217 +39,3 @@ def connect():
196 )
197 connection.connect()
198 return connection
199-
200-
201-@implementer(IMessageSession)
202-class RabbitSession(threading.local):
203-
204- exchange = LAUNCHPAD_EXCHANGE
205-
206- def __init__(self):
207- super().__init__()
208- self._connection = None
209- self._deferred = deque()
210- # Maintain sessions according to transaction boundaries. Keep a strong
211- # reference to the sync because the transaction manager does not. We
212- # need one per thread (definining it here is enough to ensure that).
213- self._sync = RabbitSessionTransactionSync(self)
214- transaction.manager.registerSynch(self._sync)
215-
216- @property
217- def is_connected(self):
218- """See `IMessageSession`."""
219- return self._connection is not None and self._connection.connected
220-
221- def connect(self):
222- """See `IMessageSession`.
223-
224- Open a connection for this thread if necessary. Connections cannot be
225- shared between threads.
226- """
227- if self._connection is None or not self._connection.connected:
228- self._connection = connect()
229- return self._connection
230-
231- def disconnect(self):
232- """See `IMessageSession`."""
233- if self._connection is not None:
234- try:
235- self._connection.close()
236- except OSError:
237- # Socket error is fine; the connection is still closed.
238- pass
239- finally:
240- self._connection = None
241-
242- def flush(self):
243- """See `IMessageSession`."""
244- tasks = self._deferred
245- while len(tasks) != 0:
246- tasks.popleft()()
247-
248- def finish(self):
249- """See `IMessageSession`."""
250- try:
251- self.flush()
252- finally:
253- self.reset()
254-
255- def reset(self):
256- """See `IMessageSession`."""
257- self._deferred.clear()
258- self.disconnect()
259-
260- def defer(self, func, *args, **kwargs):
261- """See `IMessageSession`."""
262- self._deferred.append(partial(func, *args, **kwargs))
263-
264- def getProducer(self, name):
265- """See `IMessageSession`."""
266- return RabbitRoutingKey(self, name)
267-
268- def getConsumer(self, name):
269- """See `IMessageSession`."""
270- return RabbitQueue(self, name)
271-
272-
273-# Per-thread sessions.
274-session = RabbitSession()
275-session_finish_handler = lambda event: session.finish()
276-
277-
278-class RabbitUnreliableSession(RabbitSession):
279- """An "unreliable" `RabbitSession`.
280-
281- Unreliable in this case means that certain errors in deferred tasks are
282- silently suppressed. This means that services can continue to function
283- even in the absence of a running and fully functional message queue.
284-
285- Other types of errors are also caught because we don't want this
286- subsystem to destabilise other parts of Launchpad but we nonetheless
287- record OOPses for these.
288-
289- XXX: We only suppress MessagingUnavailable for now because we want to
290- monitor this closely before we add more exceptions to the
291- suppressed_errors list. Potential candidates are `MessagingException`,
292- `IOError` or `amqp.AMQPException`.
293- """
294-
295- suppressed_errors = (MessagingUnavailable,)
296-
297- def finish(self):
298- """See `IMessageSession`.
299-
300- Suppresses errors listed in `suppressed_errors`. Also suppresses
301- other errors but files an oops report for these.
302- """
303- try:
304- super().finish()
305- except self.suppressed_errors:
306- pass
307- except Exception:
308- from lp.services.webapp import errorlog
309-
310- errorlog.globalErrorUtility.raising(sys.exc_info())
311-
312-
313-# Per-thread "unreliable" sessions.
314-unreliable_session = RabbitUnreliableSession()
315-unreliable_session_finish_handler = lambda event: unreliable_session.finish()
316-
317-
318-class RabbitMessageBase:
319- """Base class for all RabbitMQ messaging."""
320-
321- def __init__(self, session):
322- self.session = IMessageSession(session)
323- self._channel = None
324-
325- @property
326- def channel(self):
327- if self._channel is None or not self._channel.is_open:
328- connection = self.session.connect()
329- self._channel = connection.channel()
330- self._channel.exchange_declare(
331- self.session.exchange,
332- "direct",
333- durable=False,
334- auto_delete=False,
335- nowait=False,
336- )
337- return self._channel
338-
339-
340-@implementer(IMessageProducer)
341-class RabbitRoutingKey(RabbitMessageBase):
342- """A RabbitMQ data origination point."""
343-
344- def __init__(self, session, routing_key):
345- super().__init__(session)
346- self.key = routing_key
347-
348- def associateConsumer(self, consumer):
349- """Only receive messages for requested routing key."""
350- self.session.defer(self.associateConsumerNow, consumer)
351-
352- def associateConsumerNow(self, consumer):
353- """Only receive messages for requested routing key."""
354- # The queue will be auto-deleted 5 minutes after its last use.
355- # http://www.rabbitmq.com/extensions.html#queue-leases
356- self.channel.queue_declare(
357- consumer.name,
358- nowait=False,
359- auto_delete=False,
360- arguments={"x-expires": 300000},
361- ) # 5 minutes.
362- self.channel.queue_bind(
363- queue=consumer.name,
364- exchange=self.session.exchange,
365- routing_key=self.key,
366- nowait=False,
367- )
368-
369- def send(self, data):
370- """See `IMessageProducer`."""
371- self.session.defer(self.sendNow, data)
372-
373- def sendNow(self, data):
374- """Immediately send a message to the broker."""
375- json_data = json.dumps(data)
376- msg = amqp.Message(json_data)
377- self.channel.basic_publish(
378- exchange=self.session.exchange, routing_key=self.key, msg=msg
379- )
380-
381-
382-@implementer(IMessageConsumer)
383-class RabbitQueue(RabbitMessageBase):
384- """A RabbitMQ Queue."""
385-
386- def __init__(self, session, name):
387- super().__init__(session)
388- self.name = name
389-
390- def receive(self, timeout=0.0):
391- """Pull a message from the queue.
392-
393- :param timeout: Wait a maximum of `timeout` seconds before giving up,
394- trying at least once.
395- :raises QueueEmpty: if the timeout passes.
396- """
397- endtime = time.time() + timeout
398- while True:
399- try:
400- message = self.channel.basic_get(self.name)
401- if message is None:
402- if time.time() > endtime:
403- raise QueueEmpty()
404- time.sleep(0.1)
405- else:
406- self.channel.basic_ack(message.delivery_tag)
407- return json.loads(message.body)
408- except amqp.ChannelError as error:
409- if error.reply_code == 404:
410- raise QueueNotFound()
411- else:
412- raise
413diff --git a/lib/lp/services/messaging/tests/__init__.py b/lib/lp/services/messaging/tests/__init__.py
414deleted file mode 100644
415index e69de29..0000000
416--- a/lib/lp/services/messaging/tests/__init__.py
417+++ /dev/null
418diff --git a/lib/lp/services/messaging/tests/test_rabbit.py b/lib/lp/services/messaging/tests/test_rabbit.py
419deleted file mode 100644
420index f82a0a1..0000000
421--- a/lib/lp/services/messaging/tests/test_rabbit.py
422+++ /dev/null
423@@ -1,447 +0,0 @@
424-# Copyright 2011 Canonical Ltd. This software is licensed under the
425-# GNU Affero General Public License version 3 (see the file LICENSE).
426-
427-"""Messaging utility tests."""
428-
429-from functools import partial
430-from itertools import count
431-
432-import transaction
433-from testtools.testcase import ExpectedException
434-from transaction._transaction import Status as TransactionStatus
435-from zope.component import getUtility
436-from zope.event import notify
437-
438-from lp.services.messaging.interfaces import (
439- IMessageConsumer,
440- IMessageProducer,
441- IMessageSession,
442- MessagingUnavailable,
443- QueueEmpty,
444- QueueNotFound,
445-)
446-from lp.services.messaging.rabbit import (
447- RabbitMessageBase,
448- RabbitQueue,
449- RabbitRoutingKey,
450- RabbitSession,
451- RabbitSessionTransactionSync,
452- RabbitUnreliableSession,
453-)
454-from lp.services.messaging.rabbit import session as global_session
455-from lp.services.messaging.rabbit import (
456- unreliable_session as global_unreliable_session,
457-)
458-from lp.services.webapp.interfaces import FinishReadOnlyRequestEvent
459-from lp.testing import TestCase, monkey_patch
460-from lp.testing.fakemethod import FakeMethod
461-from lp.testing.faketransaction import FakeTransaction
462-from lp.testing.layers import LaunchpadFunctionalLayer, RabbitMQLayer
463-from lp.testing.matchers import Provides
464-
465-# RabbitMQ is not (yet) torn down or reset between tests, so here are sources
466-# of distinct names.
467-queue_names = ("queue.%d" % num for num in count(1))
468-key_names = ("key.%d" % num for num in count(1))
469-
470-
471-class FakeRabbitSession:
472- def __init__(self):
473- self.log = []
474-
475- def finish(self):
476- self.log.append("finish")
477-
478- def reset(self):
479- self.log.append("reset")
480-
481-
482-class TestRabbitSessionTransactionSync(TestCase):
483- def test_interface(self):
484- self.assertThat(
485- RabbitSessionTransactionSync(None),
486- Provides(transaction.interfaces.ISynchronizer),
487- )
488-
489- def test_afterCompletion_COMMITTED(self):
490- txn = FakeTransaction()
491- txn.status = TransactionStatus.COMMITTED
492- fake_session = FakeRabbitSession()
493- sync = RabbitSessionTransactionSync(fake_session)
494- sync.afterCompletion(txn)
495- self.assertEqual(["finish"], fake_session.log)
496-
497- def test_afterCompletion_ACTIVE(self):
498- txn = FakeTransaction()
499- txn.status = TransactionStatus.ACTIVE
500- fake_session = FakeRabbitSession()
501- sync = RabbitSessionTransactionSync(fake_session)
502- sync.afterCompletion(txn)
503- self.assertEqual(["reset"], fake_session.log)
504-
505-
506-class RabbitTestCase(TestCase):
507-
508- layer = RabbitMQLayer
509-
510- def tearDown(self):
511- super().tearDown()
512- global_session.reset()
513- global_unreliable_session.reset()
514-
515-
516-class TestRabbitSession(RabbitTestCase):
517-
518- session_factory = RabbitSession
519-
520- def test_interface(self):
521- session = self.session_factory()
522- self.assertThat(session, Provides(IMessageSession))
523-
524- def test_connect(self):
525- session = self.session_factory()
526- self.assertFalse(session.is_connected)
527- connection = session.connect()
528- self.assertTrue(session.is_connected)
529- self.assertIs(connection, session._connection)
530-
531- def test_connect_with_incomplete_configuration(self):
532- self.pushConfig("rabbitmq", host="none")
533- session = self.session_factory()
534- with ExpectedException(
535- MessagingUnavailable, "Incomplete configuration"
536- ):
537- session.connect()
538-
539- def test_disconnect(self):
540- session = self.session_factory()
541- session.connect()
542- session.disconnect()
543- self.assertFalse(session.is_connected)
544-
545- def test_disconnect_with_error(self):
546- session = self.session_factory()
547- session.connect()
548- old_close = session._connection.close
549-
550- def new_close(*args, **kwargs):
551- old_close(*args, **kwargs)
552- raise OSError
553-
554- with monkey_patch(session._connection, close=new_close):
555- session.disconnect()
556- self.assertFalse(session.is_connected)
557-
558- def test_is_connected(self):
559- # is_connected is False once a connection has been closed.
560- session = self.session_factory()
561- session.connect()
562- # Close the connection without using disconnect().
563- session._connection.close()
564- self.assertFalse(session.is_connected)
565-
566- def test_defer(self):
567- task = lambda foo, bar: None
568- session = self.session_factory()
569- session.defer(task, "foo", bar="baz")
570- self.assertEqual(1, len(session._deferred))
571- [deferred_task] = session._deferred
572- self.assertIsInstance(deferred_task, partial)
573- self.assertIs(task, deferred_task.func)
574- self.assertEqual(("foo",), deferred_task.args)
575- self.assertEqual({"bar": "baz"}, deferred_task.keywords)
576-
577- def test_flush(self):
578- # RabbitSession.flush() runs deferred tasks.
579- log = []
580- task = lambda: log.append("task")
581- session = self.session_factory()
582- session.defer(task)
583- session.connect()
584- session.flush()
585- self.assertEqual(["task"], log)
586- self.assertEqual([], list(session._deferred))
587- self.assertTrue(session.is_connected)
588-
589- def test_reset(self):
590- # RabbitSession.reset() resets session variables and does not run
591- # deferred tasks.
592- log = []
593- task = lambda: log.append("task")
594- session = self.session_factory()
595- session.defer(task)
596- session.connect()
597- session.reset()
598- self.assertEqual([], log)
599- self.assertEqual([], list(session._deferred))
600- self.assertFalse(session.is_connected)
601-
602- def test_finish(self):
603- # RabbitSession.finish() resets session variables after running
604- # deferred tasks.
605- log = []
606- task = lambda: log.append("task")
607- session = self.session_factory()
608- session.defer(task)
609- session.connect()
610- session.finish()
611- self.assertEqual(["task"], log)
612- self.assertEqual([], list(session._deferred))
613- self.assertFalse(session.is_connected)
614-
615- def test_getProducer(self):
616- session = self.session_factory()
617- producer = session.getProducer("foo")
618- self.assertIsInstance(producer, RabbitRoutingKey)
619- self.assertIs(session, producer.session)
620- self.assertEqual("foo", producer.key)
621-
622- def test_getConsumer(self):
623- session = self.session_factory()
624- consumer = session.getConsumer("foo")
625- self.assertIsInstance(consumer, RabbitQueue)
626- self.assertIs(session, consumer.session)
627- self.assertEqual("foo", consumer.name)
628-
629-
630-class TestRabbitUnreliableSession(TestRabbitSession):
631-
632- session_factory = RabbitUnreliableSession
633- layer = RabbitMQLayer
634-
635- def setUp(self):
636- super().setUp()
637- self.prev_oops = self.getOops()
638-
639- def getOops(self):
640- try:
641- self.oops_capture.sync()
642- return self.oopses[-1]
643- except IndexError:
644- return None
645-
646- def assertNoOops(self):
647- oops_report = self.getOops()
648- self.assertEqual(repr(self.prev_oops), repr(oops_report))
649-
650- def assertOops(self, text_in_oops):
651- oops_report = self.getOops()
652- self.assertNotEqual(
653- repr(self.prev_oops), repr(oops_report), "No OOPS reported!"
654- )
655- self.assertIn(text_in_oops, str(oops_report))
656-
657- def _test_finish_suppresses_exception(self, exception):
658- # Simple helper to test that the given exception is suppressed
659- # when raised by finish().
660- session = self.session_factory()
661- session.defer(FakeMethod(failure=exception))
662- session.finish() # Look, no exceptions!
663-
664- def test_finish_suppresses_MessagingUnavailable(self):
665- self._test_finish_suppresses_exception(
666- MessagingUnavailable("Messaging borked.")
667- )
668- self.assertNoOops()
669-
670- def test_finish_suppresses_other_errors_with_oopses(self):
671- exception = Exception("That hent worked.")
672- self._test_finish_suppresses_exception(exception)
673- self.assertOops(str(exception))
674-
675-
676-class TestRabbitMessageBase(RabbitTestCase):
677- def test_session(self):
678- base = RabbitMessageBase(global_session)
679- self.assertIs(global_session, base.session)
680-
681- def test_channel(self):
682- # Referencing the channel property causes the session to connect.
683- base = RabbitMessageBase(global_session)
684- self.assertFalse(base.session.is_connected)
685- channel = base.channel
686- self.assertTrue(base.session.is_connected)
687- self.assertIsNot(None, channel)
688- # The same channel is returned every time.
689- self.assertIs(channel, base.channel)
690-
691- def test_channel_session_closed(self):
692- # When the session is disconnected the channel is thrown away too.
693- base = RabbitMessageBase(global_session)
694- channel1 = base.channel
695- base.session.disconnect()
696- channel2 = base.channel
697- self.assertNotEqual(channel1, channel2)
698-
699-
700-class TestRabbitRoutingKey(RabbitTestCase):
701- def test_interface(self):
702- routing_key = RabbitRoutingKey(global_session, next(key_names))
703- self.assertThat(routing_key, Provides(IMessageProducer))
704-
705- def test_associateConsumer(self):
706- # associateConsumer() only associates the consumer at transaction
707- # commit time. However, order is preserved.
708- consumer = RabbitQueue(global_session, next(queue_names))
709- routing_key = RabbitRoutingKey(global_session, next(key_names))
710- routing_key.associateConsumer(consumer)
711- # The session is still not connected.
712- self.assertFalse(global_session.is_connected)
713- routing_key.sendNow("now")
714- routing_key.send("later")
715- # The queue is not found because the consumer has not yet been
716- # associated with the routing key and the queue declared.
717- self.assertRaises(QueueNotFound, consumer.receive, timeout=2)
718- transaction.commit()
719- # Now that the transaction has been committed, the consumer is
720- # associated, and receives the deferred message.
721- self.assertEqual("later", consumer.receive(timeout=2))
722-
723- def test_associateConsumerNow(self):
724- # associateConsumerNow() associates the consumer right away.
725- consumer = RabbitQueue(global_session, next(queue_names))
726- routing_key = RabbitRoutingKey(global_session, next(key_names))
727- routing_key.associateConsumerNow(consumer)
728- routing_key.sendNow("now")
729- routing_key.send("later")
730- # There is already something in the queue.
731- self.assertEqual("now", consumer.receive(timeout=2))
732- transaction.commit()
733- # Now that the transaction has been committed there is another item in
734- # the queue.
735- self.assertEqual("later", consumer.receive(timeout=2))
736-
737- def test_send(self):
738- consumer = RabbitQueue(global_session, next(queue_names))
739- routing_key = RabbitRoutingKey(global_session, next(key_names))
740- routing_key.associateConsumerNow(consumer)
741-
742- for data in range(90, 100):
743- routing_key.send(data)
744-
745- routing_key.sendNow("sync")
746- # There is nothing in the queue except the sync we just sent.
747- self.assertEqual("sync", consumer.receive(timeout=2))
748-
749- # Messages get sent on commit
750- transaction.commit()
751- for data in range(90, 100):
752- self.assertEqual(data, consumer.receive())
753-
754- # There are no more messages. They have all been consumed.
755- routing_key.sendNow("sync")
756- self.assertEqual("sync", consumer.receive(timeout=2))
757-
758- def test_sendNow(self):
759- consumer = RabbitQueue(global_session, next(queue_names))
760- routing_key = RabbitRoutingKey(global_session, next(key_names))
761- routing_key.associateConsumerNow(consumer)
762-
763- for data in range(50, 60):
764- routing_key.sendNow(data)
765- received_data = consumer.receive(timeout=2)
766- self.assertEqual(data, received_data)
767-
768- def test_does_not_connect_session_immediately(self):
769- # RabbitRoutingKey does not connect the session until necessary.
770- RabbitRoutingKey(global_session, next(key_names))
771- self.assertFalse(global_session.is_connected)
772-
773-
774-class TestRabbitQueue(RabbitTestCase):
775- def test_interface(self):
776- consumer = RabbitQueue(global_session, next(queue_names))
777- self.assertThat(consumer, Provides(IMessageConsumer))
778-
779- def test_receive(self):
780- consumer = RabbitQueue(global_session, next(queue_names))
781- routing_key = RabbitRoutingKey(global_session, next(key_names))
782- routing_key.associateConsumerNow(consumer)
783-
784- for data in range(55, 65):
785- routing_key.sendNow(data)
786- self.assertEqual(data, consumer.receive(timeout=2))
787-
788- # All the messages received were consumed.
789- self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
790-
791- # New connections to the queue see an empty queue too.
792- consumer.session.disconnect()
793- consumer = RabbitQueue(global_session, next(queue_names))
794- routing_key = RabbitRoutingKey(global_session, next(key_names))
795- routing_key.associateConsumerNow(consumer)
796- self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
797-
798- def test_does_not_connect_session_immediately(self):
799- # RabbitQueue does not connect the session until necessary.
800- RabbitQueue(global_session, next(queue_names))
801- self.assertFalse(global_session.is_connected)
802-
803-
804-class TestRabbit(RabbitTestCase):
805- """Integration-like tests for the RabbitMQ messaging abstractions."""
806-
807- def get_synced_sessions(self):
808- try:
809- syncs_set = transaction.manager.manager._synchs
810- except KeyError:
811- return set()
812- else:
813- return {
814- sync.session
815- for sync in syncs_set.data.values()
816- if isinstance(sync, RabbitSessionTransactionSync)
817- }
818-
819- def test_global_session(self):
820- self.assertIsInstance(global_session, RabbitSession)
821- self.assertIn(global_session, self.get_synced_sessions())
822-
823- def test_global_unreliable_session(self):
824- self.assertIsInstance(
825- global_unreliable_session, RabbitUnreliableSession
826- )
827- self.assertIn(global_unreliable_session, self.get_synced_sessions())
828-
829- def test_abort(self):
830- consumer = RabbitQueue(global_session, next(queue_names))
831- routing_key = RabbitRoutingKey(global_session, next(key_names))
832- routing_key.associateConsumerNow(consumer)
833-
834- for data in range(90, 100):
835- routing_key.send(data)
836-
837- # Messages sent using send() are forgotten on abort.
838- transaction.abort()
839- self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
840-
841-
842-class TestRabbitWithLaunchpad(RabbitTestCase):
843- """Integration-like tests for the RabbitMQ messaging abstractions."""
844-
845- layer = LaunchpadFunctionalLayer
846-
847- def test_utility(self):
848- # The unreliable session is registered as the default IMessageSession
849- # utility.
850- self.assertIs(global_unreliable_session, getUtility(IMessageSession))
851-
852- def _test_session_finish_read_only_request(self, session):
853- # When a read-only request ends the session is also finished.
854- log = []
855- task = lambda: log.append("task")
856- session.defer(task)
857- session.connect()
858- notify(FinishReadOnlyRequestEvent(None, None))
859- self.assertEqual(["task"], log)
860- self.assertEqual([], list(session._deferred))
861- self.assertFalse(session.is_connected)
862-
863- def test_global_session_finish_read_only_request(self):
864- # When a read-only request ends the global_session is finished too.
865- self._test_session_finish_read_only_request(global_session)
866-
867- def test_global_unreliable_session_finish_read_only_request(self):
868- # When a read-only request ends the global_unreliable_session is
869- # finished too.
870- self._test_session_finish_read_only_request(global_unreliable_session)
871diff --git a/lib/lp/services/webapp/doc/webapp-publication.rst b/lib/lp/services/webapp/doc/webapp-publication.rst
872index 0fbbbe6..c478f92 100644
873--- a/lib/lp/services/webapp/doc/webapp-publication.rst
874+++ b/lib/lp/services/webapp/doc/webapp-publication.rst
875@@ -1001,7 +1001,7 @@ afterCall(). For example, this publication subclass will simply print
876 some string in its finishReadOnlyRequest().
877
878 >>> class MyPublication(LaunchpadBrowserPublication):
879- ... def finishReadOnlyRequest(self, request, ob, txn):
880+ ... def finishReadOnlyRequest(self, txn):
881 ... print("booo!")
882 ...
883
884diff --git a/lib/lp/services/webapp/interfaces.py b/lib/lp/services/webapp/interfaces.py
885index aa53686..c6ef41a 100644
886--- a/lib/lp/services/webapp/interfaces.py
887+++ b/lib/lp/services/webapp/interfaces.py
888@@ -818,23 +818,6 @@ class ICheckBoxWidgetLayout(IAlwaysSubmittedWidget):
889 """A widget that is displayed like a check box with label to the right."""
890
891
892-class IFinishReadOnlyRequestEvent(Interface):
893- """An event which gets sent when the publication is ended"""
894-
895- object = Attribute("The object to which this request pertains.")
896-
897- request = Attribute("The active request.")
898-
899-
900-@implementer(IFinishReadOnlyRequestEvent)
901-class FinishReadOnlyRequestEvent:
902- """An event which gets sent when the publication is ended"""
903-
904- def __init__(self, ob, request):
905- self.object = ob
906- self.request = request
907-
908-
909 class StormRangeFactoryError(Exception):
910 """Raised when a Storm result set cannot be used for slicing by a
911 StormRangeFactory.
912diff --git a/lib/lp/services/webapp/publication.py b/lib/lp/services/webapp/publication.py
913index bbe8b83..1557a46 100644
914--- a/lib/lp/services/webapp/publication.py
915+++ b/lib/lp/services/webapp/publication.py
916@@ -54,7 +54,6 @@ from lp.services.features.flags import NullFeatureController
917 from lp.services.oauth.interfaces import IOAuthSignedRequest
918 from lp.services.statsd.interfaces.statsd_client import IStatsdClient
919 from lp.services.webapp.interfaces import (
920- FinishReadOnlyRequestEvent,
921 ILaunchpadRoot,
922 IOpenLaunchBag,
923 IPlacelessAuthUtility,
924@@ -506,7 +505,7 @@ class LaunchpadBrowserPublication(
925 # Abort the transaction on a read-only request.
926 # NOTHING AFTER THIS SHOULD CAUSE A RETRY.
927 if request.method in ["GET", "HEAD"]:
928- self.finishReadOnlyRequest(request, ob, txn)
929+ self.finishReadOnlyRequest(txn)
930 elif txn.isDoomed():
931 # The following sends an abort to the database, even though the
932 # transaction is still doomed.
933@@ -528,13 +527,12 @@ class LaunchpadBrowserPublication(
934 # calling beforeTraversal or doing proper cleanup.
935 pass
936
937- def finishReadOnlyRequest(self, request, ob, txn):
938+ def finishReadOnlyRequest(self, txn):
939 """Hook called at the end of a read-only request.
940
941 By default it abort()s the transaction, but subclasses may need to
942 commit it instead, so they must overwrite this.
943 """
944- notify(FinishReadOnlyRequestEvent(ob, request))
945 txn.abort()
946
947 def callTraversalHooks(self, request, ob):
948diff --git a/lib/lp/services/webapp/tests/test_servers.py b/lib/lp/services/webapp/tests/test_servers.py
949index e0fa7e6..14820e0 100644
950--- a/lib/lp/services/webapp/tests/test_servers.py
951+++ b/lib/lp/services/webapp/tests/test_servers.py
952@@ -32,8 +32,6 @@ from lp.services.auth.enums import AccessTokenScope
953 from lp.services.identity.interfaces.account import AccountStatus
954 from lp.services.oauth.interfaces import TokenException
955 from lp.services.webapp.interaction import get_interaction_extras
956-from lp.services.webapp.interfaces import IFinishReadOnlyRequestEvent
957-from lp.services.webapp.publication import LaunchpadBrowserPublication
958 from lp.services.webapp.servers import (
959 ApplicationServerSettingRequestFactory,
960 FeedsBrowserRequest,
961@@ -49,7 +47,7 @@ from lp.services.webapp.servers import (
962 WebServiceTestRequest,
963 web_service_request_to_browser_request,
964 )
965-from lp.testing import EventRecorder, TestCase, TestCaseWithFactory, logout
966+from lp.testing import TestCase, TestCaseWithFactory, logout
967 from lp.testing.layers import DatabaseFunctionalLayer, FunctionalLayer
968 from lp.testing.publication import get_request_and_publication
969
970@@ -782,55 +780,6 @@ class LoggingTransaction:
971 self.log.append("ABORT")
972
973
974-class TestFinishReadOnlyRequest(TestCase):
975- # Publications that have a finishReadOnlyRequest() method are obliged to
976- # fire an IFinishReadOnlyRequestEvent.
977-
978- def _test_publication(self, publication, expected_transaction_log):
979- # publication.finishReadOnlyRequest() issues an
980- # IFinishReadOnlyRequestEvent and alters the transaction.
981- fake_request = object()
982- fake_object = object()
983- fake_transaction = LoggingTransaction()
984-
985- with EventRecorder() as event_recorder:
986- publication.finishReadOnlyRequest(
987- fake_request, fake_object, fake_transaction
988- )
989-
990- self.assertEqual(expected_transaction_log, fake_transaction.log)
991-
992- finish_events = [
993- event
994- for event in event_recorder.events
995- if IFinishReadOnlyRequestEvent.providedBy(event)
996- ]
997- self.assertEqual(
998- 1,
999- len(finish_events),
1000- (
1001- "Expected only one IFinishReadOnlyRequestEvent, but "
1002- "got: %r" % finish_events
1003- ),
1004- )
1005-
1006- [finish_event] = finish_events
1007- self.assertIs(fake_request, finish_event.request)
1008- self.assertIs(fake_object, finish_event.object)
1009-
1010- def test_WebServicePub_fires_FinishReadOnlyRequestEvent(self):
1011- # WebServicePublication.finishReadOnlyRequest() issues an
1012- # IFinishReadOnlyRequestEvent and aborts the transaction.
1013- publication = WebServicePublication(None)
1014- self._test_publication(publication, ["ABORT"])
1015-
1016- def test_LaunchpadBrowserPub_fires_FinishReadOnlyRequestEvent(self):
1017- # LaunchpadBrowserPublication.finishReadOnlyRequest() issues an
1018- # IFinishReadOnlyRequestEvent and aborts the transaction.
1019- publication = LaunchpadBrowserPublication(None)
1020- self._test_publication(publication, ["ABORT"])
1021-
1022-
1023 class TestWebServiceAccessTokens(TestCaseWithFactory):
1024 """Test personal access tokens for the webservice.
1025

Subscribers

People subscribed via source and target branches

to status/vote changes: