Merge lp:~cjwatson/python-oops-amqp/py3 into lp:python-oops-amqp

Proposed by Colin Watson
Status: Merged
Merged at revision: 22
Proposed branch: lp:~cjwatson/python-oops-amqp/py3
Merge into: lp:python-oops-amqp
Diff against target: 594 lines (+114/-57)
14 files modified
.bzrignore (+1/-0)
NEWS (+2/-0)
README (+4/-4)
oops_amqp/__init__.py (+5/-3)
oops_amqp/anybson.py (+2/-0)
oops_amqp/publisher.py (+10/-6)
oops_amqp/receiver.py (+13/-6)
oops_amqp/tests/__init__.py (+19/-4)
oops_amqp/tests/test_publisher.py (+18/-6)
oops_amqp/tests/test_receiver.py (+23/-17)
oops_amqp/trace.py (+3/-3)
oops_amqp/utils.py (+5/-4)
setup.py (+6/-3)
versions.cfg (+3/-1)
To merge this branch: bzr merge lp:~cjwatson/python-oops-amqp/py3
Reviewer Review Type Date Requested Status
William Grant code Approve
Review via email: mp+341298@code.launchpad.net

Commit message

Add Python 3 support.

Description of the change

This also involves porting from amqplib to the better-maintained amqp.

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file '.bzrignore'
--- .bzrignore 2011-12-08 10:50:34 +0000
+++ .bzrignore 2018-03-12 11:56:56 +0000
@@ -1,3 +1,4 @@
1__pycache__
1./eggs/*2./eggs/*
2./.installed.cfg3./.installed.cfg
3./develop-eggs4./develop-eggs
45
=== modified file 'NEWS'
--- NEWS 2015-10-08 14:49:18 +0000
+++ NEWS 2018-03-12 11:56:56 +0000
@@ -9,6 +9,8 @@
9* Dropped dependency on pymongo in favor of bson. This avoids having9* Dropped dependency on pymongo in favor of bson. This avoids having
10 to depend both on bson and pymongo when installing in conjunction10 to depend both on bson and pymongo when installing in conjunction
11 with oops-datedir-repo. (Ricardo Kirkner)11 with oops-datedir-repo. (Ricardo Kirkner)
12* Port from amqplib to amqp. (Colin Watson)
13* Add Python 3 support. (Colin Watson)
1214
130.0.7150.0.7
14-----16-----
1517
=== modified file 'README'
--- README 2012-08-10 01:41:44 +0000
+++ README 2018-03-12 11:56:56 +0000
@@ -31,7 +31,7 @@
3131
32* oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.32* oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.
3333
34* amqplib34* amqp
3535
36Testing Dependencies36Testing Dependencies
37====================37====================
@@ -57,7 +57,7 @@
57connection - and the exchange name and routing key to submit to.57connection - and the exchange name and routing key to submit to.
5858
59 >>> factory = partial(amqp.Connection, host="localhost:5672",59 >>> factory = partial(amqp.Connection, host="localhost:5672",
60 ... userid="guest", password="guest", virtual_host="/", insist=False)60 ... userid="guest", password="guest", virtual_host="/")
61 >>> publisher = oops_amqp.Publisher(factory, "oopses", "")61 >>> publisher = oops_amqp.Publisher(factory, "oopses", "")
6262
63Provide the publisher to your OOPS config::63Provide the publisher to your OOPS config::
@@ -70,7 +70,7 @@
70OOPS ids are generating by hashing the oops message (without the id field) -70OOPS ids are generating by hashing the oops message (without the id field) -
71this ensures unique ids.71this ensures unique ids.
7272
73The reason a factory is used is because amqplib is not threadsafe - the73The reason a factory is used is because amqp is not threadsafe - the
74publisher maintains a thread locals object to hold the factories and creates74publisher maintains a thread locals object to hold the factories and creates
75connections when new threads are created(when they first generate an OOPS).75connections when new threads are created(when they first generate an OOPS).
7676
@@ -87,7 +87,7 @@
87method failed::87method failed::
8888
89 >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",89 >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
90 ... userid="guest", password="guest", virtual_host="/", insist=False)90 ... userid="guest", password="guest", virtual_host="/")
91 >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")91 >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
92 >>> config.publisher = publish_with_fallback(publisher, fallback_publisher)92 >>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
9393
9494
=== modified file 'oops_amqp/__init__.py'
--- oops_amqp/__init__.py 2015-10-02 16:26:57 +0000
+++ oops_amqp/__init__.py 2018-03-12 11:56:56 +0000
@@ -32,7 +32,7 @@
32connection - and the exchange name and routing key to submit to.32connection - and the exchange name and routing key to submit to.
3333
34 >>> factory = partial(amqp.Connection, host="localhost:5672",34 >>> factory = partial(amqp.Connection, host="localhost:5672",
35 ... userid="guest", password="guest", virtual_host="/", insist=False)35 ... userid="guest", password="guest", virtual_host="/")
36 >>> publisher = oops_amqp.Publisher(factory, "oopses", "")36 >>> publisher = oops_amqp.Publisher(factory, "oopses", "")
3737
38Provide the publisher to your OOPS config::38Provide the publisher to your OOPS config::
@@ -45,7 +45,7 @@
45OOPS ids are generating by hashing the oops message (without the id field) -45OOPS ids are generating by hashing the oops message (without the id field) -
46this ensures unique ids.46this ensures unique ids.
4747
48The reason a factory is used is because amqplib is not threadsafe - the48The reason a factory is used is because amqp is not threadsafe - the
49publisher maintains a thread locals object to hold the factories and creates49publisher maintains a thread locals object to hold the factories and creates
50connections when new threads are created(when they first generate an OOPS).50connections when new threads are created(when they first generate an OOPS).
5151
@@ -62,7 +62,7 @@
62method failed::62method failed::
6363
64 >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",64 >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
65 ... userid="guest", password="guest", virtual_host="/", insist=False)65 ... userid="guest", password="guest", virtual_host="/")
66 >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")66 >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
67 >>> config.publisher = publish_with_fallback(publisher, fallback_publisher)67 >>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
6868
@@ -86,6 +86,8 @@
86 >>> receiver.run_forever()86 >>> receiver.run_forever()
87"""87"""
8888
89from __future__ import absolute_import, print_function
90
89# same format as sys.version_info: "A tuple containing the five components of91# same format as sys.version_info: "A tuple containing the five components of
90# the version number: major, minor, micro, releaselevel, and serial. All92# the version number: major, minor, micro, releaselevel, and serial. All
91# values except releaselevel are integers; the release level is 'alpha',93# values except releaselevel are integers; the release level is 'alpha',
9294
=== modified file 'oops_amqp/anybson.py'
--- oops_amqp/anybson.py 2012-02-10 19:27:09 +0000
+++ oops_amqp/anybson.py 2018-03-12 11:56:56 +0000
@@ -13,6 +13,8 @@
13# along with this program. If not, see <http://www.gnu.org/licenses/>.13# along with this program. If not, see <http://www.gnu.org/licenses/>.
14# GNU Lesser General Public License version 3 (see the file LICENSE).14# GNU Lesser General Public License version 3 (see the file LICENSE).
1515
16from __future__ import absolute_import, print_function
17
16__all__ = [18__all__ = [
17 'dumps',19 'dumps',
18 'loads',20 'loads',
1921
=== modified file 'oops_amqp/publisher.py'
--- oops_amqp/publisher.py 2012-08-10 01:41:44 +0000
+++ oops_amqp/publisher.py 2018-03-12 11:56:56 +0000
@@ -15,15 +15,17 @@
1515
16"""Publish OOPS reports over amqp."""16"""Publish OOPS reports over amqp."""
1717
18from __future__ import absolute_import, print_function
19
18__metaclass__ = type20__metaclass__ = type
1921
20from hashlib import md522from hashlib import md5
21from threading import local23from threading import local
2224
23from amqplib import client_0_8 as amqp25import amqp
24from anybson import dumps
2526
26from utils import (27from oops_amqp.anybson import dumps
28from oops_amqp.utils import (
27 amqplib_error_types,29 amqplib_error_types,
28 is_amqplib_connection_error,30 is_amqplib_connection_error,
29 )31 )
@@ -63,8 +65,10 @@
63 def get_channel(self):65 def get_channel(self):
64 if getattr(self.channels, 'channel', None) is None:66 if getattr(self.channels, 'channel', None) is None:
65 try:67 try:
66 self.channels.channel = self.connection_factory().channel()68 connection = self.connection_factory()
67 except amqplib_error_types, e:69 connection.connect()
70 self.channels.channel = connection.channel()
71 except amqplib_error_types as e:
68 if is_amqplib_connection_error(e):72 if is_amqplib_connection_error(e):
69 # Could not connect73 # Could not connect
70 return None74 return None
@@ -92,7 +96,7 @@
92 try:96 try:
93 channel.basic_publish(97 channel.basic_publish(
94 message, self.exchange_name, routing_key=self.routing_key)98 message, self.exchange_name, routing_key=self.routing_key)
95 except amqplib_error_types, e:99 except amqplib_error_types as e:
96 self.channels.channel = None100 self.channels.channel = None
97 if is_amqplib_connection_error(e):101 if is_amqplib_connection_error(e):
98 # Could not connect / interrupted connection102 # Could not connect / interrupted connection
99103
=== modified file 'oops_amqp/receiver.py'
--- oops_amqp/receiver.py 2012-02-09 23:13:45 +0000
+++ oops_amqp/receiver.py 2018-03-12 11:56:56 +0000
@@ -15,12 +15,14 @@
1515
16"""Receive OOPS reports over amqp and republish locally."""16"""Receive OOPS reports over amqp and republish locally."""
1717
18from __future__ import absolute_import, print_function
19
18__metaclass__ = type20__metaclass__ = type
1921
20import time22import time
2123
22import anybson as bson24from oops_amqp import anybson as bson
23from utils import (25from oops_amqp.utils import (
24 amqplib_error_types,26 amqplib_error_types,
25 close_ignoring_connection_errors,27 close_ignoring_connection_errors,
26 is_amqplib_connection_error,28 is_amqplib_connection_error,
@@ -56,12 +58,16 @@
56 self.sentinel = None58 self.sentinel = None
5759
58 def handle_report(self, message):60 def handle_report(self, message):
59 if message.body == self.sentinel:61 # bson requires bytes.
62 body = message.body
63 if not isinstance(body, bytes):
64 body = body.encode(message.content_encoding or 'UTF-8')
65 if body == self.sentinel:
60 self.stopping = True66 self.stopping = True
61 self.channel.basic_ack(message.delivery_tag)67 self.channel.basic_ack(message.delivery_tag)
62 return68 return
63 try:69 try:
64 report = bson.loads(message.body)70 report = bson.loads(body)
65 except KeyError:71 except KeyError:
66 # Garbage in the queue. Possibly this should raise an OOPS itself72 # Garbage in the queue. Possibly this should raise an OOPS itself
67 # (through a different config) or log an info level message.73 # (through a different config) or log an info level message.
@@ -82,7 +88,7 @@
82 (not self.went_bad or time.time() < self.went_bad + 120)):88 (not self.went_bad or time.time() < self.went_bad + 120)):
83 try:89 try:
84 self._run_forever()90 self._run_forever()
85 except amqplib_error_types, e:91 except amqplib_error_types as e:
86 if not is_amqplib_connection_error(e):92 if not is_amqplib_connection_error(e):
87 # Something unknown went wrong.93 # Something unknown went wrong.
88 raise94 raise
@@ -94,6 +100,7 @@
94100
95 def _run_forever(self):101 def _run_forever(self):
96 self.connection = self.connection_factory()102 self.connection = self.connection_factory()
103 self.connection.connect()
97 # A successful connection: record this so run_forever won't bail early.104 # A successful connection: record this so run_forever won't bail early.
98 self.went_bad = None105 self.went_bad = None
99 try:106 try:
@@ -103,7 +110,7 @@
103 self.queue_name, callback=self.handle_report)110 self.queue_name, callback=self.handle_report)
104 try:111 try:
105 while True:112 while True:
106 self.channel.wait()113 self.connection.drain_events(timeout=1)
107 if self.stopping:114 if self.stopping:
108 break115 break
109 finally:116 finally:
110117
=== modified file 'oops_amqp/tests/__init__.py'
--- oops_amqp/tests/__init__.py 2011-12-08 10:15:23 +0000
+++ oops_amqp/tests/__init__.py 2018-03-12 11:56:56 +0000
@@ -15,16 +15,20 @@
1515
16"""Tests for oops_amqp."""16"""Tests for oops_amqp."""
1717
18from __future__ import absolute_import, print_function
19
18from unittest import TestLoader20from unittest import TestLoader
1921
20from amqplib import client_0_8 as amqp22import amqp
21from fixtures import Fixture23from fixtures import Fixture
22from rabbitfixture.server import RabbitServer24from rabbitfixture.server import RabbitServer
23import testtools25import testtools
24from testresources import (26from testresources import (
27 _get_result,
25 FixtureResource,28 FixtureResource,
26 OptimisingTestSuite,29 OptimisingTestSuite,
27 ResourcedTestCase,30 setUpResources,
31 tearDownResources,
28 )32 )
2933
30from oops_amqp.utils import close_ignoring_connection_errors34from oops_amqp.utils import close_ignoring_connection_errors
@@ -94,16 +98,27 @@
94 def setUp(self):98 def setUp(self):
95 super(ChannelFixture, self).setUp()99 super(ChannelFixture, self).setUp()
96 self.connection = self.connection_factory()100 self.connection = self.connection_factory()
101 self.connection.connect()
97 self.addCleanup(close_ignoring_connection_errors, self.connection)102 self.addCleanup(close_ignoring_connection_errors, self.connection)
98 self.channel = self.connection.channel()103 self.channel = self.connection.channel()
99 self.addCleanup(close_ignoring_connection_errors, self.channel)104 self.addCleanup(close_ignoring_connection_errors, self.channel)
100105
101106
102class TestCase(testtools.TestCase, ResourcedTestCase):107class TestCase(testtools.TestCase):
103 """Subclass to mix in testresources ResourcedTestCase."""108 """Subclass to start a RabbitMQ server."""
104109
105 resources = [('rabbit', FixtureResource(RabbitServer()))]110 resources = [('rabbit', FixtureResource(RabbitServer()))]
106111
112 def setUp(self):
113 super(TestCase, self).setUp()
114 # ResourcedTestCase handles teardown in the wrong order for us (we
115 # need to ensure that the RabbitServer fixture is only cleaned up
116 # after any other fixtures registered by individual tests), so we
117 # imitate it manually.
118 result = _get_result()
119 setUpResources(self, self.resources, result)
120 self.addCleanup(tearDownResources, self, self.resources, result)
121
107 def connection_factory(self):122 def connection_factory(self):
108 """When called, return an amqplib connection."""123 """When called, return an amqplib connection."""
109 return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,124 return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,
110125
=== modified file 'oops_amqp/tests/test_publisher.py'
--- oops_amqp/tests/test_publisher.py 2012-08-10 01:41:44 +0000
+++ oops_amqp/tests/test_publisher.py 2018-03-12 11:56:56 +0000
@@ -15,6 +15,8 @@
1515
16"""Tests for AMQP publishing."""16"""Tests for AMQP publishing."""
1717
18from __future__ import absolute_import, print_function
19
18from hashlib import md520from hashlib import md5
1921
20from oops_amqp import (22from oops_amqp import (
@@ -47,13 +49,16 @@
47 self.assertEqual(reference_oops, oops)49 self.assertEqual(reference_oops, oops)
48 # The received OOPS should have the ID embedded and be a bson dict.50 # The received OOPS should have the ID embedded and be a bson dict.
49 def check_oops(msg):51 def check_oops(msg):
50 self.assertEqual(reference_oops, bson.loads(msg.body))52 body = msg.body
53 if not isinstance(body, bytes):
54 body = body.encode(msg.content_encoding or 'UTF-8')
55 self.assertEqual(reference_oops, bson.loads(body))
51 channel.basic_ack(msg.delivery_tag)56 channel.basic_ack(msg.delivery_tag)
52 channel.basic_cancel(queue.queue_name)57 channel.basic_cancel(queue.queue_name)
53 channel.basic_consume(58 channel.basic_consume(
54 queue.queue_name, callback=check_oops,59 queue.queue_name, callback=check_oops,
55 consumer_tag=queue.queue_name)60 consumer_tag=queue.queue_name)
56 channel.wait()61 channel.connection.drain_events()
5762
58 def test_publish(self):63 def test_publish(self):
59 # Publishing an oops sends it to the exchange, making a connection as64 # Publishing an oops sends it to the exchange, making a connection as
@@ -75,13 +80,16 @@
75 expected_oops = dict(reference_oops)80 expected_oops = dict(reference_oops)
76 expected_oops['id'] = oops_ids[0]81 expected_oops['id'] = oops_ids[0]
77 def check_oops(msg):82 def check_oops(msg):
78 self.assertEqual(expected_oops, bson.loads(msg.body))83 body = msg.body
84 if not isinstance(body, bytes):
85 body = body.encode(msg.content_encoding or 'UTF-8')
86 self.assertEqual(expected_oops, bson.loads(body))
79 channel.basic_ack(msg.delivery_tag)87 channel.basic_ack(msg.delivery_tag)
80 channel.basic_cancel(queue.queue_name)88 channel.basic_cancel(queue.queue_name)
81 channel.basic_consume(89 channel.basic_consume(
82 queue.queue_name, callback=check_oops,90 queue.queue_name, callback=check_oops,
83 consumer_tag=queue.queue_name)91 consumer_tag=queue.queue_name)
84 channel.wait()92 channel.connection.drain_events()
8593
86 def test_publish_amqp_already_down(self):94 def test_publish_amqp_already_down(self):
87 # If amqp is down when a connection is attempted, None is returned to95 # If amqp is down when a connection is attempted, None is returned to
@@ -101,7 +109,9 @@
101 self.assertEqual([], publisher(oops))109 self.assertEqual([], publisher(oops))
102 finally:110 finally:
103 self.rabbit.runner._start()111 self.rabbit.runner._start()
104 queue.channel = self.connection_factory().channel()112 connection = self.connection_factory()
113 connection.connect()
114 queue.channel = connection.channel()
105 self.assertNotEqual([], publisher(oops))115 self.assertNotEqual([], publisher(oops))
106116
107 def test_publish_amqp_down_after_use(self):117 def test_publish_amqp_down_after_use(self):
@@ -122,6 +132,8 @@
122 self.assertEqual([], publisher(oops))132 self.assertEqual([], publisher(oops))
123 finally:133 finally:
124 self.rabbit.runner._start()134 self.rabbit.runner._start()
125 queue.channel = self.connection_factory().channel()135 connection = self.connection_factory()
136 connection.connect()
137 queue.channel = connection.channel()
126 self.assertNotEqual([], publisher(oops))138 self.assertNotEqual([], publisher(oops))
127139
128140
=== modified file 'oops_amqp/tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py 2012-08-10 01:41:44 +0000
+++ oops_amqp/tests/test_receiver.py 2018-03-12 11:56:56 +0000
@@ -15,11 +15,14 @@
1515
16"""Tests for AMQP receiving."""16"""Tests for AMQP receiving."""
1717
18from __future__ import absolute_import, print_function
19
18import errno20import errno
19import socket21import socket
2022
21from amqplib import client_0_8 as amqp23import amqp
22from oops import Config24from oops import Config
25import six
2326
24from oops_amqp import (27from oops_amqp import (
25 anybson as bson,28 anybson as bson,
@@ -47,7 +50,7 @@
47 queue = self.useFixture(QueueFixture(channel, self.getUniqueString))50 queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
48 channel.basic_publish(51 channel.basic_publish(
49 message, queue.exchange_name, routing_key="")52 message, queue.exchange_name, routing_key="")
50 sentinel = "xxx"53 sentinel = b"xxx"
51 channel.basic_publish(54 channel.basic_publish(
52 amqp.Message(sentinel), queue.exchange_name, routing_key="")55 amqp.Message(sentinel), queue.exchange_name, routing_key="")
53 config = Config()56 config = Config()
@@ -81,9 +84,9 @@
81 def new_channel():84 def new_channel():
82 result = old_channel()85 result = old_channel()
83 old_wait = result.wait86 old_wait = result.wait
84 def new_wait(allowed_methods=None):87 def new_wait(*args, **kwargs):
85 receiver.stopping = True88 receiver.stopping = True
86 return old_wait(allowed_methods=allowed_methods)89 return old_wait(*args, **kwargs)
87 result.wait = new_wait90 result.wait = new_wait
88 return result91 return result
89 connection.channel = new_channel92 connection.channel = new_channel
@@ -93,8 +96,7 @@
93 self.assertEqual([expected_report], reports)96 self.assertEqual([expected_report], reports)
9497
95 def test_run_forever(self):98 def test_run_forever(self):
96 # run_forever subscribes and then calls wait in a loop.99 # run_forever subscribes and then calls drain_events in a loop.
97 config = None
98 calls = []100 calls = []
99 class FakeChannel:101 class FakeChannel:
100 def __init__(self, calls):102 def __init__(self, calls):
@@ -103,25 +105,29 @@
103 def basic_consume(self, queue_name, callback=None):105 def basic_consume(self, queue_name, callback=None):
104 self.calls.append(('basic_consume', queue_name, callback))106 self.calls.append(('basic_consume', queue_name, callback))
105 return 'tag'107 return 'tag'
106 def wait(self):
107 self.calls.append(('wait',))
108 if len(self.calls) > 2:
109 receiver.stopping = True
110 def basic_cancel(self, tag):108 def basic_cancel(self, tag):
111 self.calls.append(('basic_cancel', tag))109 self.calls.append(('basic_cancel', tag))
112 def close(self):110 def close(self):
113 self.is_open = False111 self.is_open = False
114 class FakeConnection:112 class FakeConnection:
113 def __init__(self, calls):
114 self.calls = calls
115 def connect(self):
116 pass
115 def channel(self):117 def channel(self):
116 return FakeChannel(calls)118 return FakeChannel(calls)
119 def drain_events(self, timeout=None):
120 self.calls.append(('drain_events', timeout))
121 if len(self.calls) > 2:
122 receiver.stopping = True
117 def close(self):123 def close(self):
118 pass124 pass
119 receiver = Receiver(None, FakeConnection, 'foo')125 receiver = Receiver(None, lambda: FakeConnection(calls), 'foo')
120 receiver.run_forever()126 receiver.run_forever()
121 self.assertEqual(127 self.assertEqual(
122 [('basic_consume', 'foo', receiver.handle_report),128 [('basic_consume', 'foo', receiver.handle_report),
123 ('wait',),129 ('drain_events', 1),
124 ('wait',),130 ('drain_events', 1),
125 ('basic_cancel', 'tag')],131 ('basic_cancel', 'tag')],
126 calls)132 calls)
127133
@@ -146,7 +152,7 @@
146 state = {}152 state = {}
147 def error_once(func):153 def error_once(func):
148 def wrapped(*args, **kwargs):154 def wrapped(*args, **kwargs):
149 func_ref = func.func_code155 func_ref = six.get_function_code(func)
150 if func_ref in state:156 if func_ref in state:
151 return func(*args, **kwargs)157 return func(*args, **kwargs)
152 else:158 else:
@@ -162,17 +168,17 @@
162 @error_once168 @error_once
163 def new_channel():169 def new_channel():
164 result = old_channel()170 result = old_channel()
165 result.wait = error_once(result.wait)
166 result.basic_consume = error_once(result.basic_consume)171 result.basic_consume = error_once(result.basic_consume)
167 result.basic_cancel = error_once(result.basic_cancel)172 result.basic_cancel = error_once(result.basic_cancel)
168 result.close = error_once(result.close)173 result.close = error_once(result.close)
169 return result174 return result
170 connection.channel = new_channel175 connection.channel = new_channel
176 connection.drain_events = error_once(connection.drain_events)
171 connection.close = error_once(connection.close)177 connection.close = error_once(connection.close)
172 return connection178 return connection
173 receiver = Receiver(config, patching_factory, queue.queue_name)179 receiver = Receiver(config, patching_factory, queue.queue_name)
174 receiver.sentinel = "arhh"180 receiver.sentinel = b"arhh"
175 channel.basic_publish(181 channel.basic_publish(
176 amqp.Message("arhh"), queue.exchange_name, routing_key="")182 amqp.Message(b"arhh"), queue.exchange_name, routing_key="")
177 receiver.run_forever()183 receiver.run_forever()
178 self.assertEqual([expected_report], reports)184 self.assertEqual([expected_report], reports)
179185
=== modified file 'oops_amqp/trace.py'
--- oops_amqp/trace.py 2012-08-10 02:56:01 +0000
+++ oops_amqp/trace.py 2018-03-12 11:56:56 +0000
@@ -15,17 +15,17 @@
1515
16"""Trace OOPS reports coming from an AMQP queue."""16"""Trace OOPS reports coming from an AMQP queue."""
1717
18from __future__ import absolute_import, print_function
19
18from functools import partial20from functools import partial
19import sys21import sys
20import optparse22import optparse
21from textwrap import dedent23from textwrap import dedent
2224
23import amqplib.client_0_8 as amqp25import amqp
24import oops26import oops
25import oops_amqp27import oops_amqp
2628
27import anybson as bson
28
2929
30def main(argv=None):30def main(argv=None):
31 if argv is None:31 if argv is None:
3232
=== modified file 'oops_amqp/utils.py'
--- oops_amqp/utils.py 2011-12-08 10:15:23 +0000
+++ oops_amqp/utils.py 2018-03-12 11:56:56 +0000
@@ -15,10 +15,11 @@
1515
16"""Utility functions for oops_amqp."""16"""Utility functions for oops_amqp."""
1717
18import errno18from __future__ import absolute_import, print_function
19
19import socket20import socket
2021
21from amqplib.client_0_8.exceptions import AMQPConnectionException22from amqp.exceptions import ConnectionError
2223
23__all__ = [24__all__ = [
24 'amqplib_error_types',25 'amqplib_error_types',
@@ -30,7 +31,7 @@
30# These exception types always indicate an AMQP connection error/closure.31# These exception types always indicate an AMQP connection error/closure.
31# However you should catch amqplib_error_types and post-filter with32# However you should catch amqplib_error_types and post-filter with
32# is_amqplib_connection_error.33# is_amqplib_connection_error.
33amqplib_connection_errors = (socket.error, AMQPConnectionException)34amqplib_connection_errors = (socket.error, ConnectionError)
34# A tuple to reduce duplication in different code paths. Lists the types of35# A tuple to reduce duplication in different code paths. Lists the types of
35# exceptions legitimately raised by amqplib when the AMQP server goes down.36# exceptions legitimately raised by amqplib when the AMQP server goes down.
36# Not all exceptions *will* be such errors - use is_amqplib_connection_error to37# Not all exceptions *will* be such errors - use is_amqplib_connection_error to
@@ -41,7 +42,7 @@
41def close_ignoring_connection_errors(closable):42def close_ignoring_connection_errors(closable):
42 try:43 try:
43 return closable.close()44 return closable.close()
44 except amqplib_error_types, e:45 except amqplib_error_types as e:
45 if is_amqplib_connection_error(e):46 if is_amqplib_connection_error(e):
46 return47 return
47 raise48 raise
4849
=== modified file 'setup.py'
--- setup.py 2015-10-02 16:25:32 +0000
+++ setup.py 2018-03-12 11:56:56 +0000
@@ -19,8 +19,8 @@
19from distutils.core import setup19from distutils.core import setup
20import os.path20import os.path
2121
22description = file(22with open(os.path.join(os.path.dirname(__file__), 'README')) as f:
23 os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()23 description = f.read()
2424
25setup(name="oops_amqp",25setup(name="oops_amqp",
26 version="0.0.8b1",26 version="0.0.8b1",
@@ -38,15 +38,18 @@
38 'License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)',38 'License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)',
39 'Operating System :: OS Independent',39 'Operating System :: OS Independent',
40 'Programming Language :: Python',40 'Programming Language :: Python',
41 'Programming Language :: Python :: 2',
42 'Programming Language :: Python :: 3',
41 ],43 ],
42 install_requires = [44 install_requires = [
43 'bson',45 'bson',
44 'oops>=0.0.11',46 'oops>=0.0.11',
45 'amqplib',47 'amqp>=2.0.0',
46 ],48 ],
47 extras_require = dict(49 extras_require = dict(
48 test=[50 test=[
49 'rabbitfixture',51 'rabbitfixture',
52 'six',
50 'testresources',53 'testresources',
51 'testtools',54 'testtools',
52 ]55 ]
5356
=== modified file 'versions.cfg'
--- versions.cfg 2012-08-10 01:41:44 +0000
+++ versions.cfg 2018-03-12 11:56:56 +0000
@@ -2,7 +2,7 @@
2versions = versions2versions = versions
33
4[versions]4[versions]
5amqplib = 0.6.15amqp = 2.2.2
6bson = 0.3.26bson = 0.3.2
7fixtures = 0.3.67fixtures = 0.3.6
8iso8601 = 0.1.48iso8601 = 0.1.4
@@ -11,8 +11,10 @@
11pytz = 2010o11pytz = 2010o
12rabbitfixture = 0.3.212rabbitfixture = 0.3.2
13setuptools = 0.6c1113setuptools = 0.6c11
14six = 1.11.0
14testresources = 0.2.4-r5815testresources = 0.2.4-r58
15testtools = 0.9.12-r22816testtools = 0.9.12-r228
17vine = 1.1.4
16zc.recipe.egg = 1.3.218zc.recipe.egg = 1.3.2
17z3c.recipe.filetemplate = 2.1.019z3c.recipe.filetemplate = 2.1.0
18z3c.recipe.scripts = 1.0.120z3c.recipe.scripts = 1.0.1

Subscribers

People subscribed via source and target branches

to all changes: