Merge lp:~lifeless/python-oops-amqp/0.0.2 into lp:python-oops-amqp
- 0.0.2
- Merge into trunk
Status: | Merged | ||||||||
---|---|---|---|---|---|---|---|---|---|
Merged at revision: | 3 | ||||||||
Proposed branch: | lp:~lifeless/python-oops-amqp/0.0.2 | ||||||||
Merge into: | lp:python-oops-amqp | ||||||||
Diff against target: |
653 lines (+332/-83) 10 files modified
NEWS (+27/-0) README (+5/-6) oops_amqp/__init__.py (+6/-4) oops_amqp/publisher.py (+15/-9) oops_amqp/receiver.py (+58/-9) oops_amqp/tests/__init__.py (+22/-9) oops_amqp/tests/test_publisher.py (+35/-13) oops_amqp/tests/test_receiver.py (+131/-32) oops_amqp/utils.py (+32/-0) setup.py (+1/-1) |
||||||||
To merge this branch: | bzr merge lp:~lifeless/python-oops-amqp/0.0.2 | ||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
William Grant | code | Approve | |
Review via email: mp+79637@code.launchpad.net |
Commit message
Description of the change
0.0.2
-----
* Fix documentation warts from initial release, updated to 0.0.2 and prepare
for making the receiver deal with interrupted services.
(Robert Collins, #875976, #875984)
* Fix Receiver.
* Change API for constructing Receiver to take a connection factory rather than
a channel. This will permit handling transient faults internally rather than
forcing a restart. (Robert Collins)
* Implement resiliency for the Receiver: automatically reconnect if a socket
error is received from rabbit, for up to two minutes of downtime.
(Robert Collins)
Robert Collins (lifeless) wrote : | # |
- 10. By Robert Collins
-
Permit supplying the oops id when publishing.
William Grant (wgrant) wrote : | # |
183 - def __init__(self, config, channel, queue_name):
184 + def __init__(self, config, connection_factory, queue_name):
185 """Create a Receiver.
186
187 :param config: An oops.Config to republish the OOPS reports.
188 - :param channel: An amqplib Channel to listen for reports on.
189 + :param connection: An amqplib connection factory, used to make the
190 + initial connection and to reconnect if that connection is
191 + interrupted.
docstring says the param is "connection", but it's actually "connection_
227 + while not self.stopping and time.time() < self.connection
This is going to stop retrying immediately if the connection dies after more than two minutes.
321 + connection = self.connection
322 + self.addCleanup
323 + channel = connection.
324 + self.addCleanup
325 + queue = self.useFixture
This is now roughly quadruplicated. Put it in a helper or setUp?
318 + def test_publish_
Are test_publish and test_publish_
304 + # Publication returns the oops ID allocated.
s/allocated/
371 - def test_receive_
Where'd that go?
398 + connection = self.connection
399 + self.addCleanup
400 + channel = connection.
401 + self.addCleanup
402 + queue = self.useFixture
More duplication.
410 + receiver.sentinel = sentinel
411 + receiver.
412 + self.assertEqua
May be worth a comment around run_forever to say that it will return quickly since you've already sent the sentinel.
422 + connection = self.connection
423 + self.addCleanup
424 + channel = connection.
425 + self.addCleanup
426 + queue = self.useFixture
Guess what...
499 + # publisher tests is what le aks through when rabbit is shutdown).
You've leaked some whitespace there.
507 + connection = self.connection
508 + self.addCleanup
509 + channel = connection.
510 + self.addCleanup
511 + queue = self.useFixture
Ahem.
Robert Collins (lifeless) wrote : | # |
receive_one got renamed stop_via_stopping
I see an fixtures_amqp coming on sometime soon.
- 11. By Robert Collins
-
Review feedback (fix retry, docstring).
- 12. By Robert Collins
-
Refactor test support.
William Grant (wgrant) : | # |
Preview Diff
1 | === modified file 'NEWS' |
2 | --- NEWS 2011-10-10 21:49:50 +0000 |
3 | +++ NEWS 2011-10-18 03:36:23 +0000 |
4 | @@ -6,6 +6,33 @@ |
5 | NEXT |
6 | ---- |
7 | |
8 | +0.0.2 |
9 | +----- |
10 | + |
11 | +* Fix documentation warts from initial release, updated to 0.0.2 and prepare |
12 | + for making the receiver deal with interrupted services. |
13 | + (Robert Collins, #875976, #875984) |
14 | + |
15 | +* Fix Receiver.run_forever to actually run forever. (Robert Collins) |
16 | + |
17 | +* Change API for constructing Receiver to take a connection factory rather than |
18 | + a channel. This will permit handling transient faults internally rather than |
19 | + forcing a restart. (Robert Collins) |
20 | + |
21 | +* Implement resiliency for the Receiver: automatically reconnect if a socket |
22 | + error is received from rabbit, for up to two minutes of downtime. |
23 | + (Robert Collins) |
24 | + |
25 | +* The publisher can honour any existing oops id if desired (set |
26 | + inherit_id=True). This is useful if using a chain of publishers where |
27 | + AMQP is the last stage rather than the primary mechanism. |
28 | + (Robert Collins) |
29 | + |
30 | +* Receiver.sentinel may be set and used to cause the run_forever loop to exit |
31 | + when a crafted message is received. This is useful for testing, and defaults |
32 | + to None (so cannot be used to shutdown a normally running service). |
33 | + (Robert Collins) |
34 | + |
35 | 0.0.1 |
36 | ----- |
37 | |
38 | |
39 | === modified file 'README' |
40 | --- README 2011-10-10 21:49:50 +0000 |
41 | +++ README 2011-10-18 03:36:23 +0000 |
42 | @@ -98,17 +98,16 @@ |
43 | AMQP. To use it you need to configure a local config to publish the received |
44 | reports. A full config is used because that includes support for filtering |
45 | (which can be useful if you need to throttle volume, for instance). |
46 | -Additionally you need an amqp channel object and a queue name to receive from. |
47 | +Additionally you need an amqp connection factory (to handle the amqp server |
48 | +being restarted) and a queue name to receive from. |
49 | |
50 | -This example uses the OOPSDateDirRepo publisher, telling it to accept whatever |
51 | +This example uses the DateDirRepo publisher, telling it to accept whatever |
52 | id was assigned by the process publishing to AMQP:: |
53 | |
54 | - >>> publisher = oops_datedir_repo.OOPSDateDirRepo('.', inherit_id=True) |
55 | + >>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True) |
56 | >>> config = oops.Config() |
57 | >>> config.publishers.append(publisher.publish) |
58 | - >>> connection = amqp.Connection(host="localhost:5672", |
59 | - ... userid="guest", password="guest", virtual_host="/", insist=False) |
60 | - >>> receiver = oops_amqp.Receiver(config, connection, "my queue") |
61 | + >>> receiver = oops_amqp.Receiver(config, factory, "my queue") |
62 | >>> receiver.run_forever() |
63 | |
64 | For more information see pydoc oops_amqp. |
65 | |
66 | === modified file 'oops_amqp/__init__.py' |
67 | --- oops_amqp/__init__.py 2011-10-10 21:49:50 +0000 |
68 | +++ oops_amqp/__init__.py 2011-10-18 03:36:23 +0000 |
69 | @@ -73,14 +73,16 @@ |
70 | AMQP. To use it you need to configure a local config to publish the received |
71 | reports. A full config is used because that includes support for filtering |
72 | (which can be useful if you need to throttle volume, for instance). |
73 | +Additionally you need an amqp connection factory (to handle the amqp server |
74 | +being restarted) and a queue name to receive from. |
75 | |
76 | -This example uses the OOPSDateDirRepo publisher, telling it to accept whatever |
77 | +This example uses the DateDirRepo publisher, telling it to accept whatever |
78 | id was assigned by the process publishing to AMQP:: |
79 | |
80 | - >>> publisher = oops_datedir_repo.OOPSDateDirRepo('.', inherit_id=True) |
81 | + >>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True) |
82 | >>> config = oops.Config() |
83 | >>> config.publishers.append(publisher.publish) |
84 | - >>> receiver = oops_amqp.Receiver(config) |
85 | + >>> receiver = oops_amqp.Receiver(config, factory, "my queue") |
86 | >>> receiver.run_forever() |
87 | """ |
88 | |
89 | @@ -95,7 +97,7 @@ |
90 | # established at this point, and setup.py will use a version of next-$(revno). |
91 | # If the releaselevel is 'final', then the tarball will be major.minor.micro. |
92 | # Otherwise it is major.minor.micro~$(revno). |
93 | -__version__ = (0, 0, 1, 'beta', 0) |
94 | +__version__ = (0, 0, 2, 'beta', 0) |
95 | |
96 | __all__ = [ |
97 | 'Publisher', |
98 | |
99 | === modified file 'oops_amqp/publisher.py' |
100 | --- oops_amqp/publisher.py 2011-10-10 21:51:53 +0000 |
101 | +++ oops_amqp/publisher.py 2011-10-18 03:36:23 +0000 |
102 | @@ -36,7 +36,8 @@ |
103 | supplied exchange + routing key. |
104 | """ |
105 | |
106 | - def __init__(self, connection_factory, exchange_name, routing_key): |
107 | + def __init__(self, connection_factory, exchange_name, routing_key, |
108 | + inherit_id=False): |
109 | """Create a publisher. |
110 | |
111 | :param connection_factory: A callable which creates an amqplib |
112 | @@ -46,11 +47,15 @@ |
113 | across threads. |
114 | :param exchange_name: The name of the exchange to publish to. |
115 | :param routing_key: The routing key for messages. |
116 | + :param inherit_id: If True any 'True' 'id' in an OOPS report is |
117 | + preserved. Handy if an id that has already been shown to a user is |
118 | + being published (but uniqueness cannot be guaranteed). |
119 | """ |
120 | self.connection_factory = connection_factory |
121 | self.exchange_name = exchange_name |
122 | self.routing_key = routing_key |
123 | self.channels = local() |
124 | + self.inherit_id = inherit_id |
125 | |
126 | def get_channel(self): |
127 | if getattr(self.channels, 'channel', None) is None: |
128 | @@ -64,13 +69,14 @@ |
129 | def __call__(self, report): |
130 | # Don't mess with the passed in report. |
131 | report = dict(report) |
132 | - # Discard any existing id. |
133 | - report.pop('id', None) |
134 | - # Hash it, to make an ID |
135 | - oops_id = "OOPS-%s" % md5(dumps(report)).hexdigest() |
136 | - # Store the id in what we send on the wire, so that the recipient has |
137 | - # it. |
138 | - report['id'] = oops_id |
139 | + if not self.inherit_id or not report.get('id'): |
140 | + # Discard any existing id. |
141 | + original_id = report.pop('id', None) |
142 | + # Hash it, to make an ID |
143 | + oops_id = "OOPS-%s" % md5(dumps(report)).hexdigest() |
144 | + # Store the id in what we send on the wire, so that the recipient |
145 | + # has it. |
146 | + report['id'] = oops_id |
147 | message = amqp.Message(dumps(report)) |
148 | # We don't want to drop OOPS on the floor if rabbit is restarted. |
149 | message.properties["delivery_mode"] = 2 |
150 | @@ -83,4 +89,4 @@ |
151 | except socket.error: |
152 | self.channels.channel = None |
153 | return None |
154 | - return oops_id |
155 | + return report['id'] |
156 | |
157 | === modified file 'oops_amqp/receiver.py' |
158 | --- oops_amqp/receiver.py 2011-10-10 21:49:50 +0000 |
159 | +++ oops_amqp/receiver.py 2011-10-18 03:36:23 +0000 |
160 | @@ -18,8 +18,13 @@ |
161 | |
162 | __metaclass__ = type |
163 | |
164 | +import socket |
165 | +import time |
166 | + |
167 | import bson |
168 | |
169 | +from utils import close_ignoring_EPIPE |
170 | + |
171 | __all__ = [ |
172 | 'Receiver', |
173 | ] |
174 | @@ -28,23 +33,32 @@ |
175 | """Republish OOPS reports from AMQP to a local oops.Config. |
176 | |
177 | :ivar stopping: When True will cause Receiver to break out of run_forever. |
178 | + Calls to run_forever reset this to False. |
179 | + :ivar sentinel: If a message identical to the sentinel is received, |
180 | + handle_report will set stopping to True. |
181 | """ |
182 | |
183 | - def __init__(self, config, channel, queue_name): |
184 | + def __init__(self, config, connection_factory, queue_name): |
185 | """Create a Receiver. |
186 | |
187 | :param config: An oops.Config to republish the OOPS reports. |
188 | - :param channel: An amqplib Channel to listen for reports on. |
189 | + :param connection_factory: An amqplib connection factory, used to make |
190 | + the initial connection and to reconnect if that connection is |
191 | + interrupted. |
192 | :param queue_name: The queue to listen for reports on. |
193 | """ |
194 | self.config = config |
195 | - self.channel = channel |
196 | + self.connection = None |
197 | + self.channel = None |
198 | + self.connection_factory = connection_factory |
199 | self.queue_name = queue_name |
200 | - self.stopping = False |
201 | + self.sentinel = None |
202 | |
203 | def handle_report(self, message): |
204 | - if self.stopping: |
205 | - self.channel.basic_cancel(self.consume_tag) |
206 | + if message.body == self.sentinel: |
207 | + self.stopping = True |
208 | + self.channel.basic_ack(message.delivery_tag) |
209 | + return |
210 | try: |
211 | report = bson.loads(message.body) |
212 | except KeyError: |
213 | @@ -56,6 +70,41 @@ |
214 | self.channel.basic_ack(message.delivery_tag) |
215 | |
216 | def run_forever(self): |
217 | - self.consume_tag = self.channel.basic_consume( |
218 | - self.queue_name, callback=self.handle_report) |
219 | - self.channel.wait() |
220 | + """Run in a loop handling messages. |
221 | + |
222 | + If the amqp server is down or uncontactable for > 120 seconds, error |
223 | + out. |
224 | + """ |
225 | + self.stopping = False |
226 | + self.went_bad = None |
227 | + while (not self.stopping and |
228 | + (not self.went_bad or time.time() < self.went_bad + 120)): |
229 | + try: |
230 | + self._run_forever() |
231 | + except socket.error: |
232 | + if not self.went_bad: |
233 | + self.went_bad = time.time() |
234 | + # Don't probe immediately, give the network/process time to |
235 | + # come back. |
236 | + time.sleep(0.1) |
237 | + |
238 | + def _run_forever(self): |
239 | + self.connection = self.connection_factory() |
240 | + # A successful connection: record this so run_forever won't bail early. |
241 | + self.went_bad = None |
242 | + try: |
243 | + self.channel = self.connection.channel() |
244 | + try: |
245 | + self.consume_tag = self.channel.basic_consume( |
246 | + self.queue_name, callback=self.handle_report) |
247 | + try: |
248 | + while True: |
249 | + self.channel.wait() |
250 | + if self.stopping: |
251 | + break |
252 | + finally: |
253 | + self.channel.basic_cancel(self.consume_tag) |
254 | + finally: |
255 | + close_ignoring_EPIPE(self.channel) |
256 | + finally: |
257 | + close_ignoring_EPIPE(self.connection) |
258 | |
259 | === modified file 'oops_amqp/tests/__init__.py' |
260 | --- oops_amqp/tests/__init__.py 2011-10-10 21:49:50 +0000 |
261 | +++ oops_amqp/tests/__init__.py 2011-10-18 03:36:23 +0000 |
262 | @@ -16,8 +16,6 @@ |
263 | |
264 | """Tests for oops_amqp.""" |
265 | |
266 | -import errno |
267 | -import socket |
268 | from unittest import TestLoader |
269 | |
270 | from amqplib import client_0_8 as amqp |
271 | @@ -30,7 +28,10 @@ |
272 | ResourcedTestCase, |
273 | ) |
274 | |
275 | +from oops_amqp.utils import close_ignoring_EPIPE |
276 | + |
277 | __all__ = [ |
278 | + 'ChannelFixture', |
279 | 'QueueFixture', |
280 | 'test_suite', |
281 | 'TestCase', |
282 | @@ -80,6 +81,25 @@ |
283 | self.channel.exchange_delete(self.exchange_name) |
284 | |
285 | |
286 | +class ChannelFixture(Fixture): |
287 | + """Create an AMQP connection and channel for tests. |
288 | + |
289 | + :ivar connection: an amqplib connection. |
290 | + :ivar channel: an amqplib channel |
291 | + """ |
292 | + |
293 | + def __init__(self, connection_factory): |
294 | + super(ChannelFixture, self).__init__() |
295 | + self.connection_factory = connection_factory |
296 | + |
297 | + def setUp(self): |
298 | + super(ChannelFixture, self).setUp() |
299 | + self.connection = self.connection_factory() |
300 | + self.addCleanup(close_ignoring_EPIPE, self.connection) |
301 | + self.channel = self.connection.channel() |
302 | + self.addCleanup(close_ignoring_EPIPE, self.channel) |
303 | + |
304 | + |
305 | class TestCase(testtools.TestCase, ResourcedTestCase): |
306 | """Subclass to mix in testresources ResourcedTestCase.""" |
307 | |
308 | @@ -91,13 +111,6 @@ |
309 | self.rabbit.config.port), userid="guest", password="guest", |
310 | virtual_host="/") |
311 | |
312 | - def close_ignoring_EPIPE(self, closable): |
313 | - try: |
314 | - closable.close() |
315 | - except socket.error, e: |
316 | - if e.errno != errno.EPIPE: |
317 | - raise |
318 | - |
319 | |
320 | def test_suite(): |
321 | test_mod_names = [ |
322 | |
323 | === modified file 'oops_amqp/tests/test_publisher.py' |
324 | --- oops_amqp/tests/test_publisher.py 2011-10-10 21:49:50 +0000 |
325 | +++ oops_amqp/tests/test_publisher.py 2011-10-18 03:36:23 +0000 |
326 | @@ -21,7 +21,9 @@ |
327 | import bson |
328 | |
329 | from oops_amqp import Publisher |
330 | +from oops_amqp.utils import close_ignoring_EPIPE |
331 | from oops_amqp.tests import ( |
332 | + ChannelFixture, |
333 | QueueFixture, |
334 | TestCase, |
335 | ) |
336 | @@ -29,13 +31,36 @@ |
337 | |
338 | class TestPublisher(TestCase): |
339 | |
340 | + def test_publish_inherit_id(self): |
341 | + # OOPS id's can be set outside of Publisher(). |
342 | + channel = self.useFixture( |
343 | + ChannelFixture(self.connection_factory)).channel |
344 | + queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
345 | + publisher = Publisher(self.connection_factory, queue.exchange_name, "", |
346 | + inherit_id=True) |
347 | + reference_oops = {'id': 'kept', 'akey': 'avalue'} |
348 | + oops = dict(reference_oops) |
349 | + expected_id = 'kept' |
350 | + oops_id = publisher(oops) |
351 | + # Publication returns the oops ID allocated. |
352 | + self.assertEqual(expected_id, oops_id) |
353 | + # The oops should not be altered by publication. |
354 | + self.assertEqual(reference_oops, oops) |
355 | + # The received OOPS should have the ID embedded and be a bson dict. |
356 | + def check_oops(msg): |
357 | + self.assertEqual(reference_oops, bson.loads(msg.body)) |
358 | + channel.basic_ack(msg.delivery_tag) |
359 | + channel.basic_cancel(queue.queue_name) |
360 | + channel.basic_consume( |
361 | + queue.queue_name, callback=check_oops, |
362 | + consumer_tag=queue.queue_name) |
363 | + channel.wait() |
364 | + |
365 | def test_publish(self): |
366 | # Publishing an oops sends it to the exchange, making a connection as |
367 | # it goes. |
368 | - connection = self.connection_factory() |
369 | - self.addCleanup(connection.close) |
370 | - channel = connection.channel() |
371 | - self.addCleanup(channel.close) |
372 | + channel = self.useFixture( |
373 | + ChannelFixture(self.connection_factory)).channel |
374 | queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
375 | publisher = Publisher(self.connection_factory, queue.exchange_name, "") |
376 | reference_oops = {'akey': 'avalue'} |
377 | @@ -63,10 +88,8 @@ |
378 | # If amqp is down when a connection is attempted, None is returned to |
379 | # indicate that publication failed - and publishing after it comes back |
380 | # works. |
381 | - connection = self.connection_factory() |
382 | - self.addCleanup(self.close_ignoring_EPIPE, connection) |
383 | - channel = connection.channel() |
384 | - self.addCleanup(self.close_ignoring_EPIPE, channel) |
385 | + channel = self.useFixture( |
386 | + ChannelFixture(self.connection_factory)).channel |
387 | queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
388 | # The private method use and the restart of rabbit before it gets torn |
389 | # down are bugs in rabbitfixture that will be fixed in a future |
390 | @@ -86,11 +109,10 @@ |
391 | # If amqp goes down after its been successfully used, None is returned |
392 | # to indicate that publication failed - and publishing after it comes |
393 | # back works. |
394 | - connection = self.connection_factory() |
395 | - self.addCleanup(self.close_ignoring_EPIPE, connection) |
396 | - channel = connection.channel() |
397 | - self.addCleanup(self.close_ignoring_EPIPE, channel) |
398 | - queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
399 | + channel = self.useFixture( |
400 | + ChannelFixture(self.connection_factory)).channel |
401 | + queue = self.useFixture( |
402 | + QueueFixture(channel, self.getUniqueString)) |
403 | publisher = Publisher(self.connection_factory, queue.exchange_name, "") |
404 | oops = {'akey': 42} |
405 | self.assertNotEqual(None, publisher(oops)) |
406 | |
407 | === modified file 'oops_amqp/tests/test_receiver.py' |
408 | --- oops_amqp/tests/test_receiver.py 2011-10-10 21:49:50 +0000 |
409 | +++ oops_amqp/tests/test_receiver.py 2011-10-18 03:36:23 +0000 |
410 | @@ -16,12 +16,16 @@ |
411 | |
412 | """Tests for AMQP receiving.""" |
413 | |
414 | +import errno |
415 | +import socket |
416 | + |
417 | from amqplib import client_0_8 as amqp |
418 | import bson |
419 | from oops import Config |
420 | |
421 | from oops_amqp import Receiver |
422 | from oops_amqp.tests import ( |
423 | + ChannelFixture, |
424 | QueueFixture, |
425 | TestCase, |
426 | ) |
427 | @@ -29,49 +33,144 @@ |
428 | |
429 | class TestReceiver(TestCase): |
430 | |
431 | - def test_receive_one(self): |
432 | - # Receiving a message from AMQP should republish it unaltered. |
433 | - reports = [] |
434 | - def capture(report): |
435 | - reports.append(report) |
436 | - return report['id'] |
437 | - expected_report = {'id': 'foo', 'otherkey': 42} |
438 | - message = amqp.Message(bson.dumps(expected_report)) |
439 | - connection = self.connection_factory() |
440 | - self.addCleanup(connection.close) |
441 | - channel = connection.channel() |
442 | - self.addCleanup(channel.close) |
443 | - queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
444 | - channel.basic_publish(message, queue.exchange_name, routing_key="") |
445 | - config = Config() |
446 | - config.publishers.append(capture) |
447 | - receiver_channel = connection.channel() |
448 | - self.addCleanup(receiver_channel.close) |
449 | - receiver = Receiver(config, receiver_channel, queue.queue_name) |
450 | + def test_stop_on_sentinel(self): |
451 | + # A sentinel can be used to stop the receiver (useful for testing). |
452 | + reports = [] |
453 | + def capture(report): |
454 | + reports.append(report) |
455 | + return report['id'] |
456 | + expected_report = {'id': 'foo', 'otherkey': 42} |
457 | + message = amqp.Message(bson.dumps(expected_report)) |
458 | + channel = self.useFixture( |
459 | + ChannelFixture(self.connection_factory)).channel |
460 | + queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
461 | + channel.basic_publish( |
462 | + message, queue.exchange_name, routing_key="") |
463 | + sentinel = "xxx" |
464 | + channel.basic_publish( |
465 | + amqp.Message(sentinel), queue.exchange_name, routing_key="") |
466 | + config = Config() |
467 | + config.publishers.append(capture) |
468 | + receiver = Receiver(config, self.connection_factory, queue.queue_name) |
469 | + receiver.sentinel = sentinel |
470 | + receiver.run_forever() |
471 | + self.assertEqual([expected_report], reports) |
472 | + |
473 | + def test_stop_via_stopping(self): |
474 | + # Setting the stopping field should stop the run_forever loop. |
475 | + reports = [] |
476 | + def capture(report): |
477 | + reports.append(report) |
478 | + return report['id'] |
479 | + expected_report = {'id': 'foo', 'otherkey': 42} |
480 | + message = amqp.Message(bson.dumps(expected_report)) |
481 | + channel = self.useFixture( |
482 | + ChannelFixture(self.connection_factory)).channel |
483 | + queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
484 | + channel.basic_publish( |
485 | + message, queue.exchange_name, routing_key="") |
486 | + config = Config() |
487 | + config.publishers.append(capture) |
488 | # We don't want to loop forever: patch the channel so that after one |
489 | # call to wait (which will get our injected message) the loop will shut |
490 | - # down. This also checks we use the consume_tag correctly. |
491 | - old_wait = receiver_channel.wait |
492 | - def new_wait(allowed_methods=None): |
493 | - receiver.stopping = True |
494 | - return old_wait(allowed_methods=allowed_methods) |
495 | - receiver_channel.wait = new_wait |
496 | + # down. |
497 | + def patching_factory(): |
498 | + connection = self.connection_factory() |
499 | + old_channel = connection.channel |
500 | + def new_channel(): |
501 | + result = old_channel() |
502 | + old_wait = result.wait |
503 | + def new_wait(allowed_methods=None): |
504 | + receiver.stopping = True |
505 | + return old_wait(allowed_methods=allowed_methods) |
506 | + result.wait = new_wait |
507 | + return result |
508 | + connection.channel = new_channel |
509 | + return connection |
510 | + receiver = Receiver(config, patching_factory, queue.queue_name) |
511 | receiver.run_forever() |
512 | self.assertEqual([expected_report], reports) |
513 | |
514 | def test_run_forever(self): |
515 | - # run_forever subscribes and then calls wait. |
516 | + # run_forever subscribes and then calls wait in a loop. |
517 | config = None |
518 | + calls = [] |
519 | class FakeChannel: |
520 | - def __init__(self): |
521 | - self.calls = [] |
522 | + def __init__(self, calls): |
523 | + self.calls = calls |
524 | def basic_consume(self, queue_name, callback=None): |
525 | self.calls.append(('basic_consume', queue_name, callback)) |
526 | + return 'tag' |
527 | def wait(self): |
528 | self.calls.append(('wait',)) |
529 | - channel = FakeChannel() |
530 | - receiver = Receiver(None, channel, 'foo') |
531 | + if len(self.calls) > 2: |
532 | + receiver.stopping = True |
533 | + def basic_cancel(self, tag): |
534 | + self.calls.append(('basic_cancel', tag)) |
535 | + def close(self): |
536 | + pass |
537 | + class FakeConnection: |
538 | + def channel(self): |
539 | + return FakeChannel(calls) |
540 | + def close(self): |
541 | + pass |
542 | + receiver = Receiver(None, FakeConnection, 'foo') |
543 | receiver.run_forever() |
544 | self.assertEqual( |
545 | - [('basic_consume', 'foo', receiver.handle_report), ('wait',)], |
546 | - channel.calls) |
547 | + [('basic_consume', 'foo', receiver.handle_report), |
548 | + ('wait',), |
549 | + ('wait',), |
550 | + ('basic_cancel', 'tag')], |
551 | + calls) |
552 | + |
553 | + def test_tolerates_amqp_trouble(self): |
554 | + # If the AMQP server is unavailable for a short period, the receiver |
555 | + # will automatically reconnect. |
556 | + # Break a connection to raise socket.error (which we know from the |
557 | + # publisher tests is what leaks through when rabbit is shutdown). |
558 | + # We raise it the first time on each amqp method call. |
559 | + reports = [] |
560 | + def capture(report): |
561 | + reports.append(report) |
562 | + return report['id'] |
563 | + expected_report = {'id': 'foo', 'otherkey': 42} |
564 | + message = amqp.Message(bson.dumps(expected_report)) |
565 | + channel = self.useFixture( |
566 | + ChannelFixture(self.connection_factory)).channel |
567 | + queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
568 | + channel.basic_publish(message, queue.exchange_name, routing_key="") |
569 | + config = Config() |
570 | + config.publishers.append(capture) |
571 | + state = {} |
572 | + def error_once(func): |
573 | + def wrapped(*args, **kwargs): |
574 | + func_ref = func.func_code |
575 | + if func_ref in state: |
576 | + return func(*args, **kwargs) |
577 | + else: |
578 | + state[func_ref] = True |
579 | + # Use EPIPE because the close() code checks that (though |
580 | + # the rest doesn't) |
581 | + raise socket.error(errno.EPIPE, "booyah") |
582 | + return wrapped |
583 | + @error_once |
584 | + def patching_factory(): |
585 | + connection = self.connection_factory() |
586 | + old_channel = connection.channel |
587 | + @error_once |
588 | + def new_channel(): |
589 | + result = old_channel() |
590 | + result.wait = error_once(result.wait) |
591 | + result.basic_consume = error_once(result.basic_consume) |
592 | + result.basic_cancel = error_once(result.basic_cancel) |
593 | + result.close = error_once(result.close) |
594 | + return result |
595 | + connection.channel = new_channel |
596 | + connection.close = error_once(connection.close) |
597 | + return connection |
598 | + receiver = Receiver(config, patching_factory, queue.queue_name) |
599 | + receiver.sentinel = "arhh" |
600 | + channel.basic_publish( |
601 | + amqp.Message("arhh"), queue.exchange_name, routing_key="") |
602 | + receiver.run_forever() |
603 | + self.assertEqual([expected_report], reports) |
604 | |
605 | === added file 'oops_amqp/utils.py' |
606 | --- oops_amqp/utils.py 1970-01-01 00:00:00 +0000 |
607 | +++ oops_amqp/utils.py 2011-10-18 03:36:23 +0000 |
608 | @@ -0,0 +1,32 @@ |
609 | +# Copyright (c) 2011, Canonical Ltd |
610 | +# |
611 | +# This program is free software: you can redistribute it and/or modify |
612 | +# it under the terms of the GNU Affero General Public License as published by |
613 | +# the Free Software Foundation, either version 3 of the License, or |
614 | +# (at your option) any later version. |
615 | +# |
616 | +# This program is distributed in the hope that it will be useful, |
617 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
618 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
619 | +# GNU Affero General Public License for more details. |
620 | +# |
621 | +# You should have received a copy of the GNU Affero General Public License |
622 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
623 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
624 | + |
625 | +"""Utility functions for oops_amqp.""" |
626 | + |
627 | +import errno |
628 | +import socket |
629 | + |
630 | +__all__ = [ |
631 | + 'close_ignoring_EPIPE', |
632 | + ] |
633 | + |
634 | + |
635 | +def close_ignoring_EPIPE(closable): |
636 | + try: |
637 | + return closable.close() |
638 | + except socket.error, e: |
639 | + if e.errno != errno.EPIPE: |
640 | + raise |
641 | |
642 | === modified file 'setup.py' |
643 | --- setup.py 2011-10-10 21:49:50 +0000 |
644 | +++ setup.py 2011-10-18 03:36:23 +0000 |
645 | @@ -23,7 +23,7 @@ |
646 | os.path.join(os.path.dirname(__file__), 'README'), 'rb').read() |
647 | |
648 | setup(name="oops_amqp", |
649 | - version="0.0.1", |
650 | + version="0.0.2", |
651 | description=\ |
652 | "OOPS AMQP transport.", |
653 | long_description=description, |
I forgot to add improvements from the work on LP, moved to WIP