Merge lp:~cjwatson/python-oops-amqp/py3 into lp:python-oops-amqp
- py3
- Merge into trunk
Proposed by
Colin Watson
Status: | Merged |
---|---|
Merged at revision: | 22 |
Proposed branch: | lp:~cjwatson/python-oops-amqp/py3 |
Merge into: | lp:python-oops-amqp |
Diff against target: |
594 lines (+114/-57) 14 files modified
.bzrignore (+1/-0) NEWS (+2/-0) README (+4/-4) oops_amqp/__init__.py (+5/-3) oops_amqp/anybson.py (+2/-0) oops_amqp/publisher.py (+10/-6) oops_amqp/receiver.py (+13/-6) oops_amqp/tests/__init__.py (+19/-4) oops_amqp/tests/test_publisher.py (+18/-6) oops_amqp/tests/test_receiver.py (+23/-17) oops_amqp/trace.py (+3/-3) oops_amqp/utils.py (+5/-4) setup.py (+6/-3) versions.cfg (+3/-1) |
To merge this branch: | bzr merge lp:~cjwatson/python-oops-amqp/py3 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
William Grant | code | Approve | |
Review via email: mp+341298@code.launchpad.net |
Commit message
Add Python 3 support.
Description of the change
This also involves porting from amqplib to the better-maintained amqp.
To post a comment you must log in.
Revision history for this message
William Grant (wgrant) : | # |
review:
Approve
(code)
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file '.bzrignore' | |||
2 | --- .bzrignore 2011-12-08 10:50:34 +0000 | |||
3 | +++ .bzrignore 2018-03-12 11:56:56 +0000 | |||
4 | @@ -1,3 +1,4 @@ | |||
5 | 1 | __pycache__ | ||
6 | 1 | ./eggs/* | 2 | ./eggs/* |
7 | 2 | ./.installed.cfg | 3 | ./.installed.cfg |
8 | 3 | ./develop-eggs | 4 | ./develop-eggs |
9 | 4 | 5 | ||
10 | === modified file 'NEWS' | |||
11 | --- NEWS 2015-10-08 14:49:18 +0000 | |||
12 | +++ NEWS 2018-03-12 11:56:56 +0000 | |||
13 | @@ -9,6 +9,8 @@ | |||
14 | 9 | * Dropped dependency on pymongo in favor of bson. This avoids having | 9 | * Dropped dependency on pymongo in favor of bson. This avoids having |
15 | 10 | to depend both on bson and pymongo when installing in conjunction | 10 | to depend both on bson and pymongo when installing in conjunction |
16 | 11 | with oops-datedir-repo. (Ricardo Kirkner) | 11 | with oops-datedir-repo. (Ricardo Kirkner) |
17 | 12 | * Port from amqplib to amqp. (Colin Watson) | ||
18 | 13 | * Add Python 3 support. (Colin Watson) | ||
19 | 12 | 14 | ||
20 | 13 | 0.0.7 | 15 | 0.0.7 |
21 | 14 | ----- | 16 | ----- |
22 | 15 | 17 | ||
23 | === modified file 'README' | |||
24 | --- README 2012-08-10 01:41:44 +0000 | |||
25 | +++ README 2018-03-12 11:56:56 +0000 | |||
26 | @@ -31,7 +31,7 @@ | |||
27 | 31 | 31 | ||
28 | 32 | * oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer. | 32 | * oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer. |
29 | 33 | 33 | ||
31 | 34 | * amqplib | 34 | * amqp |
32 | 35 | 35 | ||
33 | 36 | Testing Dependencies | 36 | Testing Dependencies |
34 | 37 | ==================== | 37 | ==================== |
35 | @@ -57,7 +57,7 @@ | |||
36 | 57 | connection - and the exchange name and routing key to submit to. | 57 | connection - and the exchange name and routing key to submit to. |
37 | 58 | 58 | ||
38 | 59 | >>> factory = partial(amqp.Connection, host="localhost:5672", | 59 | >>> factory = partial(amqp.Connection, host="localhost:5672", |
40 | 60 | ... userid="guest", password="guest", virtual_host="/", insist=False) | 60 | ... userid="guest", password="guest", virtual_host="/") |
41 | 61 | >>> publisher = oops_amqp.Publisher(factory, "oopses", "") | 61 | >>> publisher = oops_amqp.Publisher(factory, "oopses", "") |
42 | 62 | 62 | ||
43 | 63 | Provide the publisher to your OOPS config:: | 63 | Provide the publisher to your OOPS config:: |
44 | @@ -70,7 +70,7 @@ | |||
45 | 70 | OOPS ids are generating by hashing the oops message (without the id field) - | 70 | OOPS ids are generating by hashing the oops message (without the id field) - |
46 | 71 | this ensures unique ids. | 71 | this ensures unique ids. |
47 | 72 | 72 | ||
49 | 73 | The reason a factory is used is because amqplib is not threadsafe - the | 73 | The reason a factory is used is because amqp is not threadsafe - the |
50 | 74 | publisher maintains a thread locals object to hold the factories and creates | 74 | publisher maintains a thread locals object to hold the factories and creates |
51 | 75 | connections when new threads are created(when they first generate an OOPS). | 75 | connections when new threads are created(when they first generate an OOPS). |
52 | 76 | 76 | ||
53 | @@ -87,7 +87,7 @@ | |||
54 | 87 | method failed:: | 87 | method failed:: |
55 | 88 | 88 | ||
56 | 89 | >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672", | 89 | >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672", |
58 | 90 | ... userid="guest", password="guest", virtual_host="/", insist=False) | 90 | ... userid="guest", password="guest", virtual_host="/") |
59 | 91 | >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "") | 91 | >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "") |
60 | 92 | >>> config.publisher = publish_with_fallback(publisher, fallback_publisher) | 92 | >>> config.publisher = publish_with_fallback(publisher, fallback_publisher) |
61 | 93 | 93 | ||
62 | 94 | 94 | ||
63 | === modified file 'oops_amqp/__init__.py' | |||
64 | --- oops_amqp/__init__.py 2015-10-02 16:26:57 +0000 | |||
65 | +++ oops_amqp/__init__.py 2018-03-12 11:56:56 +0000 | |||
66 | @@ -32,7 +32,7 @@ | |||
67 | 32 | connection - and the exchange name and routing key to submit to. | 32 | connection - and the exchange name and routing key to submit to. |
68 | 33 | 33 | ||
69 | 34 | >>> factory = partial(amqp.Connection, host="localhost:5672", | 34 | >>> factory = partial(amqp.Connection, host="localhost:5672", |
71 | 35 | ... userid="guest", password="guest", virtual_host="/", insist=False) | 35 | ... userid="guest", password="guest", virtual_host="/") |
72 | 36 | >>> publisher = oops_amqp.Publisher(factory, "oopses", "") | 36 | >>> publisher = oops_amqp.Publisher(factory, "oopses", "") |
73 | 37 | 37 | ||
74 | 38 | Provide the publisher to your OOPS config:: | 38 | Provide the publisher to your OOPS config:: |
75 | @@ -45,7 +45,7 @@ | |||
76 | 45 | OOPS ids are generating by hashing the oops message (without the id field) - | 45 | OOPS ids are generating by hashing the oops message (without the id field) - |
77 | 46 | this ensures unique ids. | 46 | this ensures unique ids. |
78 | 47 | 47 | ||
80 | 48 | The reason a factory is used is because amqplib is not threadsafe - the | 48 | The reason a factory is used is because amqp is not threadsafe - the |
81 | 49 | publisher maintains a thread locals object to hold the factories and creates | 49 | publisher maintains a thread locals object to hold the factories and creates |
82 | 50 | connections when new threads are created(when they first generate an OOPS). | 50 | connections when new threads are created(when they first generate an OOPS). |
83 | 51 | 51 | ||
84 | @@ -62,7 +62,7 @@ | |||
85 | 62 | method failed:: | 62 | method failed:: |
86 | 63 | 63 | ||
87 | 64 | >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672", | 64 | >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672", |
89 | 65 | ... userid="guest", password="guest", virtual_host="/", insist=False) | 65 | ... userid="guest", password="guest", virtual_host="/") |
90 | 66 | >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "") | 66 | >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "") |
91 | 67 | >>> config.publisher = publish_with_fallback(publisher, fallback_publisher) | 67 | >>> config.publisher = publish_with_fallback(publisher, fallback_publisher) |
92 | 68 | 68 | ||
93 | @@ -86,6 +86,8 @@ | |||
94 | 86 | >>> receiver.run_forever() | 86 | >>> receiver.run_forever() |
95 | 87 | """ | 87 | """ |
96 | 88 | 88 | ||
97 | 89 | from __future__ import absolute_import, print_function | ||
98 | 90 | |||
99 | 89 | # same format as sys.version_info: "A tuple containing the five components of | 91 | # same format as sys.version_info: "A tuple containing the five components of |
100 | 90 | # the version number: major, minor, micro, releaselevel, and serial. All | 92 | # the version number: major, minor, micro, releaselevel, and serial. All |
101 | 91 | # values except releaselevel are integers; the release level is 'alpha', | 93 | # values except releaselevel are integers; the release level is 'alpha', |
102 | 92 | 94 | ||
103 | === modified file 'oops_amqp/anybson.py' | |||
104 | --- oops_amqp/anybson.py 2012-02-10 19:27:09 +0000 | |||
105 | +++ oops_amqp/anybson.py 2018-03-12 11:56:56 +0000 | |||
106 | @@ -13,6 +13,8 @@ | |||
107 | 13 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | 13 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
108 | 14 | # GNU Lesser General Public License version 3 (see the file LICENSE). | 14 | # GNU Lesser General Public License version 3 (see the file LICENSE). |
109 | 15 | 15 | ||
110 | 16 | from __future__ import absolute_import, print_function | ||
111 | 17 | |||
112 | 16 | __all__ = [ | 18 | __all__ = [ |
113 | 17 | 'dumps', | 19 | 'dumps', |
114 | 18 | 'loads', | 20 | 'loads', |
115 | 19 | 21 | ||
116 | === modified file 'oops_amqp/publisher.py' | |||
117 | --- oops_amqp/publisher.py 2012-08-10 01:41:44 +0000 | |||
118 | +++ oops_amqp/publisher.py 2018-03-12 11:56:56 +0000 | |||
119 | @@ -15,15 +15,17 @@ | |||
120 | 15 | 15 | ||
121 | 16 | """Publish OOPS reports over amqp.""" | 16 | """Publish OOPS reports over amqp.""" |
122 | 17 | 17 | ||
123 | 18 | from __future__ import absolute_import, print_function | ||
124 | 19 | |||
125 | 18 | __metaclass__ = type | 20 | __metaclass__ = type |
126 | 19 | 21 | ||
127 | 20 | from hashlib import md5 | 22 | from hashlib import md5 |
128 | 21 | from threading import local | 23 | from threading import local |
129 | 22 | 24 | ||
132 | 23 | from amqplib import client_0_8 as amqp | 25 | import amqp |
131 | 24 | from anybson import dumps | ||
133 | 25 | 26 | ||
135 | 26 | from utils import ( | 27 | from oops_amqp.anybson import dumps |
136 | 28 | from oops_amqp.utils import ( | ||
137 | 27 | amqplib_error_types, | 29 | amqplib_error_types, |
138 | 28 | is_amqplib_connection_error, | 30 | is_amqplib_connection_error, |
139 | 29 | ) | 31 | ) |
140 | @@ -63,8 +65,10 @@ | |||
141 | 63 | def get_channel(self): | 65 | def get_channel(self): |
142 | 64 | if getattr(self.channels, 'channel', None) is None: | 66 | if getattr(self.channels, 'channel', None) is None: |
143 | 65 | try: | 67 | try: |
146 | 66 | self.channels.channel = self.connection_factory().channel() | 68 | connection = self.connection_factory() |
147 | 67 | except amqplib_error_types, e: | 69 | connection.connect() |
148 | 70 | self.channels.channel = connection.channel() | ||
149 | 71 | except amqplib_error_types as e: | ||
150 | 68 | if is_amqplib_connection_error(e): | 72 | if is_amqplib_connection_error(e): |
151 | 69 | # Could not connect | 73 | # Could not connect |
152 | 70 | return None | 74 | return None |
153 | @@ -92,7 +96,7 @@ | |||
154 | 92 | try: | 96 | try: |
155 | 93 | channel.basic_publish( | 97 | channel.basic_publish( |
156 | 94 | message, self.exchange_name, routing_key=self.routing_key) | 98 | message, self.exchange_name, routing_key=self.routing_key) |
158 | 95 | except amqplib_error_types, e: | 99 | except amqplib_error_types as e: |
159 | 96 | self.channels.channel = None | 100 | self.channels.channel = None |
160 | 97 | if is_amqplib_connection_error(e): | 101 | if is_amqplib_connection_error(e): |
161 | 98 | # Could not connect / interrupted connection | 102 | # Could not connect / interrupted connection |
162 | 99 | 103 | ||
163 | === modified file 'oops_amqp/receiver.py' | |||
164 | --- oops_amqp/receiver.py 2012-02-09 23:13:45 +0000 | |||
165 | +++ oops_amqp/receiver.py 2018-03-12 11:56:56 +0000 | |||
166 | @@ -15,12 +15,14 @@ | |||
167 | 15 | 15 | ||
168 | 16 | """Receive OOPS reports over amqp and republish locally.""" | 16 | """Receive OOPS reports over amqp and republish locally.""" |
169 | 17 | 17 | ||
170 | 18 | from __future__ import absolute_import, print_function | ||
171 | 19 | |||
172 | 18 | __metaclass__ = type | 20 | __metaclass__ = type |
173 | 19 | 21 | ||
174 | 20 | import time | 22 | import time |
175 | 21 | 23 | ||
178 | 22 | import anybson as bson | 24 | from oops_amqp import anybson as bson |
179 | 23 | from utils import ( | 25 | from oops_amqp.utils import ( |
180 | 24 | amqplib_error_types, | 26 | amqplib_error_types, |
181 | 25 | close_ignoring_connection_errors, | 27 | close_ignoring_connection_errors, |
182 | 26 | is_amqplib_connection_error, | 28 | is_amqplib_connection_error, |
183 | @@ -56,12 +58,16 @@ | |||
184 | 56 | self.sentinel = None | 58 | self.sentinel = None |
185 | 57 | 59 | ||
186 | 58 | def handle_report(self, message): | 60 | def handle_report(self, message): |
188 | 59 | if message.body == self.sentinel: | 61 | # bson requires bytes. |
189 | 62 | body = message.body | ||
190 | 63 | if not isinstance(body, bytes): | ||
191 | 64 | body = body.encode(message.content_encoding or 'UTF-8') | ||
192 | 65 | if body == self.sentinel: | ||
193 | 60 | self.stopping = True | 66 | self.stopping = True |
194 | 61 | self.channel.basic_ack(message.delivery_tag) | 67 | self.channel.basic_ack(message.delivery_tag) |
195 | 62 | return | 68 | return |
196 | 63 | try: | 69 | try: |
198 | 64 | report = bson.loads(message.body) | 70 | report = bson.loads(body) |
199 | 65 | except KeyError: | 71 | except KeyError: |
200 | 66 | # Garbage in the queue. Possibly this should raise an OOPS itself | 72 | # Garbage in the queue. Possibly this should raise an OOPS itself |
201 | 67 | # (through a different config) or log an info level message. | 73 | # (through a different config) or log an info level message. |
202 | @@ -82,7 +88,7 @@ | |||
203 | 82 | (not self.went_bad or time.time() < self.went_bad + 120)): | 88 | (not self.went_bad or time.time() < self.went_bad + 120)): |
204 | 83 | try: | 89 | try: |
205 | 84 | self._run_forever() | 90 | self._run_forever() |
207 | 85 | except amqplib_error_types, e: | 91 | except amqplib_error_types as e: |
208 | 86 | if not is_amqplib_connection_error(e): | 92 | if not is_amqplib_connection_error(e): |
209 | 87 | # Something unknown went wrong. | 93 | # Something unknown went wrong. |
210 | 88 | raise | 94 | raise |
211 | @@ -94,6 +100,7 @@ | |||
212 | 94 | 100 | ||
213 | 95 | def _run_forever(self): | 101 | def _run_forever(self): |
214 | 96 | self.connection = self.connection_factory() | 102 | self.connection = self.connection_factory() |
215 | 103 | self.connection.connect() | ||
216 | 97 | # A successful connection: record this so run_forever won't bail early. | 104 | # A successful connection: record this so run_forever won't bail early. |
217 | 98 | self.went_bad = None | 105 | self.went_bad = None |
218 | 99 | try: | 106 | try: |
219 | @@ -103,7 +110,7 @@ | |||
220 | 103 | self.queue_name, callback=self.handle_report) | 110 | self.queue_name, callback=self.handle_report) |
221 | 104 | try: | 111 | try: |
222 | 105 | while True: | 112 | while True: |
224 | 106 | self.channel.wait() | 113 | self.connection.drain_events(timeout=1) |
225 | 107 | if self.stopping: | 114 | if self.stopping: |
226 | 108 | break | 115 | break |
227 | 109 | finally: | 116 | finally: |
228 | 110 | 117 | ||
229 | === modified file 'oops_amqp/tests/__init__.py' | |||
230 | --- oops_amqp/tests/__init__.py 2011-12-08 10:15:23 +0000 | |||
231 | +++ oops_amqp/tests/__init__.py 2018-03-12 11:56:56 +0000 | |||
232 | @@ -15,16 +15,20 @@ | |||
233 | 15 | 15 | ||
234 | 16 | """Tests for oops_amqp.""" | 16 | """Tests for oops_amqp.""" |
235 | 17 | 17 | ||
236 | 18 | from __future__ import absolute_import, print_function | ||
237 | 19 | |||
238 | 18 | from unittest import TestLoader | 20 | from unittest import TestLoader |
239 | 19 | 21 | ||
241 | 20 | from amqplib import client_0_8 as amqp | 22 | import amqp |
242 | 21 | from fixtures import Fixture | 23 | from fixtures import Fixture |
243 | 22 | from rabbitfixture.server import RabbitServer | 24 | from rabbitfixture.server import RabbitServer |
244 | 23 | import testtools | 25 | import testtools |
245 | 24 | from testresources import ( | 26 | from testresources import ( |
246 | 27 | _get_result, | ||
247 | 25 | FixtureResource, | 28 | FixtureResource, |
248 | 26 | OptimisingTestSuite, | 29 | OptimisingTestSuite, |
250 | 27 | ResourcedTestCase, | 30 | setUpResources, |
251 | 31 | tearDownResources, | ||
252 | 28 | ) | 32 | ) |
253 | 29 | 33 | ||
254 | 30 | from oops_amqp.utils import close_ignoring_connection_errors | 34 | from oops_amqp.utils import close_ignoring_connection_errors |
255 | @@ -94,16 +98,27 @@ | |||
256 | 94 | def setUp(self): | 98 | def setUp(self): |
257 | 95 | super(ChannelFixture, self).setUp() | 99 | super(ChannelFixture, self).setUp() |
258 | 96 | self.connection = self.connection_factory() | 100 | self.connection = self.connection_factory() |
259 | 101 | self.connection.connect() | ||
260 | 97 | self.addCleanup(close_ignoring_connection_errors, self.connection) | 102 | self.addCleanup(close_ignoring_connection_errors, self.connection) |
261 | 98 | self.channel = self.connection.channel() | 103 | self.channel = self.connection.channel() |
262 | 99 | self.addCleanup(close_ignoring_connection_errors, self.channel) | 104 | self.addCleanup(close_ignoring_connection_errors, self.channel) |
263 | 100 | 105 | ||
264 | 101 | 106 | ||
267 | 102 | class TestCase(testtools.TestCase, ResourcedTestCase): | 107 | class TestCase(testtools.TestCase): |
268 | 103 | """Subclass to mix in testresources ResourcedTestCase.""" | 108 | """Subclass to start a RabbitMQ server.""" |
269 | 104 | 109 | ||
270 | 105 | resources = [('rabbit', FixtureResource(RabbitServer()))] | 110 | resources = [('rabbit', FixtureResource(RabbitServer()))] |
271 | 106 | 111 | ||
272 | 112 | def setUp(self): | ||
273 | 113 | super(TestCase, self).setUp() | ||
274 | 114 | # ResourcedTestCase handles teardown in the wrong order for us (we | ||
275 | 115 | # need to ensure that the RabbitServer fixture is only cleaned up | ||
276 | 116 | # after any other fixtures registered by individual tests), so we | ||
277 | 117 | # imitate it manually. | ||
278 | 118 | result = _get_result() | ||
279 | 119 | setUpResources(self, self.resources, result) | ||
280 | 120 | self.addCleanup(tearDownResources, self, self.resources, result) | ||
281 | 121 | |||
282 | 107 | def connection_factory(self): | 122 | def connection_factory(self): |
283 | 108 | """When called, return an amqplib connection.""" | 123 | """When called, return an amqplib connection.""" |
284 | 109 | return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname, | 124 | return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname, |
285 | 110 | 125 | ||
286 | === modified file 'oops_amqp/tests/test_publisher.py' | |||
287 | --- oops_amqp/tests/test_publisher.py 2012-08-10 01:41:44 +0000 | |||
288 | +++ oops_amqp/tests/test_publisher.py 2018-03-12 11:56:56 +0000 | |||
289 | @@ -15,6 +15,8 @@ | |||
290 | 15 | 15 | ||
291 | 16 | """Tests for AMQP publishing.""" | 16 | """Tests for AMQP publishing.""" |
292 | 17 | 17 | ||
293 | 18 | from __future__ import absolute_import, print_function | ||
294 | 19 | |||
295 | 18 | from hashlib import md5 | 20 | from hashlib import md5 |
296 | 19 | 21 | ||
297 | 20 | from oops_amqp import ( | 22 | from oops_amqp import ( |
298 | @@ -47,13 +49,16 @@ | |||
299 | 47 | self.assertEqual(reference_oops, oops) | 49 | self.assertEqual(reference_oops, oops) |
300 | 48 | # The received OOPS should have the ID embedded and be a bson dict. | 50 | # The received OOPS should have the ID embedded and be a bson dict. |
301 | 49 | def check_oops(msg): | 51 | def check_oops(msg): |
303 | 50 | self.assertEqual(reference_oops, bson.loads(msg.body)) | 52 | body = msg.body |
304 | 53 | if not isinstance(body, bytes): | ||
305 | 54 | body = body.encode(msg.content_encoding or 'UTF-8') | ||
306 | 55 | self.assertEqual(reference_oops, bson.loads(body)) | ||
307 | 51 | channel.basic_ack(msg.delivery_tag) | 56 | channel.basic_ack(msg.delivery_tag) |
308 | 52 | channel.basic_cancel(queue.queue_name) | 57 | channel.basic_cancel(queue.queue_name) |
309 | 53 | channel.basic_consume( | 58 | channel.basic_consume( |
310 | 54 | queue.queue_name, callback=check_oops, | 59 | queue.queue_name, callback=check_oops, |
311 | 55 | consumer_tag=queue.queue_name) | 60 | consumer_tag=queue.queue_name) |
313 | 56 | channel.wait() | 61 | channel.connection.drain_events() |
314 | 57 | 62 | ||
315 | 58 | def test_publish(self): | 63 | def test_publish(self): |
316 | 59 | # Publishing an oops sends it to the exchange, making a connection as | 64 | # Publishing an oops sends it to the exchange, making a connection as |
317 | @@ -75,13 +80,16 @@ | |||
318 | 75 | expected_oops = dict(reference_oops) | 80 | expected_oops = dict(reference_oops) |
319 | 76 | expected_oops['id'] = oops_ids[0] | 81 | expected_oops['id'] = oops_ids[0] |
320 | 77 | def check_oops(msg): | 82 | def check_oops(msg): |
322 | 78 | self.assertEqual(expected_oops, bson.loads(msg.body)) | 83 | body = msg.body |
323 | 84 | if not isinstance(body, bytes): | ||
324 | 85 | body = body.encode(msg.content_encoding or 'UTF-8') | ||
325 | 86 | self.assertEqual(expected_oops, bson.loads(body)) | ||
326 | 79 | channel.basic_ack(msg.delivery_tag) | 87 | channel.basic_ack(msg.delivery_tag) |
327 | 80 | channel.basic_cancel(queue.queue_name) | 88 | channel.basic_cancel(queue.queue_name) |
328 | 81 | channel.basic_consume( | 89 | channel.basic_consume( |
329 | 82 | queue.queue_name, callback=check_oops, | 90 | queue.queue_name, callback=check_oops, |
330 | 83 | consumer_tag=queue.queue_name) | 91 | consumer_tag=queue.queue_name) |
332 | 84 | channel.wait() | 92 | channel.connection.drain_events() |
333 | 85 | 93 | ||
334 | 86 | def test_publish_amqp_already_down(self): | 94 | def test_publish_amqp_already_down(self): |
335 | 87 | # If amqp is down when a connection is attempted, None is returned to | 95 | # If amqp is down when a connection is attempted, None is returned to |
336 | @@ -101,7 +109,9 @@ | |||
337 | 101 | self.assertEqual([], publisher(oops)) | 109 | self.assertEqual([], publisher(oops)) |
338 | 102 | finally: | 110 | finally: |
339 | 103 | self.rabbit.runner._start() | 111 | self.rabbit.runner._start() |
341 | 104 | queue.channel = self.connection_factory().channel() | 112 | connection = self.connection_factory() |
342 | 113 | connection.connect() | ||
343 | 114 | queue.channel = connection.channel() | ||
344 | 105 | self.assertNotEqual([], publisher(oops)) | 115 | self.assertNotEqual([], publisher(oops)) |
345 | 106 | 116 | ||
346 | 107 | def test_publish_amqp_down_after_use(self): | 117 | def test_publish_amqp_down_after_use(self): |
347 | @@ -122,6 +132,8 @@ | |||
348 | 122 | self.assertEqual([], publisher(oops)) | 132 | self.assertEqual([], publisher(oops)) |
349 | 123 | finally: | 133 | finally: |
350 | 124 | self.rabbit.runner._start() | 134 | self.rabbit.runner._start() |
352 | 125 | queue.channel = self.connection_factory().channel() | 135 | connection = self.connection_factory() |
353 | 136 | connection.connect() | ||
354 | 137 | queue.channel = connection.channel() | ||
355 | 126 | self.assertNotEqual([], publisher(oops)) | 138 | self.assertNotEqual([], publisher(oops)) |
356 | 127 | 139 | ||
357 | 128 | 140 | ||
358 | === modified file 'oops_amqp/tests/test_receiver.py' | |||
359 | --- oops_amqp/tests/test_receiver.py 2012-08-10 01:41:44 +0000 | |||
360 | +++ oops_amqp/tests/test_receiver.py 2018-03-12 11:56:56 +0000 | |||
361 | @@ -15,11 +15,14 @@ | |||
362 | 15 | 15 | ||
363 | 16 | """Tests for AMQP receiving.""" | 16 | """Tests for AMQP receiving.""" |
364 | 17 | 17 | ||
365 | 18 | from __future__ import absolute_import, print_function | ||
366 | 19 | |||
367 | 18 | import errno | 20 | import errno |
368 | 19 | import socket | 21 | import socket |
369 | 20 | 22 | ||
371 | 21 | from amqplib import client_0_8 as amqp | 23 | import amqp |
372 | 22 | from oops import Config | 24 | from oops import Config |
373 | 25 | import six | ||
374 | 23 | 26 | ||
375 | 24 | from oops_amqp import ( | 27 | from oops_amqp import ( |
376 | 25 | anybson as bson, | 28 | anybson as bson, |
377 | @@ -47,7 +50,7 @@ | |||
378 | 47 | queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) | 50 | queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
379 | 48 | channel.basic_publish( | 51 | channel.basic_publish( |
380 | 49 | message, queue.exchange_name, routing_key="") | 52 | message, queue.exchange_name, routing_key="") |
382 | 50 | sentinel = "xxx" | 53 | sentinel = b"xxx" |
383 | 51 | channel.basic_publish( | 54 | channel.basic_publish( |
384 | 52 | amqp.Message(sentinel), queue.exchange_name, routing_key="") | 55 | amqp.Message(sentinel), queue.exchange_name, routing_key="") |
385 | 53 | config = Config() | 56 | config = Config() |
386 | @@ -81,9 +84,9 @@ | |||
387 | 81 | def new_channel(): | 84 | def new_channel(): |
388 | 82 | result = old_channel() | 85 | result = old_channel() |
389 | 83 | old_wait = result.wait | 86 | old_wait = result.wait |
391 | 84 | def new_wait(allowed_methods=None): | 87 | def new_wait(*args, **kwargs): |
392 | 85 | receiver.stopping = True | 88 | receiver.stopping = True |
394 | 86 | return old_wait(allowed_methods=allowed_methods) | 89 | return old_wait(*args, **kwargs) |
395 | 87 | result.wait = new_wait | 90 | result.wait = new_wait |
396 | 88 | return result | 91 | return result |
397 | 89 | connection.channel = new_channel | 92 | connection.channel = new_channel |
398 | @@ -93,8 +96,7 @@ | |||
399 | 93 | self.assertEqual([expected_report], reports) | 96 | self.assertEqual([expected_report], reports) |
400 | 94 | 97 | ||
401 | 95 | def test_run_forever(self): | 98 | def test_run_forever(self): |
404 | 96 | # run_forever subscribes and then calls wait in a loop. | 99 | # run_forever subscribes and then calls drain_events in a loop. |
403 | 97 | config = None | ||
405 | 98 | calls = [] | 100 | calls = [] |
406 | 99 | class FakeChannel: | 101 | class FakeChannel: |
407 | 100 | def __init__(self, calls): | 102 | def __init__(self, calls): |
408 | @@ -103,25 +105,29 @@ | |||
409 | 103 | def basic_consume(self, queue_name, callback=None): | 105 | def basic_consume(self, queue_name, callback=None): |
410 | 104 | self.calls.append(('basic_consume', queue_name, callback)) | 106 | self.calls.append(('basic_consume', queue_name, callback)) |
411 | 105 | return 'tag' | 107 | return 'tag' |
412 | 106 | def wait(self): | ||
413 | 107 | self.calls.append(('wait',)) | ||
414 | 108 | if len(self.calls) > 2: | ||
415 | 109 | receiver.stopping = True | ||
416 | 110 | def basic_cancel(self, tag): | 108 | def basic_cancel(self, tag): |
417 | 111 | self.calls.append(('basic_cancel', tag)) | 109 | self.calls.append(('basic_cancel', tag)) |
418 | 112 | def close(self): | 110 | def close(self): |
419 | 113 | self.is_open = False | 111 | self.is_open = False |
420 | 114 | class FakeConnection: | 112 | class FakeConnection: |
421 | 113 | def __init__(self, calls): | ||
422 | 114 | self.calls = calls | ||
423 | 115 | def connect(self): | ||
424 | 116 | pass | ||
425 | 115 | def channel(self): | 117 | def channel(self): |
426 | 116 | return FakeChannel(calls) | 118 | return FakeChannel(calls) |
427 | 119 | def drain_events(self, timeout=None): | ||
428 | 120 | self.calls.append(('drain_events', timeout)) | ||
429 | 121 | if len(self.calls) > 2: | ||
430 | 122 | receiver.stopping = True | ||
431 | 117 | def close(self): | 123 | def close(self): |
432 | 118 | pass | 124 | pass |
434 | 119 | receiver = Receiver(None, FakeConnection, 'foo') | 125 | receiver = Receiver(None, lambda: FakeConnection(calls), 'foo') |
435 | 120 | receiver.run_forever() | 126 | receiver.run_forever() |
436 | 121 | self.assertEqual( | 127 | self.assertEqual( |
437 | 122 | [('basic_consume', 'foo', receiver.handle_report), | 128 | [('basic_consume', 'foo', receiver.handle_report), |
440 | 123 | ('wait',), | 129 | ('drain_events', 1), |
441 | 124 | ('wait',), | 130 | ('drain_events', 1), |
442 | 125 | ('basic_cancel', 'tag')], | 131 | ('basic_cancel', 'tag')], |
443 | 126 | calls) | 132 | calls) |
444 | 127 | 133 | ||
445 | @@ -146,7 +152,7 @@ | |||
446 | 146 | state = {} | 152 | state = {} |
447 | 147 | def error_once(func): | 153 | def error_once(func): |
448 | 148 | def wrapped(*args, **kwargs): | 154 | def wrapped(*args, **kwargs): |
450 | 149 | func_ref = func.func_code | 155 | func_ref = six.get_function_code(func) |
451 | 150 | if func_ref in state: | 156 | if func_ref in state: |
452 | 151 | return func(*args, **kwargs) | 157 | return func(*args, **kwargs) |
453 | 152 | else: | 158 | else: |
454 | @@ -162,17 +168,17 @@ | |||
455 | 162 | @error_once | 168 | @error_once |
456 | 163 | def new_channel(): | 169 | def new_channel(): |
457 | 164 | result = old_channel() | 170 | result = old_channel() |
458 | 165 | result.wait = error_once(result.wait) | ||
459 | 166 | result.basic_consume = error_once(result.basic_consume) | 171 | result.basic_consume = error_once(result.basic_consume) |
460 | 167 | result.basic_cancel = error_once(result.basic_cancel) | 172 | result.basic_cancel = error_once(result.basic_cancel) |
461 | 168 | result.close = error_once(result.close) | 173 | result.close = error_once(result.close) |
462 | 169 | return result | 174 | return result |
463 | 170 | connection.channel = new_channel | 175 | connection.channel = new_channel |
464 | 176 | connection.drain_events = error_once(connection.drain_events) | ||
465 | 171 | connection.close = error_once(connection.close) | 177 | connection.close = error_once(connection.close) |
466 | 172 | return connection | 178 | return connection |
467 | 173 | receiver = Receiver(config, patching_factory, queue.queue_name) | 179 | receiver = Receiver(config, patching_factory, queue.queue_name) |
469 | 174 | receiver.sentinel = "arhh" | 180 | receiver.sentinel = b"arhh" |
470 | 175 | channel.basic_publish( | 181 | channel.basic_publish( |
472 | 176 | amqp.Message("arhh"), queue.exchange_name, routing_key="") | 182 | amqp.Message(b"arhh"), queue.exchange_name, routing_key="") |
473 | 177 | receiver.run_forever() | 183 | receiver.run_forever() |
474 | 178 | self.assertEqual([expected_report], reports) | 184 | self.assertEqual([expected_report], reports) |
475 | 179 | 185 | ||
476 | === modified file 'oops_amqp/trace.py' | |||
477 | --- oops_amqp/trace.py 2012-08-10 02:56:01 +0000 | |||
478 | +++ oops_amqp/trace.py 2018-03-12 11:56:56 +0000 | |||
479 | @@ -15,17 +15,17 @@ | |||
480 | 15 | 15 | ||
481 | 16 | """Trace OOPS reports coming from an AMQP queue.""" | 16 | """Trace OOPS reports coming from an AMQP queue.""" |
482 | 17 | 17 | ||
483 | 18 | from __future__ import absolute_import, print_function | ||
484 | 19 | |||
485 | 18 | from functools import partial | 20 | from functools import partial |
486 | 19 | import sys | 21 | import sys |
487 | 20 | import optparse | 22 | import optparse |
488 | 21 | from textwrap import dedent | 23 | from textwrap import dedent |
489 | 22 | 24 | ||
491 | 23 | import amqplib.client_0_8 as amqp | 25 | import amqp |
492 | 24 | import oops | 26 | import oops |
493 | 25 | import oops_amqp | 27 | import oops_amqp |
494 | 26 | 28 | ||
495 | 27 | import anybson as bson | ||
496 | 28 | |||
497 | 29 | 29 | ||
498 | 30 | def main(argv=None): | 30 | def main(argv=None): |
499 | 31 | if argv is None: | 31 | if argv is None: |
500 | 32 | 32 | ||
501 | === modified file 'oops_amqp/utils.py' | |||
502 | --- oops_amqp/utils.py 2011-12-08 10:15:23 +0000 | |||
503 | +++ oops_amqp/utils.py 2018-03-12 11:56:56 +0000 | |||
504 | @@ -15,10 +15,11 @@ | |||
505 | 15 | 15 | ||
506 | 16 | """Utility functions for oops_amqp.""" | 16 | """Utility functions for oops_amqp.""" |
507 | 17 | 17 | ||
509 | 18 | import errno | 18 | from __future__ import absolute_import, print_function |
510 | 19 | |||
511 | 19 | import socket | 20 | import socket |
512 | 20 | 21 | ||
514 | 21 | from amqplib.client_0_8.exceptions import AMQPConnectionException | 22 | from amqp.exceptions import ConnectionError |
515 | 22 | 23 | ||
516 | 23 | __all__ = [ | 24 | __all__ = [ |
517 | 24 | 'amqplib_error_types', | 25 | 'amqplib_error_types', |
518 | @@ -30,7 +31,7 @@ | |||
519 | 30 | # These exception types always indicate an AMQP connection error/closure. | 31 | # These exception types always indicate an AMQP connection error/closure. |
520 | 31 | # However you should catch amqplib_error_types and post-filter with | 32 | # However you should catch amqplib_error_types and post-filter with |
521 | 32 | # is_amqplib_connection_error. | 33 | # is_amqplib_connection_error. |
523 | 33 | amqplib_connection_errors = (socket.error, AMQPConnectionException) | 34 | amqplib_connection_errors = (socket.error, ConnectionError) |
524 | 34 | # A tuple to reduce duplication in different code paths. Lists the types of | 35 | # A tuple to reduce duplication in different code paths. Lists the types of |
525 | 35 | # exceptions legitimately raised by amqplib when the AMQP server goes down. | 36 | # exceptions legitimately raised by amqplib when the AMQP server goes down. |
526 | 36 | # Not all exceptions *will* be such errors - use is_amqplib_connection_error to | 37 | # Not all exceptions *will* be such errors - use is_amqplib_connection_error to |
527 | @@ -41,7 +42,7 @@ | |||
528 | 41 | def close_ignoring_connection_errors(closable): | 42 | def close_ignoring_connection_errors(closable): |
529 | 42 | try: | 43 | try: |
530 | 43 | return closable.close() | 44 | return closable.close() |
532 | 44 | except amqplib_error_types, e: | 45 | except amqplib_error_types as e: |
533 | 45 | if is_amqplib_connection_error(e): | 46 | if is_amqplib_connection_error(e): |
534 | 46 | return | 47 | return |
535 | 47 | raise | 48 | raise |
536 | 48 | 49 | ||
537 | === modified file 'setup.py' | |||
538 | --- setup.py 2015-10-02 16:25:32 +0000 | |||
539 | +++ setup.py 2018-03-12 11:56:56 +0000 | |||
540 | @@ -19,8 +19,8 @@ | |||
541 | 19 | from distutils.core import setup | 19 | from distutils.core import setup |
542 | 20 | import os.path | 20 | import os.path |
543 | 21 | 21 | ||
546 | 22 | description = file( | 22 | with open(os.path.join(os.path.dirname(__file__), 'README')) as f: |
547 | 23 | os.path.join(os.path.dirname(__file__), 'README'), 'rb').read() | 23 | description = f.read() |
548 | 24 | 24 | ||
549 | 25 | setup(name="oops_amqp", | 25 | setup(name="oops_amqp", |
550 | 26 | version="0.0.8b1", | 26 | version="0.0.8b1", |
551 | @@ -38,15 +38,18 @@ | |||
552 | 38 | 'License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)', | 38 | 'License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)', |
553 | 39 | 'Operating System :: OS Independent', | 39 | 'Operating System :: OS Independent', |
554 | 40 | 'Programming Language :: Python', | 40 | 'Programming Language :: Python', |
555 | 41 | 'Programming Language :: Python :: 2', | ||
556 | 42 | 'Programming Language :: Python :: 3', | ||
557 | 41 | ], | 43 | ], |
558 | 42 | install_requires = [ | 44 | install_requires = [ |
559 | 43 | 'bson', | 45 | 'bson', |
560 | 44 | 'oops>=0.0.11', | 46 | 'oops>=0.0.11', |
562 | 45 | 'amqplib', | 47 | 'amqp>=2.0.0', |
563 | 46 | ], | 48 | ], |
564 | 47 | extras_require = dict( | 49 | extras_require = dict( |
565 | 48 | test=[ | 50 | test=[ |
566 | 49 | 'rabbitfixture', | 51 | 'rabbitfixture', |
567 | 52 | 'six', | ||
568 | 50 | 'testresources', | 53 | 'testresources', |
569 | 51 | 'testtools', | 54 | 'testtools', |
570 | 52 | ] | 55 | ] |
571 | 53 | 56 | ||
572 | === modified file 'versions.cfg' | |||
573 | --- versions.cfg 2012-08-10 01:41:44 +0000 | |||
574 | +++ versions.cfg 2018-03-12 11:56:56 +0000 | |||
575 | @@ -2,7 +2,7 @@ | |||
576 | 2 | versions = versions | 2 | versions = versions |
577 | 3 | 3 | ||
578 | 4 | [versions] | 4 | [versions] |
580 | 5 | amqplib = 0.6.1 | 5 | amqp = 2.2.2 |
581 | 6 | bson = 0.3.2 | 6 | bson = 0.3.2 |
582 | 7 | fixtures = 0.3.6 | 7 | fixtures = 0.3.6 |
583 | 8 | iso8601 = 0.1.4 | 8 | iso8601 = 0.1.4 |
584 | @@ -11,8 +11,10 @@ | |||
585 | 11 | pytz = 2010o | 11 | pytz = 2010o |
586 | 12 | rabbitfixture = 0.3.2 | 12 | rabbitfixture = 0.3.2 |
587 | 13 | setuptools = 0.6c11 | 13 | setuptools = 0.6c11 |
588 | 14 | six = 1.11.0 | ||
589 | 14 | testresources = 0.2.4-r58 | 15 | testresources = 0.2.4-r58 |
590 | 15 | testtools = 0.9.12-r228 | 16 | testtools = 0.9.12-r228 |
591 | 17 | vine = 1.1.4 | ||
592 | 16 | zc.recipe.egg = 1.3.2 | 18 | zc.recipe.egg = 1.3.2 |
593 | 17 | z3c.recipe.filetemplate = 2.1.0 | 19 | z3c.recipe.filetemplate = 2.1.0 |
594 | 18 | z3c.recipe.scripts = 1.0.1 | 20 | z3c.recipe.scripts = 1.0.1 |