Merge lp:~cjwatson/python-oops-amqp/py3 into lp:python-oops-amqp

Proposed by Colin Watson
Status: Merged
Merged at revision: 22
Proposed branch: lp:~cjwatson/python-oops-amqp/py3
Merge into: lp:python-oops-amqp
Diff against target: 594 lines (+114/-57)
14 files modified
.bzrignore (+1/-0)
NEWS (+2/-0)
README (+4/-4)
oops_amqp/__init__.py (+5/-3)
oops_amqp/anybson.py (+2/-0)
oops_amqp/publisher.py (+10/-6)
oops_amqp/receiver.py (+13/-6)
oops_amqp/tests/__init__.py (+19/-4)
oops_amqp/tests/test_publisher.py (+18/-6)
oops_amqp/tests/test_receiver.py (+23/-17)
oops_amqp/trace.py (+3/-3)
oops_amqp/utils.py (+5/-4)
setup.py (+6/-3)
versions.cfg (+3/-1)
To merge this branch: bzr merge lp:~cjwatson/python-oops-amqp/py3
Reviewer Review Type Date Requested Status
William Grant code Approve
Review via email: mp+341298@code.launchpad.net

Commit message

Add Python 3 support.

Description of the change

This also involves porting from amqplib to the better-maintained amqp.

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file '.bzrignore'
2--- .bzrignore 2011-12-08 10:50:34 +0000
3+++ .bzrignore 2018-03-12 11:56:56 +0000
4@@ -1,3 +1,4 @@
5+__pycache__
6 ./eggs/*
7 ./.installed.cfg
8 ./develop-eggs
9
10=== modified file 'NEWS'
11--- NEWS 2015-10-08 14:49:18 +0000
12+++ NEWS 2018-03-12 11:56:56 +0000
13@@ -9,6 +9,8 @@
14 * Dropped dependency on pymongo in favor of bson. This avoids having
15 to depend both on bson and pymongo when installing in conjunction
16 with oops-datedir-repo. (Ricardo Kirkner)
17+* Port from amqplib to amqp. (Colin Watson)
18+* Add Python 3 support. (Colin Watson)
19
20 0.0.7
21 -----
22
23=== modified file 'README'
24--- README 2012-08-10 01:41:44 +0000
25+++ README 2018-03-12 11:56:56 +0000
26@@ -31,7 +31,7 @@
27
28 * oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.
29
30-* amqplib
31+* amqp
32
33 Testing Dependencies
34 ====================
35@@ -57,7 +57,7 @@
36 connection - and the exchange name and routing key to submit to.
37
38 >>> factory = partial(amqp.Connection, host="localhost:5672",
39- ... userid="guest", password="guest", virtual_host="/", insist=False)
40+ ... userid="guest", password="guest", virtual_host="/")
41 >>> publisher = oops_amqp.Publisher(factory, "oopses", "")
42
43 Provide the publisher to your OOPS config::
44@@ -70,7 +70,7 @@
45 OOPS ids are generating by hashing the oops message (without the id field) -
46 this ensures unique ids.
47
48-The reason a factory is used is because amqplib is not threadsafe - the
49+The reason a factory is used is because amqp is not threadsafe - the
50 publisher maintains a thread locals object to hold the factories and creates
51 connections when new threads are created(when they first generate an OOPS).
52
53@@ -87,7 +87,7 @@
54 method failed::
55
56 >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
57- ... userid="guest", password="guest", virtual_host="/", insist=False)
58+ ... userid="guest", password="guest", virtual_host="/")
59 >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
60 >>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
61
62
63=== modified file 'oops_amqp/__init__.py'
64--- oops_amqp/__init__.py 2015-10-02 16:26:57 +0000
65+++ oops_amqp/__init__.py 2018-03-12 11:56:56 +0000
66@@ -32,7 +32,7 @@
67 connection - and the exchange name and routing key to submit to.
68
69 >>> factory = partial(amqp.Connection, host="localhost:5672",
70- ... userid="guest", password="guest", virtual_host="/", insist=False)
71+ ... userid="guest", password="guest", virtual_host="/")
72 >>> publisher = oops_amqp.Publisher(factory, "oopses", "")
73
74 Provide the publisher to your OOPS config::
75@@ -45,7 +45,7 @@
76 OOPS ids are generating by hashing the oops message (without the id field) -
77 this ensures unique ids.
78
79-The reason a factory is used is because amqplib is not threadsafe - the
80+The reason a factory is used is because amqp is not threadsafe - the
81 publisher maintains a thread locals object to hold the factories and creates
82 connections when new threads are created(when they first generate an OOPS).
83
84@@ -62,7 +62,7 @@
85 method failed::
86
87 >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
88- ... userid="guest", password="guest", virtual_host="/", insist=False)
89+ ... userid="guest", password="guest", virtual_host="/")
90 >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
91 >>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
92
93@@ -86,6 +86,8 @@
94 >>> receiver.run_forever()
95 """
96
97+from __future__ import absolute_import, print_function
98+
99 # same format as sys.version_info: "A tuple containing the five components of
100 # the version number: major, minor, micro, releaselevel, and serial. All
101 # values except releaselevel are integers; the release level is 'alpha',
102
103=== modified file 'oops_amqp/anybson.py'
104--- oops_amqp/anybson.py 2012-02-10 19:27:09 +0000
105+++ oops_amqp/anybson.py 2018-03-12 11:56:56 +0000
106@@ -13,6 +13,8 @@
107 # along with this program. If not, see <http://www.gnu.org/licenses/>.
108 # GNU Lesser General Public License version 3 (see the file LICENSE).
109
110+from __future__ import absolute_import, print_function
111+
112 __all__ = [
113 'dumps',
114 'loads',
115
116=== modified file 'oops_amqp/publisher.py'
117--- oops_amqp/publisher.py 2012-08-10 01:41:44 +0000
118+++ oops_amqp/publisher.py 2018-03-12 11:56:56 +0000
119@@ -15,15 +15,17 @@
120
121 """Publish OOPS reports over amqp."""
122
123+from __future__ import absolute_import, print_function
124+
125 __metaclass__ = type
126
127 from hashlib import md5
128 from threading import local
129
130-from amqplib import client_0_8 as amqp
131-from anybson import dumps
132+import amqp
133
134-from utils import (
135+from oops_amqp.anybson import dumps
136+from oops_amqp.utils import (
137 amqplib_error_types,
138 is_amqplib_connection_error,
139 )
140@@ -63,8 +65,10 @@
141 def get_channel(self):
142 if getattr(self.channels, 'channel', None) is None:
143 try:
144- self.channels.channel = self.connection_factory().channel()
145- except amqplib_error_types, e:
146+ connection = self.connection_factory()
147+ connection.connect()
148+ self.channels.channel = connection.channel()
149+ except amqplib_error_types as e:
150 if is_amqplib_connection_error(e):
151 # Could not connect
152 return None
153@@ -92,7 +96,7 @@
154 try:
155 channel.basic_publish(
156 message, self.exchange_name, routing_key=self.routing_key)
157- except amqplib_error_types, e:
158+ except amqplib_error_types as e:
159 self.channels.channel = None
160 if is_amqplib_connection_error(e):
161 # Could not connect / interrupted connection
162
163=== modified file 'oops_amqp/receiver.py'
164--- oops_amqp/receiver.py 2012-02-09 23:13:45 +0000
165+++ oops_amqp/receiver.py 2018-03-12 11:56:56 +0000
166@@ -15,12 +15,14 @@
167
168 """Receive OOPS reports over amqp and republish locally."""
169
170+from __future__ import absolute_import, print_function
171+
172 __metaclass__ = type
173
174 import time
175
176-import anybson as bson
177-from utils import (
178+from oops_amqp import anybson as bson
179+from oops_amqp.utils import (
180 amqplib_error_types,
181 close_ignoring_connection_errors,
182 is_amqplib_connection_error,
183@@ -56,12 +58,16 @@
184 self.sentinel = None
185
186 def handle_report(self, message):
187- if message.body == self.sentinel:
188+ # bson requires bytes.
189+ body = message.body
190+ if not isinstance(body, bytes):
191+ body = body.encode(message.content_encoding or 'UTF-8')
192+ if body == self.sentinel:
193 self.stopping = True
194 self.channel.basic_ack(message.delivery_tag)
195 return
196 try:
197- report = bson.loads(message.body)
198+ report = bson.loads(body)
199 except KeyError:
200 # Garbage in the queue. Possibly this should raise an OOPS itself
201 # (through a different config) or log an info level message.
202@@ -82,7 +88,7 @@
203 (not self.went_bad or time.time() < self.went_bad + 120)):
204 try:
205 self._run_forever()
206- except amqplib_error_types, e:
207+ except amqplib_error_types as e:
208 if not is_amqplib_connection_error(e):
209 # Something unknown went wrong.
210 raise
211@@ -94,6 +100,7 @@
212
213 def _run_forever(self):
214 self.connection = self.connection_factory()
215+ self.connection.connect()
216 # A successful connection: record this so run_forever won't bail early.
217 self.went_bad = None
218 try:
219@@ -103,7 +110,7 @@
220 self.queue_name, callback=self.handle_report)
221 try:
222 while True:
223- self.channel.wait()
224+ self.connection.drain_events(timeout=1)
225 if self.stopping:
226 break
227 finally:
228
229=== modified file 'oops_amqp/tests/__init__.py'
230--- oops_amqp/tests/__init__.py 2011-12-08 10:15:23 +0000
231+++ oops_amqp/tests/__init__.py 2018-03-12 11:56:56 +0000
232@@ -15,16 +15,20 @@
233
234 """Tests for oops_amqp."""
235
236+from __future__ import absolute_import, print_function
237+
238 from unittest import TestLoader
239
240-from amqplib import client_0_8 as amqp
241+import amqp
242 from fixtures import Fixture
243 from rabbitfixture.server import RabbitServer
244 import testtools
245 from testresources import (
246+ _get_result,
247 FixtureResource,
248 OptimisingTestSuite,
249- ResourcedTestCase,
250+ setUpResources,
251+ tearDownResources,
252 )
253
254 from oops_amqp.utils import close_ignoring_connection_errors
255@@ -94,16 +98,27 @@
256 def setUp(self):
257 super(ChannelFixture, self).setUp()
258 self.connection = self.connection_factory()
259+ self.connection.connect()
260 self.addCleanup(close_ignoring_connection_errors, self.connection)
261 self.channel = self.connection.channel()
262 self.addCleanup(close_ignoring_connection_errors, self.channel)
263
264
265-class TestCase(testtools.TestCase, ResourcedTestCase):
266- """Subclass to mix in testresources ResourcedTestCase."""
267+class TestCase(testtools.TestCase):
268+ """Subclass to start a RabbitMQ server."""
269
270 resources = [('rabbit', FixtureResource(RabbitServer()))]
271
272+ def setUp(self):
273+ super(TestCase, self).setUp()
274+ # ResourcedTestCase handles teardown in the wrong order for us (we
275+ # need to ensure that the RabbitServer fixture is only cleaned up
276+ # after any other fixtures registered by individual tests), so we
277+ # imitate it manually.
278+ result = _get_result()
279+ setUpResources(self, self.resources, result)
280+ self.addCleanup(tearDownResources, self, self.resources, result)
281+
282 def connection_factory(self):
283 """When called, return an amqplib connection."""
284 return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,
285
286=== modified file 'oops_amqp/tests/test_publisher.py'
287--- oops_amqp/tests/test_publisher.py 2012-08-10 01:41:44 +0000
288+++ oops_amqp/tests/test_publisher.py 2018-03-12 11:56:56 +0000
289@@ -15,6 +15,8 @@
290
291 """Tests for AMQP publishing."""
292
293+from __future__ import absolute_import, print_function
294+
295 from hashlib import md5
296
297 from oops_amqp import (
298@@ -47,13 +49,16 @@
299 self.assertEqual(reference_oops, oops)
300 # The received OOPS should have the ID embedded and be a bson dict.
301 def check_oops(msg):
302- self.assertEqual(reference_oops, bson.loads(msg.body))
303+ body = msg.body
304+ if not isinstance(body, bytes):
305+ body = body.encode(msg.content_encoding or 'UTF-8')
306+ self.assertEqual(reference_oops, bson.loads(body))
307 channel.basic_ack(msg.delivery_tag)
308 channel.basic_cancel(queue.queue_name)
309 channel.basic_consume(
310 queue.queue_name, callback=check_oops,
311 consumer_tag=queue.queue_name)
312- channel.wait()
313+ channel.connection.drain_events()
314
315 def test_publish(self):
316 # Publishing an oops sends it to the exchange, making a connection as
317@@ -75,13 +80,16 @@
318 expected_oops = dict(reference_oops)
319 expected_oops['id'] = oops_ids[0]
320 def check_oops(msg):
321- self.assertEqual(expected_oops, bson.loads(msg.body))
322+ body = msg.body
323+ if not isinstance(body, bytes):
324+ body = body.encode(msg.content_encoding or 'UTF-8')
325+ self.assertEqual(expected_oops, bson.loads(body))
326 channel.basic_ack(msg.delivery_tag)
327 channel.basic_cancel(queue.queue_name)
328 channel.basic_consume(
329 queue.queue_name, callback=check_oops,
330 consumer_tag=queue.queue_name)
331- channel.wait()
332+ channel.connection.drain_events()
333
334 def test_publish_amqp_already_down(self):
335 # If amqp is down when a connection is attempted, None is returned to
336@@ -101,7 +109,9 @@
337 self.assertEqual([], publisher(oops))
338 finally:
339 self.rabbit.runner._start()
340- queue.channel = self.connection_factory().channel()
341+ connection = self.connection_factory()
342+ connection.connect()
343+ queue.channel = connection.channel()
344 self.assertNotEqual([], publisher(oops))
345
346 def test_publish_amqp_down_after_use(self):
347@@ -122,6 +132,8 @@
348 self.assertEqual([], publisher(oops))
349 finally:
350 self.rabbit.runner._start()
351- queue.channel = self.connection_factory().channel()
352+ connection = self.connection_factory()
353+ connection.connect()
354+ queue.channel = connection.channel()
355 self.assertNotEqual([], publisher(oops))
356
357
358=== modified file 'oops_amqp/tests/test_receiver.py'
359--- oops_amqp/tests/test_receiver.py 2012-08-10 01:41:44 +0000
360+++ oops_amqp/tests/test_receiver.py 2018-03-12 11:56:56 +0000
361@@ -15,11 +15,14 @@
362
363 """Tests for AMQP receiving."""
364
365+from __future__ import absolute_import, print_function
366+
367 import errno
368 import socket
369
370-from amqplib import client_0_8 as amqp
371+import amqp
372 from oops import Config
373+import six
374
375 from oops_amqp import (
376 anybson as bson,
377@@ -47,7 +50,7 @@
378 queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
379 channel.basic_publish(
380 message, queue.exchange_name, routing_key="")
381- sentinel = "xxx"
382+ sentinel = b"xxx"
383 channel.basic_publish(
384 amqp.Message(sentinel), queue.exchange_name, routing_key="")
385 config = Config()
386@@ -81,9 +84,9 @@
387 def new_channel():
388 result = old_channel()
389 old_wait = result.wait
390- def new_wait(allowed_methods=None):
391+ def new_wait(*args, **kwargs):
392 receiver.stopping = True
393- return old_wait(allowed_methods=allowed_methods)
394+ return old_wait(*args, **kwargs)
395 result.wait = new_wait
396 return result
397 connection.channel = new_channel
398@@ -93,8 +96,7 @@
399 self.assertEqual([expected_report], reports)
400
401 def test_run_forever(self):
402- # run_forever subscribes and then calls wait in a loop.
403- config = None
404+ # run_forever subscribes and then calls drain_events in a loop.
405 calls = []
406 class FakeChannel:
407 def __init__(self, calls):
408@@ -103,25 +105,29 @@
409 def basic_consume(self, queue_name, callback=None):
410 self.calls.append(('basic_consume', queue_name, callback))
411 return 'tag'
412- def wait(self):
413- self.calls.append(('wait',))
414- if len(self.calls) > 2:
415- receiver.stopping = True
416 def basic_cancel(self, tag):
417 self.calls.append(('basic_cancel', tag))
418 def close(self):
419 self.is_open = False
420 class FakeConnection:
421+ def __init__(self, calls):
422+ self.calls = calls
423+ def connect(self):
424+ pass
425 def channel(self):
426 return FakeChannel(calls)
427+ def drain_events(self, timeout=None):
428+ self.calls.append(('drain_events', timeout))
429+ if len(self.calls) > 2:
430+ receiver.stopping = True
431 def close(self):
432 pass
433- receiver = Receiver(None, FakeConnection, 'foo')
434+ receiver = Receiver(None, lambda: FakeConnection(calls), 'foo')
435 receiver.run_forever()
436 self.assertEqual(
437 [('basic_consume', 'foo', receiver.handle_report),
438- ('wait',),
439- ('wait',),
440+ ('drain_events', 1),
441+ ('drain_events', 1),
442 ('basic_cancel', 'tag')],
443 calls)
444
445@@ -146,7 +152,7 @@
446 state = {}
447 def error_once(func):
448 def wrapped(*args, **kwargs):
449- func_ref = func.func_code
450+ func_ref = six.get_function_code(func)
451 if func_ref in state:
452 return func(*args, **kwargs)
453 else:
454@@ -162,17 +168,17 @@
455 @error_once
456 def new_channel():
457 result = old_channel()
458- result.wait = error_once(result.wait)
459 result.basic_consume = error_once(result.basic_consume)
460 result.basic_cancel = error_once(result.basic_cancel)
461 result.close = error_once(result.close)
462 return result
463 connection.channel = new_channel
464+ connection.drain_events = error_once(connection.drain_events)
465 connection.close = error_once(connection.close)
466 return connection
467 receiver = Receiver(config, patching_factory, queue.queue_name)
468- receiver.sentinel = "arhh"
469+ receiver.sentinel = b"arhh"
470 channel.basic_publish(
471- amqp.Message("arhh"), queue.exchange_name, routing_key="")
472+ amqp.Message(b"arhh"), queue.exchange_name, routing_key="")
473 receiver.run_forever()
474 self.assertEqual([expected_report], reports)
475
476=== modified file 'oops_amqp/trace.py'
477--- oops_amqp/trace.py 2012-08-10 02:56:01 +0000
478+++ oops_amqp/trace.py 2018-03-12 11:56:56 +0000
479@@ -15,17 +15,17 @@
480
481 """Trace OOPS reports coming from an AMQP queue."""
482
483+from __future__ import absolute_import, print_function
484+
485 from functools import partial
486 import sys
487 import optparse
488 from textwrap import dedent
489
490-import amqplib.client_0_8 as amqp
491+import amqp
492 import oops
493 import oops_amqp
494
495-import anybson as bson
496-
497
498 def main(argv=None):
499 if argv is None:
500
501=== modified file 'oops_amqp/utils.py'
502--- oops_amqp/utils.py 2011-12-08 10:15:23 +0000
503+++ oops_amqp/utils.py 2018-03-12 11:56:56 +0000
504@@ -15,10 +15,11 @@
505
506 """Utility functions for oops_amqp."""
507
508-import errno
509+from __future__ import absolute_import, print_function
510+
511 import socket
512
513-from amqplib.client_0_8.exceptions import AMQPConnectionException
514+from amqp.exceptions import ConnectionError
515
516 __all__ = [
517 'amqplib_error_types',
518@@ -30,7 +31,7 @@
519 # These exception types always indicate an AMQP connection error/closure.
520 # However you should catch amqplib_error_types and post-filter with
521 # is_amqplib_connection_error.
522-amqplib_connection_errors = (socket.error, AMQPConnectionException)
523+amqplib_connection_errors = (socket.error, ConnectionError)
524 # A tuple to reduce duplication in different code paths. Lists the types of
525 # exceptions legitimately raised by amqplib when the AMQP server goes down.
526 # Not all exceptions *will* be such errors - use is_amqplib_connection_error to
527@@ -41,7 +42,7 @@
528 def close_ignoring_connection_errors(closable):
529 try:
530 return closable.close()
531- except amqplib_error_types, e:
532+ except amqplib_error_types as e:
533 if is_amqplib_connection_error(e):
534 return
535 raise
536
537=== modified file 'setup.py'
538--- setup.py 2015-10-02 16:25:32 +0000
539+++ setup.py 2018-03-12 11:56:56 +0000
540@@ -19,8 +19,8 @@
541 from distutils.core import setup
542 import os.path
543
544-description = file(
545- os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
546+with open(os.path.join(os.path.dirname(__file__), 'README')) as f:
547+ description = f.read()
548
549 setup(name="oops_amqp",
550 version="0.0.8b1",
551@@ -38,15 +38,18 @@
552 'License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)',
553 'Operating System :: OS Independent',
554 'Programming Language :: Python',
555+ 'Programming Language :: Python :: 2',
556+ 'Programming Language :: Python :: 3',
557 ],
558 install_requires = [
559 'bson',
560 'oops>=0.0.11',
561- 'amqplib',
562+ 'amqp>=2.0.0',
563 ],
564 extras_require = dict(
565 test=[
566 'rabbitfixture',
567+ 'six',
568 'testresources',
569 'testtools',
570 ]
571
572=== modified file 'versions.cfg'
573--- versions.cfg 2012-08-10 01:41:44 +0000
574+++ versions.cfg 2018-03-12 11:56:56 +0000
575@@ -2,7 +2,7 @@
576 versions = versions
577
578 [versions]
579-amqplib = 0.6.1
580+amqp = 2.2.2
581 bson = 0.3.2
582 fixtures = 0.3.6
583 iso8601 = 0.1.4
584@@ -11,8 +11,10 @@
585 pytz = 2010o
586 rabbitfixture = 0.3.2
587 setuptools = 0.6c11
588+six = 1.11.0
589 testresources = 0.2.4-r58
590 testtools = 0.9.12-r228
591+vine = 1.1.4
592 zc.recipe.egg = 1.3.2
593 z3c.recipe.filetemplate = 2.1.0
594 z3c.recipe.scripts = 1.0.1

Subscribers

People subscribed via source and target branches

to all changes: