Merge lp:~lifeless/python-oops-amqp/0.0.2 into lp:python-oops-amqp

Proposed by Robert Collins
Status: Merged
Merged at revision: 3
Proposed branch: lp:~lifeless/python-oops-amqp/0.0.2
Merge into: lp:python-oops-amqp
Diff against target: 653 lines (+332/-83)
10 files modified
NEWS (+27/-0)
README (+5/-6)
oops_amqp/__init__.py (+6/-4)
oops_amqp/publisher.py (+15/-9)
oops_amqp/receiver.py (+58/-9)
oops_amqp/tests/__init__.py (+22/-9)
oops_amqp/tests/test_publisher.py (+35/-13)
oops_amqp/tests/test_receiver.py (+131/-32)
oops_amqp/utils.py (+32/-0)
setup.py (+1/-1)
To merge this branch: bzr merge lp:~lifeless/python-oops-amqp/0.0.2
Reviewer Review Type Date Requested Status
William Grant code Approve
Review via email: mp+79637@code.launchpad.net

Description of the change

0.0.2
-----

* Fix documentation warts from initial release, updated to 0.0.2 and prepare
  for making the receiver deal with interrupted services.
  (Robert Collins, #875976, #875984)

* Fix Receiver.run_forever to actually run forever. (Robert Collins)

* Change API for constructing Receiver to take a connection factory rather than
  a channel. This will permit handling transient faults internally rather than
  forcing a restart. (Robert Collins)

* Implement resiliency for the Receiver: automatically reconnect if a socket
  error is received from rabbit, for up to two minutes of downtime.
  (Robert Collins)

To post a comment you must log in.
Revision history for this message
Robert Collins (lifeless) wrote :

I forgot to add improvements from the work on LP, moved to WIP

lp:~lifeless/python-oops-amqp/0.0.2 updated
10. By Robert Collins

Permit supplying the oops id when publishing.

Revision history for this message
William Grant (wgrant) wrote :

183 - def __init__(self, config, channel, queue_name):
184 + def __init__(self, config, connection_factory, queue_name):
185 """Create a Receiver.
186
187 :param config: An oops.Config to republish the OOPS reports.
188 - :param channel: An amqplib Channel to listen for reports on.
189 + :param connection: An amqplib connection factory, used to make the
190 + initial connection and to reconnect if that connection is
191 + interrupted.

docstring says the param is "connection", but it's actually "connection_factory".

227 + while not self.stopping and time.time() < self.connection_start + 120:

This is going to stop retrying immediately if the connection dies after more than two minutes.

321 + connection = self.connection_factory()
322 + self.addCleanup(connection.close)
323 + channel = connection.channel()
324 + self.addCleanup(channel.close)
325 + queue = self.useFixture(QueueFixture(channel, self.getUniqueString))

This is now roughly quadruplicated. Put it in a helper or setUp?

318 + def test_publish_inherit_id(self):

Are test_publish and test_publish_inherit_id around the wrong way?

304 + # Publication returns the oops ID allocated.

s/allocated/provided/?

371 - def test_receive_one(self):

Where'd that go?

398 + connection = self.connection_factory()
399 + self.addCleanup(connection.close)
400 + channel = connection.channel()
401 + self.addCleanup(channel.close)
402 + queue = self.useFixture(QueueFixture(channel, self.getUniqueString))

More duplication.

410 + receiver.sentinel = sentinel
411 + receiver.run_forever()
412 + self.assertEqual([expected_report], reports)

May be worth a comment around run_forever to say that it will return quickly since you've already sent the sentinel.

422 + connection = self.connection_factory()
423 + self.addCleanup(connection.close)
424 + channel = connection.channel()
425 + self.addCleanup(channel.close)
426 + queue = self.useFixture(QueueFixture(channel, self.getUniqueString))

Guess what...

499 + # publisher tests is what le aks through when rabbit is shutdown).

You've leaked some whitespace there.

507 + connection = self.connection_factory()
508 + self.addCleanup(connection.close)
509 + channel = connection.channel()
510 + self.addCleanup(channel.close)
511 + queue = self.useFixture(QueueFixture(channel, self.getUniqueString))

Ahem.

review: Needs Fixing (code)
Revision history for this message
Robert Collins (lifeless) wrote :

receive_one got renamed stop_via_stopping

I see an fixtures_amqp coming on sometime soon.

lp:~lifeless/python-oops-amqp/0.0.2 updated
11. By Robert Collins

Review feedback (fix retry, docstring).

12. By Robert Collins

Refactor test support.

Revision history for this message
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2011-10-10 21:49:50 +0000
3+++ NEWS 2011-10-18 03:36:23 +0000
4@@ -6,6 +6,33 @@
5 NEXT
6 ----
7
8+0.0.2
9+-----
10+
11+* Fix documentation warts from initial release, updated to 0.0.2 and prepare
12+ for making the receiver deal with interrupted services.
13+ (Robert Collins, #875976, #875984)
14+
15+* Fix Receiver.run_forever to actually run forever. (Robert Collins)
16+
17+* Change API for constructing Receiver to take a connection factory rather than
18+ a channel. This will permit handling transient faults internally rather than
19+ forcing a restart. (Robert Collins)
20+
21+* Implement resiliency for the Receiver: automatically reconnect if a socket
22+ error is received from rabbit, for up to two minutes of downtime.
23+ (Robert Collins)
24+
25+* The publisher can honour any existing oops id if desired (set
26+ inherit_id=True). This is useful if using a chain of publishers where
27+ AMQP is the last stage rather than the primary mechanism.
28+ (Robert Collins)
29+
30+* Receiver.sentinel may be set and used to cause the run_forever loop to exit
31+ when a crafted message is received. This is useful for testing, and defaults
32+ to None (so cannot be used to shutdown a normally running service).
33+ (Robert Collins)
34+
35 0.0.1
36 -----
37
38
39=== modified file 'README'
40--- README 2011-10-10 21:49:50 +0000
41+++ README 2011-10-18 03:36:23 +0000
42@@ -98,17 +98,16 @@
43 AMQP. To use it you need to configure a local config to publish the received
44 reports. A full config is used because that includes support for filtering
45 (which can be useful if you need to throttle volume, for instance).
46-Additionally you need an amqp channel object and a queue name to receive from.
47+Additionally you need an amqp connection factory (to handle the amqp server
48+being restarted) and a queue name to receive from.
49
50-This example uses the OOPSDateDirRepo publisher, telling it to accept whatever
51+This example uses the DateDirRepo publisher, telling it to accept whatever
52 id was assigned by the process publishing to AMQP::
53
54- >>> publisher = oops_datedir_repo.OOPSDateDirRepo('.', inherit_id=True)
55+ >>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True)
56 >>> config = oops.Config()
57 >>> config.publishers.append(publisher.publish)
58- >>> connection = amqp.Connection(host="localhost:5672",
59- ... userid="guest", password="guest", virtual_host="/", insist=False)
60- >>> receiver = oops_amqp.Receiver(config, connection, "my queue")
61+ >>> receiver = oops_amqp.Receiver(config, factory, "my queue")
62 >>> receiver.run_forever()
63
64 For more information see pydoc oops_amqp.
65
66=== modified file 'oops_amqp/__init__.py'
67--- oops_amqp/__init__.py 2011-10-10 21:49:50 +0000
68+++ oops_amqp/__init__.py 2011-10-18 03:36:23 +0000
69@@ -73,14 +73,16 @@
70 AMQP. To use it you need to configure a local config to publish the received
71 reports. A full config is used because that includes support for filtering
72 (which can be useful if you need to throttle volume, for instance).
73+Additionally you need an amqp connection factory (to handle the amqp server
74+being restarted) and a queue name to receive from.
75
76-This example uses the OOPSDateDirRepo publisher, telling it to accept whatever
77+This example uses the DateDirRepo publisher, telling it to accept whatever
78 id was assigned by the process publishing to AMQP::
79
80- >>> publisher = oops_datedir_repo.OOPSDateDirRepo('.', inherit_id=True)
81+ >>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True)
82 >>> config = oops.Config()
83 >>> config.publishers.append(publisher.publish)
84- >>> receiver = oops_amqp.Receiver(config)
85+ >>> receiver = oops_amqp.Receiver(config, factory, "my queue")
86 >>> receiver.run_forever()
87 """
88
89@@ -95,7 +97,7 @@
90 # established at this point, and setup.py will use a version of next-$(revno).
91 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
92 # Otherwise it is major.minor.micro~$(revno).
93-__version__ = (0, 0, 1, 'beta', 0)
94+__version__ = (0, 0, 2, 'beta', 0)
95
96 __all__ = [
97 'Publisher',
98
99=== modified file 'oops_amqp/publisher.py'
100--- oops_amqp/publisher.py 2011-10-10 21:51:53 +0000
101+++ oops_amqp/publisher.py 2011-10-18 03:36:23 +0000
102@@ -36,7 +36,8 @@
103 supplied exchange + routing key.
104 """
105
106- def __init__(self, connection_factory, exchange_name, routing_key):
107+ def __init__(self, connection_factory, exchange_name, routing_key,
108+ inherit_id=False):
109 """Create a publisher.
110
111 :param connection_factory: A callable which creates an amqplib
112@@ -46,11 +47,15 @@
113 across threads.
114 :param exchange_name: The name of the exchange to publish to.
115 :param routing_key: The routing key for messages.
116+ :param inherit_id: If True any 'True' 'id' in an OOPS report is
117+ preserved. Handy if an id that has already been shown to a user is
118+ being published (but uniqueness cannot be guaranteed).
119 """
120 self.connection_factory = connection_factory
121 self.exchange_name = exchange_name
122 self.routing_key = routing_key
123 self.channels = local()
124+ self.inherit_id = inherit_id
125
126 def get_channel(self):
127 if getattr(self.channels, 'channel', None) is None:
128@@ -64,13 +69,14 @@
129 def __call__(self, report):
130 # Don't mess with the passed in report.
131 report = dict(report)
132- # Discard any existing id.
133- report.pop('id', None)
134- # Hash it, to make an ID
135- oops_id = "OOPS-%s" % md5(dumps(report)).hexdigest()
136- # Store the id in what we send on the wire, so that the recipient has
137- # it.
138- report['id'] = oops_id
139+ if not self.inherit_id or not report.get('id'):
140+ # Discard any existing id.
141+ original_id = report.pop('id', None)
142+ # Hash it, to make an ID
143+ oops_id = "OOPS-%s" % md5(dumps(report)).hexdigest()
144+ # Store the id in what we send on the wire, so that the recipient
145+ # has it.
146+ report['id'] = oops_id
147 message = amqp.Message(dumps(report))
148 # We don't want to drop OOPS on the floor if rabbit is restarted.
149 message.properties["delivery_mode"] = 2
150@@ -83,4 +89,4 @@
151 except socket.error:
152 self.channels.channel = None
153 return None
154- return oops_id
155+ return report['id']
156
157=== modified file 'oops_amqp/receiver.py'
158--- oops_amqp/receiver.py 2011-10-10 21:49:50 +0000
159+++ oops_amqp/receiver.py 2011-10-18 03:36:23 +0000
160@@ -18,8 +18,13 @@
161
162 __metaclass__ = type
163
164+import socket
165+import time
166+
167 import bson
168
169+from utils import close_ignoring_EPIPE
170+
171 __all__ = [
172 'Receiver',
173 ]
174@@ -28,23 +33,32 @@
175 """Republish OOPS reports from AMQP to a local oops.Config.
176
177 :ivar stopping: When True will cause Receiver to break out of run_forever.
178+ Calls to run_forever reset this to False.
179+ :ivar sentinel: If a message identical to the sentinel is received,
180+ handle_report will set stopping to True.
181 """
182
183- def __init__(self, config, channel, queue_name):
184+ def __init__(self, config, connection_factory, queue_name):
185 """Create a Receiver.
186
187 :param config: An oops.Config to republish the OOPS reports.
188- :param channel: An amqplib Channel to listen for reports on.
189+ :param connection_factory: An amqplib connection factory, used to make
190+ the initial connection and to reconnect if that connection is
191+ interrupted.
192 :param queue_name: The queue to listen for reports on.
193 """
194 self.config = config
195- self.channel = channel
196+ self.connection = None
197+ self.channel = None
198+ self.connection_factory = connection_factory
199 self.queue_name = queue_name
200- self.stopping = False
201+ self.sentinel = None
202
203 def handle_report(self, message):
204- if self.stopping:
205- self.channel.basic_cancel(self.consume_tag)
206+ if message.body == self.sentinel:
207+ self.stopping = True
208+ self.channel.basic_ack(message.delivery_tag)
209+ return
210 try:
211 report = bson.loads(message.body)
212 except KeyError:
213@@ -56,6 +70,41 @@
214 self.channel.basic_ack(message.delivery_tag)
215
216 def run_forever(self):
217- self.consume_tag = self.channel.basic_consume(
218- self.queue_name, callback=self.handle_report)
219- self.channel.wait()
220+ """Run in a loop handling messages.
221+
222+ If the amqp server is down or uncontactable for > 120 seconds, error
223+ out.
224+ """
225+ self.stopping = False
226+ self.went_bad = None
227+ while (not self.stopping and
228+ (not self.went_bad or time.time() < self.went_bad + 120)):
229+ try:
230+ self._run_forever()
231+ except socket.error:
232+ if not self.went_bad:
233+ self.went_bad = time.time()
234+ # Don't probe immediately, give the network/process time to
235+ # come back.
236+ time.sleep(0.1)
237+
238+ def _run_forever(self):
239+ self.connection = self.connection_factory()
240+ # A successful connection: record this so run_forever won't bail early.
241+ self.went_bad = None
242+ try:
243+ self.channel = self.connection.channel()
244+ try:
245+ self.consume_tag = self.channel.basic_consume(
246+ self.queue_name, callback=self.handle_report)
247+ try:
248+ while True:
249+ self.channel.wait()
250+ if self.stopping:
251+ break
252+ finally:
253+ self.channel.basic_cancel(self.consume_tag)
254+ finally:
255+ close_ignoring_EPIPE(self.channel)
256+ finally:
257+ close_ignoring_EPIPE(self.connection)
258
259=== modified file 'oops_amqp/tests/__init__.py'
260--- oops_amqp/tests/__init__.py 2011-10-10 21:49:50 +0000
261+++ oops_amqp/tests/__init__.py 2011-10-18 03:36:23 +0000
262@@ -16,8 +16,6 @@
263
264 """Tests for oops_amqp."""
265
266-import errno
267-import socket
268 from unittest import TestLoader
269
270 from amqplib import client_0_8 as amqp
271@@ -30,7 +28,10 @@
272 ResourcedTestCase,
273 )
274
275+from oops_amqp.utils import close_ignoring_EPIPE
276+
277 __all__ = [
278+ 'ChannelFixture',
279 'QueueFixture',
280 'test_suite',
281 'TestCase',
282@@ -80,6 +81,25 @@
283 self.channel.exchange_delete(self.exchange_name)
284
285
286+class ChannelFixture(Fixture):
287+ """Create an AMQP connection and channel for tests.
288+
289+ :ivar connection: an amqplib connection.
290+ :ivar channel: an amqplib channel
291+ """
292+
293+ def __init__(self, connection_factory):
294+ super(ChannelFixture, self).__init__()
295+ self.connection_factory = connection_factory
296+
297+ def setUp(self):
298+ super(ChannelFixture, self).setUp()
299+ self.connection = self.connection_factory()
300+ self.addCleanup(close_ignoring_EPIPE, self.connection)
301+ self.channel = self.connection.channel()
302+ self.addCleanup(close_ignoring_EPIPE, self.channel)
303+
304+
305 class TestCase(testtools.TestCase, ResourcedTestCase):
306 """Subclass to mix in testresources ResourcedTestCase."""
307
308@@ -91,13 +111,6 @@
309 self.rabbit.config.port), userid="guest", password="guest",
310 virtual_host="/")
311
312- def close_ignoring_EPIPE(self, closable):
313- try:
314- closable.close()
315- except socket.error, e:
316- if e.errno != errno.EPIPE:
317- raise
318-
319
320 def test_suite():
321 test_mod_names = [
322
323=== modified file 'oops_amqp/tests/test_publisher.py'
324--- oops_amqp/tests/test_publisher.py 2011-10-10 21:49:50 +0000
325+++ oops_amqp/tests/test_publisher.py 2011-10-18 03:36:23 +0000
326@@ -21,7 +21,9 @@
327 import bson
328
329 from oops_amqp import Publisher
330+from oops_amqp.utils import close_ignoring_EPIPE
331 from oops_amqp.tests import (
332+ ChannelFixture,
333 QueueFixture,
334 TestCase,
335 )
336@@ -29,13 +31,36 @@
337
338 class TestPublisher(TestCase):
339
340+ def test_publish_inherit_id(self):
341+ # OOPS id's can be set outside of Publisher().
342+ channel = self.useFixture(
343+ ChannelFixture(self.connection_factory)).channel
344+ queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
345+ publisher = Publisher(self.connection_factory, queue.exchange_name, "",
346+ inherit_id=True)
347+ reference_oops = {'id': 'kept', 'akey': 'avalue'}
348+ oops = dict(reference_oops)
349+ expected_id = 'kept'
350+ oops_id = publisher(oops)
351+ # Publication returns the oops ID allocated.
352+ self.assertEqual(expected_id, oops_id)
353+ # The oops should not be altered by publication.
354+ self.assertEqual(reference_oops, oops)
355+ # The received OOPS should have the ID embedded and be a bson dict.
356+ def check_oops(msg):
357+ self.assertEqual(reference_oops, bson.loads(msg.body))
358+ channel.basic_ack(msg.delivery_tag)
359+ channel.basic_cancel(queue.queue_name)
360+ channel.basic_consume(
361+ queue.queue_name, callback=check_oops,
362+ consumer_tag=queue.queue_name)
363+ channel.wait()
364+
365 def test_publish(self):
366 # Publishing an oops sends it to the exchange, making a connection as
367 # it goes.
368- connection = self.connection_factory()
369- self.addCleanup(connection.close)
370- channel = connection.channel()
371- self.addCleanup(channel.close)
372+ channel = self.useFixture(
373+ ChannelFixture(self.connection_factory)).channel
374 queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
375 publisher = Publisher(self.connection_factory, queue.exchange_name, "")
376 reference_oops = {'akey': 'avalue'}
377@@ -63,10 +88,8 @@
378 # If amqp is down when a connection is attempted, None is returned to
379 # indicate that publication failed - and publishing after it comes back
380 # works.
381- connection = self.connection_factory()
382- self.addCleanup(self.close_ignoring_EPIPE, connection)
383- channel = connection.channel()
384- self.addCleanup(self.close_ignoring_EPIPE, channel)
385+ channel = self.useFixture(
386+ ChannelFixture(self.connection_factory)).channel
387 queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
388 # The private method use and the restart of rabbit before it gets torn
389 # down are bugs in rabbitfixture that will be fixed in a future
390@@ -86,11 +109,10 @@
391 # If amqp goes down after its been successfully used, None is returned
392 # to indicate that publication failed - and publishing after it comes
393 # back works.
394- connection = self.connection_factory()
395- self.addCleanup(self.close_ignoring_EPIPE, connection)
396- channel = connection.channel()
397- self.addCleanup(self.close_ignoring_EPIPE, channel)
398- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
399+ channel = self.useFixture(
400+ ChannelFixture(self.connection_factory)).channel
401+ queue = self.useFixture(
402+ QueueFixture(channel, self.getUniqueString))
403 publisher = Publisher(self.connection_factory, queue.exchange_name, "")
404 oops = {'akey': 42}
405 self.assertNotEqual(None, publisher(oops))
406
407=== modified file 'oops_amqp/tests/test_receiver.py'
408--- oops_amqp/tests/test_receiver.py 2011-10-10 21:49:50 +0000
409+++ oops_amqp/tests/test_receiver.py 2011-10-18 03:36:23 +0000
410@@ -16,12 +16,16 @@
411
412 """Tests for AMQP receiving."""
413
414+import errno
415+import socket
416+
417 from amqplib import client_0_8 as amqp
418 import bson
419 from oops import Config
420
421 from oops_amqp import Receiver
422 from oops_amqp.tests import (
423+ ChannelFixture,
424 QueueFixture,
425 TestCase,
426 )
427@@ -29,49 +33,144 @@
428
429 class TestReceiver(TestCase):
430
431- def test_receive_one(self):
432- # Receiving a message from AMQP should republish it unaltered.
433- reports = []
434- def capture(report):
435- reports.append(report)
436- return report['id']
437- expected_report = {'id': 'foo', 'otherkey': 42}
438- message = amqp.Message(bson.dumps(expected_report))
439- connection = self.connection_factory()
440- self.addCleanup(connection.close)
441- channel = connection.channel()
442- self.addCleanup(channel.close)
443- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
444- channel.basic_publish(message, queue.exchange_name, routing_key="")
445- config = Config()
446- config.publishers.append(capture)
447- receiver_channel = connection.channel()
448- self.addCleanup(receiver_channel.close)
449- receiver = Receiver(config, receiver_channel, queue.queue_name)
450+ def test_stop_on_sentinel(self):
451+ # A sentinel can be used to stop the receiver (useful for testing).
452+ reports = []
453+ def capture(report):
454+ reports.append(report)
455+ return report['id']
456+ expected_report = {'id': 'foo', 'otherkey': 42}
457+ message = amqp.Message(bson.dumps(expected_report))
458+ channel = self.useFixture(
459+ ChannelFixture(self.connection_factory)).channel
460+ queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
461+ channel.basic_publish(
462+ message, queue.exchange_name, routing_key="")
463+ sentinel = "xxx"
464+ channel.basic_publish(
465+ amqp.Message(sentinel), queue.exchange_name, routing_key="")
466+ config = Config()
467+ config.publishers.append(capture)
468+ receiver = Receiver(config, self.connection_factory, queue.queue_name)
469+ receiver.sentinel = sentinel
470+ receiver.run_forever()
471+ self.assertEqual([expected_report], reports)
472+
473+ def test_stop_via_stopping(self):
474+ # Setting the stopping field should stop the run_forever loop.
475+ reports = []
476+ def capture(report):
477+ reports.append(report)
478+ return report['id']
479+ expected_report = {'id': 'foo', 'otherkey': 42}
480+ message = amqp.Message(bson.dumps(expected_report))
481+ channel = self.useFixture(
482+ ChannelFixture(self.connection_factory)).channel
483+ queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
484+ channel.basic_publish(
485+ message, queue.exchange_name, routing_key="")
486+ config = Config()
487+ config.publishers.append(capture)
488 # We don't want to loop forever: patch the channel so that after one
489 # call to wait (which will get our injected message) the loop will shut
490- # down. This also checks we use the consume_tag correctly.
491- old_wait = receiver_channel.wait
492- def new_wait(allowed_methods=None):
493- receiver.stopping = True
494- return old_wait(allowed_methods=allowed_methods)
495- receiver_channel.wait = new_wait
496+ # down.
497+ def patching_factory():
498+ connection = self.connection_factory()
499+ old_channel = connection.channel
500+ def new_channel():
501+ result = old_channel()
502+ old_wait = result.wait
503+ def new_wait(allowed_methods=None):
504+ receiver.stopping = True
505+ return old_wait(allowed_methods=allowed_methods)
506+ result.wait = new_wait
507+ return result
508+ connection.channel = new_channel
509+ return connection
510+ receiver = Receiver(config, patching_factory, queue.queue_name)
511 receiver.run_forever()
512 self.assertEqual([expected_report], reports)
513
514 def test_run_forever(self):
515- # run_forever subscribes and then calls wait.
516+ # run_forever subscribes and then calls wait in a loop.
517 config = None
518+ calls = []
519 class FakeChannel:
520- def __init__(self):
521- self.calls = []
522+ def __init__(self, calls):
523+ self.calls = calls
524 def basic_consume(self, queue_name, callback=None):
525 self.calls.append(('basic_consume', queue_name, callback))
526+ return 'tag'
527 def wait(self):
528 self.calls.append(('wait',))
529- channel = FakeChannel()
530- receiver = Receiver(None, channel, 'foo')
531+ if len(self.calls) > 2:
532+ receiver.stopping = True
533+ def basic_cancel(self, tag):
534+ self.calls.append(('basic_cancel', tag))
535+ def close(self):
536+ pass
537+ class FakeConnection:
538+ def channel(self):
539+ return FakeChannel(calls)
540+ def close(self):
541+ pass
542+ receiver = Receiver(None, FakeConnection, 'foo')
543 receiver.run_forever()
544 self.assertEqual(
545- [('basic_consume', 'foo', receiver.handle_report), ('wait',)],
546- channel.calls)
547+ [('basic_consume', 'foo', receiver.handle_report),
548+ ('wait',),
549+ ('wait',),
550+ ('basic_cancel', 'tag')],
551+ calls)
552+
553+ def test_tolerates_amqp_trouble(self):
554+ # If the AMQP server is unavailable for a short period, the receiver
555+ # will automatically reconnect.
556+ # Break a connection to raise socket.error (which we know from the
557+ # publisher tests is what leaks through when rabbit is shutdown).
558+ # We raise it the first time on each amqp method call.
559+ reports = []
560+ def capture(report):
561+ reports.append(report)
562+ return report['id']
563+ expected_report = {'id': 'foo', 'otherkey': 42}
564+ message = amqp.Message(bson.dumps(expected_report))
565+ channel = self.useFixture(
566+ ChannelFixture(self.connection_factory)).channel
567+ queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
568+ channel.basic_publish(message, queue.exchange_name, routing_key="")
569+ config = Config()
570+ config.publishers.append(capture)
571+ state = {}
572+ def error_once(func):
573+ def wrapped(*args, **kwargs):
574+ func_ref = func.func_code
575+ if func_ref in state:
576+ return func(*args, **kwargs)
577+ else:
578+ state[func_ref] = True
579+ # Use EPIPE because the close() code checks that (though
580+ # the rest doesn't)
581+ raise socket.error(errno.EPIPE, "booyah")
582+ return wrapped
583+ @error_once
584+ def patching_factory():
585+ connection = self.connection_factory()
586+ old_channel = connection.channel
587+ @error_once
588+ def new_channel():
589+ result = old_channel()
590+ result.wait = error_once(result.wait)
591+ result.basic_consume = error_once(result.basic_consume)
592+ result.basic_cancel = error_once(result.basic_cancel)
593+ result.close = error_once(result.close)
594+ return result
595+ connection.channel = new_channel
596+ connection.close = error_once(connection.close)
597+ return connection
598+ receiver = Receiver(config, patching_factory, queue.queue_name)
599+ receiver.sentinel = "arhh"
600+ channel.basic_publish(
601+ amqp.Message("arhh"), queue.exchange_name, routing_key="")
602+ receiver.run_forever()
603+ self.assertEqual([expected_report], reports)
604
605=== added file 'oops_amqp/utils.py'
606--- oops_amqp/utils.py 1970-01-01 00:00:00 +0000
607+++ oops_amqp/utils.py 2011-10-18 03:36:23 +0000
608@@ -0,0 +1,32 @@
609+# Copyright (c) 2011, Canonical Ltd
610+#
611+# This program is free software: you can redistribute it and/or modify
612+# it under the terms of the GNU Affero General Public License as published by
613+# the Free Software Foundation, either version 3 of the License, or
614+# (at your option) any later version.
615+#
616+# This program is distributed in the hope that it will be useful,
617+# but WITHOUT ANY WARRANTY; without even the implied warranty of
618+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
619+# GNU Affero General Public License for more details.
620+#
621+# You should have received a copy of the GNU Affero General Public License
622+# along with this program. If not, see <http://www.gnu.org/licenses/>.
623+# GNU Affero General Public License version 3 (see the file LICENSE).
624+
625+"""Utility functions for oops_amqp."""
626+
627+import errno
628+import socket
629+
630+__all__ = [
631+ 'close_ignoring_EPIPE',
632+ ]
633+
634+
635+def close_ignoring_EPIPE(closable):
636+ try:
637+ return closable.close()
638+ except socket.error, e:
639+ if e.errno != errno.EPIPE:
640+ raise
641
642=== modified file 'setup.py'
643--- setup.py 2011-10-10 21:49:50 +0000
644+++ setup.py 2011-10-18 03:36:23 +0000
645@@ -23,7 +23,7 @@
646 os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
647
648 setup(name="oops_amqp",
649- version="0.0.1",
650+ version="0.0.2",
651 description=\
652 "OOPS AMQP transport.",
653 long_description=description,

Subscribers

People subscribed via source and target branches

to all changes: