Merge lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721 into lp:~ubuntu-cloud-archive/ubuntu/trusty/oslo.messaging/juno
- Trusty (14.04)
- juno-lp1318721
- Merge into juno
Proposed by
Xiang Hui
Status: | Needs review | ||||
---|---|---|---|---|---|
Proposed branch: | lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721 | ||||
Merge into: | lp:~ubuntu-cloud-archive/ubuntu/trusty/oslo.messaging/juno | ||||
Diff against target: |
9657 lines (+9295/-23) 29 files modified
.pc/.quilt_patches (+1/-0) .pc/.quilt_series (+1/-0) .pc/.version (+1/-0) .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py (+444/-0) .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py (+349/-0) .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py (+821/-0) .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py (+374/-0) .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py (+49/-0) .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py (+729/-0) .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py (+832/-0) .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py (+735/-0) .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py (+839/-0) .pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py (+374/-0) .pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py (+839/-0) .pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/drivers/test_impl_rabbit.py (+756/-0) .pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py (+64/-0) .pc/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging/_drivers/impl_rabbit.py (+865/-0) .pc/applied-patches (+8/-0) .pc/zmq-server-routing.patch/oslo/messaging/_drivers/impl_zmq.py (+995/-0) debian/changelog (+8/-0) debian/patches/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch (+24/-0) debian/patches/series (+1/-0) oslo/messaging/_drivers/amqpdriver.py (+16/-8) oslo/messaging/_drivers/common.py (+26/-0) oslo/messaging/_drivers/impl_qpid.py (+0/-1) oslo/messaging/_drivers/impl_rabbit.py (+69/-12) oslo/messaging/_drivers/impl_zmq.py (+2/-0) tests/drivers/test_impl_rabbit.py (+41/-2) tests/test_utils.py (+32/-0) |
||||
To merge this branch: | bzr merge lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721 | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Cloud Archive Team | Pending | ||
Review via email: mp+280822@code.launchpad.net |
Commit message
Description of the change
* Backport of upstream release. (LP: #1318721):-
- d/p/0007-
Redeclare if exception is catched after self.queue.
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added directory '.pc' | |||
2 | === added file '.pc/.quilt_patches' | |||
3 | --- .pc/.quilt_patches 1970-01-01 00:00:00 +0000 | |||
4 | +++ .pc/.quilt_patches 2015-12-17 11:03:34 +0000 | |||
5 | @@ -0,0 +1,1 @@ | |||
6 | 1 | debian/patches | ||
7 | 0 | 2 | ||
8 | === added file '.pc/.quilt_series' | |||
9 | --- .pc/.quilt_series 1970-01-01 00:00:00 +0000 | |||
10 | +++ .pc/.quilt_series 2015-12-17 11:03:34 +0000 | |||
11 | @@ -0,0 +1,1 @@ | |||
12 | 1 | series | ||
13 | 0 | 2 | ||
14 | === added file '.pc/.version' | |||
15 | --- .pc/.version 1970-01-01 00:00:00 +0000 | |||
16 | +++ .pc/.version 2015-12-17 11:03:34 +0000 | |||
17 | @@ -0,0 +1,1 @@ | |||
18 | 1 | 2 | ||
19 | 0 | 2 | ||
20 | === added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch' | |||
21 | === added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/.timestamp' | |||
22 | === added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo' | |||
23 | === added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging' | |||
24 | === added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers' | |||
25 | === added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py' | |||
26 | --- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py 1970-01-01 00:00:00 +0000 | |||
27 | +++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py 2015-12-17 11:03:34 +0000 | |||
28 | @@ -0,0 +1,444 @@ | |||
29 | 1 | |||
30 | 2 | # Copyright 2013 Red Hat, Inc. | ||
31 | 3 | # | ||
32 | 4 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
33 | 5 | # not use this file except in compliance with the License. You may obtain | ||
34 | 6 | # a copy of the License at | ||
35 | 7 | # | ||
36 | 8 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
37 | 9 | # | ||
38 | 10 | # Unless required by applicable law or agreed to in writing, software | ||
39 | 11 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
40 | 12 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
41 | 13 | # License for the specific language governing permissions and limitations | ||
42 | 14 | # under the License. | ||
43 | 15 | |||
44 | 16 | __all__ = ['AMQPDriverBase'] | ||
45 | 17 | |||
46 | 18 | import logging | ||
47 | 19 | import threading | ||
48 | 20 | import time | ||
49 | 21 | import uuid | ||
50 | 22 | |||
51 | 23 | from six import moves | ||
52 | 24 | |||
53 | 25 | from oslo import messaging | ||
54 | 26 | from oslo.messaging._drivers import amqp as rpc_amqp | ||
55 | 27 | from oslo.messaging._drivers import base | ||
56 | 28 | from oslo.messaging._drivers import common as rpc_common | ||
57 | 29 | |||
58 | 30 | LOG = logging.getLogger(__name__) | ||
59 | 31 | |||
60 | 32 | |||
61 | 33 | class AMQPIncomingMessage(base.IncomingMessage): | ||
62 | 34 | |||
63 | 35 | def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q): | ||
64 | 36 | super(AMQPIncomingMessage, self).__init__(listener, ctxt, | ||
65 | 37 | dict(message)) | ||
66 | 38 | |||
67 | 39 | self.unique_id = unique_id | ||
68 | 40 | self.msg_id = msg_id | ||
69 | 41 | self.reply_q = reply_q | ||
70 | 42 | self.acknowledge_callback = message.acknowledge | ||
71 | 43 | self.requeue_callback = message.requeue | ||
72 | 44 | |||
73 | 45 | def _send_reply(self, conn, reply=None, failure=None, | ||
74 | 46 | ending=False, log_failure=True): | ||
75 | 47 | if failure: | ||
76 | 48 | failure = rpc_common.serialize_remote_exception(failure, | ||
77 | 49 | log_failure) | ||
78 | 50 | |||
79 | 51 | msg = {'result': reply, 'failure': failure} | ||
80 | 52 | if ending: | ||
81 | 53 | msg['ending'] = True | ||
82 | 54 | |||
83 | 55 | rpc_amqp._add_unique_id(msg) | ||
84 | 56 | |||
85 | 57 | # If a reply_q exists, add the msg_id to the reply and pass the | ||
86 | 58 | # reply_q to direct_send() to use it as the response queue. | ||
87 | 59 | # Otherwise use the msg_id for backward compatibility. | ||
88 | 60 | if self.reply_q: | ||
89 | 61 | msg['_msg_id'] = self.msg_id | ||
90 | 62 | conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) | ||
91 | 63 | else: | ||
92 | 64 | conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) | ||
93 | 65 | |||
94 | 66 | def reply(self, reply=None, failure=None, log_failure=True): | ||
95 | 67 | if not self.msg_id: | ||
96 | 68 | # NOTE(Alexei_987) not sending reply, if msg_id is empty | ||
97 | 69 | # because reply should not be expected by caller side | ||
98 | 70 | return | ||
99 | 71 | with self.listener.driver._get_connection() as conn: | ||
100 | 72 | self._send_reply(conn, reply, failure, log_failure=log_failure) | ||
101 | 73 | self._send_reply(conn, ending=True) | ||
102 | 74 | |||
103 | 75 | def acknowledge(self): | ||
104 | 76 | self.listener.msg_id_cache.add(self.unique_id) | ||
105 | 77 | self.acknowledge_callback() | ||
106 | 78 | |||
107 | 79 | def requeue(self): | ||
108 | 80 | # NOTE(sileht): In case of the connection is lost between receiving the | ||
109 | 81 | # message and requeing it, this requeue call fail | ||
110 | 82 | # but because the message is not acknowledged and not added to the | ||
111 | 83 | # msg_id_cache, the message will be reconsumed, the only difference is | ||
112 | 84 | # the message stay at the beginning of the queue instead of moving to | ||
113 | 85 | # the end. | ||
114 | 86 | self.requeue_callback() | ||
115 | 87 | |||
116 | 88 | |||
117 | 89 | class AMQPListener(base.Listener): | ||
118 | 90 | |||
119 | 91 | def __init__(self, driver, conn): | ||
120 | 92 | super(AMQPListener, self).__init__(driver) | ||
121 | 93 | self.conn = conn | ||
122 | 94 | self.msg_id_cache = rpc_amqp._MsgIdCache() | ||
123 | 95 | self.incoming = [] | ||
124 | 96 | |||
125 | 97 | def __call__(self, message): | ||
126 | 98 | # FIXME(markmc): logging isn't driver specific | ||
127 | 99 | rpc_common._safe_log(LOG.debug, 'received %s', dict(message)) | ||
128 | 100 | |||
129 | 101 | unique_id = self.msg_id_cache.check_duplicate_message(message) | ||
130 | 102 | ctxt = rpc_amqp.unpack_context(self.conf, message) | ||
131 | 103 | |||
132 | 104 | self.incoming.append(AMQPIncomingMessage(self, | ||
133 | 105 | ctxt.to_dict(), | ||
134 | 106 | message, | ||
135 | 107 | unique_id, | ||
136 | 108 | ctxt.msg_id, | ||
137 | 109 | ctxt.reply_q)) | ||
138 | 110 | |||
139 | 111 | def poll(self, timeout=None): | ||
140 | 112 | if timeout is not None: | ||
141 | 113 | deadline = time.time() + timeout | ||
142 | 114 | else: | ||
143 | 115 | deadline = None | ||
144 | 116 | while True: | ||
145 | 117 | if self.incoming: | ||
146 | 118 | return self.incoming.pop(0) | ||
147 | 119 | if deadline is not None: | ||
148 | 120 | timeout = deadline - time.time() | ||
149 | 121 | if timeout < 0: | ||
150 | 122 | return None | ||
151 | 123 | try: | ||
152 | 124 | self.conn.consume(limit=1, timeout=timeout) | ||
153 | 125 | except rpc_common.Timeout: | ||
154 | 126 | return None | ||
155 | 127 | else: | ||
156 | 128 | self.conn.consume(limit=1) | ||
157 | 129 | |||
158 | 130 | |||
159 | 131 | class ReplyWaiters(object): | ||
160 | 132 | |||
161 | 133 | WAKE_UP = object() | ||
162 | 134 | |||
163 | 135 | def __init__(self): | ||
164 | 136 | self._queues = {} | ||
165 | 137 | self._wrn_threshold = 10 | ||
166 | 138 | |||
167 | 139 | def get(self, msg_id, timeout): | ||
168 | 140 | try: | ||
169 | 141 | return self._queues[msg_id].get(block=True, timeout=timeout) | ||
170 | 142 | except moves.queue.Empty: | ||
171 | 143 | raise messaging.MessagingTimeout('Timed out waiting for a reply ' | ||
172 | 144 | 'to message ID %s' % msg_id) | ||
173 | 145 | |||
174 | 146 | def check(self, msg_id): | ||
175 | 147 | try: | ||
176 | 148 | return self._queues[msg_id].get(block=False) | ||
177 | 149 | except moves.queue.Empty: | ||
178 | 150 | return None | ||
179 | 151 | |||
180 | 152 | def put(self, msg_id, message_data): | ||
181 | 153 | queue = self._queues.get(msg_id) | ||
182 | 154 | if not queue: | ||
183 | 155 | LOG.warn('No calling threads waiting for msg_id : %(msg_id)s' | ||
184 | 156 | ', message : %(data)s', {'msg_id': msg_id, | ||
185 | 157 | 'data': message_data}) | ||
186 | 158 | LOG.warn('_queues: %s', self._queues) | ||
187 | 159 | else: | ||
188 | 160 | queue.put(message_data) | ||
189 | 161 | |||
190 | 162 | def wake_all(self, except_id): | ||
191 | 163 | msg_ids = [i for i in self._queues.keys() if i != except_id] | ||
192 | 164 | for msg_id in msg_ids: | ||
193 | 165 | self.put(msg_id, self.WAKE_UP) | ||
194 | 166 | |||
195 | 167 | def add(self, msg_id, queue): | ||
196 | 168 | self._queues[msg_id] = queue | ||
197 | 169 | if len(self._queues) > self._wrn_threshold: | ||
198 | 170 | LOG.warn('Number of call queues is greater than warning ' | ||
199 | 171 | 'threshold: %d. There could be a leak.', | ||
200 | 172 | self._wrn_threshold) | ||
201 | 173 | self._wrn_threshold *= 2 | ||
202 | 174 | |||
203 | 175 | def remove(self, msg_id): | ||
204 | 176 | del self._queues[msg_id] | ||
205 | 177 | |||
206 | 178 | |||
207 | 179 | class ReplyWaiter(object): | ||
208 | 180 | |||
209 | 181 | def __init__(self, conf, reply_q, conn, allowed_remote_exmods): | ||
210 | 182 | self.conf = conf | ||
211 | 183 | self.conn = conn | ||
212 | 184 | self.reply_q = reply_q | ||
213 | 185 | self.allowed_remote_exmods = allowed_remote_exmods | ||
214 | 186 | |||
215 | 187 | self.conn_lock = threading.Lock() | ||
216 | 188 | self.incoming = [] | ||
217 | 189 | self.msg_id_cache = rpc_amqp._MsgIdCache() | ||
218 | 190 | self.waiters = ReplyWaiters() | ||
219 | 191 | |||
220 | 192 | conn.declare_direct_consumer(reply_q, self) | ||
221 | 193 | |||
222 | 194 | def __call__(self, message): | ||
223 | 195 | message.acknowledge() | ||
224 | 196 | self.incoming.append(message) | ||
225 | 197 | |||
226 | 198 | def listen(self, msg_id): | ||
227 | 199 | queue = moves.queue.Queue() | ||
228 | 200 | self.waiters.add(msg_id, queue) | ||
229 | 201 | |||
230 | 202 | def unlisten(self, msg_id): | ||
231 | 203 | self.waiters.remove(msg_id) | ||
232 | 204 | |||
233 | 205 | def _process_reply(self, data): | ||
234 | 206 | result = None | ||
235 | 207 | ending = False | ||
236 | 208 | self.msg_id_cache.check_duplicate_message(data) | ||
237 | 209 | if data['failure']: | ||
238 | 210 | failure = data['failure'] | ||
239 | 211 | result = rpc_common.deserialize_remote_exception( | ||
240 | 212 | failure, self.allowed_remote_exmods) | ||
241 | 213 | elif data.get('ending', False): | ||
242 | 214 | ending = True | ||
243 | 215 | else: | ||
244 | 216 | result = data['result'] | ||
245 | 217 | return result, ending | ||
246 | 218 | |||
247 | 219 | def _poll_connection(self, msg_id, timeout): | ||
248 | 220 | while True: | ||
249 | 221 | while self.incoming: | ||
250 | 222 | message_data = self.incoming.pop(0) | ||
251 | 223 | |||
252 | 224 | incoming_msg_id = message_data.pop('_msg_id', None) | ||
253 | 225 | if incoming_msg_id == msg_id: | ||
254 | 226 | return self._process_reply(message_data) | ||
255 | 227 | |||
256 | 228 | self.waiters.put(incoming_msg_id, message_data) | ||
257 | 229 | |||
258 | 230 | try: | ||
259 | 231 | self.conn.consume(limit=1, timeout=timeout) | ||
260 | 232 | except rpc_common.Timeout: | ||
261 | 233 | raise messaging.MessagingTimeout('Timed out waiting for a ' | ||
262 | 234 | 'reply to message ID %s' | ||
263 | 235 | % msg_id) | ||
264 | 236 | |||
265 | 237 | def _poll_queue(self, msg_id, timeout): | ||
266 | 238 | message = self.waiters.get(msg_id, timeout) | ||
267 | 239 | if message is self.waiters.WAKE_UP: | ||
268 | 240 | return None, None, True # lock was released | ||
269 | 241 | |||
270 | 242 | reply, ending = self._process_reply(message) | ||
271 | 243 | return reply, ending, False | ||
272 | 244 | |||
273 | 245 | def _check_queue(self, msg_id): | ||
274 | 246 | while True: | ||
275 | 247 | message = self.waiters.check(msg_id) | ||
276 | 248 | if message is self.waiters.WAKE_UP: | ||
277 | 249 | continue | ||
278 | 250 | if message is None: | ||
279 | 251 | return None, None, True # queue is empty | ||
280 | 252 | |||
281 | 253 | reply, ending = self._process_reply(message) | ||
282 | 254 | return reply, ending, False | ||
283 | 255 | |||
284 | 256 | def wait(self, msg_id, timeout): | ||
285 | 257 | # | ||
286 | 258 | # NOTE(markmc): we're waiting for a reply for msg_id to come in for on | ||
287 | 259 | # the reply_q, but there may be other threads also waiting for replies | ||
288 | 260 | # to other msg_ids | ||
289 | 261 | # | ||
290 | 262 | # Only one thread can be consuming from the queue using this connection | ||
291 | 263 | # and we don't want to hold open a connection per thread, so instead we | ||
292 | 264 | # have the first thread take responsibility for passing replies not | ||
293 | 265 | # intended for itself to the appropriate thread. | ||
294 | 266 | # | ||
295 | 267 | final_reply = None | ||
296 | 268 | while True: | ||
297 | 269 | if self.conn_lock.acquire(False): | ||
298 | 270 | # Ok, we're the thread responsible for polling the connection | ||
299 | 271 | try: | ||
300 | 272 | # Check the queue to see if a previous lock-holding thread | ||
301 | 273 | # queued up a reply already | ||
302 | 274 | while True: | ||
303 | 275 | reply, ending, empty = self._check_queue(msg_id) | ||
304 | 276 | if empty: | ||
305 | 277 | break | ||
306 | 278 | if not ending: | ||
307 | 279 | final_reply = reply | ||
308 | 280 | else: | ||
309 | 281 | return final_reply | ||
310 | 282 | |||
311 | 283 | # Now actually poll the connection | ||
312 | 284 | while True: | ||
313 | 285 | reply, ending = self._poll_connection(msg_id, timeout) | ||
314 | 286 | if not ending: | ||
315 | 287 | final_reply = reply | ||
316 | 288 | else: | ||
317 | 289 | return final_reply | ||
318 | 290 | finally: | ||
319 | 291 | self.conn_lock.release() | ||
320 | 292 | # We've got our reply, tell the other threads to wake up | ||
321 | 293 | # so that one of them will take over the responsibility for | ||
322 | 294 | # polling the connection | ||
323 | 295 | self.waiters.wake_all(msg_id) | ||
324 | 296 | else: | ||
325 | 297 | # We're going to wait for the first thread to pass us our reply | ||
326 | 298 | reply, ending, trylock = self._poll_queue(msg_id, timeout) | ||
327 | 299 | if trylock: | ||
328 | 300 | # The first thread got its reply, let's try and take over | ||
329 | 301 | # the responsibility for polling | ||
330 | 302 | continue | ||
331 | 303 | if not ending: | ||
332 | 304 | final_reply = reply | ||
333 | 305 | else: | ||
334 | 306 | return final_reply | ||
335 | 307 | |||
336 | 308 | |||
337 | 309 | class AMQPDriverBase(base.BaseDriver): | ||
338 | 310 | |||
339 | 311 | def __init__(self, conf, url, connection_pool, | ||
340 | 312 | default_exchange=None, allowed_remote_exmods=None): | ||
341 | 313 | super(AMQPDriverBase, self).__init__(conf, url, default_exchange, | ||
342 | 314 | allowed_remote_exmods) | ||
343 | 315 | |||
344 | 316 | self._default_exchange = default_exchange | ||
345 | 317 | |||
346 | 318 | self._connection_pool = connection_pool | ||
347 | 319 | |||
348 | 320 | self._reply_q_lock = threading.Lock() | ||
349 | 321 | self._reply_q = None | ||
350 | 322 | self._reply_q_conn = None | ||
351 | 323 | self._waiter = None | ||
352 | 324 | |||
353 | 325 | def _get_exchange(self, target): | ||
354 | 326 | return target.exchange or self._default_exchange | ||
355 | 327 | |||
356 | 328 | def _get_connection(self, pooled=True): | ||
357 | 329 | return rpc_amqp.ConnectionContext(self.conf, | ||
358 | 330 | self._url, | ||
359 | 331 | self._connection_pool, | ||
360 | 332 | pooled=pooled) | ||
361 | 333 | |||
362 | 334 | def _get_reply_q(self): | ||
363 | 335 | with self._reply_q_lock: | ||
364 | 336 | if self._reply_q is not None: | ||
365 | 337 | return self._reply_q | ||
366 | 338 | |||
367 | 339 | reply_q = 'reply_' + uuid.uuid4().hex | ||
368 | 340 | |||
369 | 341 | conn = self._get_connection(pooled=False) | ||
370 | 342 | |||
371 | 343 | self._waiter = ReplyWaiter(self.conf, reply_q, conn, | ||
372 | 344 | self._allowed_remote_exmods) | ||
373 | 345 | |||
374 | 346 | self._reply_q = reply_q | ||
375 | 347 | self._reply_q_conn = conn | ||
376 | 348 | |||
377 | 349 | return self._reply_q | ||
378 | 350 | |||
379 | 351 | def _send(self, target, ctxt, message, | ||
380 | 352 | wait_for_reply=None, timeout=None, | ||
381 | 353 | envelope=True, notify=False, retry=None): | ||
382 | 354 | |||
383 | 355 | # FIXME(markmc): remove this temporary hack | ||
384 | 356 | class Context(object): | ||
385 | 357 | def __init__(self, d): | ||
386 | 358 | self.d = d | ||
387 | 359 | |||
388 | 360 | def to_dict(self): | ||
389 | 361 | return self.d | ||
390 | 362 | |||
391 | 363 | context = Context(ctxt) | ||
392 | 364 | msg = message | ||
393 | 365 | |||
394 | 366 | if wait_for_reply: | ||
395 | 367 | msg_id = uuid.uuid4().hex | ||
396 | 368 | msg.update({'_msg_id': msg_id}) | ||
397 | 369 | LOG.debug('MSG_ID is %s', msg_id) | ||
398 | 370 | msg.update({'_reply_q': self._get_reply_q()}) | ||
399 | 371 | |||
400 | 372 | rpc_amqp._add_unique_id(msg) | ||
401 | 373 | rpc_amqp.pack_context(msg, context) | ||
402 | 374 | |||
403 | 375 | if envelope: | ||
404 | 376 | msg = rpc_common.serialize_msg(msg) | ||
405 | 377 | |||
406 | 378 | if wait_for_reply: | ||
407 | 379 | self._waiter.listen(msg_id) | ||
408 | 380 | |||
409 | 381 | try: | ||
410 | 382 | with self._get_connection() as conn: | ||
411 | 383 | if notify: | ||
412 | 384 | conn.notify_send(self._get_exchange(target), | ||
413 | 385 | target.topic, msg, retry=retry) | ||
414 | 386 | elif target.fanout: | ||
415 | 387 | conn.fanout_send(target.topic, msg, retry=retry) | ||
416 | 388 | else: | ||
417 | 389 | topic = target.topic | ||
418 | 390 | if target.server: | ||
419 | 391 | topic = '%s.%s' % (target.topic, target.server) | ||
420 | 392 | conn.topic_send(exchange_name=self._get_exchange(target), | ||
421 | 393 | topic=topic, msg=msg, timeout=timeout, | ||
422 | 394 | retry=retry) | ||
423 | 395 | |||
424 | 396 | if wait_for_reply: | ||
425 | 397 | result = self._waiter.wait(msg_id, timeout) | ||
426 | 398 | if isinstance(result, Exception): | ||
427 | 399 | raise result | ||
428 | 400 | return result | ||
429 | 401 | finally: | ||
430 | 402 | if wait_for_reply: | ||
431 | 403 | self._waiter.unlisten(msg_id) | ||
432 | 404 | |||
433 | 405 | def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, | ||
434 | 406 | retry=None): | ||
435 | 407 | return self._send(target, ctxt, message, wait_for_reply, timeout, | ||
436 | 408 | retry=retry) | ||
437 | 409 | |||
438 | 410 | def send_notification(self, target, ctxt, message, version, retry=None): | ||
439 | 411 | return self._send(target, ctxt, message, | ||
440 | 412 | envelope=(version == 2.0), notify=True, retry=retry) | ||
441 | 413 | |||
442 | 414 | def listen(self, target): | ||
443 | 415 | conn = self._get_connection(pooled=False) | ||
444 | 416 | |||
445 | 417 | listener = AMQPListener(self, conn) | ||
446 | 418 | |||
447 | 419 | conn.declare_topic_consumer(exchange_name=self._get_exchange(target), | ||
448 | 420 | topic=target.topic, | ||
449 | 421 | callback=listener) | ||
450 | 422 | conn.declare_topic_consumer(exchange_name=self._get_exchange(target), | ||
451 | 423 | topic='%s.%s' % (target.topic, | ||
452 | 424 | target.server), | ||
453 | 425 | callback=listener) | ||
454 | 426 | conn.declare_fanout_consumer(target.topic, listener) | ||
455 | 427 | |||
456 | 428 | return listener | ||
457 | 429 | |||
458 | 430 | def listen_for_notifications(self, targets_and_priorities): | ||
459 | 431 | conn = self._get_connection(pooled=False) | ||
460 | 432 | |||
461 | 433 | listener = AMQPListener(self, conn) | ||
462 | 434 | for target, priority in targets_and_priorities: | ||
463 | 435 | conn.declare_topic_consumer( | ||
464 | 436 | exchange_name=self._get_exchange(target), | ||
465 | 437 | topic='%s.%s' % (target.topic, priority), | ||
466 | 438 | callback=listener) | ||
467 | 439 | return listener | ||
468 | 440 | |||
469 | 441 | def cleanup(self): | ||
470 | 442 | if self._connection_pool: | ||
471 | 443 | self._connection_pool.empty() | ||
472 | 444 | self._connection_pool = None | ||
473 | 0 | 445 | ||
474 | === added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py' | |||
475 | --- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000 | |||
476 | +++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py 2015-12-17 11:03:34 +0000 | |||
477 | @@ -0,0 +1,349 @@ | |||
478 | 1 | # Copyright 2010 United States Government as represented by the | ||
479 | 2 | # Administrator of the National Aeronautics and Space Administration. | ||
480 | 3 | # All Rights Reserved. | ||
481 | 4 | # Copyright 2011 Red Hat, Inc. | ||
482 | 5 | # | ||
483 | 6 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
484 | 7 | # not use this file except in compliance with the License. You may obtain | ||
485 | 8 | # a copy of the License at | ||
486 | 9 | # | ||
487 | 10 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
488 | 11 | # | ||
489 | 12 | # Unless required by applicable law or agreed to in writing, software | ||
490 | 13 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
491 | 14 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
492 | 15 | # License for the specific language governing permissions and limitations | ||
493 | 16 | # under the License. | ||
494 | 17 | |||
495 | 18 | import copy | ||
496 | 19 | import logging | ||
497 | 20 | import sys | ||
498 | 21 | import traceback | ||
499 | 22 | |||
500 | 23 | import six | ||
501 | 24 | |||
502 | 25 | from oslo import messaging | ||
503 | 26 | from oslo.messaging import _utils as utils | ||
504 | 27 | from oslo.messaging.openstack.common.gettextutils import _ | ||
505 | 28 | from oslo.messaging.openstack.common import jsonutils | ||
506 | 29 | |||
507 | 30 | LOG = logging.getLogger(__name__) | ||
508 | 31 | |||
509 | 32 | _EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' | ||
510 | 33 | |||
511 | 34 | |||
512 | 35 | '''RPC Envelope Version. | ||
513 | 36 | |||
514 | 37 | This version number applies to the top level structure of messages sent out. | ||
515 | 38 | It does *not* apply to the message payload, which must be versioned | ||
516 | 39 | independently. For example, when using rpc APIs, a version number is applied | ||
517 | 40 | for changes to the API being exposed over rpc. This version number is handled | ||
518 | 41 | in the rpc proxy and dispatcher modules. | ||
519 | 42 | |||
520 | 43 | This version number applies to the message envelope that is used in the | ||
521 | 44 | serialization done inside the rpc layer. See serialize_msg() and | ||
522 | 45 | deserialize_msg(). | ||
523 | 46 | |||
524 | 47 | The current message format (version 2.0) is very simple. It is: | ||
525 | 48 | |||
526 | 49 | { | ||
527 | 50 | 'oslo.version': <RPC Envelope Version as a String>, | ||
528 | 51 | 'oslo.message': <Application Message Payload, JSON encoded> | ||
529 | 52 | } | ||
530 | 53 | |||
531 | 54 | Message format version '1.0' is just considered to be the messages we sent | ||
532 | 55 | without a message envelope. | ||
533 | 56 | |||
534 | 57 | So, the current message envelope just includes the envelope version. It may | ||
535 | 58 | eventually contain additional information, such as a signature for the message | ||
536 | 59 | payload. | ||
537 | 60 | |||
538 | 61 | We will JSON encode the application message payload. The message envelope, | ||
539 | 62 | which includes the JSON encoded application message body, will be passed down | ||
540 | 63 | to the messaging libraries as a dict. | ||
541 | 64 | ''' | ||
542 | 65 | _RPC_ENVELOPE_VERSION = '2.0' | ||
543 | 66 | |||
544 | 67 | _VERSION_KEY = 'oslo.version' | ||
545 | 68 | _MESSAGE_KEY = 'oslo.message' | ||
546 | 69 | |||
547 | 70 | _REMOTE_POSTFIX = '_Remote' | ||
548 | 71 | |||
549 | 72 | |||
550 | 73 | class RPCException(Exception): | ||
551 | 74 | msg_fmt = _("An unknown RPC related exception occurred.") | ||
552 | 75 | |||
553 | 76 | def __init__(self, message=None, **kwargs): | ||
554 | 77 | self.kwargs = kwargs | ||
555 | 78 | |||
556 | 79 | if not message: | ||
557 | 80 | try: | ||
558 | 81 | message = self.msg_fmt % kwargs | ||
559 | 82 | |||
560 | 83 | except Exception: | ||
561 | 84 | # kwargs doesn't match a variable in the message | ||
562 | 85 | # log the issue and the kwargs | ||
563 | 86 | LOG.exception(_('Exception in string format operation')) | ||
564 | 87 | for name, value in six.iteritems(kwargs): | ||
565 | 88 | LOG.error("%s: %s", name, value) | ||
566 | 89 | # at least get the core message out if something happened | ||
567 | 90 | message = self.msg_fmt | ||
568 | 91 | |||
569 | 92 | super(RPCException, self).__init__(message) | ||
570 | 93 | |||
571 | 94 | |||
572 | 95 | class Timeout(RPCException): | ||
573 | 96 | """Signifies that a timeout has occurred. | ||
574 | 97 | |||
575 | 98 | This exception is raised if the rpc_response_timeout is reached while | ||
576 | 99 | waiting for a response from the remote side. | ||
577 | 100 | """ | ||
578 | 101 | msg_fmt = _('Timeout while waiting on RPC response - ' | ||
579 | 102 | 'topic: "%(topic)s", RPC method: "%(method)s" ' | ||
580 | 103 | 'info: "%(info)s"') | ||
581 | 104 | |||
582 | 105 | def __init__(self, info=None, topic=None, method=None): | ||
583 | 106 | """Initiates Timeout object. | ||
584 | 107 | |||
585 | 108 | :param info: Extra info to convey to the user | ||
586 | 109 | :param topic: The topic that the rpc call was sent to | ||
587 | 110 | :param rpc_method_name: The name of the rpc method being | ||
588 | 111 | called | ||
589 | 112 | """ | ||
590 | 113 | self.info = info | ||
591 | 114 | self.topic = topic | ||
592 | 115 | self.method = method | ||
593 | 116 | super(Timeout, self).__init__( | ||
594 | 117 | None, | ||
595 | 118 | info=info or _('<unknown>'), | ||
596 | 119 | topic=topic or _('<unknown>'), | ||
597 | 120 | method=method or _('<unknown>')) | ||
598 | 121 | |||
599 | 122 | |||
600 | 123 | class DuplicateMessageError(RPCException): | ||
601 | 124 | msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.") | ||
602 | 125 | |||
603 | 126 | |||
604 | 127 | class InvalidRPCConnectionReuse(RPCException): | ||
605 | 128 | msg_fmt = _("Invalid reuse of an RPC connection.") | ||
606 | 129 | |||
607 | 130 | |||
608 | 131 | class UnsupportedRpcVersion(RPCException): | ||
609 | 132 | msg_fmt = _("Specified RPC version, %(version)s, not supported by " | ||
610 | 133 | "this endpoint.") | ||
611 | 134 | |||
612 | 135 | |||
613 | 136 | class UnsupportedRpcEnvelopeVersion(RPCException): | ||
614 | 137 | msg_fmt = _("Specified RPC envelope version, %(version)s, " | ||
615 | 138 | "not supported by this endpoint.") | ||
616 | 139 | |||
617 | 140 | |||
618 | 141 | class RpcVersionCapError(RPCException): | ||
619 | 142 | msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") | ||
620 | 143 | |||
621 | 144 | |||
622 | 145 | class Connection(object): | ||
623 | 146 | """A connection, returned by rpc.create_connection(). | ||
624 | 147 | |||
625 | 148 | This class represents a connection to the message bus used for rpc. | ||
626 | 149 | An instance of this class should never be created by users of the rpc API. | ||
627 | 150 | Use rpc.create_connection() instead. | ||
628 | 151 | """ | ||
629 | 152 | def close(self): | ||
630 | 153 | """Close the connection. | ||
631 | 154 | |||
632 | 155 | This method must be called when the connection will no longer be used. | ||
633 | 156 | It will ensure that any resources associated with the connection, such | ||
634 | 157 | as a network connection, and cleaned up. | ||
635 | 158 | """ | ||
636 | 159 | raise NotImplementedError() | ||
637 | 160 | |||
638 | 161 | |||
639 | 162 | def _safe_log(log_func, msg, msg_data): | ||
640 | 163 | """Sanitizes the msg_data field before logging.""" | ||
641 | 164 | SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass'] | ||
642 | 165 | |||
643 | 166 | def _fix_passwords(d): | ||
644 | 167 | """Sanitizes the password fields in the dictionary.""" | ||
645 | 168 | for k in six.iterkeys(d): | ||
646 | 169 | if k.lower().find('password') != -1: | ||
647 | 170 | d[k] = '<SANITIZED>' | ||
648 | 171 | elif k.lower() in SANITIZE: | ||
649 | 172 | d[k] = '<SANITIZED>' | ||
650 | 173 | elif isinstance(d[k], dict): | ||
651 | 174 | _fix_passwords(d[k]) | ||
652 | 175 | return d | ||
653 | 176 | |||
654 | 177 | return log_func(msg, _fix_passwords(copy.deepcopy(msg_data))) | ||
655 | 178 | |||
656 | 179 | |||
657 | 180 | def serialize_remote_exception(failure_info, log_failure=True): | ||
658 | 181 | """Prepares exception data to be sent over rpc. | ||
659 | 182 | |||
660 | 183 | Failure_info should be a sys.exc_info() tuple. | ||
661 | 184 | |||
662 | 185 | """ | ||
663 | 186 | tb = traceback.format_exception(*failure_info) | ||
664 | 187 | failure = failure_info[1] | ||
665 | 188 | if log_failure: | ||
666 | 189 | LOG.error(_("Returning exception %s to caller"), | ||
667 | 190 | six.text_type(failure)) | ||
668 | 191 | LOG.error(tb) | ||
669 | 192 | |||
670 | 193 | kwargs = {} | ||
671 | 194 | if hasattr(failure, 'kwargs'): | ||
672 | 195 | kwargs = failure.kwargs | ||
673 | 196 | |||
674 | 197 | # NOTE(matiu): With cells, it's possible to re-raise remote, remote | ||
675 | 198 | # exceptions. Lets turn it back into the original exception type. | ||
676 | 199 | cls_name = six.text_type(failure.__class__.__name__) | ||
677 | 200 | mod_name = six.text_type(failure.__class__.__module__) | ||
678 | 201 | if (cls_name.endswith(_REMOTE_POSTFIX) and | ||
679 | 202 | mod_name.endswith(_REMOTE_POSTFIX)): | ||
680 | 203 | cls_name = cls_name[:-len(_REMOTE_POSTFIX)] | ||
681 | 204 | mod_name = mod_name[:-len(_REMOTE_POSTFIX)] | ||
682 | 205 | |||
683 | 206 | data = { | ||
684 | 207 | 'class': cls_name, | ||
685 | 208 | 'module': mod_name, | ||
686 | 209 | 'message': six.text_type(failure), | ||
687 | 210 | 'tb': tb, | ||
688 | 211 | 'args': failure.args, | ||
689 | 212 | 'kwargs': kwargs | ||
690 | 213 | } | ||
691 | 214 | |||
692 | 215 | json_data = jsonutils.dumps(data) | ||
693 | 216 | |||
694 | 217 | return json_data | ||
695 | 218 | |||
696 | 219 | |||
697 | 220 | def deserialize_remote_exception(data, allowed_remote_exmods): | ||
698 | 221 | failure = jsonutils.loads(six.text_type(data)) | ||
699 | 222 | |||
700 | 223 | trace = failure.get('tb', []) | ||
701 | 224 | message = failure.get('message', "") + "\n" + "\n".join(trace) | ||
702 | 225 | name = failure.get('class') | ||
703 | 226 | module = failure.get('module') | ||
704 | 227 | |||
705 | 228 | # NOTE(ameade): We DO NOT want to allow just any module to be imported, in | ||
706 | 229 | # order to prevent arbitrary code execution. | ||
707 | 230 | if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods: | ||
708 | 231 | return messaging.RemoteError(name, failure.get('message'), trace) | ||
709 | 232 | |||
710 | 233 | try: | ||
711 | 234 | __import__(module) | ||
712 | 235 | mod = sys.modules[module] | ||
713 | 236 | klass = getattr(mod, name) | ||
714 | 237 | if not issubclass(klass, Exception): | ||
715 | 238 | raise TypeError("Can only deserialize Exceptions") | ||
716 | 239 | |||
717 | 240 | failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) | ||
718 | 241 | except (AttributeError, TypeError, ImportError): | ||
719 | 242 | return messaging.RemoteError(name, failure.get('message'), trace) | ||
720 | 243 | |||
721 | 244 | ex_type = type(failure) | ||
722 | 245 | str_override = lambda self: message | ||
723 | 246 | new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,), | ||
724 | 247 | {'__str__': str_override, '__unicode__': str_override}) | ||
725 | 248 | new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX) | ||
726 | 249 | try: | ||
727 | 250 | # NOTE(ameade): Dynamically create a new exception type and swap it in | ||
728 | 251 | # as the new type for the exception. This only works on user defined | ||
729 | 252 | # Exceptions and not core Python exceptions. This is important because | ||
730 | 253 | # we cannot necessarily change an exception message so we must override | ||
731 | 254 | # the __str__ method. | ||
732 | 255 | failure.__class__ = new_ex_type | ||
733 | 256 | except TypeError: | ||
734 | 257 | # NOTE(ameade): If a core exception then just add the traceback to the | ||
735 | 258 | # first exception argument. | ||
736 | 259 | failure.args = (message,) + failure.args[1:] | ||
737 | 260 | return failure | ||
738 | 261 | |||
739 | 262 | |||
740 | 263 | class CommonRpcContext(object): | ||
741 | 264 | def __init__(self, **kwargs): | ||
742 | 265 | self.values = kwargs | ||
743 | 266 | |||
744 | 267 | def __getattr__(self, key): | ||
745 | 268 | try: | ||
746 | 269 | return self.values[key] | ||
747 | 270 | except KeyError: | ||
748 | 271 | raise AttributeError(key) | ||
749 | 272 | |||
750 | 273 | def to_dict(self): | ||
751 | 274 | return copy.deepcopy(self.values) | ||
752 | 275 | |||
753 | 276 | @classmethod | ||
754 | 277 | def from_dict(cls, values): | ||
755 | 278 | return cls(**values) | ||
756 | 279 | |||
757 | 280 | def deepcopy(self): | ||
758 | 281 | return self.from_dict(self.to_dict()) | ||
759 | 282 | |||
760 | 283 | def update_store(self): | ||
761 | 284 | # local.store.context = self | ||
762 | 285 | pass | ||
763 | 286 | |||
764 | 287 | |||
765 | 288 | class ClientException(Exception): | ||
766 | 289 | """Encapsulates actual exception expected to be hit by a RPC proxy object. | ||
767 | 290 | |||
768 | 291 | Merely instantiating it records the current exception information, which | ||
769 | 292 | will be passed back to the RPC client without exceptional logging. | ||
770 | 293 | """ | ||
771 | 294 | def __init__(self): | ||
772 | 295 | self._exc_info = sys.exc_info() | ||
773 | 296 | |||
774 | 297 | |||
775 | 298 | def serialize_msg(raw_msg): | ||
776 | 299 | # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more | ||
777 | 300 | # information about this format. | ||
778 | 301 | msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, | ||
779 | 302 | _MESSAGE_KEY: jsonutils.dumps(raw_msg)} | ||
780 | 303 | |||
781 | 304 | return msg | ||
782 | 305 | |||
783 | 306 | |||
784 | 307 | def deserialize_msg(msg): | ||
785 | 308 | # NOTE(russellb): Hang on to your hats, this road is about to | ||
786 | 309 | # get a little bumpy. | ||
787 | 310 | # | ||
788 | 311 | # Robustness Principle: | ||
789 | 312 | # "Be strict in what you send, liberal in what you accept." | ||
790 | 313 | # | ||
791 | 314 | # At this point we have to do a bit of guessing about what it | ||
792 | 315 | # is we just received. Here is the set of possibilities: | ||
793 | 316 | # | ||
794 | 317 | # 1) We received a dict. This could be 2 things: | ||
795 | 318 | # | ||
796 | 319 | # a) Inspect it to see if it looks like a standard message envelope. | ||
797 | 320 | # If so, great! | ||
798 | 321 | # | ||
799 | 322 | # b) If it doesn't look like a standard message envelope, it could either | ||
800 | 323 | # be a notification, or a message from before we added a message | ||
801 | 324 | # envelope (referred to as version 1.0). | ||
802 | 325 | # Just return the message as-is. | ||
803 | 326 | # | ||
804 | 327 | # 2) It's any other non-dict type. Just return it and hope for the best. | ||
805 | 328 | # This case covers return values from rpc.call() from before message | ||
806 | 329 | # envelopes were used. (messages to call a method were always a dict) | ||
807 | 330 | |||
808 | 331 | if not isinstance(msg, dict): | ||
809 | 332 | # See #2 above. | ||
810 | 333 | return msg | ||
811 | 334 | |||
812 | 335 | base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) | ||
813 | 336 | if not all(map(lambda key: key in msg, base_envelope_keys)): | ||
814 | 337 | # See #1.b above. | ||
815 | 338 | return msg | ||
816 | 339 | |||
817 | 340 | # At this point we think we have the message envelope | ||
818 | 341 | # format we were expecting. (#1.a above) | ||
819 | 342 | |||
820 | 343 | if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION, | ||
821 | 344 | msg[_VERSION_KEY]): | ||
822 | 345 | raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) | ||
823 | 346 | |||
824 | 347 | raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) | ||
825 | 348 | |||
826 | 349 | return raw_msg | ||
827 | 0 | 350 | ||
828 | === added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py' | |||
829 | --- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000 | |||
830 | +++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000 | |||
831 | @@ -0,0 +1,821 @@ | |||
832 | 1 | # Copyright 2011 OpenStack Foundation | ||
833 | 2 | # | ||
834 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
835 | 4 | # not use this file except in compliance with the License. You may obtain | ||
836 | 5 | # a copy of the License at | ||
837 | 6 | # | ||
838 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
839 | 8 | # | ||
840 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
841 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
842 | 11 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
843 | 12 | # License for the specific language governing permissions and limitations | ||
844 | 13 | # under the License. | ||
845 | 14 | |||
846 | 15 | import functools | ||
847 | 16 | import itertools | ||
848 | 17 | import logging | ||
849 | 18 | import random | ||
850 | 19 | import socket | ||
851 | 20 | import ssl | ||
852 | 21 | import time | ||
853 | 22 | import uuid | ||
854 | 23 | |||
855 | 24 | import kombu | ||
856 | 25 | import kombu.connection | ||
857 | 26 | import kombu.entity | ||
858 | 27 | import kombu.messaging | ||
859 | 28 | import six | ||
860 | 29 | |||
861 | 30 | from oslo.config import cfg | ||
862 | 31 | from oslo.messaging._drivers import amqp as rpc_amqp | ||
863 | 32 | from oslo.messaging._drivers import amqpdriver | ||
864 | 33 | from oslo.messaging._drivers import common as rpc_common | ||
865 | 34 | from oslo.messaging import exceptions | ||
866 | 35 | from oslo.messaging.openstack.common.gettextutils import _ | ||
867 | 36 | from oslo.utils import netutils | ||
868 | 37 | |||
869 | 38 | rabbit_opts = [ | ||
870 | 39 | cfg.StrOpt('kombu_ssl_version', | ||
871 | 40 | default='', | ||
872 | 41 | help='SSL version to use (valid only if SSL enabled). ' | ||
873 | 42 | 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' | ||
874 | 43 | 'be available on some distributions.' | ||
875 | 44 | ), | ||
876 | 45 | cfg.StrOpt('kombu_ssl_keyfile', | ||
877 | 46 | default='', | ||
878 | 47 | help='SSL key file (valid only if SSL enabled).'), | ||
879 | 48 | cfg.StrOpt('kombu_ssl_certfile', | ||
880 | 49 | default='', | ||
881 | 50 | help='SSL cert file (valid only if SSL enabled).'), | ||
882 | 51 | cfg.StrOpt('kombu_ssl_ca_certs', | ||
883 | 52 | default='', | ||
884 | 53 | help='SSL certification authority file ' | ||
885 | 54 | '(valid only if SSL enabled).'), | ||
886 | 55 | cfg.FloatOpt('kombu_reconnect_delay', | ||
887 | 56 | default=1.0, | ||
888 | 57 | help='How long to wait before reconnecting in response to an ' | ||
889 | 58 | 'AMQP consumer cancel notification.'), | ||
890 | 59 | cfg.StrOpt('rabbit_host', | ||
891 | 60 | default='localhost', | ||
892 | 61 | help='The RabbitMQ broker address where a single node is ' | ||
893 | 62 | 'used.'), | ||
894 | 63 | cfg.IntOpt('rabbit_port', | ||
895 | 64 | default=5672, | ||
896 | 65 | help='The RabbitMQ broker port where a single node is used.'), | ||
897 | 66 | cfg.ListOpt('rabbit_hosts', | ||
898 | 67 | default=['$rabbit_host:$rabbit_port'], | ||
899 | 68 | help='RabbitMQ HA cluster host:port pairs.'), | ||
900 | 69 | cfg.BoolOpt('rabbit_use_ssl', | ||
901 | 70 | default=False, | ||
902 | 71 | help='Connect over SSL for RabbitMQ.'), | ||
903 | 72 | cfg.StrOpt('rabbit_userid', | ||
904 | 73 | default='guest', | ||
905 | 74 | help='The RabbitMQ userid.'), | ||
906 | 75 | cfg.StrOpt('rabbit_password', | ||
907 | 76 | default='guest', | ||
908 | 77 | help='The RabbitMQ password.', | ||
909 | 78 | secret=True), | ||
910 | 79 | cfg.StrOpt('rabbit_login_method', | ||
911 | 80 | default='AMQPLAIN', | ||
912 | 81 | help='the RabbitMQ login method'), | ||
913 | 82 | cfg.StrOpt('rabbit_virtual_host', | ||
914 | 83 | default='/', | ||
915 | 84 | help='The RabbitMQ virtual host.'), | ||
916 | 85 | cfg.IntOpt('rabbit_retry_interval', | ||
917 | 86 | default=1, | ||
918 | 87 | help='How frequently to retry connecting with RabbitMQ.'), | ||
919 | 88 | cfg.IntOpt('rabbit_retry_backoff', | ||
920 | 89 | default=2, | ||
921 | 90 | help='How long to backoff for between retries when connecting ' | ||
922 | 91 | 'to RabbitMQ.'), | ||
923 | 92 | cfg.IntOpt('rabbit_max_retries', | ||
924 | 93 | default=0, | ||
925 | 94 | help='Maximum number of RabbitMQ connection retries. ' | ||
926 | 95 | 'Default is 0 (infinite retry count).'), | ||
927 | 96 | cfg.BoolOpt('rabbit_ha_queues', | ||
928 | 97 | default=False, | ||
929 | 98 | help='Use HA queues in RabbitMQ (x-ha-policy: all). ' | ||
930 | 99 | 'If you change this option, you must wipe the ' | ||
931 | 100 | 'RabbitMQ database.'), | ||
932 | 101 | |||
933 | 102 | # FIXME(markmc): this was toplevel in openstack.common.rpc | ||
934 | 103 | cfg.BoolOpt('fake_rabbit', | ||
935 | 104 | default=False, | ||
936 | 105 | help='If passed, use a fake RabbitMQ provider.'), | ||
937 | 106 | ] | ||
938 | 107 | |||
939 | 108 | LOG = logging.getLogger(__name__) | ||
940 | 109 | |||
941 | 110 | |||
942 | 111 | def _get_queue_arguments(conf): | ||
943 | 112 | """Construct the arguments for declaring a queue. | ||
944 | 113 | |||
945 | 114 | If the rabbit_ha_queues option is set, we declare a mirrored queue | ||
946 | 115 | as described here: | ||
947 | 116 | |||
948 | 117 | http://www.rabbitmq.com/ha.html | ||
949 | 118 | |||
950 | 119 | Setting x-ha-policy to all means that the queue will be mirrored | ||
951 | 120 | to all nodes in the cluster. | ||
952 | 121 | """ | ||
953 | 122 | return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} | ||
954 | 123 | |||
955 | 124 | |||
956 | 125 | class RabbitMessage(dict): | ||
957 | 126 | def __init__(self, raw_message): | ||
958 | 127 | super(RabbitMessage, self).__init__( | ||
959 | 128 | rpc_common.deserialize_msg(raw_message.payload)) | ||
960 | 129 | self._raw_message = raw_message | ||
961 | 130 | |||
962 | 131 | def acknowledge(self): | ||
963 | 132 | self._raw_message.ack() | ||
964 | 133 | |||
965 | 134 | def requeue(self): | ||
966 | 135 | self._raw_message.requeue() | ||
967 | 136 | |||
968 | 137 | |||
969 | 138 | class ConsumerBase(object): | ||
970 | 139 | """Consumer base class.""" | ||
971 | 140 | |||
972 | 141 | def __init__(self, channel, callback, tag, **kwargs): | ||
973 | 142 | """Declare a queue on an amqp channel. | ||
974 | 143 | |||
975 | 144 | 'channel' is the amqp channel to use | ||
976 | 145 | 'callback' is the callback to call when messages are received | ||
977 | 146 | 'tag' is a unique ID for the consumer on the channel | ||
978 | 147 | |||
979 | 148 | queue name, exchange name, and other kombu options are | ||
980 | 149 | passed in here as a dictionary. | ||
981 | 150 | """ | ||
982 | 151 | self.callback = callback | ||
983 | 152 | self.tag = six.text_type(tag) | ||
984 | 153 | self.kwargs = kwargs | ||
985 | 154 | self.queue = None | ||
986 | 155 | self.reconnect(channel) | ||
987 | 156 | |||
988 | 157 | def reconnect(self, channel): | ||
989 | 158 | """Re-declare the queue after a rabbit reconnect.""" | ||
990 | 159 | self.channel = channel | ||
991 | 160 | self.kwargs['channel'] = channel | ||
992 | 161 | self.queue = kombu.entity.Queue(**self.kwargs) | ||
993 | 162 | self.queue.declare() | ||
994 | 163 | |||
995 | 164 | def _callback_handler(self, message, callback): | ||
996 | 165 | """Call callback with deserialized message. | ||
997 | 166 | |||
998 | 167 | Messages that are processed and ack'ed. | ||
999 | 168 | """ | ||
1000 | 169 | |||
1001 | 170 | try: | ||
1002 | 171 | callback(RabbitMessage(message)) | ||
1003 | 172 | except Exception: | ||
1004 | 173 | LOG.exception(_("Failed to process message" | ||
1005 | 174 | " ... skipping it.")) | ||
1006 | 175 | message.ack() | ||
1007 | 176 | |||
1008 | 177 | def consume(self, *args, **kwargs): | ||
1009 | 178 | """Actually declare the consumer on the amqp channel. This will | ||
1010 | 179 | start the flow of messages from the queue. Using the | ||
1011 | 180 | Connection.iterconsume() iterator will process the messages, | ||
1012 | 181 | calling the appropriate callback. | ||
1013 | 182 | |||
1014 | 183 | If a callback is specified in kwargs, use that. Otherwise, | ||
1015 | 184 | use the callback passed during __init__() | ||
1016 | 185 | |||
1017 | 186 | If kwargs['nowait'] is True, then this call will block until | ||
1018 | 187 | a message is read. | ||
1019 | 188 | |||
1020 | 189 | """ | ||
1021 | 190 | |||
1022 | 191 | options = {'consumer_tag': self.tag} | ||
1023 | 192 | options['nowait'] = kwargs.get('nowait', False) | ||
1024 | 193 | callback = kwargs.get('callback', self.callback) | ||
1025 | 194 | if not callback: | ||
1026 | 195 | raise ValueError("No callback defined") | ||
1027 | 196 | |||
1028 | 197 | def _callback(raw_message): | ||
1029 | 198 | message = self.channel.message_to_python(raw_message) | ||
1030 | 199 | self._callback_handler(message, callback) | ||
1031 | 200 | |||
1032 | 201 | self.queue.consume(*args, callback=_callback, **options) | ||
1033 | 202 | |||
1034 | 203 | def cancel(self): | ||
1035 | 204 | """Cancel the consuming from the queue, if it has started.""" | ||
1036 | 205 | try: | ||
1037 | 206 | self.queue.cancel(self.tag) | ||
1038 | 207 | except KeyError as e: | ||
1039 | 208 | # NOTE(comstud): Kludge to get around a amqplib bug | ||
1040 | 209 | if six.text_type(e) != "u'%s'" % self.tag: | ||
1041 | 210 | raise | ||
1042 | 211 | self.queue = None | ||
1043 | 212 | |||
1044 | 213 | |||
1045 | 214 | class DirectConsumer(ConsumerBase): | ||
1046 | 215 | """Queue/consumer class for 'direct'.""" | ||
1047 | 216 | |||
1048 | 217 | def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): | ||
1049 | 218 | """Init a 'direct' queue. | ||
1050 | 219 | |||
1051 | 220 | 'channel' is the amqp channel to use | ||
1052 | 221 | 'msg_id' is the msg_id to listen on | ||
1053 | 222 | 'callback' is the callback to call when messages are received | ||
1054 | 223 | 'tag' is a unique ID for the consumer on the channel | ||
1055 | 224 | |||
1056 | 225 | Other kombu options may be passed | ||
1057 | 226 | """ | ||
1058 | 227 | # Default options | ||
1059 | 228 | options = {'durable': False, | ||
1060 | 229 | 'queue_arguments': _get_queue_arguments(conf), | ||
1061 | 230 | 'auto_delete': True, | ||
1062 | 231 | 'exclusive': False} | ||
1063 | 232 | options.update(kwargs) | ||
1064 | 233 | exchange = kombu.entity.Exchange(name=msg_id, | ||
1065 | 234 | type='direct', | ||
1066 | 235 | durable=options['durable'], | ||
1067 | 236 | auto_delete=options['auto_delete']) | ||
1068 | 237 | super(DirectConsumer, self).__init__(channel, | ||
1069 | 238 | callback, | ||
1070 | 239 | tag, | ||
1071 | 240 | name=msg_id, | ||
1072 | 241 | exchange=exchange, | ||
1073 | 242 | routing_key=msg_id, | ||
1074 | 243 | **options) | ||
1075 | 244 | |||
1076 | 245 | |||
1077 | 246 | class TopicConsumer(ConsumerBase): | ||
1078 | 247 | """Consumer class for 'topic'.""" | ||
1079 | 248 | |||
1080 | 249 | def __init__(self, conf, channel, topic, callback, tag, exchange_name, | ||
1081 | 250 | name=None, **kwargs): | ||
1082 | 251 | """Init a 'topic' queue. | ||
1083 | 252 | |||
1084 | 253 | :param channel: the amqp channel to use | ||
1085 | 254 | :param topic: the topic to listen on | ||
1086 | 255 | :paramtype topic: str | ||
1087 | 256 | :param callback: the callback to call when messages are received | ||
1088 | 257 | :param tag: a unique ID for the consumer on the channel | ||
1089 | 258 | :param exchange_name: the exchange name to use | ||
1090 | 259 | :param name: optional queue name, defaults to topic | ||
1091 | 260 | :paramtype name: str | ||
1092 | 261 | |||
1093 | 262 | Other kombu options may be passed as keyword arguments | ||
1094 | 263 | """ | ||
1095 | 264 | # Default options | ||
1096 | 265 | options = {'durable': conf.amqp_durable_queues, | ||
1097 | 266 | 'queue_arguments': _get_queue_arguments(conf), | ||
1098 | 267 | 'auto_delete': conf.amqp_auto_delete, | ||
1099 | 268 | 'exclusive': False} | ||
1100 | 269 | options.update(kwargs) | ||
1101 | 270 | exchange = kombu.entity.Exchange(name=exchange_name, | ||
1102 | 271 | type='topic', | ||
1103 | 272 | durable=options['durable'], | ||
1104 | 273 | auto_delete=options['auto_delete']) | ||
1105 | 274 | super(TopicConsumer, self).__init__(channel, | ||
1106 | 275 | callback, | ||
1107 | 276 | tag, | ||
1108 | 277 | name=name or topic, | ||
1109 | 278 | exchange=exchange, | ||
1110 | 279 | routing_key=topic, | ||
1111 | 280 | **options) | ||
1112 | 281 | |||
1113 | 282 | |||
1114 | 283 | class FanoutConsumer(ConsumerBase): | ||
1115 | 284 | """Consumer class for 'fanout'.""" | ||
1116 | 285 | |||
1117 | 286 | def __init__(self, conf, channel, topic, callback, tag, **kwargs): | ||
1118 | 287 | """Init a 'fanout' queue. | ||
1119 | 288 | |||
1120 | 289 | 'channel' is the amqp channel to use | ||
1121 | 290 | 'topic' is the topic to listen on | ||
1122 | 291 | 'callback' is the callback to call when messages are received | ||
1123 | 292 | 'tag' is a unique ID for the consumer on the channel | ||
1124 | 293 | |||
1125 | 294 | Other kombu options may be passed | ||
1126 | 295 | """ | ||
1127 | 296 | unique = uuid.uuid4().hex | ||
1128 | 297 | exchange_name = '%s_fanout' % topic | ||
1129 | 298 | queue_name = '%s_fanout_%s' % (topic, unique) | ||
1130 | 299 | |||
1131 | 300 | # Default options | ||
1132 | 301 | options = {'durable': False, | ||
1133 | 302 | 'queue_arguments': _get_queue_arguments(conf), | ||
1134 | 303 | 'auto_delete': True, | ||
1135 | 304 | 'exclusive': False} | ||
1136 | 305 | options.update(kwargs) | ||
1137 | 306 | exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', | ||
1138 | 307 | durable=options['durable'], | ||
1139 | 308 | auto_delete=options['auto_delete']) | ||
1140 | 309 | super(FanoutConsumer, self).__init__(channel, callback, tag, | ||
1141 | 310 | name=queue_name, | ||
1142 | 311 | exchange=exchange, | ||
1143 | 312 | routing_key=topic, | ||
1144 | 313 | **options) | ||
1145 | 314 | |||
1146 | 315 | |||
1147 | 316 | class Publisher(object): | ||
1148 | 317 | """Base Publisher class.""" | ||
1149 | 318 | |||
1150 | 319 | def __init__(self, channel, exchange_name, routing_key, **kwargs): | ||
1151 | 320 | """Init the Publisher class with the exchange_name, routing_key, | ||
1152 | 321 | and other options | ||
1153 | 322 | """ | ||
1154 | 323 | self.exchange_name = exchange_name | ||
1155 | 324 | self.routing_key = routing_key | ||
1156 | 325 | self.kwargs = kwargs | ||
1157 | 326 | self.reconnect(channel) | ||
1158 | 327 | |||
1159 | 328 | def reconnect(self, channel): | ||
1160 | 329 | """Re-establish the Producer after a rabbit reconnection.""" | ||
1161 | 330 | self.exchange = kombu.entity.Exchange(name=self.exchange_name, | ||
1162 | 331 | **self.kwargs) | ||
1163 | 332 | self.producer = kombu.messaging.Producer(exchange=self.exchange, | ||
1164 | 333 | channel=channel, | ||
1165 | 334 | routing_key=self.routing_key) | ||
1166 | 335 | |||
1167 | 336 | def send(self, msg, timeout=None): | ||
1168 | 337 | """Send a message.""" | ||
1169 | 338 | if timeout: | ||
1170 | 339 | # | ||
1171 | 340 | # AMQP TTL is in milliseconds when set in the header. | ||
1172 | 341 | # | ||
1173 | 342 | self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) | ||
1174 | 343 | else: | ||
1175 | 344 | self.producer.publish(msg) | ||
1176 | 345 | |||
1177 | 346 | |||
1178 | 347 | class DirectPublisher(Publisher): | ||
1179 | 348 | """Publisher class for 'direct'.""" | ||
1180 | 349 | def __init__(self, conf, channel, topic, **kwargs): | ||
1181 | 350 | """Init a 'direct' publisher. | ||
1182 | 351 | |||
1183 | 352 | Kombu options may be passed as keyword args to override defaults | ||
1184 | 353 | """ | ||
1185 | 354 | |||
1186 | 355 | options = {'durable': False, | ||
1187 | 356 | 'auto_delete': True, | ||
1188 | 357 | 'exclusive': False} | ||
1189 | 358 | options.update(kwargs) | ||
1190 | 359 | super(DirectPublisher, self).__init__(channel, topic, topic, | ||
1191 | 360 | type='direct', **options) | ||
1192 | 361 | |||
1193 | 362 | |||
1194 | 363 | class TopicPublisher(Publisher): | ||
1195 | 364 | """Publisher class for 'topic'.""" | ||
1196 | 365 | def __init__(self, conf, channel, exchange_name, topic, **kwargs): | ||
1197 | 366 | """Init a 'topic' publisher. | ||
1198 | 367 | |||
1199 | 368 | Kombu options may be passed as keyword args to override defaults | ||
1200 | 369 | """ | ||
1201 | 370 | options = {'durable': conf.amqp_durable_queues, | ||
1202 | 371 | 'auto_delete': conf.amqp_auto_delete, | ||
1203 | 372 | 'exclusive': False} | ||
1204 | 373 | options.update(kwargs) | ||
1205 | 374 | super(TopicPublisher, self).__init__(channel, | ||
1206 | 375 | exchange_name, | ||
1207 | 376 | topic, | ||
1208 | 377 | type='topic', | ||
1209 | 378 | **options) | ||
1210 | 379 | |||
1211 | 380 | |||
1212 | 381 | class FanoutPublisher(Publisher): | ||
1213 | 382 | """Publisher class for 'fanout'.""" | ||
1214 | 383 | def __init__(self, conf, channel, topic, **kwargs): | ||
1215 | 384 | """Init a 'fanout' publisher. | ||
1216 | 385 | |||
1217 | 386 | Kombu options may be passed as keyword args to override defaults | ||
1218 | 387 | """ | ||
1219 | 388 | options = {'durable': False, | ||
1220 | 389 | 'auto_delete': True, | ||
1221 | 390 | 'exclusive': False} | ||
1222 | 391 | options.update(kwargs) | ||
1223 | 392 | super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, | ||
1224 | 393 | None, type='fanout', **options) | ||
1225 | 394 | |||
1226 | 395 | |||
1227 | 396 | class NotifyPublisher(TopicPublisher): | ||
1228 | 397 | """Publisher class for 'notify'.""" | ||
1229 | 398 | |||
1230 | 399 | def __init__(self, conf, channel, exchange_name, topic, **kwargs): | ||
1231 | 400 | self.durable = kwargs.pop('durable', conf.amqp_durable_queues) | ||
1232 | 401 | self.queue_arguments = _get_queue_arguments(conf) | ||
1233 | 402 | super(NotifyPublisher, self).__init__(conf, channel, exchange_name, | ||
1234 | 403 | topic, **kwargs) | ||
1235 | 404 | |||
1236 | 405 | def reconnect(self, channel): | ||
1237 | 406 | super(NotifyPublisher, self).reconnect(channel) | ||
1238 | 407 | |||
1239 | 408 | # NOTE(jerdfelt): Normally the consumer would create the queue, but | ||
1240 | 409 | # we do this to ensure that messages don't get dropped if the | ||
1241 | 410 | # consumer is started after we do | ||
1242 | 411 | queue = kombu.entity.Queue(channel=channel, | ||
1243 | 412 | exchange=self.exchange, | ||
1244 | 413 | durable=self.durable, | ||
1245 | 414 | name=self.routing_key, | ||
1246 | 415 | routing_key=self.routing_key, | ||
1247 | 416 | queue_arguments=self.queue_arguments) | ||
1248 | 417 | queue.declare() | ||
1249 | 418 | |||
1250 | 419 | |||
1251 | 420 | class Connection(object): | ||
1252 | 421 | """Connection object.""" | ||
1253 | 422 | |||
1254 | 423 | pools = {} | ||
1255 | 424 | |||
1256 | 425 | def __init__(self, conf, url): | ||
1257 | 426 | self.consumers = [] | ||
1258 | 427 | self.conf = conf | ||
1259 | 428 | self.max_retries = self.conf.rabbit_max_retries | ||
1260 | 429 | # Try forever? | ||
1261 | 430 | if self.max_retries <= 0: | ||
1262 | 431 | self.max_retries = None | ||
1263 | 432 | self.interval_start = self.conf.rabbit_retry_interval | ||
1264 | 433 | self.interval_stepping = self.conf.rabbit_retry_backoff | ||
1265 | 434 | # max retry-interval = 30 seconds | ||
1266 | 435 | self.interval_max = 30 | ||
1267 | 436 | self.memory_transport = False | ||
1268 | 437 | |||
1269 | 438 | ssl_params = self._fetch_ssl_params() | ||
1270 | 439 | |||
1271 | 440 | if url.virtual_host is not None: | ||
1272 | 441 | virtual_host = url.virtual_host | ||
1273 | 442 | else: | ||
1274 | 443 | virtual_host = self.conf.rabbit_virtual_host | ||
1275 | 444 | |||
1276 | 445 | self.brokers_params = [] | ||
1277 | 446 | if url.hosts: | ||
1278 | 447 | for host in url.hosts: | ||
1279 | 448 | params = { | ||
1280 | 449 | 'hostname': host.hostname, | ||
1281 | 450 | 'port': host.port or 5672, | ||
1282 | 451 | 'userid': host.username or '', | ||
1283 | 452 | 'password': host.password or '', | ||
1284 | 453 | 'login_method': self.conf.rabbit_login_method, | ||
1285 | 454 | 'virtual_host': virtual_host | ||
1286 | 455 | } | ||
1287 | 456 | if self.conf.fake_rabbit: | ||
1288 | 457 | params['transport'] = 'memory' | ||
1289 | 458 | if self.conf.rabbit_use_ssl: | ||
1290 | 459 | params['ssl'] = ssl_params | ||
1291 | 460 | |||
1292 | 461 | self.brokers_params.append(params) | ||
1293 | 462 | else: | ||
1294 | 463 | # Old configuration format | ||
1295 | 464 | for adr in self.conf.rabbit_hosts: | ||
1296 | 465 | hostname, port = netutils.parse_host_port( | ||
1297 | 466 | adr, default_port=self.conf.rabbit_port) | ||
1298 | 467 | |||
1299 | 468 | params = { | ||
1300 | 469 | 'hostname': hostname, | ||
1301 | 470 | 'port': port, | ||
1302 | 471 | 'userid': self.conf.rabbit_userid, | ||
1303 | 472 | 'password': self.conf.rabbit_password, | ||
1304 | 473 | 'login_method': self.conf.rabbit_login_method, | ||
1305 | 474 | 'virtual_host': virtual_host | ||
1306 | 475 | } | ||
1307 | 476 | |||
1308 | 477 | if self.conf.fake_rabbit: | ||
1309 | 478 | params['transport'] = 'memory' | ||
1310 | 479 | if self.conf.rabbit_use_ssl: | ||
1311 | 480 | params['ssl'] = ssl_params | ||
1312 | 481 | |||
1313 | 482 | self.brokers_params.append(params) | ||
1314 | 483 | |||
1315 | 484 | random.shuffle(self.brokers_params) | ||
1316 | 485 | self.brokers = itertools.cycle(self.brokers_params) | ||
1317 | 486 | |||
1318 | 487 | self.memory_transport = self.conf.fake_rabbit | ||
1319 | 488 | |||
1320 | 489 | self.connection = None | ||
1321 | 490 | self.do_consume = None | ||
1322 | 491 | self.reconnect() | ||
1323 | 492 | |||
1324 | 493 | # FIXME(markmc): use oslo sslutils when it is available as a library | ||
1325 | 494 | _SSL_PROTOCOLS = { | ||
1326 | 495 | "tlsv1": ssl.PROTOCOL_TLSv1, | ||
1327 | 496 | "sslv23": ssl.PROTOCOL_SSLv23, | ||
1328 | 497 | "sslv3": ssl.PROTOCOL_SSLv3 | ||
1329 | 498 | } | ||
1330 | 499 | |||
1331 | 500 | try: | ||
1332 | 501 | _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 | ||
1333 | 502 | except AttributeError: | ||
1334 | 503 | pass | ||
1335 | 504 | |||
1336 | 505 | @classmethod | ||
1337 | 506 | def validate_ssl_version(cls, version): | ||
1338 | 507 | key = version.lower() | ||
1339 | 508 | try: | ||
1340 | 509 | return cls._SSL_PROTOCOLS[key] | ||
1341 | 510 | except KeyError: | ||
1342 | 511 | raise RuntimeError(_("Invalid SSL version : %s") % version) | ||
1343 | 512 | |||
1344 | 513 | def _fetch_ssl_params(self): | ||
1345 | 514 | """Handles fetching what ssl params should be used for the connection | ||
1346 | 515 | (if any). | ||
1347 | 516 | """ | ||
1348 | 517 | ssl_params = dict() | ||
1349 | 518 | |||
1350 | 519 | # http://docs.python.org/library/ssl.html - ssl.wrap_socket | ||
1351 | 520 | if self.conf.kombu_ssl_version: | ||
1352 | 521 | ssl_params['ssl_version'] = self.validate_ssl_version( | ||
1353 | 522 | self.conf.kombu_ssl_version) | ||
1354 | 523 | if self.conf.kombu_ssl_keyfile: | ||
1355 | 524 | ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile | ||
1356 | 525 | if self.conf.kombu_ssl_certfile: | ||
1357 | 526 | ssl_params['certfile'] = self.conf.kombu_ssl_certfile | ||
1358 | 527 | if self.conf.kombu_ssl_ca_certs: | ||
1359 | 528 | ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs | ||
1360 | 529 | # We might want to allow variations in the | ||
1361 | 530 | # future with this? | ||
1362 | 531 | ssl_params['cert_reqs'] = ssl.CERT_REQUIRED | ||
1363 | 532 | |||
1364 | 533 | # Return the extended behavior or just have the default behavior | ||
1365 | 534 | return ssl_params or True | ||
1366 | 535 | |||
1367 | 536 | def _connect(self, broker): | ||
1368 | 537 | """Connect to rabbit. Re-establish any queues that may have | ||
1369 | 538 | been declared before if we are reconnecting. Exceptions should | ||
1370 | 539 | be handled by the caller. | ||
1371 | 540 | """ | ||
1372 | 541 | LOG.info(_("Connecting to AMQP server on " | ||
1373 | 542 | "%(hostname)s:%(port)d"), broker) | ||
1374 | 543 | self.connection = kombu.connection.BrokerConnection(**broker) | ||
1375 | 544 | self.connection_errors = self.connection.connection_errors | ||
1376 | 545 | self.channel_errors = self.connection.channel_errors | ||
1377 | 546 | if self.memory_transport: | ||
1378 | 547 | # Kludge to speed up tests. | ||
1379 | 548 | self.connection.transport.polling_interval = 0.0 | ||
1380 | 549 | self.do_consume = True | ||
1381 | 550 | self.consumer_num = itertools.count(1) | ||
1382 | 551 | self.connection.connect() | ||
1383 | 552 | self.channel = self.connection.channel() | ||
1384 | 553 | # work around 'memory' transport bug in 1.1.3 | ||
1385 | 554 | if self.memory_transport: | ||
1386 | 555 | self.channel._new_queue('ae.undeliver') | ||
1387 | 556 | for consumer in self.consumers: | ||
1388 | 557 | consumer.reconnect(self.channel) | ||
1389 | 558 | LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), | ||
1390 | 559 | broker) | ||
1391 | 560 | |||
1392 | 561 | def _disconnect(self): | ||
1393 | 562 | if self.connection: | ||
1394 | 563 | # XXX(nic): when reconnecting to a RabbitMQ cluster | ||
1395 | 564 | # with mirrored queues in use, the attempt to release the | ||
1396 | 565 | # connection can hang "indefinitely" somewhere deep down | ||
1397 | 566 | # in Kombu. Blocking the thread for a bit prior to | ||
1398 | 567 | # release seems to kludge around the problem where it is | ||
1399 | 568 | # otherwise reproduceable. | ||
1400 | 569 | if self.conf.kombu_reconnect_delay > 0: | ||
1401 | 570 | LOG.info(_("Delaying reconnect for %1.1f seconds...") % | ||
1402 | 571 | self.conf.kombu_reconnect_delay) | ||
1403 | 572 | time.sleep(self.conf.kombu_reconnect_delay) | ||
1404 | 573 | |||
1405 | 574 | try: | ||
1406 | 575 | self.connection.release() | ||
1407 | 576 | except self.connection_errors: | ||
1408 | 577 | pass | ||
1409 | 578 | self.connection = None | ||
1410 | 579 | |||
1411 | 580 | def reconnect(self, retry=None): | ||
1412 | 581 | """Handles reconnecting and re-establishing queues. | ||
1413 | 582 | Will retry up to retry number of times. | ||
1414 | 583 | retry = None means use the value of rabbit_max_retries | ||
1415 | 584 | retry = -1 means to retry forever | ||
1416 | 585 | retry = 0 means no retry | ||
1417 | 586 | retry = N means N retries | ||
1418 | 587 | Sleep between tries, starting at self.interval_start | ||
1419 | 588 | seconds, backing off self.interval_stepping number of seconds | ||
1420 | 589 | each attempt. | ||
1421 | 590 | """ | ||
1422 | 591 | |||
1423 | 592 | attempt = 0 | ||
1424 | 593 | loop_forever = False | ||
1425 | 594 | if retry is None: | ||
1426 | 595 | retry = self.max_retries | ||
1427 | 596 | if retry is None or retry < 0: | ||
1428 | 597 | loop_forever = True | ||
1429 | 598 | |||
1430 | 599 | while True: | ||
1431 | 600 | self._disconnect() | ||
1432 | 601 | |||
1433 | 602 | broker = six.next(self.brokers) | ||
1434 | 603 | attempt += 1 | ||
1435 | 604 | try: | ||
1436 | 605 | self._connect(broker) | ||
1437 | 606 | return | ||
1438 | 607 | except IOError as ex: | ||
1439 | 608 | e = ex | ||
1440 | 609 | except self.connection_errors as ex: | ||
1441 | 610 | e = ex | ||
1442 | 611 | except Exception as ex: | ||
1443 | 612 | # NOTE(comstud): Unfortunately it's possible for amqplib | ||
1444 | 613 | # to return an error not covered by its transport | ||
1445 | 614 | # connection_errors in the case of a timeout waiting for | ||
1446 | 615 | # a protocol response. (See paste link in LP888621) | ||
1447 | 616 | # So, we check all exceptions for 'timeout' in them | ||
1448 | 617 | # and try to reconnect in this case. | ||
1449 | 618 | if 'timeout' not in six.text_type(e): | ||
1450 | 619 | raise | ||
1451 | 620 | e = ex | ||
1452 | 621 | |||
1453 | 622 | log_info = {} | ||
1454 | 623 | log_info['err_str'] = e | ||
1455 | 624 | log_info['retry'] = retry or 0 | ||
1456 | 625 | log_info.update(broker) | ||
1457 | 626 | |||
1458 | 627 | if not loop_forever and attempt > retry: | ||
1459 | 628 | msg = _('Unable to connect to AMQP server on ' | ||
1460 | 629 | '%(hostname)s:%(port)d after %(retry)d ' | ||
1461 | 630 | 'tries: %(err_str)s') % log_info | ||
1462 | 631 | LOG.error(msg) | ||
1463 | 632 | raise exceptions.MessageDeliveryFailure(msg) | ||
1464 | 633 | else: | ||
1465 | 634 | if attempt == 1: | ||
1466 | 635 | sleep_time = self.interval_start or 1 | ||
1467 | 636 | elif attempt > 1: | ||
1468 | 637 | sleep_time += self.interval_stepping | ||
1469 | 638 | |||
1470 | 639 | sleep_time = min(sleep_time, self.interval_max) | ||
1471 | 640 | |||
1472 | 641 | log_info['sleep_time'] = sleep_time | ||
1473 | 642 | if 'Socket closed' in six.text_type(e): | ||
1474 | 643 | LOG.error(_('AMQP server %(hostname)s:%(port)d closed' | ||
1475 | 644 | ' the connection. Check login credentials:' | ||
1476 | 645 | ' %(err_str)s'), log_info) | ||
1477 | 646 | else: | ||
1478 | 647 | LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' | ||
1479 | 648 | 'unreachable: %(err_str)s. Trying again in ' | ||
1480 | 649 | '%(sleep_time)d seconds.'), log_info) | ||
1481 | 650 | time.sleep(sleep_time) | ||
1482 | 651 | |||
1483 | 652 | def ensure(self, error_callback, method, retry=None): | ||
1484 | 653 | while True: | ||
1485 | 654 | try: | ||
1486 | 655 | return method() | ||
1487 | 656 | except self.connection_errors as e: | ||
1488 | 657 | if error_callback: | ||
1489 | 658 | error_callback(e) | ||
1490 | 659 | except self.channel_errors as e: | ||
1491 | 660 | if error_callback: | ||
1492 | 661 | error_callback(e) | ||
1493 | 662 | except (socket.timeout, IOError) as e: | ||
1494 | 663 | if error_callback: | ||
1495 | 664 | error_callback(e) | ||
1496 | 665 | except Exception as e: | ||
1497 | 666 | # NOTE(comstud): Unfortunately it's possible for amqplib | ||
1498 | 667 | # to return an error not covered by its transport | ||
1499 | 668 | # connection_errors in the case of a timeout waiting for | ||
1500 | 669 | # a protocol response. (See paste link in LP888621) | ||
1501 | 670 | # So, we check all exceptions for 'timeout' in them | ||
1502 | 671 | # and try to reconnect in this case. | ||
1503 | 672 | if 'timeout' not in six.text_type(e): | ||
1504 | 673 | raise | ||
1505 | 674 | if error_callback: | ||
1506 | 675 | error_callback(e) | ||
1507 | 676 | self.reconnect(retry=retry) | ||
1508 | 677 | |||
1509 | 678 | def get_channel(self): | ||
1510 | 679 | """Convenience call for bin/clear_rabbit_queues.""" | ||
1511 | 680 | return self.channel | ||
1512 | 681 | |||
1513 | 682 | def close(self): | ||
1514 | 683 | """Close/release this connection.""" | ||
1515 | 684 | if self.connection: | ||
1516 | 685 | self.connection.release() | ||
1517 | 686 | self.connection = None | ||
1518 | 687 | |||
1519 | 688 | def reset(self): | ||
1520 | 689 | """Reset a connection so it can be used again.""" | ||
1521 | 690 | self.channel.close() | ||
1522 | 691 | self.channel = self.connection.channel() | ||
1523 | 692 | # work around 'memory' transport bug in 1.1.3 | ||
1524 | 693 | if self.memory_transport: | ||
1525 | 694 | self.channel._new_queue('ae.undeliver') | ||
1526 | 695 | self.consumers = [] | ||
1527 | 696 | |||
1528 | 697 | def declare_consumer(self, consumer_cls, topic, callback): | ||
1529 | 698 | """Create a Consumer using the class that was passed in and | ||
1530 | 699 | add it to our list of consumers | ||
1531 | 700 | """ | ||
1532 | 701 | |||
1533 | 702 | def _connect_error(exc): | ||
1534 | 703 | log_info = {'topic': topic, 'err_str': exc} | ||
1535 | 704 | LOG.error(_("Failed to declare consumer for topic '%(topic)s': " | ||
1536 | 705 | "%(err_str)s"), log_info) | ||
1537 | 706 | |||
1538 | 707 | def _declare_consumer(): | ||
1539 | 708 | consumer = consumer_cls(self.conf, self.channel, topic, callback, | ||
1540 | 709 | six.next(self.consumer_num)) | ||
1541 | 710 | self.consumers.append(consumer) | ||
1542 | 711 | return consumer | ||
1543 | 712 | |||
1544 | 713 | return self.ensure(_connect_error, _declare_consumer) | ||
1545 | 714 | |||
1546 | 715 | def iterconsume(self, limit=None, timeout=None): | ||
1547 | 716 | """Return an iterator that will consume from all queues/consumers.""" | ||
1548 | 717 | |||
1549 | 718 | def _error_callback(exc): | ||
1550 | 719 | if isinstance(exc, socket.timeout): | ||
1551 | 720 | LOG.debug('Timed out waiting for RPC response: %s', exc) | ||
1552 | 721 | raise rpc_common.Timeout() | ||
1553 | 722 | else: | ||
1554 | 723 | LOG.exception(_('Failed to consume message from queue: %s'), | ||
1555 | 724 | exc) | ||
1556 | 725 | self.do_consume = True | ||
1557 | 726 | |||
1558 | 727 | def _consume(): | ||
1559 | 728 | if self.do_consume: | ||
1560 | 729 | queues_head = self.consumers[:-1] # not fanout. | ||
1561 | 730 | queues_tail = self.consumers[-1] # fanout | ||
1562 | 731 | for queue in queues_head: | ||
1563 | 732 | queue.consume(nowait=True) | ||
1564 | 733 | queues_tail.consume(nowait=False) | ||
1565 | 734 | self.do_consume = False | ||
1566 | 735 | return self.connection.drain_events(timeout=timeout) | ||
1567 | 736 | |||
1568 | 737 | for iteration in itertools.count(0): | ||
1569 | 738 | if limit and iteration >= limit: | ||
1570 | 739 | raise StopIteration | ||
1571 | 740 | yield self.ensure(_error_callback, _consume) | ||
1572 | 741 | |||
1573 | 742 | def publisher_send(self, cls, topic, msg, timeout=None, retry=None, | ||
1574 | 743 | **kwargs): | ||
1575 | 744 | """Send to a publisher based on the publisher class.""" | ||
1576 | 745 | |||
1577 | 746 | def _error_callback(exc): | ||
1578 | 747 | log_info = {'topic': topic, 'err_str': exc} | ||
1579 | 748 | LOG.exception(_("Failed to publish message to topic " | ||
1580 | 749 | "'%(topic)s': %(err_str)s"), log_info) | ||
1581 | 750 | |||
1582 | 751 | def _publish(): | ||
1583 | 752 | publisher = cls(self.conf, self.channel, topic=topic, **kwargs) | ||
1584 | 753 | publisher.send(msg, timeout) | ||
1585 | 754 | |||
1586 | 755 | self.ensure(_error_callback, _publish, retry=retry) | ||
1587 | 756 | |||
1588 | 757 | def declare_direct_consumer(self, topic, callback): | ||
1589 | 758 | """Create a 'direct' queue. | ||
1590 | 759 | In nova's use, this is generally a msg_id queue used for | ||
1591 | 760 | responses for call/multicall | ||
1592 | 761 | """ | ||
1593 | 762 | self.declare_consumer(DirectConsumer, topic, callback) | ||
1594 | 763 | |||
1595 | 764 | def declare_topic_consumer(self, exchange_name, topic, callback=None, | ||
1596 | 765 | queue_name=None): | ||
1597 | 766 | """Create a 'topic' consumer.""" | ||
1598 | 767 | self.declare_consumer(functools.partial(TopicConsumer, | ||
1599 | 768 | name=queue_name, | ||
1600 | 769 | exchange_name=exchange_name, | ||
1601 | 770 | ), | ||
1602 | 771 | topic, callback) | ||
1603 | 772 | |||
1604 | 773 | def declare_fanout_consumer(self, topic, callback): | ||
1605 | 774 | """Create a 'fanout' consumer.""" | ||
1606 | 775 | self.declare_consumer(FanoutConsumer, topic, callback) | ||
1607 | 776 | |||
1608 | 777 | def direct_send(self, msg_id, msg): | ||
1609 | 778 | """Send a 'direct' message.""" | ||
1610 | 779 | self.publisher_send(DirectPublisher, msg_id, msg) | ||
1611 | 780 | |||
1612 | 781 | def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): | ||
1613 | 782 | """Send a 'topic' message.""" | ||
1614 | 783 | self.publisher_send(TopicPublisher, topic, msg, timeout, | ||
1615 | 784 | exchange_name=exchange_name, retry=retry) | ||
1616 | 785 | |||
1617 | 786 | def fanout_send(self, topic, msg, retry=None): | ||
1618 | 787 | """Send a 'fanout' message.""" | ||
1619 | 788 | self.publisher_send(FanoutPublisher, topic, msg, retry=retry) | ||
1620 | 789 | |||
1621 | 790 | def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): | ||
1622 | 791 | """Send a notify message on a topic.""" | ||
1623 | 792 | self.publisher_send(NotifyPublisher, topic, msg, timeout=None, | ||
1624 | 793 | exchange_name=exchange_name, retry=retry, **kwargs) | ||
1625 | 794 | |||
1626 | 795 | def consume(self, limit=None, timeout=None): | ||
1627 | 796 | """Consume from all queues/consumers.""" | ||
1628 | 797 | it = self.iterconsume(limit=limit, timeout=timeout) | ||
1629 | 798 | while True: | ||
1630 | 799 | try: | ||
1631 | 800 | six.next(it) | ||
1632 | 801 | except StopIteration: | ||
1633 | 802 | return | ||
1634 | 803 | |||
1635 | 804 | |||
1636 | 805 | class RabbitDriver(amqpdriver.AMQPDriverBase): | ||
1637 | 806 | |||
1638 | 807 | def __init__(self, conf, url, | ||
1639 | 808 | default_exchange=None, | ||
1640 | 809 | allowed_remote_exmods=None): | ||
1641 | 810 | conf.register_opts(rabbit_opts) | ||
1642 | 811 | conf.register_opts(rpc_amqp.amqp_opts) | ||
1643 | 812 | |||
1644 | 813 | connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection) | ||
1645 | 814 | |||
1646 | 815 | super(RabbitDriver, self).__init__(conf, url, | ||
1647 | 816 | connection_pool, | ||
1648 | 817 | default_exchange, | ||
1649 | 818 | allowed_remote_exmods) | ||
1650 | 819 | |||
1651 | 820 | def require_features(self, requeue=True): | ||
1652 | 821 | pass | ||
1653 | 0 | 822 | ||
1654 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch' | |||
1655 | === added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/.timestamp' | |||
1656 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo' | |||
1657 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging' | |||
1658 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers' | |||
1659 | === added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py' | |||
1660 | --- .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000 | |||
1661 | +++ .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py 2015-12-17 11:03:34 +0000 | |||
1662 | @@ -0,0 +1,374 @@ | |||
1663 | 1 | # Copyright 2010 United States Government as represented by the | ||
1664 | 2 | # Administrator of the National Aeronautics and Space Administration. | ||
1665 | 3 | # All Rights Reserved. | ||
1666 | 4 | # Copyright 2011 Red Hat, Inc. | ||
1667 | 5 | # | ||
1668 | 6 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
1669 | 7 | # not use this file except in compliance with the License. You may obtain | ||
1670 | 8 | # a copy of the License at | ||
1671 | 9 | # | ||
1672 | 10 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
1673 | 11 | # | ||
1674 | 12 | # Unless required by applicable law or agreed to in writing, software | ||
1675 | 13 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
1676 | 14 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
1677 | 15 | # License for the specific language governing permissions and limitations | ||
1678 | 16 | # under the License. | ||
1679 | 17 | |||
1680 | 18 | import copy | ||
1681 | 19 | import logging | ||
1682 | 20 | import sys | ||
1683 | 21 | import time | ||
1684 | 22 | import traceback | ||
1685 | 23 | |||
1686 | 24 | import six | ||
1687 | 25 | |||
1688 | 26 | from oslo import messaging | ||
1689 | 27 | from oslo.messaging import _utils as utils | ||
1690 | 28 | from oslo.messaging.openstack.common.gettextutils import _ | ||
1691 | 29 | from oslo.messaging.openstack.common import jsonutils | ||
1692 | 30 | |||
1693 | 31 | LOG = logging.getLogger(__name__) | ||
1694 | 32 | |||
1695 | 33 | _EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' | ||
1696 | 34 | |||
1697 | 35 | |||
1698 | 36 | '''RPC Envelope Version. | ||
1699 | 37 | |||
1700 | 38 | This version number applies to the top level structure of messages sent out. | ||
1701 | 39 | It does *not* apply to the message payload, which must be versioned | ||
1702 | 40 | independently. For example, when using rpc APIs, a version number is applied | ||
1703 | 41 | for changes to the API being exposed over rpc. This version number is handled | ||
1704 | 42 | in the rpc proxy and dispatcher modules. | ||
1705 | 43 | |||
1706 | 44 | This version number applies to the message envelope that is used in the | ||
1707 | 45 | serialization done inside the rpc layer. See serialize_msg() and | ||
1708 | 46 | deserialize_msg(). | ||
1709 | 47 | |||
1710 | 48 | The current message format (version 2.0) is very simple. It is: | ||
1711 | 49 | |||
1712 | 50 | { | ||
1713 | 51 | 'oslo.version': <RPC Envelope Version as a String>, | ||
1714 | 52 | 'oslo.message': <Application Message Payload, JSON encoded> | ||
1715 | 53 | } | ||
1716 | 54 | |||
1717 | 55 | Message format version '1.0' is just considered to be the messages we sent | ||
1718 | 56 | without a message envelope. | ||
1719 | 57 | |||
1720 | 58 | So, the current message envelope just includes the envelope version. It may | ||
1721 | 59 | eventually contain additional information, such as a signature for the message | ||
1722 | 60 | payload. | ||
1723 | 61 | |||
1724 | 62 | We will JSON encode the application message payload. The message envelope, | ||
1725 | 63 | which includes the JSON encoded application message body, will be passed down | ||
1726 | 64 | to the messaging libraries as a dict. | ||
1727 | 65 | ''' | ||
1728 | 66 | _RPC_ENVELOPE_VERSION = '2.0' | ||
1729 | 67 | |||
1730 | 68 | _VERSION_KEY = 'oslo.version' | ||
1731 | 69 | _MESSAGE_KEY = 'oslo.message' | ||
1732 | 70 | |||
1733 | 71 | _REMOTE_POSTFIX = '_Remote' | ||
1734 | 72 | |||
1735 | 73 | |||
1736 | 74 | class RPCException(Exception): | ||
1737 | 75 | msg_fmt = _("An unknown RPC related exception occurred.") | ||
1738 | 76 | |||
1739 | 77 | def __init__(self, message=None, **kwargs): | ||
1740 | 78 | self.kwargs = kwargs | ||
1741 | 79 | |||
1742 | 80 | if not message: | ||
1743 | 81 | try: | ||
1744 | 82 | message = self.msg_fmt % kwargs | ||
1745 | 83 | |||
1746 | 84 | except Exception: | ||
1747 | 85 | # kwargs doesn't match a variable in the message | ||
1748 | 86 | # log the issue and the kwargs | ||
1749 | 87 | LOG.exception(_('Exception in string format operation')) | ||
1750 | 88 | for name, value in six.iteritems(kwargs): | ||
1751 | 89 | LOG.error("%s: %s", name, value) | ||
1752 | 90 | # at least get the core message out if something happened | ||
1753 | 91 | message = self.msg_fmt | ||
1754 | 92 | |||
1755 | 93 | super(RPCException, self).__init__(message) | ||
1756 | 94 | |||
1757 | 95 | |||
1758 | 96 | class Timeout(RPCException): | ||
1759 | 97 | """Signifies that a timeout has occurred. | ||
1760 | 98 | |||
1761 | 99 | This exception is raised if the rpc_response_timeout is reached while | ||
1762 | 100 | waiting for a response from the remote side. | ||
1763 | 101 | """ | ||
1764 | 102 | msg_fmt = _('Timeout while waiting on RPC response - ' | ||
1765 | 103 | 'topic: "%(topic)s", RPC method: "%(method)s" ' | ||
1766 | 104 | 'info: "%(info)s"') | ||
1767 | 105 | |||
1768 | 106 | def __init__(self, info=None, topic=None, method=None): | ||
1769 | 107 | """Initiates Timeout object. | ||
1770 | 108 | |||
1771 | 109 | :param info: Extra info to convey to the user | ||
1772 | 110 | :param topic: The topic that the rpc call was sent to | ||
1773 | 111 | :param rpc_method_name: The name of the rpc method being | ||
1774 | 112 | called | ||
1775 | 113 | """ | ||
1776 | 114 | self.info = info | ||
1777 | 115 | self.topic = topic | ||
1778 | 116 | self.method = method | ||
1779 | 117 | super(Timeout, self).__init__( | ||
1780 | 118 | None, | ||
1781 | 119 | info=info or _('<unknown>'), | ||
1782 | 120 | topic=topic or _('<unknown>'), | ||
1783 | 121 | method=method or _('<unknown>')) | ||
1784 | 122 | |||
1785 | 123 | |||
1786 | 124 | class DuplicateMessageError(RPCException): | ||
1787 | 125 | msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.") | ||
1788 | 126 | |||
1789 | 127 | |||
1790 | 128 | class InvalidRPCConnectionReuse(RPCException): | ||
1791 | 129 | msg_fmt = _("Invalid reuse of an RPC connection.") | ||
1792 | 130 | |||
1793 | 131 | |||
1794 | 132 | class UnsupportedRpcVersion(RPCException): | ||
1795 | 133 | msg_fmt = _("Specified RPC version, %(version)s, not supported by " | ||
1796 | 134 | "this endpoint.") | ||
1797 | 135 | |||
1798 | 136 | |||
1799 | 137 | class UnsupportedRpcEnvelopeVersion(RPCException): | ||
1800 | 138 | msg_fmt = _("Specified RPC envelope version, %(version)s, " | ||
1801 | 139 | "not supported by this endpoint.") | ||
1802 | 140 | |||
1803 | 141 | |||
1804 | 142 | class RpcVersionCapError(RPCException): | ||
1805 | 143 | msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") | ||
1806 | 144 | |||
1807 | 145 | |||
1808 | 146 | class Connection(object): | ||
1809 | 147 | """A connection, returned by rpc.create_connection(). | ||
1810 | 148 | |||
1811 | 149 | This class represents a connection to the message bus used for rpc. | ||
1812 | 150 | An instance of this class should never be created by users of the rpc API. | ||
1813 | 151 | Use rpc.create_connection() instead. | ||
1814 | 152 | """ | ||
1815 | 153 | def close(self): | ||
1816 | 154 | """Close the connection. | ||
1817 | 155 | |||
1818 | 156 | This method must be called when the connection will no longer be used. | ||
1819 | 157 | It will ensure that any resources associated with the connection, such | ||
1820 | 158 | as a network connection, and cleaned up. | ||
1821 | 159 | """ | ||
1822 | 160 | raise NotImplementedError() | ||
1823 | 161 | |||
1824 | 162 | |||
1825 | 163 | def _safe_log(log_func, msg, msg_data): | ||
1826 | 164 | """Sanitizes the msg_data field before logging.""" | ||
1827 | 165 | SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass'] | ||
1828 | 166 | |||
1829 | 167 | def _fix_passwords(d): | ||
1830 | 168 | """Sanitizes the password fields in the dictionary.""" | ||
1831 | 169 | for k in six.iterkeys(d): | ||
1832 | 170 | if k.lower().find('password') != -1: | ||
1833 | 171 | d[k] = '<SANITIZED>' | ||
1834 | 172 | elif k.lower() in SANITIZE: | ||
1835 | 173 | d[k] = '<SANITIZED>' | ||
1836 | 174 | elif isinstance(d[k], dict): | ||
1837 | 175 | _fix_passwords(d[k]) | ||
1838 | 176 | return d | ||
1839 | 177 | |||
1840 | 178 | return log_func(msg, _fix_passwords(copy.deepcopy(msg_data))) | ||
1841 | 179 | |||
1842 | 180 | |||
1843 | 181 | def serialize_remote_exception(failure_info, log_failure=True): | ||
1844 | 182 | """Prepares exception data to be sent over rpc. | ||
1845 | 183 | |||
1846 | 184 | Failure_info should be a sys.exc_info() tuple. | ||
1847 | 185 | |||
1848 | 186 | """ | ||
1849 | 187 | tb = traceback.format_exception(*failure_info) | ||
1850 | 188 | failure = failure_info[1] | ||
1851 | 189 | if log_failure: | ||
1852 | 190 | LOG.error(_("Returning exception %s to caller"), | ||
1853 | 191 | six.text_type(failure)) | ||
1854 | 192 | LOG.error(tb) | ||
1855 | 193 | |||
1856 | 194 | kwargs = {} | ||
1857 | 195 | if hasattr(failure, 'kwargs'): | ||
1858 | 196 | kwargs = failure.kwargs | ||
1859 | 197 | |||
1860 | 198 | # NOTE(matiu): With cells, it's possible to re-raise remote, remote | ||
1861 | 199 | # exceptions. Lets turn it back into the original exception type. | ||
1862 | 200 | cls_name = six.text_type(failure.__class__.__name__) | ||
1863 | 201 | mod_name = six.text_type(failure.__class__.__module__) | ||
1864 | 202 | if (cls_name.endswith(_REMOTE_POSTFIX) and | ||
1865 | 203 | mod_name.endswith(_REMOTE_POSTFIX)): | ||
1866 | 204 | cls_name = cls_name[:-len(_REMOTE_POSTFIX)] | ||
1867 | 205 | mod_name = mod_name[:-len(_REMOTE_POSTFIX)] | ||
1868 | 206 | |||
1869 | 207 | data = { | ||
1870 | 208 | 'class': cls_name, | ||
1871 | 209 | 'module': mod_name, | ||
1872 | 210 | 'message': six.text_type(failure), | ||
1873 | 211 | 'tb': tb, | ||
1874 | 212 | 'args': failure.args, | ||
1875 | 213 | 'kwargs': kwargs | ||
1876 | 214 | } | ||
1877 | 215 | |||
1878 | 216 | json_data = jsonutils.dumps(data) | ||
1879 | 217 | |||
1880 | 218 | return json_data | ||
1881 | 219 | |||
1882 | 220 | |||
1883 | 221 | def deserialize_remote_exception(data, allowed_remote_exmods): | ||
1884 | 222 | failure = jsonutils.loads(six.text_type(data)) | ||
1885 | 223 | |||
1886 | 224 | trace = failure.get('tb', []) | ||
1887 | 225 | message = failure.get('message', "") + "\n" + "\n".join(trace) | ||
1888 | 226 | name = failure.get('class') | ||
1889 | 227 | module = failure.get('module') | ||
1890 | 228 | |||
1891 | 229 | # NOTE(ameade): We DO NOT want to allow just any module to be imported, in | ||
1892 | 230 | # order to prevent arbitrary code execution. | ||
1893 | 231 | if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods: | ||
1894 | 232 | return messaging.RemoteError(name, failure.get('message'), trace) | ||
1895 | 233 | |||
1896 | 234 | try: | ||
1897 | 235 | __import__(module) | ||
1898 | 236 | mod = sys.modules[module] | ||
1899 | 237 | klass = getattr(mod, name) | ||
1900 | 238 | if not issubclass(klass, Exception): | ||
1901 | 239 | raise TypeError("Can only deserialize Exceptions") | ||
1902 | 240 | |||
1903 | 241 | failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) | ||
1904 | 242 | except (AttributeError, TypeError, ImportError): | ||
1905 | 243 | return messaging.RemoteError(name, failure.get('message'), trace) | ||
1906 | 244 | |||
1907 | 245 | ex_type = type(failure) | ||
1908 | 246 | str_override = lambda self: message | ||
1909 | 247 | new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,), | ||
1910 | 248 | {'__str__': str_override, '__unicode__': str_override}) | ||
1911 | 249 | new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX) | ||
1912 | 250 | try: | ||
1913 | 251 | # NOTE(ameade): Dynamically create a new exception type and swap it in | ||
1914 | 252 | # as the new type for the exception. This only works on user defined | ||
1915 | 253 | # Exceptions and not core Python exceptions. This is important because | ||
1916 | 254 | # we cannot necessarily change an exception message so we must override | ||
1917 | 255 | # the __str__ method. | ||
1918 | 256 | failure.__class__ = new_ex_type | ||
1919 | 257 | except TypeError: | ||
1920 | 258 | # NOTE(ameade): If a core exception then just add the traceback to the | ||
1921 | 259 | # first exception argument. | ||
1922 | 260 | failure.args = (message,) + failure.args[1:] | ||
1923 | 261 | return failure | ||
1924 | 262 | |||
1925 | 263 | |||
1926 | 264 | class CommonRpcContext(object): | ||
1927 | 265 | def __init__(self, **kwargs): | ||
1928 | 266 | self.values = kwargs | ||
1929 | 267 | |||
1930 | 268 | def __getattr__(self, key): | ||
1931 | 269 | try: | ||
1932 | 270 | return self.values[key] | ||
1933 | 271 | except KeyError: | ||
1934 | 272 | raise AttributeError(key) | ||
1935 | 273 | |||
1936 | 274 | def to_dict(self): | ||
1937 | 275 | return copy.deepcopy(self.values) | ||
1938 | 276 | |||
1939 | 277 | @classmethod | ||
1940 | 278 | def from_dict(cls, values): | ||
1941 | 279 | return cls(**values) | ||
1942 | 280 | |||
1943 | 281 | def deepcopy(self): | ||
1944 | 282 | return self.from_dict(self.to_dict()) | ||
1945 | 283 | |||
1946 | 284 | def update_store(self): | ||
1947 | 285 | # local.store.context = self | ||
1948 | 286 | pass | ||
1949 | 287 | |||
1950 | 288 | |||
1951 | 289 | class ClientException(Exception): | ||
1952 | 290 | """Encapsulates actual exception expected to be hit by a RPC proxy object. | ||
1953 | 291 | |||
1954 | 292 | Merely instantiating it records the current exception information, which | ||
1955 | 293 | will be passed back to the RPC client without exceptional logging. | ||
1956 | 294 | """ | ||
1957 | 295 | def __init__(self): | ||
1958 | 296 | self._exc_info = sys.exc_info() | ||
1959 | 297 | |||
1960 | 298 | |||
1961 | 299 | def serialize_msg(raw_msg): | ||
1962 | 300 | # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more | ||
1963 | 301 | # information about this format. | ||
1964 | 302 | msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, | ||
1965 | 303 | _MESSAGE_KEY: jsonutils.dumps(raw_msg)} | ||
1966 | 304 | |||
1967 | 305 | return msg | ||
1968 | 306 | |||
1969 | 307 | |||
1970 | 308 | def deserialize_msg(msg): | ||
1971 | 309 | # NOTE(russellb): Hang on to your hats, this road is about to | ||
1972 | 310 | # get a little bumpy. | ||
1973 | 311 | # | ||
1974 | 312 | # Robustness Principle: | ||
1975 | 313 | # "Be strict in what you send, liberal in what you accept." | ||
1976 | 314 | # | ||
1977 | 315 | # At this point we have to do a bit of guessing about what it | ||
1978 | 316 | # is we just received. Here is the set of possibilities: | ||
1979 | 317 | # | ||
1980 | 318 | # 1) We received a dict. This could be 2 things: | ||
1981 | 319 | # | ||
1982 | 320 | # a) Inspect it to see if it looks like a standard message envelope. | ||
1983 | 321 | # If so, great! | ||
1984 | 322 | # | ||
1985 | 323 | # b) If it doesn't look like a standard message envelope, it could either | ||
1986 | 324 | # be a notification, or a message from before we added a message | ||
1987 | 325 | # envelope (referred to as version 1.0). | ||
1988 | 326 | # Just return the message as-is. | ||
1989 | 327 | # | ||
1990 | 328 | # 2) It's any other non-dict type. Just return it and hope for the best. | ||
1991 | 329 | # This case covers return values from rpc.call() from before message | ||
1992 | 330 | # envelopes were used. (messages to call a method were always a dict) | ||
1993 | 331 | |||
1994 | 332 | if not isinstance(msg, dict): | ||
1995 | 333 | # See #2 above. | ||
1996 | 334 | return msg | ||
1997 | 335 | |||
1998 | 336 | base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) | ||
1999 | 337 | if not all(map(lambda key: key in msg, base_envelope_keys)): | ||
2000 | 338 | # See #1.b above. | ||
2001 | 339 | return msg | ||
2002 | 340 | |||
2003 | 341 | # At this point we think we have the message envelope | ||
2004 | 342 | # format we were expecting. (#1.a above) | ||
2005 | 343 | |||
2006 | 344 | if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION, | ||
2007 | 345 | msg[_VERSION_KEY]): | ||
2008 | 346 | raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) | ||
2009 | 347 | |||
2010 | 348 | raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) | ||
2011 | 349 | |||
2012 | 350 | return raw_msg | ||
2013 | 351 | |||
2014 | 352 | |||
2015 | 353 | class DecayingTimer(object): | ||
2016 | 354 | def __init__(self, duration=None): | ||
2017 | 355 | self._duration = duration | ||
2018 | 356 | self._ends_at = None | ||
2019 | 357 | |||
2020 | 358 | def start(self): | ||
2021 | 359 | if self._duration is not None: | ||
2022 | 360 | self._ends_at = time.time() + max(0, self._duration) | ||
2023 | 361 | |||
2024 | 362 | def check_return(self, timeout_callback, *args, **kwargs): | ||
2025 | 363 | if self._duration is None: | ||
2026 | 364 | return None | ||
2027 | 365 | if self._ends_at is None: | ||
2028 | 366 | raise RuntimeError(_("Can not check/return a timeout from a timer" | ||
2029 | 367 | " that has not been started.")) | ||
2030 | 368 | |||
2031 | 369 | maximum = kwargs.pop('maximum', None) | ||
2032 | 370 | left = self._ends_at - time.time() | ||
2033 | 371 | if left <= 0: | ||
2034 | 372 | timeout_callback(*args, **kwargs) | ||
2035 | 373 | |||
2036 | 374 | return left if maximum is None else min(left, maximum) | ||
2037 | 0 | 375 | ||
2038 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests' | |||
2039 | === added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py' | |||
2040 | --- .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py 1970-01-01 00:00:00 +0000 | |||
2041 | +++ .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py 2015-12-17 11:03:34 +0000 | |||
2042 | @@ -0,0 +1,49 @@ | |||
2043 | 1 | |||
2044 | 2 | # Copyright 2013 Red Hat, Inc. | ||
2045 | 3 | # | ||
2046 | 4 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
2047 | 5 | # not use this file except in compliance with the License. You may obtain | ||
2048 | 6 | # a copy of the License at | ||
2049 | 7 | # | ||
2050 | 8 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
2051 | 9 | # | ||
2052 | 10 | # Unless required by applicable law or agreed to in writing, software | ||
2053 | 11 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
2054 | 12 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
2055 | 13 | # License for the specific language governing permissions and limitations | ||
2056 | 14 | # under the License. | ||
2057 | 15 | |||
2058 | 16 | from oslo.messaging import _utils as utils | ||
2059 | 17 | from tests import utils as test_utils | ||
2060 | 18 | |||
2061 | 19 | |||
2062 | 20 | class VersionIsCompatibleTestCase(test_utils.BaseTestCase): | ||
2063 | 21 | def test_version_is_compatible_same(self): | ||
2064 | 22 | self.assertTrue(utils.version_is_compatible('1.23', '1.23')) | ||
2065 | 23 | |||
2066 | 24 | def test_version_is_compatible_newer_minor(self): | ||
2067 | 25 | self.assertTrue(utils.version_is_compatible('1.24', '1.23')) | ||
2068 | 26 | |||
2069 | 27 | def test_version_is_compatible_older_minor(self): | ||
2070 | 28 | self.assertFalse(utils.version_is_compatible('1.22', '1.23')) | ||
2071 | 29 | |||
2072 | 30 | def test_version_is_compatible_major_difference1(self): | ||
2073 | 31 | self.assertFalse(utils.version_is_compatible('2.23', '1.23')) | ||
2074 | 32 | |||
2075 | 33 | def test_version_is_compatible_major_difference2(self): | ||
2076 | 34 | self.assertFalse(utils.version_is_compatible('1.23', '2.23')) | ||
2077 | 35 | |||
2078 | 36 | def test_version_is_compatible_newer_rev(self): | ||
2079 | 37 | self.assertFalse(utils.version_is_compatible('1.23', '1.23.1')) | ||
2080 | 38 | |||
2081 | 39 | def test_version_is_compatible_newer_rev_both(self): | ||
2082 | 40 | self.assertFalse(utils.version_is_compatible('1.23.1', '1.23.2')) | ||
2083 | 41 | |||
2084 | 42 | def test_version_is_compatible_older_rev_both(self): | ||
2085 | 43 | self.assertTrue(utils.version_is_compatible('1.23.2', '1.23.1')) | ||
2086 | 44 | |||
2087 | 45 | def test_version_is_compatible_older_rev(self): | ||
2088 | 46 | self.assertTrue(utils.version_is_compatible('1.24', '1.23.1')) | ||
2089 | 47 | |||
2090 | 48 | def test_version_is_compatible_no_rev_is_zero(self): | ||
2091 | 49 | self.assertTrue(utils.version_is_compatible('1.23.0', '1.23')) | ||
2092 | 0 | 50 | ||
2093 | === added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch' | |||
2094 | === added file '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/.timestamp' | |||
2095 | === added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo' | |||
2096 | === added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging' | |||
2097 | === added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers' | |||
2098 | === added file '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py' | |||
2099 | --- .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py 1970-01-01 00:00:00 +0000 | |||
2100 | +++ .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py 2015-12-17 11:03:34 +0000 | |||
2101 | @@ -0,0 +1,729 @@ | |||
2102 | 1 | # Copyright 2011 OpenStack Foundation | ||
2103 | 2 | # Copyright 2011 - 2012, Red Hat, Inc. | ||
2104 | 3 | # | ||
2105 | 4 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
2106 | 5 | # not use this file except in compliance with the License. You may obtain | ||
2107 | 6 | # a copy of the License at | ||
2108 | 7 | # | ||
2109 | 8 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
2110 | 9 | # | ||
2111 | 10 | # Unless required by applicable law or agreed to in writing, software | ||
2112 | 11 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
2113 | 12 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
2114 | 13 | # License for the specific language governing permissions and limitations | ||
2115 | 14 | # under the License. | ||
2116 | 15 | |||
2117 | 16 | import functools | ||
2118 | 17 | import itertools | ||
2119 | 18 | import logging | ||
2120 | 19 | import random | ||
2121 | 20 | import time | ||
2122 | 21 | |||
2123 | 22 | import six | ||
2124 | 23 | |||
2125 | 24 | from oslo.config import cfg | ||
2126 | 25 | from oslo.messaging._drivers import amqp as rpc_amqp | ||
2127 | 26 | from oslo.messaging._drivers import amqpdriver | ||
2128 | 27 | from oslo.messaging._drivers import common as rpc_common | ||
2129 | 28 | from oslo.messaging import exceptions | ||
2130 | 29 | from oslo.messaging.openstack.common.gettextutils import _ | ||
2131 | 30 | from oslo.messaging.openstack.common import jsonutils | ||
2132 | 31 | from oslo.utils import importutils | ||
2133 | 32 | from oslo.utils import netutils | ||
2134 | 33 | |||
2135 | 34 | qpid_codec = importutils.try_import("qpid.codec010") | ||
2136 | 35 | qpid_messaging = importutils.try_import("qpid.messaging") | ||
2137 | 36 | qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") | ||
2138 | 37 | |||
2139 | 38 | LOG = logging.getLogger(__name__) | ||
2140 | 39 | |||
2141 | 40 | qpid_opts = [ | ||
2142 | 41 | cfg.StrOpt('qpid_hostname', | ||
2143 | 42 | default='localhost', | ||
2144 | 43 | help='Qpid broker hostname.'), | ||
2145 | 44 | cfg.IntOpt('qpid_port', | ||
2146 | 45 | default=5672, | ||
2147 | 46 | help='Qpid broker port.'), | ||
2148 | 47 | cfg.ListOpt('qpid_hosts', | ||
2149 | 48 | default=['$qpid_hostname:$qpid_port'], | ||
2150 | 49 | help='Qpid HA cluster host:port pairs.'), | ||
2151 | 50 | cfg.StrOpt('qpid_username', | ||
2152 | 51 | default='', | ||
2153 | 52 | help='Username for Qpid connection.'), | ||
2154 | 53 | cfg.StrOpt('qpid_password', | ||
2155 | 54 | default='', | ||
2156 | 55 | help='Password for Qpid connection.', | ||
2157 | 56 | secret=True), | ||
2158 | 57 | cfg.StrOpt('qpid_sasl_mechanisms', | ||
2159 | 58 | default='', | ||
2160 | 59 | help='Space separated list of SASL mechanisms to use for ' | ||
2161 | 60 | 'auth.'), | ||
2162 | 61 | cfg.IntOpt('qpid_heartbeat', | ||
2163 | 62 | default=60, | ||
2164 | 63 | help='Seconds between connection keepalive heartbeats.'), | ||
2165 | 64 | cfg.StrOpt('qpid_protocol', | ||
2166 | 65 | default='tcp', | ||
2167 | 66 | help="Transport to use, either 'tcp' or 'ssl'."), | ||
2168 | 67 | cfg.BoolOpt('qpid_tcp_nodelay', | ||
2169 | 68 | default=True, | ||
2170 | 69 | help='Whether to disable the Nagle algorithm.'), | ||
2171 | 70 | cfg.IntOpt('qpid_receiver_capacity', | ||
2172 | 71 | default=1, | ||
2173 | 72 | help='The number of prefetched messages held by receiver.'), | ||
2174 | 73 | # NOTE(russellb) If any additional versions are added (beyond 1 and 2), | ||
2175 | 74 | # this file could probably use some additional refactoring so that the | ||
2176 | 75 | # differences between each version are split into different classes. | ||
2177 | 76 | cfg.IntOpt('qpid_topology_version', | ||
2178 | 77 | default=1, | ||
2179 | 78 | help="The qpid topology version to use. Version 1 is what " | ||
2180 | 79 | "was originally used by impl_qpid. Version 2 includes " | ||
2181 | 80 | "some backwards-incompatible changes that allow broker " | ||
2182 | 81 | "federation to work. Users should update to version 2 " | ||
2183 | 82 | "when they are able to take everything down, as it " | ||
2184 | 83 | "requires a clean break."), | ||
2185 | 84 | ] | ||
2186 | 85 | |||
2187 | 86 | JSON_CONTENT_TYPE = 'application/json; charset=utf8' | ||
2188 | 87 | |||
2189 | 88 | |||
2190 | 89 | def raise_invalid_topology_version(conf): | ||
2191 | 90 | msg = (_("Invalid value for qpid_topology_version: %d") % | ||
2192 | 91 | conf.qpid_topology_version) | ||
2193 | 92 | LOG.error(msg) | ||
2194 | 93 | raise Exception(msg) | ||
2195 | 94 | |||
2196 | 95 | |||
2197 | 96 | class QpidMessage(dict): | ||
2198 | 97 | def __init__(self, session, raw_message): | ||
2199 | 98 | super(QpidMessage, self).__init__( | ||
2200 | 99 | rpc_common.deserialize_msg(raw_message.content)) | ||
2201 | 100 | self._raw_message = raw_message | ||
2202 | 101 | self._session = session | ||
2203 | 102 | |||
2204 | 103 | def acknowledge(self): | ||
2205 | 104 | self._session.acknowledge(self._raw_message) | ||
2206 | 105 | |||
2207 | 106 | def requeue(self): | ||
2208 | 107 | pass | ||
2209 | 108 | |||
2210 | 109 | |||
2211 | 110 | class ConsumerBase(object): | ||
2212 | 111 | """Consumer base class.""" | ||
2213 | 112 | |||
2214 | 113 | def __init__(self, conf, session, callback, node_name, node_opts, | ||
2215 | 114 | link_name, link_opts): | ||
2216 | 115 | """Declare a queue on an amqp session. | ||
2217 | 116 | |||
2218 | 117 | 'session' is the amqp session to use | ||
2219 | 118 | 'callback' is the callback to call when messages are received | ||
2220 | 119 | 'node_name' is the first part of the Qpid address string, before ';' | ||
2221 | 120 | 'node_opts' will be applied to the "x-declare" section of "node" | ||
2222 | 121 | in the address string. | ||
2223 | 122 | 'link_name' goes into the "name" field of the "link" in the address | ||
2224 | 123 | string | ||
2225 | 124 | 'link_opts' will be applied to the "x-declare" section of "link" | ||
2226 | 125 | in the address string. | ||
2227 | 126 | """ | ||
2228 | 127 | self.callback = callback | ||
2229 | 128 | self.receiver = None | ||
2230 | 129 | self.rcv_capacity = conf.qpid_receiver_capacity | ||
2231 | 130 | self.session = None | ||
2232 | 131 | |||
2233 | 132 | if conf.qpid_topology_version == 1: | ||
2234 | 133 | addr_opts = { | ||
2235 | 134 | "create": "always", | ||
2236 | 135 | "node": { | ||
2237 | 136 | "type": "topic", | ||
2238 | 137 | "x-declare": { | ||
2239 | 138 | "durable": True, | ||
2240 | 139 | "auto-delete": True, | ||
2241 | 140 | }, | ||
2242 | 141 | }, | ||
2243 | 142 | "link": { | ||
2244 | 143 | "durable": True, | ||
2245 | 144 | "x-declare": { | ||
2246 | 145 | "durable": False, | ||
2247 | 146 | "auto-delete": True, | ||
2248 | 147 | "exclusive": False, | ||
2249 | 148 | }, | ||
2250 | 149 | }, | ||
2251 | 150 | } | ||
2252 | 151 | addr_opts["node"]["x-declare"].update(node_opts) | ||
2253 | 152 | elif conf.qpid_topology_version == 2: | ||
2254 | 153 | addr_opts = { | ||
2255 | 154 | "link": { | ||
2256 | 155 | "x-declare": { | ||
2257 | 156 | "auto-delete": True, | ||
2258 | 157 | "exclusive": False, | ||
2259 | 158 | }, | ||
2260 | 159 | }, | ||
2261 | 160 | } | ||
2262 | 161 | else: | ||
2263 | 162 | raise_invalid_topology_version(conf) | ||
2264 | 163 | |||
2265 | 164 | addr_opts["link"]["x-declare"].update(link_opts) | ||
2266 | 165 | if link_name: | ||
2267 | 166 | addr_opts["link"]["name"] = link_name | ||
2268 | 167 | |||
2269 | 168 | self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) | ||
2270 | 169 | |||
2271 | 170 | self.connect(session) | ||
2272 | 171 | |||
2273 | 172 | def connect(self, session): | ||
2274 | 173 | """Declare the receiver on connect.""" | ||
2275 | 174 | self._declare_receiver(session) | ||
2276 | 175 | |||
2277 | 176 | def reconnect(self, session): | ||
2278 | 177 | """Re-declare the receiver after a Qpid reconnect.""" | ||
2279 | 178 | self._declare_receiver(session) | ||
2280 | 179 | |||
2281 | 180 | def _declare_receiver(self, session): | ||
2282 | 181 | self.session = session | ||
2283 | 182 | self.receiver = session.receiver(self.address) | ||
2284 | 183 | self.receiver.capacity = self.rcv_capacity | ||
2285 | 184 | |||
2286 | 185 | def _unpack_json_msg(self, msg): | ||
2287 | 186 | """Load the JSON data in msg if msg.content_type indicates that it | ||
2288 | 187 | is necessary. Put the loaded data back into msg.content and | ||
2289 | 188 | update msg.content_type appropriately. | ||
2290 | 189 | |||
2291 | 190 | A Qpid Message containing a dict will have a content_type of | ||
2292 | 191 | 'amqp/map', whereas one containing a string that needs to be converted | ||
2293 | 192 | back from JSON will have a content_type of JSON_CONTENT_TYPE. | ||
2294 | 193 | |||
2295 | 194 | :param msg: a Qpid Message object | ||
2296 | 195 | :returns: None | ||
2297 | 196 | """ | ||
2298 | 197 | if msg.content_type == JSON_CONTENT_TYPE: | ||
2299 | 198 | msg.content = jsonutils.loads(msg.content) | ||
2300 | 199 | msg.content_type = 'amqp/map' | ||
2301 | 200 | |||
2302 | 201 | def consume(self): | ||
2303 | 202 | """Fetch the message and pass it to the callback object.""" | ||
2304 | 203 | message = self.receiver.fetch() | ||
2305 | 204 | try: | ||
2306 | 205 | self._unpack_json_msg(message) | ||
2307 | 206 | self.callback(QpidMessage(self.session, message)) | ||
2308 | 207 | except Exception: | ||
2309 | 208 | LOG.exception(_("Failed to process message... skipping it.")) | ||
2310 | 209 | self.session.acknowledge(message) | ||
2311 | 210 | |||
2312 | 211 | def get_receiver(self): | ||
2313 | 212 | return self.receiver | ||
2314 | 213 | |||
2315 | 214 | def get_node_name(self): | ||
2316 | 215 | return self.address.split(';')[0] | ||
2317 | 216 | |||
2318 | 217 | |||
2319 | 218 | class DirectConsumer(ConsumerBase): | ||
2320 | 219 | """Queue/consumer class for 'direct'.""" | ||
2321 | 220 | |||
2322 | 221 | def __init__(self, conf, session, msg_id, callback): | ||
2323 | 222 | """Init a 'direct' queue. | ||
2324 | 223 | |||
2325 | 224 | 'session' is the amqp session to use | ||
2326 | 225 | 'msg_id' is the msg_id to listen on | ||
2327 | 226 | 'callback' is the callback to call when messages are received | ||
2328 | 227 | """ | ||
2329 | 228 | |||
2330 | 229 | link_opts = { | ||
2331 | 230 | "auto-delete": conf.amqp_auto_delete, | ||
2332 | 231 | "exclusive": True, | ||
2333 | 232 | "durable": conf.amqp_durable_queues, | ||
2334 | 233 | } | ||
2335 | 234 | |||
2336 | 235 | if conf.qpid_topology_version == 1: | ||
2337 | 236 | node_name = "%s/%s" % (msg_id, msg_id) | ||
2338 | 237 | node_opts = {"type": "direct"} | ||
2339 | 238 | link_name = msg_id | ||
2340 | 239 | elif conf.qpid_topology_version == 2: | ||
2341 | 240 | node_name = "amq.direct/%s" % msg_id | ||
2342 | 241 | node_opts = {} | ||
2343 | 242 | link_name = msg_id | ||
2344 | 243 | else: | ||
2345 | 244 | raise_invalid_topology_version(conf) | ||
2346 | 245 | |||
2347 | 246 | super(DirectConsumer, self).__init__(conf, session, callback, | ||
2348 | 247 | node_name, node_opts, link_name, | ||
2349 | 248 | link_opts) | ||
2350 | 249 | |||
2351 | 250 | |||
2352 | 251 | class TopicConsumer(ConsumerBase): | ||
2353 | 252 | """Consumer class for 'topic'.""" | ||
2354 | 253 | |||
2355 | 254 | def __init__(self, conf, session, topic, callback, exchange_name, | ||
2356 | 255 | name=None): | ||
2357 | 256 | """Init a 'topic' queue. | ||
2358 | 257 | |||
2359 | 258 | :param session: the amqp session to use | ||
2360 | 259 | :param topic: is the topic to listen on | ||
2361 | 260 | :paramtype topic: str | ||
2362 | 261 | :param callback: the callback to call when messages are received | ||
2363 | 262 | :param name: optional queue name, defaults to topic | ||
2364 | 263 | """ | ||
2365 | 264 | |||
2366 | 265 | link_opts = { | ||
2367 | 266 | "auto-delete": conf.amqp_auto_delete, | ||
2368 | 267 | "durable": conf.amqp_durable_queues, | ||
2369 | 268 | } | ||
2370 | 269 | |||
2371 | 270 | if conf.qpid_topology_version == 1: | ||
2372 | 271 | node_name = "%s/%s" % (exchange_name, topic) | ||
2373 | 272 | elif conf.qpid_topology_version == 2: | ||
2374 | 273 | node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) | ||
2375 | 274 | else: | ||
2376 | 275 | raise_invalid_topology_version(conf) | ||
2377 | 276 | |||
2378 | 277 | super(TopicConsumer, self).__init__(conf, session, callback, node_name, | ||
2379 | 278 | {}, name or topic, link_opts) | ||
2380 | 279 | |||
2381 | 280 | |||
2382 | 281 | class FanoutConsumer(ConsumerBase): | ||
2383 | 282 | """Consumer class for 'fanout'.""" | ||
2384 | 283 | |||
2385 | 284 | def __init__(self, conf, session, topic, callback): | ||
2386 | 285 | """Init a 'fanout' queue. | ||
2387 | 286 | |||
2388 | 287 | 'session' is the amqp session to use | ||
2389 | 288 | 'topic' is the topic to listen on | ||
2390 | 289 | 'callback' is the callback to call when messages are received | ||
2391 | 290 | """ | ||
2392 | 291 | self.conf = conf | ||
2393 | 292 | |||
2394 | 293 | link_opts = {"exclusive": True} | ||
2395 | 294 | |||
2396 | 295 | if conf.qpid_topology_version == 1: | ||
2397 | 296 | node_name = "%s_fanout" % topic | ||
2398 | 297 | node_opts = {"durable": False, "type": "fanout"} | ||
2399 | 298 | elif conf.qpid_topology_version == 2: | ||
2400 | 299 | node_name = "amq.topic/fanout/%s" % topic | ||
2401 | 300 | node_opts = {} | ||
2402 | 301 | else: | ||
2403 | 302 | raise_invalid_topology_version(conf) | ||
2404 | 303 | |||
2405 | 304 | super(FanoutConsumer, self).__init__(conf, session, callback, | ||
2406 | 305 | node_name, node_opts, None, | ||
2407 | 306 | link_opts) | ||
2408 | 307 | |||
2409 | 308 | |||
2410 | 309 | class Publisher(object): | ||
2411 | 310 | """Base Publisher class.""" | ||
2412 | 311 | |||
2413 | 312 | def __init__(self, conf, session, node_name, node_opts=None): | ||
2414 | 313 | """Init the Publisher class with the exchange_name, routing_key, | ||
2415 | 314 | and other options | ||
2416 | 315 | """ | ||
2417 | 316 | self.sender = None | ||
2418 | 317 | self.session = session | ||
2419 | 318 | |||
2420 | 319 | if conf.qpid_topology_version == 1: | ||
2421 | 320 | addr_opts = { | ||
2422 | 321 | "create": "always", | ||
2423 | 322 | "node": { | ||
2424 | 323 | "type": "topic", | ||
2425 | 324 | "x-declare": { | ||
2426 | 325 | "durable": False, | ||
2427 | 326 | # auto-delete isn't implemented for exchanges in qpid, | ||
2428 | 327 | # but put in here anyway | ||
2429 | 328 | "auto-delete": True, | ||
2430 | 329 | }, | ||
2431 | 330 | }, | ||
2432 | 331 | } | ||
2433 | 332 | if node_opts: | ||
2434 | 333 | addr_opts["node"]["x-declare"].update(node_opts) | ||
2435 | 334 | |||
2436 | 335 | self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) | ||
2437 | 336 | elif conf.qpid_topology_version == 2: | ||
2438 | 337 | self.address = node_name | ||
2439 | 338 | else: | ||
2440 | 339 | raise_invalid_topology_version(conf) | ||
2441 | 340 | |||
2442 | 341 | self.reconnect(session) | ||
2443 | 342 | |||
2444 | 343 | def reconnect(self, session): | ||
2445 | 344 | """Re-establish the Sender after a reconnection.""" | ||
2446 | 345 | self.sender = session.sender(self.address) | ||
2447 | 346 | |||
2448 | 347 | def _pack_json_msg(self, msg): | ||
2449 | 348 | """Qpid cannot serialize dicts containing strings longer than 65535 | ||
2450 | 349 | characters. This function dumps the message content to a JSON | ||
2451 | 350 | string, which Qpid is able to handle. | ||
2452 | 351 | |||
2453 | 352 | :param msg: May be either a Qpid Message object or a bare dict. | ||
2454 | 353 | :returns: A Qpid Message with its content field JSON encoded. | ||
2455 | 354 | """ | ||
2456 | 355 | try: | ||
2457 | 356 | msg.content = jsonutils.dumps(msg.content) | ||
2458 | 357 | except AttributeError: | ||
2459 | 358 | # Need to have a Qpid message so we can set the content_type. | ||
2460 | 359 | msg = qpid_messaging.Message(jsonutils.dumps(msg)) | ||
2461 | 360 | msg.content_type = JSON_CONTENT_TYPE | ||
2462 | 361 | return msg | ||
2463 | 362 | |||
2464 | 363 | def send(self, msg): | ||
2465 | 364 | """Send a message.""" | ||
2466 | 365 | try: | ||
2467 | 366 | # Check if Qpid can encode the message | ||
2468 | 367 | check_msg = msg | ||
2469 | 368 | if not hasattr(check_msg, 'content_type'): | ||
2470 | 369 | check_msg = qpid_messaging.Message(msg) | ||
2471 | 370 | content_type = check_msg.content_type | ||
2472 | 371 | enc, dec = qpid_messaging.message.get_codec(content_type) | ||
2473 | 372 | enc(check_msg.content) | ||
2474 | 373 | except qpid_codec.CodecException: | ||
2475 | 374 | # This means the message couldn't be serialized as a dict. | ||
2476 | 375 | msg = self._pack_json_msg(msg) | ||
2477 | 376 | self.sender.send(msg) | ||
2478 | 377 | |||
2479 | 378 | |||
2480 | 379 | class DirectPublisher(Publisher): | ||
2481 | 380 | """Publisher class for 'direct'.""" | ||
2482 | 381 | def __init__(self, conf, session, topic): | ||
2483 | 382 | """Init a 'direct' publisher.""" | ||
2484 | 383 | |||
2485 | 384 | if conf.qpid_topology_version == 1: | ||
2486 | 385 | node_name = "%s/%s" % (topic, topic) | ||
2487 | 386 | node_opts = {"type": "direct"} | ||
2488 | 387 | elif conf.qpid_topology_version == 2: | ||
2489 | 388 | node_name = "amq.direct/%s" % topic | ||
2490 | 389 | node_opts = {} | ||
2491 | 390 | else: | ||
2492 | 391 | raise_invalid_topology_version(conf) | ||
2493 | 392 | |||
2494 | 393 | super(DirectPublisher, self).__init__(conf, session, node_name, | ||
2495 | 394 | node_opts) | ||
2496 | 395 | |||
2497 | 396 | |||
2498 | 397 | class TopicPublisher(Publisher): | ||
2499 | 398 | """Publisher class for 'topic'.""" | ||
2500 | 399 | def __init__(self, conf, session, exchange_name, topic): | ||
2501 | 400 | """Init a 'topic' publisher. | ||
2502 | 401 | """ | ||
2503 | 402 | if conf.qpid_topology_version == 1: | ||
2504 | 403 | node_name = "%s/%s" % (exchange_name, topic) | ||
2505 | 404 | elif conf.qpid_topology_version == 2: | ||
2506 | 405 | node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) | ||
2507 | 406 | else: | ||
2508 | 407 | raise_invalid_topology_version(conf) | ||
2509 | 408 | |||
2510 | 409 | super(TopicPublisher, self).__init__(conf, session, node_name) | ||
2511 | 410 | |||
2512 | 411 | |||
2513 | 412 | class FanoutPublisher(Publisher): | ||
2514 | 413 | """Publisher class for 'fanout'.""" | ||
2515 | 414 | def __init__(self, conf, session, topic): | ||
2516 | 415 | """Init a 'fanout' publisher. | ||
2517 | 416 | """ | ||
2518 | 417 | |||
2519 | 418 | if conf.qpid_topology_version == 1: | ||
2520 | 419 | node_name = "%s_fanout" % topic | ||
2521 | 420 | node_opts = {"type": "fanout"} | ||
2522 | 421 | elif conf.qpid_topology_version == 2: | ||
2523 | 422 | node_name = "amq.topic/fanout/%s" % topic | ||
2524 | 423 | node_opts = {} | ||
2525 | 424 | else: | ||
2526 | 425 | raise_invalid_topology_version(conf) | ||
2527 | 426 | |||
2528 | 427 | super(FanoutPublisher, self).__init__(conf, session, node_name, | ||
2529 | 428 | node_opts) | ||
2530 | 429 | |||
2531 | 430 | |||
2532 | 431 | class NotifyPublisher(Publisher): | ||
2533 | 432 | """Publisher class for notifications.""" | ||
2534 | 433 | def __init__(self, conf, session, exchange_name, topic): | ||
2535 | 434 | """Init a 'topic' publisher. | ||
2536 | 435 | """ | ||
2537 | 436 | node_opts = {"durable": True} | ||
2538 | 437 | |||
2539 | 438 | if conf.qpid_topology_version == 1: | ||
2540 | 439 | node_name = "%s/%s" % (exchange_name, topic) | ||
2541 | 440 | elif conf.qpid_topology_version == 2: | ||
2542 | 441 | node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) | ||
2543 | 442 | else: | ||
2544 | 443 | raise_invalid_topology_version(conf) | ||
2545 | 444 | |||
2546 | 445 | super(NotifyPublisher, self).__init__(conf, session, node_name, | ||
2547 | 446 | node_opts) | ||
2548 | 447 | |||
2549 | 448 | |||
2550 | 449 | class Connection(object): | ||
2551 | 450 | """Connection object.""" | ||
2552 | 451 | |||
2553 | 452 | pools = {} | ||
2554 | 453 | |||
2555 | 454 | def __init__(self, conf, url): | ||
2556 | 455 | if not qpid_messaging: | ||
2557 | 456 | raise ImportError("Failed to import qpid.messaging") | ||
2558 | 457 | |||
2559 | 458 | self.connection = None | ||
2560 | 459 | self.session = None | ||
2561 | 460 | self.consumers = {} | ||
2562 | 461 | self.conf = conf | ||
2563 | 462 | |||
2564 | 463 | self.brokers_params = [] | ||
2565 | 464 | if url.hosts: | ||
2566 | 465 | for host in url.hosts: | ||
2567 | 466 | params = { | ||
2568 | 467 | 'username': host.username or '', | ||
2569 | 468 | 'password': host.password or '', | ||
2570 | 469 | } | ||
2571 | 470 | if host.port is not None: | ||
2572 | 471 | params['host'] = '%s:%d' % (host.hostname, host.port) | ||
2573 | 472 | else: | ||
2574 | 473 | params['host'] = host.hostname | ||
2575 | 474 | self.brokers_params.append(params) | ||
2576 | 475 | else: | ||
2577 | 476 | # Old configuration format | ||
2578 | 477 | for adr in self.conf.qpid_hosts: | ||
2579 | 478 | hostname, port = netutils.parse_host_port( | ||
2580 | 479 | adr, default_port=5672) | ||
2581 | 480 | |||
2582 | 481 | params = { | ||
2583 | 482 | 'host': '%s:%d' % (hostname, port), | ||
2584 | 483 | 'username': self.conf.qpid_username, | ||
2585 | 484 | 'password': self.conf.qpid_password, | ||
2586 | 485 | } | ||
2587 | 486 | self.brokers_params.append(params) | ||
2588 | 487 | |||
2589 | 488 | random.shuffle(self.brokers_params) | ||
2590 | 489 | self.brokers = itertools.cycle(self.brokers_params) | ||
2591 | 490 | |||
2592 | 491 | self.reconnect() | ||
2593 | 492 | |||
2594 | 493 | def _connect(self, broker): | ||
2595 | 494 | # Create the connection - this does not open the connection | ||
2596 | 495 | self.connection = qpid_messaging.Connection(broker['host']) | ||
2597 | 496 | |||
2598 | 497 | # Check if flags are set and if so set them for the connection | ||
2599 | 498 | # before we call open | ||
2600 | 499 | self.connection.username = broker['username'] | ||
2601 | 500 | self.connection.password = broker['password'] | ||
2602 | 501 | |||
2603 | 502 | self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms | ||
2604 | 503 | # Reconnection is done by self.reconnect() | ||
2605 | 504 | self.connection.reconnect = False | ||
2606 | 505 | self.connection.heartbeat = self.conf.qpid_heartbeat | ||
2607 | 506 | self.connection.transport = self.conf.qpid_protocol | ||
2608 | 507 | self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay | ||
2609 | 508 | self.connection.open() | ||
2610 | 509 | |||
2611 | 510 | def _register_consumer(self, consumer): | ||
2612 | 511 | self.consumers[six.text_type(consumer.get_receiver())] = consumer | ||
2613 | 512 | |||
2614 | 513 | def _lookup_consumer(self, receiver): | ||
2615 | 514 | return self.consumers[six.text_type(receiver)] | ||
2616 | 515 | |||
2617 | 516 | def _disconnect(self): | ||
2618 | 517 | # Close the session if necessary | ||
2619 | 518 | if self.connection is not None and self.connection.opened(): | ||
2620 | 519 | try: | ||
2621 | 520 | self.connection.close() | ||
2622 | 521 | except qpid_exceptions.MessagingError: | ||
2623 | 522 | pass | ||
2624 | 523 | self.connection = None | ||
2625 | 524 | |||
2626 | 525 | def reconnect(self, retry=None): | ||
2627 | 526 | """Handles reconnecting and re-establishing sessions and queues. | ||
2628 | 527 | Will retry up to retry number of times. | ||
2629 | 528 | retry = None or -1 means to retry forever | ||
2630 | 529 | retry = 0 means no retry | ||
2631 | 530 | retry = N means N retries | ||
2632 | 531 | """ | ||
2633 | 532 | delay = 1 | ||
2634 | 533 | attempt = 0 | ||
2635 | 534 | loop_forever = False | ||
2636 | 535 | if retry is None or retry < 0: | ||
2637 | 536 | loop_forever = True | ||
2638 | 537 | |||
2639 | 538 | while True: | ||
2640 | 539 | self._disconnect() | ||
2641 | 540 | |||
2642 | 541 | attempt += 1 | ||
2643 | 542 | broker = six.next(self.brokers) | ||
2644 | 543 | try: | ||
2645 | 544 | self._connect(broker) | ||
2646 | 545 | except qpid_exceptions.MessagingError as e: | ||
2647 | 546 | msg_dict = dict(e=e, | ||
2648 | 547 | delay=delay, | ||
2649 | 548 | retry=retry, | ||
2650 | 549 | broker=broker) | ||
2651 | 550 | if not loop_forever and attempt > retry: | ||
2652 | 551 | msg = _('Unable to connect to AMQP server on ' | ||
2653 | 552 | '%(broker)s after %(retry)d ' | ||
2654 | 553 | 'tries: %(e)s') % msg_dict | ||
2655 | 554 | LOG.error(msg) | ||
2656 | 555 | raise exceptions.MessageDeliveryFailure(msg) | ||
2657 | 556 | else: | ||
2658 | 557 | msg = _("Unable to connect to AMQP server on %(broker)s: " | ||
2659 | 558 | "%(e)s. Sleeping %(delay)s seconds") % msg_dict | ||
2660 | 559 | LOG.error(msg) | ||
2661 | 560 | time.sleep(delay) | ||
2662 | 561 | delay = min(delay + 1, 5) | ||
2663 | 562 | else: | ||
2664 | 563 | LOG.info(_('Connected to AMQP server on %s'), broker['host']) | ||
2665 | 564 | break | ||
2666 | 565 | |||
2667 | 566 | self.session = self.connection.session() | ||
2668 | 567 | |||
2669 | 568 | if self.consumers: | ||
2670 | 569 | consumers = self.consumers | ||
2671 | 570 | self.consumers = {} | ||
2672 | 571 | |||
2673 | 572 | for consumer in six.itervalues(consumers): | ||
2674 | 573 | consumer.reconnect(self.session) | ||
2675 | 574 | self._register_consumer(consumer) | ||
2676 | 575 | |||
2677 | 576 | LOG.debug("Re-established AMQP queues") | ||
2678 | 577 | |||
2679 | 578 | def ensure(self, error_callback, method, retry=None): | ||
2680 | 579 | while True: | ||
2681 | 580 | try: | ||
2682 | 581 | return method() | ||
2683 | 582 | except (qpid_exceptions.Empty, | ||
2684 | 583 | qpid_exceptions.MessagingError) as e: | ||
2685 | 584 | if error_callback: | ||
2686 | 585 | error_callback(e) | ||
2687 | 586 | self.reconnect(retry=retry) | ||
2688 | 587 | |||
2689 | 588 | def close(self): | ||
2690 | 589 | """Close/release this connection.""" | ||
2691 | 590 | try: | ||
2692 | 591 | self.connection.close() | ||
2693 | 592 | except Exception: | ||
2694 | 593 | # NOTE(dripton) Logging exceptions that happen during cleanup just | ||
2695 | 594 | # causes confusion; there's really nothing useful we can do with | ||
2696 | 595 | # them. | ||
2697 | 596 | pass | ||
2698 | 597 | self.connection = None | ||
2699 | 598 | |||
2700 | 599 | def reset(self): | ||
2701 | 600 | """Reset a connection so it can be used again.""" | ||
2702 | 601 | self.session.close() | ||
2703 | 602 | self.session = self.connection.session() | ||
2704 | 603 | self.consumers = {} | ||
2705 | 604 | |||
2706 | 605 | def declare_consumer(self, consumer_cls, topic, callback): | ||
2707 | 606 | """Create a Consumer using the class that was passed in and | ||
2708 | 607 | add it to our list of consumers | ||
2709 | 608 | """ | ||
2710 | 609 | def _connect_error(exc): | ||
2711 | 610 | log_info = {'topic': topic, 'err_str': exc} | ||
2712 | 611 | LOG.error(_("Failed to declare consumer for topic '%(topic)s': " | ||
2713 | 612 | "%(err_str)s"), log_info) | ||
2714 | 613 | |||
2715 | 614 | def _declare_consumer(): | ||
2716 | 615 | consumer = consumer_cls(self.conf, self.session, topic, callback) | ||
2717 | 616 | self._register_consumer(consumer) | ||
2718 | 617 | return consumer | ||
2719 | 618 | |||
2720 | 619 | return self.ensure(_connect_error, _declare_consumer) | ||
2721 | 620 | |||
2722 | 621 | def iterconsume(self, limit=None, timeout=None): | ||
2723 | 622 | """Return an iterator that will consume from all queues/consumers.""" | ||
2724 | 623 | |||
2725 | 624 | def _error_callback(exc): | ||
2726 | 625 | if isinstance(exc, qpid_exceptions.Empty): | ||
2727 | 626 | LOG.debug('Timed out waiting for RPC response: %s', exc) | ||
2728 | 627 | raise rpc_common.Timeout() | ||
2729 | 628 | else: | ||
2730 | 629 | LOG.exception(_('Failed to consume message from queue: %s'), | ||
2731 | 630 | exc) | ||
2732 | 631 | |||
2733 | 632 | def _consume(): | ||
2734 | 633 | nxt_receiver = self.session.next_receiver(timeout=timeout) | ||
2735 | 634 | try: | ||
2736 | 635 | self._lookup_consumer(nxt_receiver).consume() | ||
2737 | 636 | except Exception: | ||
2738 | 637 | LOG.exception(_("Error processing message. Skipping it.")) | ||
2739 | 638 | |||
2740 | 639 | for iteration in itertools.count(0): | ||
2741 | 640 | if limit and iteration >= limit: | ||
2742 | 641 | raise StopIteration | ||
2743 | 642 | yield self.ensure(_error_callback, _consume) | ||
2744 | 643 | |||
2745 | 644 | def publisher_send(self, cls, topic, msg, retry=None, **kwargs): | ||
2746 | 645 | """Send to a publisher based on the publisher class.""" | ||
2747 | 646 | |||
2748 | 647 | def _connect_error(exc): | ||
2749 | 648 | log_info = {'topic': topic, 'err_str': exc} | ||
2750 | 649 | LOG.exception(_("Failed to publish message to topic " | ||
2751 | 650 | "'%(topic)s': %(err_str)s"), log_info) | ||
2752 | 651 | |||
2753 | 652 | def _publisher_send(): | ||
2754 | 653 | publisher = cls(self.conf, self.session, topic=topic, **kwargs) | ||
2755 | 654 | publisher.send(msg) | ||
2756 | 655 | |||
2757 | 656 | return self.ensure(_connect_error, _publisher_send, retry=retry) | ||
2758 | 657 | |||
2759 | 658 | def declare_direct_consumer(self, topic, callback): | ||
2760 | 659 | """Create a 'direct' queue. | ||
2761 | 660 | In nova's use, this is generally a msg_id queue used for | ||
2762 | 661 | responses for call/multicall | ||
2763 | 662 | """ | ||
2764 | 663 | self.declare_consumer(DirectConsumer, topic, callback) | ||
2765 | 664 | |||
2766 | 665 | def declare_topic_consumer(self, exchange_name, topic, callback=None, | ||
2767 | 666 | queue_name=None): | ||
2768 | 667 | """Create a 'topic' consumer.""" | ||
2769 | 668 | self.declare_consumer(functools.partial(TopicConsumer, | ||
2770 | 669 | name=queue_name, | ||
2771 | 670 | exchange_name=exchange_name, | ||
2772 | 671 | ), | ||
2773 | 672 | topic, callback) | ||
2774 | 673 | |||
2775 | 674 | def declare_fanout_consumer(self, topic, callback): | ||
2776 | 675 | """Create a 'fanout' consumer.""" | ||
2777 | 676 | self.declare_consumer(FanoutConsumer, topic, callback) | ||
2778 | 677 | |||
2779 | 678 | def direct_send(self, msg_id, msg): | ||
2780 | 679 | """Send a 'direct' message.""" | ||
2781 | 680 | self.publisher_send(DirectPublisher, topic=msg_id, msg=msg) | ||
2782 | 681 | |||
2783 | 682 | def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): | ||
2784 | 683 | """Send a 'topic' message.""" | ||
2785 | 684 | # | ||
2786 | 685 | # We want to create a message with attributes, for example a TTL. We | ||
2787 | 686 | # don't really need to keep 'msg' in its JSON format any longer | ||
2788 | 687 | # so let's create an actual Qpid message here and get some | ||
2789 | 688 | # value-add on the go. | ||
2790 | 689 | # | ||
2791 | 690 | # WARNING: Request timeout happens to be in the same units as | ||
2792 | 691 | # Qpid's TTL (seconds). If this changes in the future, then this | ||
2793 | 692 | # will need to be altered accordingly. | ||
2794 | 693 | # | ||
2795 | 694 | qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) | ||
2796 | 695 | self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message, | ||
2797 | 696 | exchange_name=exchange_name, retry=retry) | ||
2798 | 697 | |||
2799 | 698 | def fanout_send(self, topic, msg, retry=None): | ||
2800 | 699 | """Send a 'fanout' message.""" | ||
2801 | 700 | self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry) | ||
2802 | 701 | |||
2803 | 702 | def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): | ||
2804 | 703 | """Send a notify message on a topic.""" | ||
2805 | 704 | self.publisher_send(NotifyPublisher, topic=topic, msg=msg, | ||
2806 | 705 | exchange_name=exchange_name, retry=retry) | ||
2807 | 706 | |||
2808 | 707 | def consume(self, limit=None, timeout=None): | ||
2809 | 708 | """Consume from all queues/consumers.""" | ||
2810 | 709 | it = self.iterconsume(limit=limit, timeout=timeout) | ||
2811 | 710 | while True: | ||
2812 | 711 | try: | ||
2813 | 712 | six.next(it) | ||
2814 | 713 | except StopIteration: | ||
2815 | 714 | return | ||
2816 | 715 | |||
2817 | 716 | |||
2818 | 717 | class QpidDriver(amqpdriver.AMQPDriverBase): | ||
2819 | 718 | |||
2820 | 719 | def __init__(self, conf, url, | ||
2821 | 720 | default_exchange=None, allowed_remote_exmods=None): | ||
2822 | 721 | conf.register_opts(qpid_opts) | ||
2823 | 722 | conf.register_opts(rpc_amqp.amqp_opts) | ||
2824 | 723 | |||
2825 | 724 | connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection) | ||
2826 | 725 | |||
2827 | 726 | super(QpidDriver, self).__init__(conf, url, | ||
2828 | 727 | connection_pool, | ||
2829 | 728 | default_exchange, | ||
2830 | 729 | allowed_remote_exmods) | ||
2831 | 0 | 730 | ||
2832 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch' | |||
2833 | === added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/.timestamp' | |||
2834 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo' | |||
2835 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging' | |||
2836 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers' | |||
2837 | === added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py' | |||
2838 | --- .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000 | |||
2839 | +++ .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000 | |||
2840 | @@ -0,0 +1,832 @@ | |||
2841 | 1 | # Copyright 2011 OpenStack Foundation | ||
2842 | 2 | # | ||
2843 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
2844 | 4 | # not use this file except in compliance with the License. You may obtain | ||
2845 | 5 | # a copy of the License at | ||
2846 | 6 | # | ||
2847 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
2848 | 8 | # | ||
2849 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
2850 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
2851 | 11 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
2852 | 12 | # License for the specific language governing permissions and limitations | ||
2853 | 13 | # under the License. | ||
2854 | 14 | |||
2855 | 15 | import functools | ||
2856 | 16 | import itertools | ||
2857 | 17 | import logging | ||
2858 | 18 | import random | ||
2859 | 19 | import socket | ||
2860 | 20 | import ssl | ||
2861 | 21 | import time | ||
2862 | 22 | import uuid | ||
2863 | 23 | |||
2864 | 24 | import kombu | ||
2865 | 25 | import kombu.connection | ||
2866 | 26 | import kombu.entity | ||
2867 | 27 | import kombu.messaging | ||
2868 | 28 | import six | ||
2869 | 29 | |||
2870 | 30 | from oslo.config import cfg | ||
2871 | 31 | from oslo.messaging._drivers import amqp as rpc_amqp | ||
2872 | 32 | from oslo.messaging._drivers import amqpdriver | ||
2873 | 33 | from oslo.messaging._drivers import common as rpc_common | ||
2874 | 34 | from oslo.messaging import exceptions | ||
2875 | 35 | from oslo.messaging.openstack.common.gettextutils import _ | ||
2876 | 36 | from oslo.utils import netutils | ||
2877 | 37 | |||
2878 | 38 | rabbit_opts = [ | ||
2879 | 39 | cfg.StrOpt('kombu_ssl_version', | ||
2880 | 40 | default='', | ||
2881 | 41 | help='SSL version to use (valid only if SSL enabled). ' | ||
2882 | 42 | 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' | ||
2883 | 43 | 'be available on some distributions.' | ||
2884 | 44 | ), | ||
2885 | 45 | cfg.StrOpt('kombu_ssl_keyfile', | ||
2886 | 46 | default='', | ||
2887 | 47 | help='SSL key file (valid only if SSL enabled).'), | ||
2888 | 48 | cfg.StrOpt('kombu_ssl_certfile', | ||
2889 | 49 | default='', | ||
2890 | 50 | help='SSL cert file (valid only if SSL enabled).'), | ||
2891 | 51 | cfg.StrOpt('kombu_ssl_ca_certs', | ||
2892 | 52 | default='', | ||
2893 | 53 | help='SSL certification authority file ' | ||
2894 | 54 | '(valid only if SSL enabled).'), | ||
2895 | 55 | cfg.FloatOpt('kombu_reconnect_delay', | ||
2896 | 56 | default=1.0, | ||
2897 | 57 | help='How long to wait before reconnecting in response to an ' | ||
2898 | 58 | 'AMQP consumer cancel notification.'), | ||
2899 | 59 | cfg.StrOpt('rabbit_host', | ||
2900 | 60 | default='localhost', | ||
2901 | 61 | help='The RabbitMQ broker address where a single node is ' | ||
2902 | 62 | 'used.'), | ||
2903 | 63 | cfg.IntOpt('rabbit_port', | ||
2904 | 64 | default=5672, | ||
2905 | 65 | help='The RabbitMQ broker port where a single node is used.'), | ||
2906 | 66 | cfg.ListOpt('rabbit_hosts', | ||
2907 | 67 | default=['$rabbit_host:$rabbit_port'], | ||
2908 | 68 | help='RabbitMQ HA cluster host:port pairs.'), | ||
2909 | 69 | cfg.BoolOpt('rabbit_use_ssl', | ||
2910 | 70 | default=False, | ||
2911 | 71 | help='Connect over SSL for RabbitMQ.'), | ||
2912 | 72 | cfg.StrOpt('rabbit_userid', | ||
2913 | 73 | default='guest', | ||
2914 | 74 | help='The RabbitMQ userid.'), | ||
2915 | 75 | cfg.StrOpt('rabbit_password', | ||
2916 | 76 | default='guest', | ||
2917 | 77 | help='The RabbitMQ password.', | ||
2918 | 78 | secret=True), | ||
2919 | 79 | cfg.StrOpt('rabbit_login_method', | ||
2920 | 80 | default='AMQPLAIN', | ||
2921 | 81 | help='the RabbitMQ login method'), | ||
2922 | 82 | cfg.StrOpt('rabbit_virtual_host', | ||
2923 | 83 | default='/', | ||
2924 | 84 | help='The RabbitMQ virtual host.'), | ||
2925 | 85 | cfg.IntOpt('rabbit_retry_interval', | ||
2926 | 86 | default=1, | ||
2927 | 87 | help='How frequently to retry connecting with RabbitMQ.'), | ||
2928 | 88 | cfg.IntOpt('rabbit_retry_backoff', | ||
2929 | 89 | default=2, | ||
2930 | 90 | help='How long to backoff for between retries when connecting ' | ||
2931 | 91 | 'to RabbitMQ.'), | ||
2932 | 92 | cfg.IntOpt('rabbit_max_retries', | ||
2933 | 93 | default=0, | ||
2934 | 94 | help='Maximum number of RabbitMQ connection retries. ' | ||
2935 | 95 | 'Default is 0 (infinite retry count).'), | ||
2936 | 96 | cfg.BoolOpt('rabbit_ha_queues', | ||
2937 | 97 | default=False, | ||
2938 | 98 | help='Use HA queues in RabbitMQ (x-ha-policy: all). ' | ||
2939 | 99 | 'If you change this option, you must wipe the ' | ||
2940 | 100 | 'RabbitMQ database.'), | ||
2941 | 101 | |||
2942 | 102 | # FIXME(markmc): this was toplevel in openstack.common.rpc | ||
2943 | 103 | cfg.BoolOpt('fake_rabbit', | ||
2944 | 104 | default=False, | ||
2945 | 105 | help='If passed, use a fake RabbitMQ provider.'), | ||
2946 | 106 | ] | ||
2947 | 107 | |||
2948 | 108 | LOG = logging.getLogger(__name__) | ||
2949 | 109 | |||
2950 | 110 | |||
2951 | 111 | def _get_queue_arguments(conf): | ||
2952 | 112 | """Construct the arguments for declaring a queue. | ||
2953 | 113 | |||
2954 | 114 | If the rabbit_ha_queues option is set, we declare a mirrored queue | ||
2955 | 115 | as described here: | ||
2956 | 116 | |||
2957 | 117 | http://www.rabbitmq.com/ha.html | ||
2958 | 118 | |||
2959 | 119 | Setting x-ha-policy to all means that the queue will be mirrored | ||
2960 | 120 | to all nodes in the cluster. | ||
2961 | 121 | """ | ||
2962 | 122 | return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} | ||
2963 | 123 | |||
2964 | 124 | |||
2965 | 125 | class RabbitMessage(dict): | ||
2966 | 126 | def __init__(self, raw_message): | ||
2967 | 127 | super(RabbitMessage, self).__init__( | ||
2968 | 128 | rpc_common.deserialize_msg(raw_message.payload)) | ||
2969 | 129 | self._raw_message = raw_message | ||
2970 | 130 | |||
2971 | 131 | def acknowledge(self): | ||
2972 | 132 | self._raw_message.ack() | ||
2973 | 133 | |||
2974 | 134 | def requeue(self): | ||
2975 | 135 | self._raw_message.requeue() | ||
2976 | 136 | |||
2977 | 137 | |||
2978 | 138 | class ConsumerBase(object): | ||
2979 | 139 | """Consumer base class.""" | ||
2980 | 140 | |||
2981 | 141 | def __init__(self, channel, callback, tag, **kwargs): | ||
2982 | 142 | """Declare a queue on an amqp channel. | ||
2983 | 143 | |||
2984 | 144 | 'channel' is the amqp channel to use | ||
2985 | 145 | 'callback' is the callback to call when messages are received | ||
2986 | 146 | 'tag' is a unique ID for the consumer on the channel | ||
2987 | 147 | |||
2988 | 148 | queue name, exchange name, and other kombu options are | ||
2989 | 149 | passed in here as a dictionary. | ||
2990 | 150 | """ | ||
2991 | 151 | self.callback = callback | ||
2992 | 152 | self.tag = six.text_type(tag) | ||
2993 | 153 | self.kwargs = kwargs | ||
2994 | 154 | self.queue = None | ||
2995 | 155 | self.reconnect(channel) | ||
2996 | 156 | |||
2997 | 157 | def reconnect(self, channel): | ||
2998 | 158 | """Re-declare the queue after a rabbit reconnect.""" | ||
2999 | 159 | self.channel = channel | ||
3000 | 160 | self.kwargs['channel'] = channel | ||
3001 | 161 | self.queue = kombu.entity.Queue(**self.kwargs) | ||
3002 | 162 | self.queue.declare() | ||
3003 | 163 | |||
3004 | 164 | def _callback_handler(self, message, callback): | ||
3005 | 165 | """Call callback with deserialized message. | ||
3006 | 166 | |||
3007 | 167 | Messages that are processed and ack'ed. | ||
3008 | 168 | """ | ||
3009 | 169 | |||
3010 | 170 | try: | ||
3011 | 171 | callback(RabbitMessage(message)) | ||
3012 | 172 | except Exception: | ||
3013 | 173 | LOG.exception(_("Failed to process message" | ||
3014 | 174 | " ... skipping it.")) | ||
3015 | 175 | message.ack() | ||
3016 | 176 | |||
3017 | 177 | def consume(self, *args, **kwargs): | ||
3018 | 178 | """Actually declare the consumer on the amqp channel. This will | ||
3019 | 179 | start the flow of messages from the queue. Using the | ||
3020 | 180 | Connection.iterconsume() iterator will process the messages, | ||
3021 | 181 | calling the appropriate callback. | ||
3022 | 182 | |||
3023 | 183 | If a callback is specified in kwargs, use that. Otherwise, | ||
3024 | 184 | use the callback passed during __init__() | ||
3025 | 185 | |||
3026 | 186 | If kwargs['nowait'] is True, then this call will block until | ||
3027 | 187 | a message is read. | ||
3028 | 188 | |||
3029 | 189 | """ | ||
3030 | 190 | |||
3031 | 191 | options = {'consumer_tag': self.tag} | ||
3032 | 192 | options['nowait'] = kwargs.get('nowait', False) | ||
3033 | 193 | callback = kwargs.get('callback', self.callback) | ||
3034 | 194 | if not callback: | ||
3035 | 195 | raise ValueError("No callback defined") | ||
3036 | 196 | |||
3037 | 197 | def _callback(raw_message): | ||
3038 | 198 | message = self.channel.message_to_python(raw_message) | ||
3039 | 199 | self._callback_handler(message, callback) | ||
3040 | 200 | |||
3041 | 201 | self.queue.consume(*args, callback=_callback, **options) | ||
3042 | 202 | |||
3043 | 203 | def cancel(self): | ||
3044 | 204 | """Cancel the consuming from the queue, if it has started.""" | ||
3045 | 205 | try: | ||
3046 | 206 | self.queue.cancel(self.tag) | ||
3047 | 207 | except KeyError as e: | ||
3048 | 208 | # NOTE(comstud): Kludge to get around a amqplib bug | ||
3049 | 209 | if six.text_type(e) != "u'%s'" % self.tag: | ||
3050 | 210 | raise | ||
3051 | 211 | self.queue = None | ||
3052 | 212 | |||
3053 | 213 | |||
3054 | 214 | class DirectConsumer(ConsumerBase): | ||
3055 | 215 | """Queue/consumer class for 'direct'.""" | ||
3056 | 216 | |||
3057 | 217 | def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): | ||
3058 | 218 | """Init a 'direct' queue. | ||
3059 | 219 | |||
3060 | 220 | 'channel' is the amqp channel to use | ||
3061 | 221 | 'msg_id' is the msg_id to listen on | ||
3062 | 222 | 'callback' is the callback to call when messages are received | ||
3063 | 223 | 'tag' is a unique ID for the consumer on the channel | ||
3064 | 224 | |||
3065 | 225 | Other kombu options may be passed | ||
3066 | 226 | """ | ||
3067 | 227 | # Default options | ||
3068 | 228 | options = {'durable': False, | ||
3069 | 229 | 'queue_arguments': _get_queue_arguments(conf), | ||
3070 | 230 | 'auto_delete': True, | ||
3071 | 231 | 'exclusive': False} | ||
3072 | 232 | options.update(kwargs) | ||
3073 | 233 | exchange = kombu.entity.Exchange(name=msg_id, | ||
3074 | 234 | type='direct', | ||
3075 | 235 | durable=options['durable'], | ||
3076 | 236 | auto_delete=options['auto_delete']) | ||
3077 | 237 | super(DirectConsumer, self).__init__(channel, | ||
3078 | 238 | callback, | ||
3079 | 239 | tag, | ||
3080 | 240 | name=msg_id, | ||
3081 | 241 | exchange=exchange, | ||
3082 | 242 | routing_key=msg_id, | ||
3083 | 243 | **options) | ||
3084 | 244 | |||
3085 | 245 | |||
3086 | 246 | class TopicConsumer(ConsumerBase): | ||
3087 | 247 | """Consumer class for 'topic'.""" | ||
3088 | 248 | |||
3089 | 249 | def __init__(self, conf, channel, topic, callback, tag, exchange_name, | ||
3090 | 250 | name=None, **kwargs): | ||
3091 | 251 | """Init a 'topic' queue. | ||
3092 | 252 | |||
3093 | 253 | :param channel: the amqp channel to use | ||
3094 | 254 | :param topic: the topic to listen on | ||
3095 | 255 | :paramtype topic: str | ||
3096 | 256 | :param callback: the callback to call when messages are received | ||
3097 | 257 | :param tag: a unique ID for the consumer on the channel | ||
3098 | 258 | :param exchange_name: the exchange name to use | ||
3099 | 259 | :param name: optional queue name, defaults to topic | ||
3100 | 260 | :paramtype name: str | ||
3101 | 261 | |||
3102 | 262 | Other kombu options may be passed as keyword arguments | ||
3103 | 263 | """ | ||
3104 | 264 | # Default options | ||
3105 | 265 | options = {'durable': conf.amqp_durable_queues, | ||
3106 | 266 | 'queue_arguments': _get_queue_arguments(conf), | ||
3107 | 267 | 'auto_delete': conf.amqp_auto_delete, | ||
3108 | 268 | 'exclusive': False} | ||
3109 | 269 | options.update(kwargs) | ||
3110 | 270 | exchange = kombu.entity.Exchange(name=exchange_name, | ||
3111 | 271 | type='topic', | ||
3112 | 272 | durable=options['durable'], | ||
3113 | 273 | auto_delete=options['auto_delete']) | ||
3114 | 274 | super(TopicConsumer, self).__init__(channel, | ||
3115 | 275 | callback, | ||
3116 | 276 | tag, | ||
3117 | 277 | name=name or topic, | ||
3118 | 278 | exchange=exchange, | ||
3119 | 279 | routing_key=topic, | ||
3120 | 280 | **options) | ||
3121 | 281 | |||
3122 | 282 | |||
3123 | 283 | class FanoutConsumer(ConsumerBase): | ||
3124 | 284 | """Consumer class for 'fanout'.""" | ||
3125 | 285 | |||
3126 | 286 | def __init__(self, conf, channel, topic, callback, tag, **kwargs): | ||
3127 | 287 | """Init a 'fanout' queue. | ||
3128 | 288 | |||
3129 | 289 | 'channel' is the amqp channel to use | ||
3130 | 290 | 'topic' is the topic to listen on | ||
3131 | 291 | 'callback' is the callback to call when messages are received | ||
3132 | 292 | 'tag' is a unique ID for the consumer on the channel | ||
3133 | 293 | |||
3134 | 294 | Other kombu options may be passed | ||
3135 | 295 | """ | ||
3136 | 296 | unique = uuid.uuid4().hex | ||
3137 | 297 | exchange_name = '%s_fanout' % topic | ||
3138 | 298 | queue_name = '%s_fanout_%s' % (topic, unique) | ||
3139 | 299 | |||
3140 | 300 | # Default options | ||
3141 | 301 | options = {'durable': False, | ||
3142 | 302 | 'queue_arguments': _get_queue_arguments(conf), | ||
3143 | 303 | 'auto_delete': True, | ||
3144 | 304 | 'exclusive': False} | ||
3145 | 305 | options.update(kwargs) | ||
3146 | 306 | exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', | ||
3147 | 307 | durable=options['durable'], | ||
3148 | 308 | auto_delete=options['auto_delete']) | ||
3149 | 309 | super(FanoutConsumer, self).__init__(channel, callback, tag, | ||
3150 | 310 | name=queue_name, | ||
3151 | 311 | exchange=exchange, | ||
3152 | 312 | routing_key=topic, | ||
3153 | 313 | **options) | ||
3154 | 314 | |||
3155 | 315 | |||
3156 | 316 | class Publisher(object): | ||
3157 | 317 | """Base Publisher class.""" | ||
3158 | 318 | |||
3159 | 319 | def __init__(self, channel, exchange_name, routing_key, **kwargs): | ||
3160 | 320 | """Init the Publisher class with the exchange_name, routing_key, | ||
3161 | 321 | and other options | ||
3162 | 322 | """ | ||
3163 | 323 | self.exchange_name = exchange_name | ||
3164 | 324 | self.routing_key = routing_key | ||
3165 | 325 | self.kwargs = kwargs | ||
3166 | 326 | self.reconnect(channel) | ||
3167 | 327 | |||
3168 | 328 | def reconnect(self, channel): | ||
3169 | 329 | """Re-establish the Producer after a rabbit reconnection.""" | ||
3170 | 330 | self.exchange = kombu.entity.Exchange(name=self.exchange_name, | ||
3171 | 331 | **self.kwargs) | ||
3172 | 332 | self.producer = kombu.messaging.Producer(exchange=self.exchange, | ||
3173 | 333 | channel=channel, | ||
3174 | 334 | routing_key=self.routing_key) | ||
3175 | 335 | |||
3176 | 336 | def send(self, msg, timeout=None): | ||
3177 | 337 | """Send a message.""" | ||
3178 | 338 | if timeout: | ||
3179 | 339 | # | ||
3180 | 340 | # AMQP TTL is in milliseconds when set in the header. | ||
3181 | 341 | # | ||
3182 | 342 | self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) | ||
3183 | 343 | else: | ||
3184 | 344 | self.producer.publish(msg) | ||
3185 | 345 | |||
3186 | 346 | |||
3187 | 347 | class DirectPublisher(Publisher): | ||
3188 | 348 | """Publisher class for 'direct'.""" | ||
3189 | 349 | def __init__(self, conf, channel, topic, **kwargs): | ||
3190 | 350 | """Init a 'direct' publisher. | ||
3191 | 351 | |||
3192 | 352 | Kombu options may be passed as keyword args to override defaults | ||
3193 | 353 | """ | ||
3194 | 354 | |||
3195 | 355 | options = {'durable': False, | ||
3196 | 356 | 'auto_delete': True, | ||
3197 | 357 | 'exclusive': False} | ||
3198 | 358 | options.update(kwargs) | ||
3199 | 359 | super(DirectPublisher, self).__init__(channel, topic, topic, | ||
3200 | 360 | type='direct', **options) | ||
3201 | 361 | |||
3202 | 362 | |||
3203 | 363 | class TopicPublisher(Publisher): | ||
3204 | 364 | """Publisher class for 'topic'.""" | ||
3205 | 365 | def __init__(self, conf, channel, exchange_name, topic, **kwargs): | ||
3206 | 366 | """Init a 'topic' publisher. | ||
3207 | 367 | |||
3208 | 368 | Kombu options may be passed as keyword args to override defaults | ||
3209 | 369 | """ | ||
3210 | 370 | options = {'durable': conf.amqp_durable_queues, | ||
3211 | 371 | 'auto_delete': conf.amqp_auto_delete, | ||
3212 | 372 | 'exclusive': False} | ||
3213 | 373 | options.update(kwargs) | ||
3214 | 374 | super(TopicPublisher, self).__init__(channel, | ||
3215 | 375 | exchange_name, | ||
3216 | 376 | topic, | ||
3217 | 377 | type='topic', | ||
3218 | 378 | **options) | ||
3219 | 379 | |||
3220 | 380 | |||
3221 | 381 | class FanoutPublisher(Publisher): | ||
3222 | 382 | """Publisher class for 'fanout'.""" | ||
3223 | 383 | def __init__(self, conf, channel, topic, **kwargs): | ||
3224 | 384 | """Init a 'fanout' publisher. | ||
3225 | 385 | |||
3226 | 386 | Kombu options may be passed as keyword args to override defaults | ||
3227 | 387 | """ | ||
3228 | 388 | options = {'durable': False, | ||
3229 | 389 | 'auto_delete': True, | ||
3230 | 390 | 'exclusive': False} | ||
3231 | 391 | options.update(kwargs) | ||
3232 | 392 | super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, | ||
3233 | 393 | None, type='fanout', **options) | ||
3234 | 394 | |||
3235 | 395 | |||
3236 | 396 | class NotifyPublisher(TopicPublisher): | ||
3237 | 397 | """Publisher class for 'notify'.""" | ||
3238 | 398 | |||
3239 | 399 | def __init__(self, conf, channel, exchange_name, topic, **kwargs): | ||
3240 | 400 | self.durable = kwargs.pop('durable', conf.amqp_durable_queues) | ||
3241 | 401 | self.queue_arguments = _get_queue_arguments(conf) | ||
3242 | 402 | super(NotifyPublisher, self).__init__(conf, channel, exchange_name, | ||
3243 | 403 | topic, **kwargs) | ||
3244 | 404 | |||
3245 | 405 | def reconnect(self, channel): | ||
3246 | 406 | super(NotifyPublisher, self).reconnect(channel) | ||
3247 | 407 | |||
3248 | 408 | # NOTE(jerdfelt): Normally the consumer would create the queue, but | ||
3249 | 409 | # we do this to ensure that messages don't get dropped if the | ||
3250 | 410 | # consumer is started after we do | ||
3251 | 411 | queue = kombu.entity.Queue(channel=channel, | ||
3252 | 412 | exchange=self.exchange, | ||
3253 | 413 | durable=self.durable, | ||
3254 | 414 | name=self.routing_key, | ||
3255 | 415 | routing_key=self.routing_key, | ||
3256 | 416 | queue_arguments=self.queue_arguments) | ||
3257 | 417 | queue.declare() | ||
3258 | 418 | |||
3259 | 419 | |||
3260 | 420 | class Connection(object): | ||
3261 | 421 | """Connection object.""" | ||
3262 | 422 | |||
3263 | 423 | pools = {} | ||
3264 | 424 | |||
3265 | 425 | def __init__(self, conf, url): | ||
3266 | 426 | self.consumers = [] | ||
3267 | 427 | self.conf = conf | ||
3268 | 428 | self.max_retries = self.conf.rabbit_max_retries | ||
3269 | 429 | # Try forever? | ||
3270 | 430 | if self.max_retries <= 0: | ||
3271 | 431 | self.max_retries = None | ||
3272 | 432 | self.interval_start = self.conf.rabbit_retry_interval | ||
3273 | 433 | self.interval_stepping = self.conf.rabbit_retry_backoff | ||
3274 | 434 | # max retry-interval = 30 seconds | ||
3275 | 435 | self.interval_max = 30 | ||
3276 | 436 | self.memory_transport = False | ||
3277 | 437 | |||
3278 | 438 | ssl_params = self._fetch_ssl_params() | ||
3279 | 439 | |||
3280 | 440 | if url.virtual_host is not None: | ||
3281 | 441 | virtual_host = url.virtual_host | ||
3282 | 442 | else: | ||
3283 | 443 | virtual_host = self.conf.rabbit_virtual_host | ||
3284 | 444 | |||
3285 | 445 | self.brokers_params = [] | ||
3286 | 446 | if url.hosts: | ||
3287 | 447 | for host in url.hosts: | ||
3288 | 448 | params = { | ||
3289 | 449 | 'hostname': host.hostname, | ||
3290 | 450 | 'port': host.port or 5672, | ||
3291 | 451 | 'userid': host.username or '', | ||
3292 | 452 | 'password': host.password or '', | ||
3293 | 453 | 'login_method': self.conf.rabbit_login_method, | ||
3294 | 454 | 'virtual_host': virtual_host | ||
3295 | 455 | } | ||
3296 | 456 | if self.conf.fake_rabbit: | ||
3297 | 457 | params['transport'] = 'memory' | ||
3298 | 458 | if self.conf.rabbit_use_ssl: | ||
3299 | 459 | params['ssl'] = ssl_params | ||
3300 | 460 | |||
3301 | 461 | self.brokers_params.append(params) | ||
3302 | 462 | else: | ||
3303 | 463 | # Old configuration format | ||
3304 | 464 | for adr in self.conf.rabbit_hosts: | ||
3305 | 465 | hostname, port = netutils.parse_host_port( | ||
3306 | 466 | adr, default_port=self.conf.rabbit_port) | ||
3307 | 467 | |||
3308 | 468 | params = { | ||
3309 | 469 | 'hostname': hostname, | ||
3310 | 470 | 'port': port, | ||
3311 | 471 | 'userid': self.conf.rabbit_userid, | ||
3312 | 472 | 'password': self.conf.rabbit_password, | ||
3313 | 473 | 'login_method': self.conf.rabbit_login_method, | ||
3314 | 474 | 'virtual_host': virtual_host | ||
3315 | 475 | } | ||
3316 | 476 | |||
3317 | 477 | if self.conf.fake_rabbit: | ||
3318 | 478 | params['transport'] = 'memory' | ||
3319 | 479 | if self.conf.rabbit_use_ssl: | ||
3320 | 480 | params['ssl'] = ssl_params | ||
3321 | 481 | |||
3322 | 482 | self.brokers_params.append(params) | ||
3323 | 483 | |||
3324 | 484 | random.shuffle(self.brokers_params) | ||
3325 | 485 | self.brokers = itertools.cycle(self.brokers_params) | ||
3326 | 486 | |||
3327 | 487 | self.memory_transport = self.conf.fake_rabbit | ||
3328 | 488 | |||
3329 | 489 | self.connection = None | ||
3330 | 490 | self.do_consume = None | ||
3331 | 491 | self.reconnect() | ||
3332 | 492 | |||
3333 | 493 | # FIXME(markmc): use oslo sslutils when it is available as a library | ||
3334 | 494 | _SSL_PROTOCOLS = { | ||
3335 | 495 | "tlsv1": ssl.PROTOCOL_TLSv1, | ||
3336 | 496 | "sslv23": ssl.PROTOCOL_SSLv23, | ||
3337 | 497 | "sslv3": ssl.PROTOCOL_SSLv3 | ||
3338 | 498 | } | ||
3339 | 499 | |||
3340 | 500 | try: | ||
3341 | 501 | _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 | ||
3342 | 502 | except AttributeError: | ||
3343 | 503 | pass | ||
3344 | 504 | |||
3345 | 505 | @classmethod | ||
3346 | 506 | def validate_ssl_version(cls, version): | ||
3347 | 507 | key = version.lower() | ||
3348 | 508 | try: | ||
3349 | 509 | return cls._SSL_PROTOCOLS[key] | ||
3350 | 510 | except KeyError: | ||
3351 | 511 | raise RuntimeError(_("Invalid SSL version : %s") % version) | ||
3352 | 512 | |||
3353 | 513 | def _fetch_ssl_params(self): | ||
3354 | 514 | """Handles fetching what ssl params should be used for the connection | ||
3355 | 515 | (if any). | ||
3356 | 516 | """ | ||
3357 | 517 | ssl_params = dict() | ||
3358 | 518 | |||
3359 | 519 | # http://docs.python.org/library/ssl.html - ssl.wrap_socket | ||
3360 | 520 | if self.conf.kombu_ssl_version: | ||
3361 | 521 | ssl_params['ssl_version'] = self.validate_ssl_version( | ||
3362 | 522 | self.conf.kombu_ssl_version) | ||
3363 | 523 | if self.conf.kombu_ssl_keyfile: | ||
3364 | 524 | ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile | ||
3365 | 525 | if self.conf.kombu_ssl_certfile: | ||
3366 | 526 | ssl_params['certfile'] = self.conf.kombu_ssl_certfile | ||
3367 | 527 | if self.conf.kombu_ssl_ca_certs: | ||
3368 | 528 | ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs | ||
3369 | 529 | # We might want to allow variations in the | ||
3370 | 530 | # future with this? | ||
3371 | 531 | ssl_params['cert_reqs'] = ssl.CERT_REQUIRED | ||
3372 | 532 | |||
3373 | 533 | # Return the extended behavior or just have the default behavior | ||
3374 | 534 | return ssl_params or True | ||
3375 | 535 | |||
3376 | 536 | def _connect(self, broker): | ||
3377 | 537 | """Connect to rabbit. Re-establish any queues that may have | ||
3378 | 538 | been declared before if we are reconnecting. Exceptions should | ||
3379 | 539 | be handled by the caller. | ||
3380 | 540 | """ | ||
3381 | 541 | LOG.info(_("Connecting to AMQP server on " | ||
3382 | 542 | "%(hostname)s:%(port)d"), broker) | ||
3383 | 543 | self.connection = kombu.connection.BrokerConnection(**broker) | ||
3384 | 544 | self.connection_errors = self.connection.connection_errors | ||
3385 | 545 | self.channel_errors = self.connection.channel_errors | ||
3386 | 546 | if self.memory_transport: | ||
3387 | 547 | # Kludge to speed up tests. | ||
3388 | 548 | self.connection.transport.polling_interval = 0.0 | ||
3389 | 549 | self.do_consume = True | ||
3390 | 550 | self.consumer_num = itertools.count(1) | ||
3391 | 551 | self.connection.connect() | ||
3392 | 552 | self.channel = self.connection.channel() | ||
3393 | 553 | # work around 'memory' transport bug in 1.1.3 | ||
3394 | 554 | if self.memory_transport: | ||
3395 | 555 | self.channel._new_queue('ae.undeliver') | ||
3396 | 556 | for consumer in self.consumers: | ||
3397 | 557 | consumer.reconnect(self.channel) | ||
3398 | 558 | LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), | ||
3399 | 559 | broker) | ||
3400 | 560 | |||
3401 | 561 | def _disconnect(self): | ||
3402 | 562 | if self.connection: | ||
3403 | 563 | # XXX(nic): when reconnecting to a RabbitMQ cluster | ||
3404 | 564 | # with mirrored queues in use, the attempt to release the | ||
3405 | 565 | # connection can hang "indefinitely" somewhere deep down | ||
3406 | 566 | # in Kombu. Blocking the thread for a bit prior to | ||
3407 | 567 | # release seems to kludge around the problem where it is | ||
3408 | 568 | # otherwise reproduceable. | ||
3409 | 569 | if self.conf.kombu_reconnect_delay > 0: | ||
3410 | 570 | LOG.info(_("Delaying reconnect for %1.1f seconds...") % | ||
3411 | 571 | self.conf.kombu_reconnect_delay) | ||
3412 | 572 | time.sleep(self.conf.kombu_reconnect_delay) | ||
3413 | 573 | |||
3414 | 574 | try: | ||
3415 | 575 | self.connection.release() | ||
3416 | 576 | except self.connection_errors: | ||
3417 | 577 | pass | ||
3418 | 578 | self.connection = None | ||
3419 | 579 | |||
3420 | 580 | def reconnect(self, retry=None): | ||
3421 | 581 | """Handles reconnecting and re-establishing queues. | ||
3422 | 582 | Will retry up to retry number of times. | ||
3423 | 583 | retry = None means use the value of rabbit_max_retries | ||
3424 | 584 | retry = -1 means to retry forever | ||
3425 | 585 | retry = 0 means no retry | ||
3426 | 586 | retry = N means N retries | ||
3427 | 587 | Sleep between tries, starting at self.interval_start | ||
3428 | 588 | seconds, backing off self.interval_stepping number of seconds | ||
3429 | 589 | each attempt. | ||
3430 | 590 | """ | ||
3431 | 591 | |||
3432 | 592 | attempt = 0 | ||
3433 | 593 | loop_forever = False | ||
3434 | 594 | if retry is None: | ||
3435 | 595 | retry = self.max_retries | ||
3436 | 596 | if retry is None or retry < 0: | ||
3437 | 597 | loop_forever = True | ||
3438 | 598 | |||
3439 | 599 | while True: | ||
3440 | 600 | self._disconnect() | ||
3441 | 601 | |||
3442 | 602 | broker = six.next(self.brokers) | ||
3443 | 603 | attempt += 1 | ||
3444 | 604 | try: | ||
3445 | 605 | self._connect(broker) | ||
3446 | 606 | return | ||
3447 | 607 | except IOError as ex: | ||
3448 | 608 | e = ex | ||
3449 | 609 | except self.connection_errors as ex: | ||
3450 | 610 | e = ex | ||
3451 | 611 | except Exception as ex: | ||
3452 | 612 | # NOTE(comstud): Unfortunately it's possible for amqplib | ||
3453 | 613 | # to return an error not covered by its transport | ||
3454 | 614 | # connection_errors in the case of a timeout waiting for | ||
3455 | 615 | # a protocol response. (See paste link in LP888621) | ||
3456 | 616 | # So, we check all exceptions for 'timeout' in them | ||
3457 | 617 | # and try to reconnect in this case. | ||
3458 | 618 | if 'timeout' not in six.text_type(e): | ||
3459 | 619 | raise | ||
3460 | 620 | e = ex | ||
3461 | 621 | |||
3462 | 622 | log_info = {} | ||
3463 | 623 | log_info['err_str'] = e | ||
3464 | 624 | log_info['retry'] = retry or 0 | ||
3465 | 625 | log_info.update(broker) | ||
3466 | 626 | |||
3467 | 627 | if not loop_forever and attempt > retry: | ||
3468 | 628 | msg = _('Unable to connect to AMQP server on ' | ||
3469 | 629 | '%(hostname)s:%(port)d after %(retry)d ' | ||
3470 | 630 | 'tries: %(err_str)s') % log_info | ||
3471 | 631 | LOG.error(msg) | ||
3472 | 632 | raise exceptions.MessageDeliveryFailure(msg) | ||
3473 | 633 | else: | ||
3474 | 634 | if attempt == 1: | ||
3475 | 635 | sleep_time = self.interval_start or 1 | ||
3476 | 636 | elif attempt > 1: | ||
3477 | 637 | sleep_time += self.interval_stepping | ||
3478 | 638 | |||
3479 | 639 | sleep_time = min(sleep_time, self.interval_max) | ||
3480 | 640 | |||
3481 | 641 | log_info['sleep_time'] = sleep_time | ||
3482 | 642 | if 'Socket closed' in six.text_type(e): | ||
3483 | 643 | LOG.error(_('AMQP server %(hostname)s:%(port)d closed' | ||
3484 | 644 | ' the connection. Check login credentials:' | ||
3485 | 645 | ' %(err_str)s'), log_info) | ||
3486 | 646 | else: | ||
3487 | 647 | LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' | ||
3488 | 648 | 'unreachable: %(err_str)s. Trying again in ' | ||
3489 | 649 | '%(sleep_time)d seconds.'), log_info) | ||
3490 | 650 | time.sleep(sleep_time) | ||
3491 | 651 | |||
3492 | 652 | def ensure(self, error_callback, method, retry=None): | ||
3493 | 653 | while True: | ||
3494 | 654 | try: | ||
3495 | 655 | return method() | ||
3496 | 656 | except self.connection_errors as e: | ||
3497 | 657 | if error_callback: | ||
3498 | 658 | error_callback(e) | ||
3499 | 659 | except self.channel_errors as e: | ||
3500 | 660 | if error_callback: | ||
3501 | 661 | error_callback(e) | ||
3502 | 662 | except (socket.timeout, IOError) as e: | ||
3503 | 663 | if error_callback: | ||
3504 | 664 | error_callback(e) | ||
3505 | 665 | except Exception as e: | ||
3506 | 666 | # NOTE(comstud): Unfortunately it's possible for amqplib | ||
3507 | 667 | # to return an error not covered by its transport | ||
3508 | 668 | # connection_errors in the case of a timeout waiting for | ||
3509 | 669 | # a protocol response. (See paste link in LP888621) | ||
3510 | 670 | # So, we check all exceptions for 'timeout' in them | ||
3511 | 671 | # and try to reconnect in this case. | ||
3512 | 672 | if 'timeout' not in six.text_type(e): | ||
3513 | 673 | raise | ||
3514 | 674 | if error_callback: | ||
3515 | 675 | error_callback(e) | ||
3516 | 676 | self.reconnect(retry=retry) | ||
3517 | 677 | |||
3518 | 678 | def get_channel(self): | ||
3519 | 679 | """Convenience call for bin/clear_rabbit_queues.""" | ||
3520 | 680 | return self.channel | ||
3521 | 681 | |||
3522 | 682 | def close(self): | ||
3523 | 683 | """Close/release this connection.""" | ||
3524 | 684 | if self.connection: | ||
3525 | 685 | self.connection.release() | ||
3526 | 686 | self.connection = None | ||
3527 | 687 | |||
3528 | 688 | def reset(self): | ||
3529 | 689 | """Reset a connection so it can be used again.""" | ||
3530 | 690 | self.channel.close() | ||
3531 | 691 | self.channel = self.connection.channel() | ||
3532 | 692 | # work around 'memory' transport bug in 1.1.3 | ||
3533 | 693 | if self.memory_transport: | ||
3534 | 694 | self.channel._new_queue('ae.undeliver') | ||
3535 | 695 | self.consumers = [] | ||
3536 | 696 | |||
3537 | 697 | def declare_consumer(self, consumer_cls, topic, callback): | ||
3538 | 698 | """Create a Consumer using the class that was passed in and | ||
3539 | 699 | add it to our list of consumers | ||
3540 | 700 | """ | ||
3541 | 701 | |||
3542 | 702 | def _connect_error(exc): | ||
3543 | 703 | log_info = {'topic': topic, 'err_str': exc} | ||
3544 | 704 | LOG.error(_("Failed to declare consumer for topic '%(topic)s': " | ||
3545 | 705 | "%(err_str)s"), log_info) | ||
3546 | 706 | |||
3547 | 707 | def _declare_consumer(): | ||
3548 | 708 | consumer = consumer_cls(self.conf, self.channel, topic, callback, | ||
3549 | 709 | six.next(self.consumer_num)) | ||
3550 | 710 | self.consumers.append(consumer) | ||
3551 | 711 | return consumer | ||
3552 | 712 | |||
3553 | 713 | return self.ensure(_connect_error, _declare_consumer) | ||
3554 | 714 | |||
3555 | 715 | def iterconsume(self, limit=None, timeout=None): | ||
3556 | 716 | """Return an iterator that will consume from all queues/consumers.""" | ||
3557 | 717 | |||
3558 | 718 | timer = rpc_common.DecayingTimer(duration=timeout) | ||
3559 | 719 | timer.start() | ||
3560 | 720 | |||
3561 | 721 | def _raise_timeout(exc): | ||
3562 | 722 | LOG.debug('Timed out waiting for RPC response: %s', exc) | ||
3563 | 723 | raise rpc_common.Timeout() | ||
3564 | 724 | |||
3565 | 725 | def _error_callback(exc): | ||
3566 | 726 | timer.check_return(_raise_timeout, exc) | ||
3567 | 727 | LOG.exception(_('Failed to consume message from queue: %s'), | ||
3568 | 728 | exc) | ||
3569 | 729 | self.do_consume = True | ||
3570 | 730 | |||
3571 | 731 | def _consume(): | ||
3572 | 732 | if self.do_consume: | ||
3573 | 733 | queues_head = self.consumers[:-1] # not fanout. | ||
3574 | 734 | queues_tail = self.consumers[-1] # fanout | ||
3575 | 735 | for queue in queues_head: | ||
3576 | 736 | queue.consume(nowait=True) | ||
3577 | 737 | queues_tail.consume(nowait=False) | ||
3578 | 738 | self.do_consume = False | ||
3579 | 739 | |||
3580 | 740 | poll_timeout = 1 if timeout is None else min(timeout, 1) | ||
3581 | 741 | while True: | ||
3582 | 742 | try: | ||
3583 | 743 | return self.connection.drain_events(timeout=poll_timeout) | ||
3584 | 744 | except socket.timeout as exc: | ||
3585 | 745 | poll_timeout = timer.check_return(_raise_timeout, exc, | ||
3586 | 746 | maximum=1) | ||
3587 | 747 | |||
3588 | 748 | for iteration in itertools.count(0): | ||
3589 | 749 | if limit and iteration >= limit: | ||
3590 | 750 | raise StopIteration | ||
3591 | 751 | yield self.ensure(_error_callback, _consume) | ||
3592 | 752 | |||
3593 | 753 | def publisher_send(self, cls, topic, msg, timeout=None, retry=None, | ||
3594 | 754 | **kwargs): | ||
3595 | 755 | """Send to a publisher based on the publisher class.""" | ||
3596 | 756 | |||
3597 | 757 | def _error_callback(exc): | ||
3598 | 758 | log_info = {'topic': topic, 'err_str': exc} | ||
3599 | 759 | LOG.exception(_("Failed to publish message to topic " | ||
3600 | 760 | "'%(topic)s': %(err_str)s"), log_info) | ||
3601 | 761 | |||
3602 | 762 | def _publish(): | ||
3603 | 763 | publisher = cls(self.conf, self.channel, topic=topic, **kwargs) | ||
3604 | 764 | publisher.send(msg, timeout) | ||
3605 | 765 | |||
3606 | 766 | self.ensure(_error_callback, _publish, retry=retry) | ||
3607 | 767 | |||
3608 | 768 | def declare_direct_consumer(self, topic, callback): | ||
3609 | 769 | """Create a 'direct' queue. | ||
3610 | 770 | In nova's use, this is generally a msg_id queue used for | ||
3611 | 771 | responses for call/multicall | ||
3612 | 772 | """ | ||
3613 | 773 | self.declare_consumer(DirectConsumer, topic, callback) | ||
3614 | 774 | |||
3615 | 775 | def declare_topic_consumer(self, exchange_name, topic, callback=None, | ||
3616 | 776 | queue_name=None): | ||
3617 | 777 | """Create a 'topic' consumer.""" | ||
3618 | 778 | self.declare_consumer(functools.partial(TopicConsumer, | ||
3619 | 779 | name=queue_name, | ||
3620 | 780 | exchange_name=exchange_name, | ||
3621 | 781 | ), | ||
3622 | 782 | topic, callback) | ||
3623 | 783 | |||
3624 | 784 | def declare_fanout_consumer(self, topic, callback): | ||
3625 | 785 | """Create a 'fanout' consumer.""" | ||
3626 | 786 | self.declare_consumer(FanoutConsumer, topic, callback) | ||
3627 | 787 | |||
3628 | 788 | def direct_send(self, msg_id, msg): | ||
3629 | 789 | """Send a 'direct' message.""" | ||
3630 | 790 | self.publisher_send(DirectPublisher, msg_id, msg) | ||
3631 | 791 | |||
3632 | 792 | def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): | ||
3633 | 793 | """Send a 'topic' message.""" | ||
3634 | 794 | self.publisher_send(TopicPublisher, topic, msg, timeout, | ||
3635 | 795 | exchange_name=exchange_name, retry=retry) | ||
3636 | 796 | |||
3637 | 797 | def fanout_send(self, topic, msg, retry=None): | ||
3638 | 798 | """Send a 'fanout' message.""" | ||
3639 | 799 | self.publisher_send(FanoutPublisher, topic, msg, retry=retry) | ||
3640 | 800 | |||
3641 | 801 | def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): | ||
3642 | 802 | """Send a notify message on a topic.""" | ||
3643 | 803 | self.publisher_send(NotifyPublisher, topic, msg, timeout=None, | ||
3644 | 804 | exchange_name=exchange_name, retry=retry, **kwargs) | ||
3645 | 805 | |||
3646 | 806 | def consume(self, limit=None, timeout=None): | ||
3647 | 807 | """Consume from all queues/consumers.""" | ||
3648 | 808 | it = self.iterconsume(limit=limit, timeout=timeout) | ||
3649 | 809 | while True: | ||
3650 | 810 | try: | ||
3651 | 811 | six.next(it) | ||
3652 | 812 | except StopIteration: | ||
3653 | 813 | return | ||
3654 | 814 | |||
3655 | 815 | |||
3656 | 816 | class RabbitDriver(amqpdriver.AMQPDriverBase): | ||
3657 | 817 | |||
3658 | 818 | def __init__(self, conf, url, | ||
3659 | 819 | default_exchange=None, | ||
3660 | 820 | allowed_remote_exmods=None): | ||
3661 | 821 | conf.register_opts(rabbit_opts) | ||
3662 | 822 | conf.register_opts(rpc_amqp.amqp_opts) | ||
3663 | 823 | |||
3664 | 824 | connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection) | ||
3665 | 825 | |||
3666 | 826 | super(RabbitDriver, self).__init__(conf, url, | ||
3667 | 827 | connection_pool, | ||
3668 | 828 | default_exchange, | ||
3669 | 829 | allowed_remote_exmods) | ||
3670 | 830 | |||
3671 | 831 | def require_features(self, requeue=True): | ||
3672 | 832 | pass | ||
3673 | 0 | 833 | ||
3674 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests' | |||
3675 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers' | |||
3676 | === added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py' | |||
3677 | --- .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py 1970-01-01 00:00:00 +0000 | |||
3678 | +++ .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py 2015-12-17 11:03:34 +0000 | |||
3679 | @@ -0,0 +1,735 @@ | |||
3680 | 1 | # Copyright 2013 Red Hat, Inc. | ||
3681 | 2 | # | ||
3682 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
3683 | 4 | # not use this file except in compliance with the License. You may obtain | ||
3684 | 5 | # a copy of the License at | ||
3685 | 6 | # | ||
3686 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
3687 | 8 | # | ||
3688 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
3689 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
3690 | 11 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
3691 | 12 | # License for the specific language governing permissions and limitations | ||
3692 | 13 | # under the License. | ||
3693 | 14 | |||
3694 | 15 | import datetime | ||
3695 | 16 | import operator | ||
3696 | 17 | import sys | ||
3697 | 18 | import threading | ||
3698 | 19 | import uuid | ||
3699 | 20 | |||
3700 | 21 | import fixtures | ||
3701 | 22 | import kombu | ||
3702 | 23 | import mock | ||
3703 | 24 | import testscenarios | ||
3704 | 25 | |||
3705 | 26 | from oslo import messaging | ||
3706 | 27 | from oslo.messaging._drivers import amqpdriver | ||
3707 | 28 | from oslo.messaging._drivers import common as driver_common | ||
3708 | 29 | from oslo.messaging._drivers import impl_rabbit as rabbit_driver | ||
3709 | 30 | from oslo.messaging.openstack.common import jsonutils | ||
3710 | 31 | from tests import utils as test_utils | ||
3711 | 32 | |||
3712 | 33 | load_tests = testscenarios.load_tests_apply_scenarios | ||
3713 | 34 | |||
3714 | 35 | |||
3715 | 36 | class TestRabbitDriverLoad(test_utils.BaseTestCase): | ||
3716 | 37 | |||
3717 | 38 | def setUp(self): | ||
3718 | 39 | super(TestRabbitDriverLoad, self).setUp() | ||
3719 | 40 | self.messaging_conf.transport_driver = 'rabbit' | ||
3720 | 41 | self.messaging_conf.in_memory = True | ||
3721 | 42 | |||
3722 | 43 | def test_driver_load(self): | ||
3723 | 44 | transport = messaging.get_transport(self.conf) | ||
3724 | 45 | self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver) | ||
3725 | 46 | |||
3726 | 47 | |||
3727 | 48 | class TestRabbitTransportURL(test_utils.BaseTestCase): | ||
3728 | 49 | |||
3729 | 50 | scenarios = [ | ||
3730 | 51 | ('none', dict(url=None, | ||
3731 | 52 | expected=[dict(hostname='localhost', | ||
3732 | 53 | port=5672, | ||
3733 | 54 | userid='guest', | ||
3734 | 55 | password='guest', | ||
3735 | 56 | virtual_host='/')])), | ||
3736 | 57 | ('empty', | ||
3737 | 58 | dict(url='rabbit:///', | ||
3738 | 59 | expected=[dict(hostname='localhost', | ||
3739 | 60 | port=5672, | ||
3740 | 61 | userid='guest', | ||
3741 | 62 | password='guest', | ||
3742 | 63 | virtual_host='')])), | ||
3743 | 64 | ('localhost', | ||
3744 | 65 | dict(url='rabbit://localhost/', | ||
3745 | 66 | expected=[dict(hostname='localhost', | ||
3746 | 67 | port=5672, | ||
3747 | 68 | userid='', | ||
3748 | 69 | password='', | ||
3749 | 70 | virtual_host='')])), | ||
3750 | 71 | ('virtual_host', | ||
3751 | 72 | dict(url='rabbit:///vhost', | ||
3752 | 73 | expected=[dict(hostname='localhost', | ||
3753 | 74 | port=5672, | ||
3754 | 75 | userid='guest', | ||
3755 | 76 | password='guest', | ||
3756 | 77 | virtual_host='vhost')])), | ||
3757 | 78 | ('no_creds', | ||
3758 | 79 | dict(url='rabbit://host/virtual_host', | ||
3759 | 80 | expected=[dict(hostname='host', | ||
3760 | 81 | port=5672, | ||
3761 | 82 | userid='', | ||
3762 | 83 | password='', | ||
3763 | 84 | virtual_host='virtual_host')])), | ||
3764 | 85 | ('no_port', | ||
3765 | 86 | dict(url='rabbit://user:password@host/virtual_host', | ||
3766 | 87 | expected=[dict(hostname='host', | ||
3767 | 88 | port=5672, | ||
3768 | 89 | userid='user', | ||
3769 | 90 | password='password', | ||
3770 | 91 | virtual_host='virtual_host')])), | ||
3771 | 92 | ('full_url', | ||
3772 | 93 | dict(url='rabbit://user:password@host:10/virtual_host', | ||
3773 | 94 | expected=[dict(hostname='host', | ||
3774 | 95 | port=10, | ||
3775 | 96 | userid='user', | ||
3776 | 97 | password='password', | ||
3777 | 98 | virtual_host='virtual_host')])), | ||
3778 | 99 | ('full_two_url', | ||
3779 | 100 | dict(url='rabbit://user:password@host:10,' | ||
3780 | 101 | 'user2:password2@host2:12/virtual_host', | ||
3781 | 102 | expected=[dict(hostname='host', | ||
3782 | 103 | port=10, | ||
3783 | 104 | userid='user', | ||
3784 | 105 | password='password', | ||
3785 | 106 | virtual_host='virtual_host'), | ||
3786 | 107 | dict(hostname='host2', | ||
3787 | 108 | port=12, | ||
3788 | 109 | userid='user2', | ||
3789 | 110 | password='password2', | ||
3790 | 111 | virtual_host='virtual_host') | ||
3791 | 112 | ] | ||
3792 | 113 | )), | ||
3793 | 114 | |||
3794 | 115 | ] | ||
3795 | 116 | |||
3796 | 117 | def test_transport_url(self): | ||
3797 | 118 | self.messaging_conf.in_memory = True | ||
3798 | 119 | |||
3799 | 120 | transport = messaging.get_transport(self.conf, self.url) | ||
3800 | 121 | self.addCleanup(transport.cleanup) | ||
3801 | 122 | driver = transport._driver | ||
3802 | 123 | |||
3803 | 124 | brokers_params = driver._get_connection().brokers_params[:] | ||
3804 | 125 | brokers_params = [dict((k, v) for k, v in broker.items() | ||
3805 | 126 | if k not in ['transport', 'login_method']) | ||
3806 | 127 | for broker in brokers_params] | ||
3807 | 128 | |||
3808 | 129 | self.assertEqual(sorted(self.expected, | ||
3809 | 130 | key=operator.itemgetter('hostname')), | ||
3810 | 131 | sorted(brokers_params, | ||
3811 | 132 | key=operator.itemgetter('hostname'))) | ||
3812 | 133 | |||
3813 | 134 | |||
3814 | 135 | class TestSendReceive(test_utils.BaseTestCase): | ||
3815 | 136 | |||
3816 | 137 | _n_senders = [ | ||
3817 | 138 | ('single_sender', dict(n_senders=1)), | ||
3818 | 139 | ('multiple_senders', dict(n_senders=10)), | ||
3819 | 140 | ] | ||
3820 | 141 | |||
3821 | 142 | _context = [ | ||
3822 | 143 | ('empty_context', dict(ctxt={})), | ||
3823 | 144 | ('with_context', dict(ctxt={'user': 'mark'})), | ||
3824 | 145 | ] | ||
3825 | 146 | |||
3826 | 147 | _reply = [ | ||
3827 | 148 | ('rx_id', dict(rx_id=True, reply=None)), | ||
3828 | 149 | ('none', dict(rx_id=False, reply=None)), | ||
3829 | 150 | ('empty_list', dict(rx_id=False, reply=[])), | ||
3830 | 151 | ('empty_dict', dict(rx_id=False, reply={})), | ||
3831 | 152 | ('false', dict(rx_id=False, reply=False)), | ||
3832 | 153 | ('zero', dict(rx_id=False, reply=0)), | ||
3833 | 154 | ] | ||
3834 | 155 | |||
3835 | 156 | _failure = [ | ||
3836 | 157 | ('success', dict(failure=False)), | ||
3837 | 158 | ('failure', dict(failure=True, expected=False)), | ||
3838 | 159 | ('expected_failure', dict(failure=True, expected=True)), | ||
3839 | 160 | ] | ||
3840 | 161 | |||
3841 | 162 | _timeout = [ | ||
3842 | 163 | ('no_timeout', dict(timeout=None)), | ||
3843 | 164 | ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken? | ||
3844 | 165 | ] | ||
3845 | 166 | |||
3846 | 167 | @classmethod | ||
3847 | 168 | def generate_scenarios(cls): | ||
3848 | 169 | cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders, | ||
3849 | 170 | cls._context, | ||
3850 | 171 | cls._reply, | ||
3851 | 172 | cls._failure, | ||
3852 | 173 | cls._timeout) | ||
3853 | 174 | |||
3854 | 175 | def setUp(self): | ||
3855 | 176 | super(TestSendReceive, self).setUp() | ||
3856 | 177 | self.messaging_conf.transport_driver = 'rabbit' | ||
3857 | 178 | self.messaging_conf.in_memory = True | ||
3858 | 179 | |||
3859 | 180 | def test_send_receive(self): | ||
3860 | 181 | transport = messaging.get_transport(self.conf) | ||
3861 | 182 | self.addCleanup(transport.cleanup) | ||
3862 | 183 | |||
3863 | 184 | driver = transport._driver | ||
3864 | 185 | |||
3865 | 186 | target = messaging.Target(topic='testtopic') | ||
3866 | 187 | |||
3867 | 188 | listener = driver.listen(target) | ||
3868 | 189 | |||
3869 | 190 | senders = [] | ||
3870 | 191 | replies = [] | ||
3871 | 192 | msgs = [] | ||
3872 | 193 | errors = [] | ||
3873 | 194 | |||
3874 | 195 | def stub_error(msg, *a, **kw): | ||
3875 | 196 | if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): | ||
3876 | 197 | a = a[0] | ||
3877 | 198 | errors.append(str(msg) % a) | ||
3878 | 199 | |||
3879 | 200 | self.stubs.Set(driver_common.LOG, 'error', stub_error) | ||
3880 | 201 | |||
3881 | 202 | def send_and_wait_for_reply(i): | ||
3882 | 203 | try: | ||
3883 | 204 | replies.append(driver.send(target, | ||
3884 | 205 | self.ctxt, | ||
3885 | 206 | {'tx_id': i}, | ||
3886 | 207 | wait_for_reply=True, | ||
3887 | 208 | timeout=self.timeout)) | ||
3888 | 209 | self.assertFalse(self.failure) | ||
3889 | 210 | self.assertIsNone(self.timeout) | ||
3890 | 211 | except (ZeroDivisionError, messaging.MessagingTimeout) as e: | ||
3891 | 212 | replies.append(e) | ||
3892 | 213 | self.assertTrue(self.failure or self.timeout is not None) | ||
3893 | 214 | |||
3894 | 215 | while len(senders) < self.n_senders: | ||
3895 | 216 | senders.append(threading.Thread(target=send_and_wait_for_reply, | ||
3896 | 217 | args=(len(senders), ))) | ||
3897 | 218 | |||
3898 | 219 | for i in range(len(senders)): | ||
3899 | 220 | senders[i].start() | ||
3900 | 221 | |||
3901 | 222 | received = listener.poll() | ||
3902 | 223 | self.assertIsNotNone(received) | ||
3903 | 224 | self.assertEqual(self.ctxt, received.ctxt) | ||
3904 | 225 | self.assertEqual({'tx_id': i}, received.message) | ||
3905 | 226 | msgs.append(received) | ||
3906 | 227 | |||
3907 | 228 | # reply in reverse, except reply to the first guy second from last | ||
3908 | 229 | order = list(range(len(senders) - 1, -1, -1)) | ||
3909 | 230 | if len(order) > 1: | ||
3910 | 231 | order[-1], order[-2] = order[-2], order[-1] | ||
3911 | 232 | |||
3912 | 233 | for i in order: | ||
3913 | 234 | if self.timeout is None: | ||
3914 | 235 | if self.failure: | ||
3915 | 236 | try: | ||
3916 | 237 | raise ZeroDivisionError | ||
3917 | 238 | except Exception: | ||
3918 | 239 | failure = sys.exc_info() | ||
3919 | 240 | msgs[i].reply(failure=failure, | ||
3920 | 241 | log_failure=not self.expected) | ||
3921 | 242 | elif self.rx_id: | ||
3922 | 243 | msgs[i].reply({'rx_id': i}) | ||
3923 | 244 | else: | ||
3924 | 245 | msgs[i].reply(self.reply) | ||
3925 | 246 | senders[i].join() | ||
3926 | 247 | |||
3927 | 248 | self.assertEqual(len(senders), len(replies)) | ||
3928 | 249 | for i, reply in enumerate(replies): | ||
3929 | 250 | if self.timeout is not None: | ||
3930 | 251 | self.assertIsInstance(reply, messaging.MessagingTimeout) | ||
3931 | 252 | elif self.failure: | ||
3932 | 253 | self.assertIsInstance(reply, ZeroDivisionError) | ||
3933 | 254 | elif self.rx_id: | ||
3934 | 255 | self.assertEqual({'rx_id': order[i]}, reply) | ||
3935 | 256 | else: | ||
3936 | 257 | self.assertEqual(self.reply, reply) | ||
3937 | 258 | |||
3938 | 259 | if not self.timeout and self.failure and not self.expected: | ||
3939 | 260 | self.assertTrue(len(errors) > 0, errors) | ||
3940 | 261 | else: | ||
3941 | 262 | self.assertEqual(0, len(errors), errors) | ||
3942 | 263 | |||
3943 | 264 | |||
3944 | 265 | TestSendReceive.generate_scenarios() | ||
3945 | 266 | |||
3946 | 267 | |||
3947 | 268 | class TestPollAsync(test_utils.BaseTestCase): | ||
3948 | 269 | |||
3949 | 270 | def setUp(self): | ||
3950 | 271 | super(TestPollAsync, self).setUp() | ||
3951 | 272 | self.messaging_conf.transport_driver = 'rabbit' | ||
3952 | 273 | self.messaging_conf.in_memory = True | ||
3953 | 274 | |||
3954 | 275 | def test_poll_timeout(self): | ||
3955 | 276 | transport = messaging.get_transport(self.conf) | ||
3956 | 277 | self.addCleanup(transport.cleanup) | ||
3957 | 278 | driver = transport._driver | ||
3958 | 279 | target = messaging.Target(topic='testtopic') | ||
3959 | 280 | listener = driver.listen(target) | ||
3960 | 281 | received = listener.poll(timeout=0.050) | ||
3961 | 282 | self.assertIsNone(received) | ||
3962 | 283 | |||
3963 | 284 | |||
3964 | 285 | class TestRacyWaitForReply(test_utils.BaseTestCase): | ||
3965 | 286 | |||
3966 | 287 | def setUp(self): | ||
3967 | 288 | super(TestRacyWaitForReply, self).setUp() | ||
3968 | 289 | self.messaging_conf.transport_driver = 'rabbit' | ||
3969 | 290 | self.messaging_conf.in_memory = True | ||
3970 | 291 | |||
3971 | 292 | def test_send_receive(self): | ||
3972 | 293 | transport = messaging.get_transport(self.conf) | ||
3973 | 294 | self.addCleanup(transport.cleanup) | ||
3974 | 295 | |||
3975 | 296 | driver = transport._driver | ||
3976 | 297 | |||
3977 | 298 | target = messaging.Target(topic='testtopic') | ||
3978 | 299 | |||
3979 | 300 | listener = driver.listen(target) | ||
3980 | 301 | |||
3981 | 302 | senders = [] | ||
3982 | 303 | replies = [] | ||
3983 | 304 | msgs = [] | ||
3984 | 305 | |||
3985 | 306 | wait_conditions = [] | ||
3986 | 307 | orig_reply_waiter = amqpdriver.ReplyWaiter.wait | ||
3987 | 308 | |||
3988 | 309 | def reply_waiter(self, msg_id, timeout): | ||
3989 | 310 | if wait_conditions: | ||
3990 | 311 | cond = wait_conditions.pop() | ||
3991 | 312 | with cond: | ||
3992 | 313 | cond.notify() | ||
3993 | 314 | with cond: | ||
3994 | 315 | cond.wait() | ||
3995 | 316 | return orig_reply_waiter(self, msg_id, timeout) | ||
3996 | 317 | |||
3997 | 318 | self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter) | ||
3998 | 319 | |||
3999 | 320 | def send_and_wait_for_reply(i, wait_for_reply): | ||
4000 | 321 | replies.append(driver.send(target, | ||
4001 | 322 | {}, | ||
4002 | 323 | {'tx_id': i}, | ||
4003 | 324 | wait_for_reply=wait_for_reply, | ||
4004 | 325 | timeout=None)) | ||
4005 | 326 | |||
4006 | 327 | while len(senders) < 2: | ||
4007 | 328 | t = threading.Thread(target=send_and_wait_for_reply, | ||
4008 | 329 | args=(len(senders), True)) | ||
4009 | 330 | t.daemon = True | ||
4010 | 331 | senders.append(t) | ||
4011 | 332 | |||
4012 | 333 | # test the case then msg_id is not set | ||
4013 | 334 | t = threading.Thread(target=send_and_wait_for_reply, | ||
4014 | 335 | args=(len(senders), False)) | ||
4015 | 336 | t.daemon = True | ||
4016 | 337 | senders.append(t) | ||
4017 | 338 | |||
4018 | 339 | # Start the first guy, receive his message, but delay his polling | ||
4019 | 340 | notify_condition = threading.Condition() | ||
4020 | 341 | wait_conditions.append(notify_condition) | ||
4021 | 342 | with notify_condition: | ||
4022 | 343 | senders[0].start() | ||
4023 | 344 | notify_condition.wait() | ||
4024 | 345 | |||
4025 | 346 | msgs.append(listener.poll()) | ||
4026 | 347 | self.assertEqual({'tx_id': 0}, msgs[-1].message) | ||
4027 | 348 | |||
4028 | 349 | # Start the second guy, receive his message | ||
4029 | 350 | senders[1].start() | ||
4030 | 351 | |||
4031 | 352 | msgs.append(listener.poll()) | ||
4032 | 353 | self.assertEqual({'tx_id': 1}, msgs[-1].message) | ||
4033 | 354 | |||
4034 | 355 | # Reply to both in order, making the second thread queue | ||
4035 | 356 | # the reply meant for the first thread | ||
4036 | 357 | msgs[0].reply({'rx_id': 0}) | ||
4037 | 358 | msgs[1].reply({'rx_id': 1}) | ||
4038 | 359 | |||
4039 | 360 | # Wait for the second thread to finish | ||
4040 | 361 | senders[1].join() | ||
4041 | 362 | |||
4042 | 363 | # Start the 3rd guy, receive his message | ||
4043 | 364 | senders[2].start() | ||
4044 | 365 | |||
4045 | 366 | msgs.append(listener.poll()) | ||
4046 | 367 | self.assertEqual({'tx_id': 2}, msgs[-1].message) | ||
4047 | 368 | |||
4048 | 369 | # Verify the _send_reply was not invoked by driver: | ||
4049 | 370 | with mock.patch.object(msgs[2], '_send_reply') as method: | ||
4050 | 371 | msgs[2].reply({'rx_id': 2}) | ||
4051 | 372 | self.assertEqual(method.call_count, 0) | ||
4052 | 373 | |||
4053 | 374 | # Wait for the 3rd thread to finish | ||
4054 | 375 | senders[2].join() | ||
4055 | 376 | |||
4056 | 377 | # Let the first thread continue | ||
4057 | 378 | with notify_condition: | ||
4058 | 379 | notify_condition.notify() | ||
4059 | 380 | |||
4060 | 381 | # Wait for the first thread to finish | ||
4061 | 382 | senders[0].join() | ||
4062 | 383 | |||
4063 | 384 | # Verify replies were received out of order | ||
4064 | 385 | self.assertEqual(len(senders), len(replies)) | ||
4065 | 386 | self.assertEqual({'rx_id': 1}, replies[0]) | ||
4066 | 387 | self.assertIsNone(replies[1]) | ||
4067 | 388 | self.assertEqual({'rx_id': 0}, replies[2]) | ||
4068 | 389 | |||
4069 | 390 | |||
4070 | 391 | def _declare_queue(target): | ||
4071 | 392 | connection = kombu.connection.BrokerConnection(transport='memory') | ||
4072 | 393 | |||
4073 | 394 | # Kludge to speed up tests. | ||
4074 | 395 | connection.transport.polling_interval = 0.0 | ||
4075 | 396 | |||
4076 | 397 | connection.connect() | ||
4077 | 398 | channel = connection.channel() | ||
4078 | 399 | |||
4079 | 400 | # work around 'memory' transport bug in 1.1.3 | ||
4080 | 401 | channel._new_queue('ae.undeliver') | ||
4081 | 402 | |||
4082 | 403 | if target.fanout: | ||
4083 | 404 | exchange = kombu.entity.Exchange(name=target.topic + '_fanout', | ||
4084 | 405 | type='fanout', | ||
4085 | 406 | durable=False, | ||
4086 | 407 | auto_delete=True) | ||
4087 | 408 | queue = kombu.entity.Queue(name=target.topic + '_fanout_12345', | ||
4088 | 409 | channel=channel, | ||
4089 | 410 | exchange=exchange, | ||
4090 | 411 | routing_key=target.topic) | ||
4091 | 412 | if target.server: | ||
4092 | 413 | exchange = kombu.entity.Exchange(name='openstack', | ||
4093 | 414 | type='topic', | ||
4094 | 415 | durable=False, | ||
4095 | 416 | auto_delete=False) | ||
4096 | 417 | topic = '%s.%s' % (target.topic, target.server) | ||
4097 | 418 | queue = kombu.entity.Queue(name=topic, | ||
4098 | 419 | channel=channel, | ||
4099 | 420 | exchange=exchange, | ||
4100 | 421 | routing_key=topic) | ||
4101 | 422 | else: | ||
4102 | 423 | exchange = kombu.entity.Exchange(name='openstack', | ||
4103 | 424 | type='topic', | ||
4104 | 425 | durable=False, | ||
4105 | 426 | auto_delete=False) | ||
4106 | 427 | queue = kombu.entity.Queue(name=target.topic, | ||
4107 | 428 | channel=channel, | ||
4108 | 429 | exchange=exchange, | ||
4109 | 430 | routing_key=target.topic) | ||
4110 | 431 | |||
4111 | 432 | queue.declare() | ||
4112 | 433 | |||
4113 | 434 | return connection, channel, queue | ||
4114 | 435 | |||
4115 | 436 | |||
4116 | 437 | class TestRequestWireFormat(test_utils.BaseTestCase): | ||
4117 | 438 | |||
4118 | 439 | _target = [ | ||
4119 | 440 | ('topic_target', | ||
4120 | 441 | dict(topic='testtopic', server=None, fanout=False)), | ||
4121 | 442 | ('server_target', | ||
4122 | 443 | dict(topic='testtopic', server='testserver', fanout=False)), | ||
4123 | 444 | # NOTE(markmc): https://github.com/celery/kombu/issues/195 | ||
4124 | 445 | ('fanout_target', | ||
4125 | 446 | dict(topic='testtopic', server=None, fanout=True, | ||
4126 | 447 | skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')), | ||
4127 | 448 | ] | ||
4128 | 449 | |||
4129 | 450 | _msg = [ | ||
4130 | 451 | ('empty_msg', | ||
4131 | 452 | dict(msg={}, expected={})), | ||
4132 | 453 | ('primitive_msg', | ||
4133 | 454 | dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})), | ||
4134 | 455 | ('complex_msg', | ||
4135 | 456 | dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}}, | ||
4136 | 457 | expected={'a': {'b': '1920-02-03T04:05:06.000007'}})), | ||
4137 | 458 | ] | ||
4138 | 459 | |||
4139 | 460 | _context = [ | ||
4140 | 461 | ('empty_ctxt', dict(ctxt={}, expected_ctxt={})), | ||
4141 | 462 | ('user_project_ctxt', | ||
4142 | 463 | dict(ctxt={'user': 'mark', 'project': 'snarkybunch'}, | ||
4143 | 464 | expected_ctxt={'_context_user': 'mark', | ||
4144 | 465 | '_context_project': 'snarkybunch'})), | ||
4145 | 466 | ] | ||
4146 | 467 | |||
4147 | 468 | @classmethod | ||
4148 | 469 | def generate_scenarios(cls): | ||
4149 | 470 | cls.scenarios = testscenarios.multiply_scenarios(cls._msg, | ||
4150 | 471 | cls._context, | ||
4151 | 472 | cls._target) | ||
4152 | 473 | |||
4153 | 474 | def setUp(self): | ||
4154 | 475 | super(TestRequestWireFormat, self).setUp() | ||
4155 | 476 | self.messaging_conf.transport_driver = 'rabbit' | ||
4156 | 477 | self.messaging_conf.in_memory = True | ||
4157 | 478 | |||
4158 | 479 | self.uuids = [] | ||
4159 | 480 | self.orig_uuid4 = uuid.uuid4 | ||
4160 | 481 | self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4)) | ||
4161 | 482 | |||
4162 | 483 | def mock_uuid4(self): | ||
4163 | 484 | self.uuids.append(self.orig_uuid4()) | ||
4164 | 485 | return self.uuids[-1] | ||
4165 | 486 | |||
4166 | 487 | def test_request_wire_format(self): | ||
4167 | 488 | if hasattr(self, 'skip_msg'): | ||
4168 | 489 | self.skipTest(self.skip_msg) | ||
4169 | 490 | |||
4170 | 491 | transport = messaging.get_transport(self.conf) | ||
4171 | 492 | self.addCleanup(transport.cleanup) | ||
4172 | 493 | |||
4173 | 494 | driver = transport._driver | ||
4174 | 495 | |||
4175 | 496 | target = messaging.Target(topic=self.topic, | ||
4176 | 497 | server=self.server, | ||
4177 | 498 | fanout=self.fanout) | ||
4178 | 499 | |||
4179 | 500 | connection, channel, queue = _declare_queue(target) | ||
4180 | 501 | self.addCleanup(connection.release) | ||
4181 | 502 | |||
4182 | 503 | driver.send(target, self.ctxt, self.msg) | ||
4183 | 504 | |||
4184 | 505 | msgs = [] | ||
4185 | 506 | |||
4186 | 507 | def callback(msg): | ||
4187 | 508 | msg = channel.message_to_python(msg) | ||
4188 | 509 | msg.ack() | ||
4189 | 510 | msgs.append(msg.payload) | ||
4190 | 511 | |||
4191 | 512 | queue.consume(callback=callback, | ||
4192 | 513 | consumer_tag='1', | ||
4193 | 514 | nowait=False) | ||
4194 | 515 | |||
4195 | 516 | connection.drain_events() | ||
4196 | 517 | |||
4197 | 518 | self.assertEqual(1, len(msgs)) | ||
4198 | 519 | self.assertIn('oslo.message', msgs[0]) | ||
4199 | 520 | |||
4200 | 521 | received = msgs[0] | ||
4201 | 522 | received['oslo.message'] = jsonutils.loads(received['oslo.message']) | ||
4202 | 523 | |||
4203 | 524 | # FIXME(markmc): add _msg_id and _reply_q check | ||
4204 | 525 | expected_msg = { | ||
4205 | 526 | '_unique_id': self.uuids[0].hex, | ||
4206 | 527 | } | ||
4207 | 528 | expected_msg.update(self.expected) | ||
4208 | 529 | expected_msg.update(self.expected_ctxt) | ||
4209 | 530 | |||
4210 | 531 | expected = { | ||
4211 | 532 | 'oslo.version': '2.0', | ||
4212 | 533 | 'oslo.message': expected_msg, | ||
4213 | 534 | } | ||
4214 | 535 | |||
4215 | 536 | self.assertEqual(expected, received) | ||
4216 | 537 | |||
4217 | 538 | |||
4218 | 539 | TestRequestWireFormat.generate_scenarios() | ||
4219 | 540 | |||
4220 | 541 | |||
4221 | 542 | def _create_producer(target): | ||
4222 | 543 | connection = kombu.connection.BrokerConnection(transport='memory') | ||
4223 | 544 | |||
4224 | 545 | # Kludge to speed up tests. | ||
4225 | 546 | connection.transport.polling_interval = 0.0 | ||
4226 | 547 | |||
4227 | 548 | connection.connect() | ||
4228 | 549 | channel = connection.channel() | ||
4229 | 550 | |||
4230 | 551 | # work around 'memory' transport bug in 1.1.3 | ||
4231 | 552 | channel._new_queue('ae.undeliver') | ||
4232 | 553 | |||
4233 | 554 | if target.fanout: | ||
4234 | 555 | exchange = kombu.entity.Exchange(name=target.topic + '_fanout', | ||
4235 | 556 | type='fanout', | ||
4236 | 557 | durable=False, | ||
4237 | 558 | auto_delete=True) | ||
4238 | 559 | producer = kombu.messaging.Producer(exchange=exchange, | ||
4239 | 560 | channel=channel, | ||
4240 | 561 | routing_key=target.topic) | ||
4241 | 562 | elif target.server: | ||
4242 | 563 | exchange = kombu.entity.Exchange(name='openstack', | ||
4243 | 564 | type='topic', | ||
4244 | 565 | durable=False, | ||
4245 | 566 | auto_delete=False) | ||
4246 | 567 | topic = '%s.%s' % (target.topic, target.server) | ||
4247 | 568 | producer = kombu.messaging.Producer(exchange=exchange, | ||
4248 | 569 | channel=channel, | ||
4249 | 570 | routing_key=topic) | ||
4250 | 571 | else: | ||
4251 | 572 | exchange = kombu.entity.Exchange(name='openstack', | ||
4252 | 573 | type='topic', | ||
4253 | 574 | durable=False, | ||
4254 | 575 | auto_delete=False) | ||
4255 | 576 | producer = kombu.messaging.Producer(exchange=exchange, | ||
4256 | 577 | channel=channel, | ||
4257 | 578 | routing_key=target.topic) | ||
4258 | 579 | |||
4259 | 580 | return connection, producer | ||
4260 | 581 | |||
4261 | 582 | |||
4262 | 583 | class TestReplyWireFormat(test_utils.BaseTestCase): | ||
4263 | 584 | |||
4264 | 585 | _target = [ | ||
4265 | 586 | ('topic_target', | ||
4266 | 587 | dict(topic='testtopic', server=None, fanout=False)), | ||
4267 | 588 | ('server_target', | ||
4268 | 589 | dict(topic='testtopic', server='testserver', fanout=False)), | ||
4269 | 590 | # NOTE(markmc): https://github.com/celery/kombu/issues/195 | ||
4270 | 591 | ('fanout_target', | ||
4271 | 592 | dict(topic='testtopic', server=None, fanout=True, | ||
4272 | 593 | skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')), | ||
4273 | 594 | ] | ||
4274 | 595 | |||
4275 | 596 | _msg = [ | ||
4276 | 597 | ('empty_msg', | ||
4277 | 598 | dict(msg={}, expected={})), | ||
4278 | 599 | ('primitive_msg', | ||
4279 | 600 | dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})), | ||
4280 | 601 | ('complex_msg', | ||
4281 | 602 | dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}}, | ||
4282 | 603 | expected={'a': {'b': '1920-02-03T04:05:06.000007'}})), | ||
4283 | 604 | ] | ||
4284 | 605 | |||
4285 | 606 | _context = [ | ||
4286 | 607 | ('empty_ctxt', dict(ctxt={}, expected_ctxt={})), | ||
4287 | 608 | ('user_project_ctxt', | ||
4288 | 609 | dict(ctxt={'_context_user': 'mark', | ||
4289 | 610 | '_context_project': 'snarkybunch'}, | ||
4290 | 611 | expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})), | ||
4291 | 612 | ] | ||
4292 | 613 | |||
4293 | 614 | @classmethod | ||
4294 | 615 | def generate_scenarios(cls): | ||
4295 | 616 | cls.scenarios = testscenarios.multiply_scenarios(cls._msg, | ||
4296 | 617 | cls._context, | ||
4297 | 618 | cls._target) | ||
4298 | 619 | |||
4299 | 620 | def setUp(self): | ||
4300 | 621 | super(TestReplyWireFormat, self).setUp() | ||
4301 | 622 | self.messaging_conf.transport_driver = 'rabbit' | ||
4302 | 623 | self.messaging_conf.in_memory = True | ||
4303 | 624 | |||
4304 | 625 | def test_reply_wire_format(self): | ||
4305 | 626 | if hasattr(self, 'skip_msg'): | ||
4306 | 627 | self.skipTest(self.skip_msg) | ||
4307 | 628 | |||
4308 | 629 | transport = messaging.get_transport(self.conf) | ||
4309 | 630 | self.addCleanup(transport.cleanup) | ||
4310 | 631 | |||
4311 | 632 | driver = transport._driver | ||
4312 | 633 | |||
4313 | 634 | target = messaging.Target(topic=self.topic, | ||
4314 | 635 | server=self.server, | ||
4315 | 636 | fanout=self.fanout) | ||
4316 | 637 | |||
4317 | 638 | listener = driver.listen(target) | ||
4318 | 639 | |||
4319 | 640 | connection, producer = _create_producer(target) | ||
4320 | 641 | self.addCleanup(connection.release) | ||
4321 | 642 | |||
4322 | 643 | msg = { | ||
4323 | 644 | 'oslo.version': '2.0', | ||
4324 | 645 | 'oslo.message': {} | ||
4325 | 646 | } | ||
4326 | 647 | |||
4327 | 648 | msg['oslo.message'].update(self.msg) | ||
4328 | 649 | msg['oslo.message'].update(self.ctxt) | ||
4329 | 650 | |||
4330 | 651 | msg['oslo.message'].update({ | ||
4331 | 652 | '_msg_id': uuid.uuid4().hex, | ||
4332 | 653 | '_unique_id': uuid.uuid4().hex, | ||
4333 | 654 | '_reply_q': 'reply_' + uuid.uuid4().hex, | ||
4334 | 655 | }) | ||
4335 | 656 | |||
4336 | 657 | msg['oslo.message'] = jsonutils.dumps(msg['oslo.message']) | ||
4337 | 658 | |||
4338 | 659 | producer.publish(msg) | ||
4339 | 660 | |||
4340 | 661 | received = listener.poll() | ||
4341 | 662 | self.assertIsNotNone(received) | ||
4342 | 663 | self.assertEqual(self.expected_ctxt, received.ctxt) | ||
4343 | 664 | self.assertEqual(self.expected, received.message) | ||
4344 | 665 | |||
4345 | 666 | |||
4346 | 667 | TestReplyWireFormat.generate_scenarios() | ||
4347 | 668 | |||
4348 | 669 | |||
4349 | 670 | class RpcKombuHATestCase(test_utils.BaseTestCase): | ||
4350 | 671 | |||
4351 | 672 | def setUp(self): | ||
4352 | 673 | super(RpcKombuHATestCase, self).setUp() | ||
4353 | 674 | self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] | ||
4354 | 675 | self.config(rabbit_hosts=self.brokers) | ||
4355 | 676 | |||
4356 | 677 | hostname_sets = set() | ||
4357 | 678 | self.info = {'attempt': 0, | ||
4358 | 679 | 'fail': False} | ||
4359 | 680 | |||
4360 | 681 | def _connect(myself, params): | ||
4361 | 682 | # do as little work that is enough to pass connection attempt | ||
4362 | 683 | myself.connection = kombu.connection.BrokerConnection(**params) | ||
4363 | 684 | myself.connection_errors = myself.connection.connection_errors | ||
4364 | 685 | |||
4365 | 686 | hostname = params['hostname'] | ||
4366 | 687 | self.assertNotIn(hostname, hostname_sets) | ||
4367 | 688 | hostname_sets.add(hostname) | ||
4368 | 689 | |||
4369 | 690 | self.info['attempt'] += 1 | ||
4370 | 691 | if self.info['fail']: | ||
4371 | 692 | raise IOError('fake fail') | ||
4372 | 693 | |||
4373 | 694 | # just make sure connection instantiation does not fail with an | ||
4374 | 695 | # exception | ||
4375 | 696 | self.stubs.Set(rabbit_driver.Connection, '_connect', _connect) | ||
4376 | 697 | |||
4377 | 698 | # starting from the first broker in the list | ||
4378 | 699 | url = messaging.TransportURL.parse(self.conf, None) | ||
4379 | 700 | self.connection = rabbit_driver.Connection(self.conf, url) | ||
4380 | 701 | self.addCleanup(self.connection.close) | ||
4381 | 702 | |||
4382 | 703 | self.info.update({'attempt': 0, | ||
4383 | 704 | 'fail': True}) | ||
4384 | 705 | hostname_sets.clear() | ||
4385 | 706 | |||
4386 | 707 | def test_reconnect_order(self): | ||
4387 | 708 | self.assertRaises(messaging.MessageDeliveryFailure, | ||
4388 | 709 | self.connection.reconnect, | ||
4389 | 710 | retry=len(self.brokers) - 1) | ||
4390 | 711 | self.assertEqual(len(self.brokers), self.info['attempt']) | ||
4391 | 712 | |||
4392 | 713 | def test_ensure_four_retry(self): | ||
4393 | 714 | mock_callback = mock.Mock(side_effect=IOError) | ||
4394 | 715 | self.assertRaises(messaging.MessageDeliveryFailure, | ||
4395 | 716 | self.connection.ensure, None, mock_callback, | ||
4396 | 717 | retry=4) | ||
4397 | 718 | self.assertEqual(5, self.info['attempt']) | ||
4398 | 719 | self.assertEqual(1, mock_callback.call_count) | ||
4399 | 720 | |||
4400 | 721 | def test_ensure_one_retry(self): | ||
4401 | 722 | mock_callback = mock.Mock(side_effect=IOError) | ||
4402 | 723 | self.assertRaises(messaging.MessageDeliveryFailure, | ||
4403 | 724 | self.connection.ensure, None, mock_callback, | ||
4404 | 725 | retry=1) | ||
4405 | 726 | self.assertEqual(2, self.info['attempt']) | ||
4406 | 727 | self.assertEqual(1, mock_callback.call_count) | ||
4407 | 728 | |||
4408 | 729 | def test_ensure_no_retry(self): | ||
4409 | 730 | mock_callback = mock.Mock(side_effect=IOError) | ||
4410 | 731 | self.assertRaises(messaging.MessageDeliveryFailure, | ||
4411 | 732 | self.connection.ensure, None, mock_callback, | ||
4412 | 733 | retry=0) | ||
4413 | 734 | self.assertEqual(1, self.info['attempt']) | ||
4414 | 735 | self.assertEqual(1, mock_callback.call_count) | ||
4415 | 0 | 736 | ||
4416 | === added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch' | |||
4417 | === added file '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/.timestamp' | |||
4418 | === added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo' | |||
4419 | === added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging' | |||
4420 | === added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers' | |||
4421 | === added file '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py' | |||
4422 | --- .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000 | |||
4423 | +++ .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000 | |||
4424 | @@ -0,0 +1,839 @@ | |||
4425 | 1 | # Copyright 2011 OpenStack Foundation | ||
4426 | 2 | # | ||
4427 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
4428 | 4 | # not use this file except in compliance with the License. You may obtain | ||
4429 | 5 | # a copy of the License at | ||
4430 | 6 | # | ||
4431 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
4432 | 8 | # | ||
4433 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
4434 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
4435 | 11 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
4436 | 12 | # License for the specific language governing permissions and limitations | ||
4437 | 13 | # under the License. | ||
4438 | 14 | |||
4439 | 15 | import functools | ||
4440 | 16 | import itertools | ||
4441 | 17 | import logging | ||
4442 | 18 | import random | ||
4443 | 19 | import socket | ||
4444 | 20 | import ssl | ||
4445 | 21 | import time | ||
4446 | 22 | import uuid | ||
4447 | 23 | |||
4448 | 24 | import kombu | ||
4449 | 25 | import kombu.connection | ||
4450 | 26 | import kombu.entity | ||
4451 | 27 | import kombu.messaging | ||
4452 | 28 | import six | ||
4453 | 29 | |||
4454 | 30 | from oslo.config import cfg | ||
4455 | 31 | from oslo.messaging._drivers import amqp as rpc_amqp | ||
4456 | 32 | from oslo.messaging._drivers import amqpdriver | ||
4457 | 33 | from oslo.messaging._drivers import common as rpc_common | ||
4458 | 34 | from oslo.messaging import exceptions | ||
4459 | 35 | from oslo.messaging.openstack.common.gettextutils import _ | ||
4460 | 36 | from oslo.utils import netutils | ||
4461 | 37 | |||
4462 | 38 | rabbit_opts = [ | ||
4463 | 39 | cfg.StrOpt('kombu_ssl_version', | ||
4464 | 40 | default='', | ||
4465 | 41 | help='SSL version to use (valid only if SSL enabled). ' | ||
4466 | 42 | 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' | ||
4467 | 43 | 'be available on some distributions.' | ||
4468 | 44 | ), | ||
4469 | 45 | cfg.StrOpt('kombu_ssl_keyfile', | ||
4470 | 46 | default='', | ||
4471 | 47 | help='SSL key file (valid only if SSL enabled).'), | ||
4472 | 48 | cfg.StrOpt('kombu_ssl_certfile', | ||
4473 | 49 | default='', | ||
4474 | 50 | help='SSL cert file (valid only if SSL enabled).'), | ||
4475 | 51 | cfg.StrOpt('kombu_ssl_ca_certs', | ||
4476 | 52 | default='', | ||
4477 | 53 | help='SSL certification authority file ' | ||
4478 | 54 | '(valid only if SSL enabled).'), | ||
4479 | 55 | cfg.FloatOpt('kombu_reconnect_delay', | ||
4480 | 56 | default=1.0, | ||
4481 | 57 | help='How long to wait before reconnecting in response to an ' | ||
4482 | 58 | 'AMQP consumer cancel notification.'), | ||
4483 | 59 | cfg.StrOpt('rabbit_host', | ||
4484 | 60 | default='localhost', | ||
4485 | 61 | help='The RabbitMQ broker address where a single node is ' | ||
4486 | 62 | 'used.'), | ||
4487 | 63 | cfg.IntOpt('rabbit_port', | ||
4488 | 64 | default=5672, | ||
4489 | 65 | help='The RabbitMQ broker port where a single node is used.'), | ||
4490 | 66 | cfg.ListOpt('rabbit_hosts', | ||
4491 | 67 | default=['$rabbit_host:$rabbit_port'], | ||
4492 | 68 | help='RabbitMQ HA cluster host:port pairs.'), | ||
4493 | 69 | cfg.BoolOpt('rabbit_use_ssl', | ||
4494 | 70 | default=False, | ||
4495 | 71 | help='Connect over SSL for RabbitMQ.'), | ||
4496 | 72 | cfg.StrOpt('rabbit_userid', | ||
4497 | 73 | default='guest', | ||
4498 | 74 | help='The RabbitMQ userid.'), | ||
4499 | 75 | cfg.StrOpt('rabbit_password', | ||
4500 | 76 | default='guest', | ||
4501 | 77 | help='The RabbitMQ password.', | ||
4502 | 78 | secret=True), | ||
4503 | 79 | cfg.StrOpt('rabbit_login_method', | ||
4504 | 80 | default='AMQPLAIN', | ||
4505 | 81 | help='the RabbitMQ login method'), | ||
4506 | 82 | cfg.StrOpt('rabbit_virtual_host', | ||
4507 | 83 | default='/', | ||
4508 | 84 | help='The RabbitMQ virtual host.'), | ||
4509 | 85 | cfg.IntOpt('rabbit_retry_interval', | ||
4510 | 86 | default=1, | ||
4511 | 87 | help='How frequently to retry connecting with RabbitMQ.'), | ||
4512 | 88 | cfg.IntOpt('rabbit_retry_backoff', | ||
4513 | 89 | default=2, | ||
4514 | 90 | help='How long to backoff for between retries when connecting ' | ||
4515 | 91 | 'to RabbitMQ.'), | ||
4516 | 92 | cfg.IntOpt('rabbit_max_retries', | ||
4517 | 93 | default=0, | ||
4518 | 94 | help='Maximum number of RabbitMQ connection retries. ' | ||
4519 | 95 | 'Default is 0 (infinite retry count).'), | ||
4520 | 96 | cfg.BoolOpt('rabbit_ha_queues', | ||
4521 | 97 | default=False, | ||
4522 | 98 | help='Use HA queues in RabbitMQ (x-ha-policy: all). ' | ||
4523 | 99 | 'If you change this option, you must wipe the ' | ||
4524 | 100 | 'RabbitMQ database.'), | ||
4525 | 101 | |||
4526 | 102 | # FIXME(markmc): this was toplevel in openstack.common.rpc | ||
4527 | 103 | cfg.BoolOpt('fake_rabbit', | ||
4528 | 104 | default=False, | ||
4529 | 105 | help='If passed, use a fake RabbitMQ provider.'), | ||
4530 | 106 | ] | ||
4531 | 107 | |||
4532 | 108 | LOG = logging.getLogger(__name__) | ||
4533 | 109 | |||
4534 | 110 | |||
4535 | 111 | def _get_queue_arguments(conf): | ||
4536 | 112 | """Construct the arguments for declaring a queue. | ||
4537 | 113 | |||
4538 | 114 | If the rabbit_ha_queues option is set, we declare a mirrored queue | ||
4539 | 115 | as described here: | ||
4540 | 116 | |||
4541 | 117 | http://www.rabbitmq.com/ha.html | ||
4542 | 118 | |||
4543 | 119 | Setting x-ha-policy to all means that the queue will be mirrored | ||
4544 | 120 | to all nodes in the cluster. | ||
4545 | 121 | """ | ||
4546 | 122 | return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} | ||
4547 | 123 | |||
4548 | 124 | |||
4549 | 125 | class RabbitMessage(dict): | ||
4550 | 126 | def __init__(self, raw_message): | ||
4551 | 127 | super(RabbitMessage, self).__init__( | ||
4552 | 128 | rpc_common.deserialize_msg(raw_message.payload)) | ||
4553 | 129 | self._raw_message = raw_message | ||
4554 | 130 | |||
4555 | 131 | def acknowledge(self): | ||
4556 | 132 | self._raw_message.ack() | ||
4557 | 133 | |||
4558 | 134 | def requeue(self): | ||
4559 | 135 | self._raw_message.requeue() | ||
4560 | 136 | |||
4561 | 137 | |||
4562 | 138 | class ConsumerBase(object): | ||
4563 | 139 | """Consumer base class.""" | ||
4564 | 140 | |||
4565 | 141 | def __init__(self, channel, callback, tag, **kwargs): | ||
4566 | 142 | """Declare a queue on an amqp channel. | ||
4567 | 143 | |||
4568 | 144 | 'channel' is the amqp channel to use | ||
4569 | 145 | 'callback' is the callback to call when messages are received | ||
4570 | 146 | 'tag' is a unique ID for the consumer on the channel | ||
4571 | 147 | |||
4572 | 148 | queue name, exchange name, and other kombu options are | ||
4573 | 149 | passed in here as a dictionary. | ||
4574 | 150 | """ | ||
4575 | 151 | self.callback = callback | ||
4576 | 152 | self.tag = six.text_type(tag) | ||
4577 | 153 | self.kwargs = kwargs | ||
4578 | 154 | self.queue = None | ||
4579 | 155 | self.reconnect(channel) | ||
4580 | 156 | |||
4581 | 157 | def reconnect(self, channel): | ||
4582 | 158 | """Re-declare the queue after a rabbit reconnect.""" | ||
4583 | 159 | self.channel = channel | ||
4584 | 160 | self.kwargs['channel'] = channel | ||
4585 | 161 | self.queue = kombu.entity.Queue(**self.kwargs) | ||
4586 | 162 | self.queue.declare() | ||
4587 | 163 | |||
4588 | 164 | def _callback_handler(self, message, callback): | ||
4589 | 165 | """Call callback with deserialized message. | ||
4590 | 166 | |||
4591 | 167 | Messages that are processed and ack'ed. | ||
4592 | 168 | """ | ||
4593 | 169 | |||
4594 | 170 | try: | ||
4595 | 171 | callback(RabbitMessage(message)) | ||
4596 | 172 | except Exception: | ||
4597 | 173 | LOG.exception(_("Failed to process message" | ||
4598 | 174 | " ... skipping it.")) | ||
4599 | 175 | message.ack() | ||
4600 | 176 | |||
4601 | 177 | def consume(self, *args, **kwargs): | ||
4602 | 178 | """Actually declare the consumer on the amqp channel. This will | ||
4603 | 179 | start the flow of messages from the queue. Using the | ||
4604 | 180 | Connection.iterconsume() iterator will process the messages, | ||
4605 | 181 | calling the appropriate callback. | ||
4606 | 182 | |||
4607 | 183 | If a callback is specified in kwargs, use that. Otherwise, | ||
4608 | 184 | use the callback passed during __init__() | ||
4609 | 185 | |||
4610 | 186 | If kwargs['nowait'] is True, then this call will block until | ||
4611 | 187 | a message is read. | ||
4612 | 188 | |||
4613 | 189 | """ | ||
4614 | 190 | |||
4615 | 191 | options = {'consumer_tag': self.tag} | ||
4616 | 192 | options['nowait'] = kwargs.get('nowait', False) | ||
4617 | 193 | callback = kwargs.get('callback', self.callback) | ||
4618 | 194 | if not callback: | ||
4619 | 195 | raise ValueError("No callback defined") | ||
4620 | 196 | |||
4621 | 197 | def _callback(raw_message): | ||
4622 | 198 | message = self.channel.message_to_python(raw_message) | ||
4623 | 199 | self._callback_handler(message, callback) | ||
4624 | 200 | |||
4625 | 201 | self.queue.consume(*args, callback=_callback, **options) | ||
4626 | 202 | |||
4627 | 203 | def cancel(self): | ||
4628 | 204 | """Cancel the consuming from the queue, if it has started.""" | ||
4629 | 205 | try: | ||
4630 | 206 | self.queue.cancel(self.tag) | ||
4631 | 207 | except KeyError as e: | ||
4632 | 208 | # NOTE(comstud): Kludge to get around a amqplib bug | ||
4633 | 209 | if six.text_type(e) != "u'%s'" % self.tag: | ||
4634 | 210 | raise | ||
4635 | 211 | self.queue = None | ||
4636 | 212 | |||
4637 | 213 | |||
4638 | 214 | class DirectConsumer(ConsumerBase): | ||
4639 | 215 | """Queue/consumer class for 'direct'.""" | ||
4640 | 216 | |||
4641 | 217 | def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): | ||
4642 | 218 | """Init a 'direct' queue. | ||
4643 | 219 | |||
4644 | 220 | 'channel' is the amqp channel to use | ||
4645 | 221 | 'msg_id' is the msg_id to listen on | ||
4646 | 222 | 'callback' is the callback to call when messages are received | ||
4647 | 223 | 'tag' is a unique ID for the consumer on the channel | ||
4648 | 224 | |||
4649 | 225 | Other kombu options may be passed | ||
4650 | 226 | """ | ||
4651 | 227 | # Default options | ||
4652 | 228 | options = {'durable': False, | ||
4653 | 229 | 'queue_arguments': _get_queue_arguments(conf), | ||
4654 | 230 | 'auto_delete': True, | ||
4655 | 231 | 'exclusive': False} | ||
4656 | 232 | options.update(kwargs) | ||
4657 | 233 | exchange = kombu.entity.Exchange(name=msg_id, | ||
4658 | 234 | type='direct', | ||
4659 | 235 | durable=options['durable'], | ||
4660 | 236 | auto_delete=options['auto_delete']) | ||
4661 | 237 | super(DirectConsumer, self).__init__(channel, | ||
4662 | 238 | callback, | ||
4663 | 239 | tag, | ||
4664 | 240 | name=msg_id, | ||
4665 | 241 | exchange=exchange, | ||
4666 | 242 | routing_key=msg_id, | ||
4667 | 243 | **options) | ||
4668 | 244 | |||
4669 | 245 | |||
4670 | 246 | class TopicConsumer(ConsumerBase): | ||
4671 | 247 | """Consumer class for 'topic'.""" | ||
4672 | 248 | |||
4673 | 249 | def __init__(self, conf, channel, topic, callback, tag, exchange_name, | ||
4674 | 250 | name=None, **kwargs): | ||
4675 | 251 | """Init a 'topic' queue. | ||
4676 | 252 | |||
4677 | 253 | :param channel: the amqp channel to use | ||
4678 | 254 | :param topic: the topic to listen on | ||
4679 | 255 | :paramtype topic: str | ||
4680 | 256 | :param callback: the callback to call when messages are received | ||
4681 | 257 | :param tag: a unique ID for the consumer on the channel | ||
4682 | 258 | :param exchange_name: the exchange name to use | ||
4683 | 259 | :param name: optional queue name, defaults to topic | ||
4684 | 260 | :paramtype name: str | ||
4685 | 261 | |||
4686 | 262 | Other kombu options may be passed as keyword arguments | ||
4687 | 263 | """ | ||
4688 | 264 | # Default options | ||
4689 | 265 | options = {'durable': conf.amqp_durable_queues, | ||
4690 | 266 | 'queue_arguments': _get_queue_arguments(conf), | ||
4691 | 267 | 'auto_delete': conf.amqp_auto_delete, | ||
4692 | 268 | 'exclusive': False} | ||
4693 | 269 | options.update(kwargs) | ||
4694 | 270 | exchange = kombu.entity.Exchange(name=exchange_name, | ||
4695 | 271 | type='topic', | ||
4696 | 272 | durable=options['durable'], | ||
4697 | 273 | auto_delete=options['auto_delete']) | ||
4698 | 274 | super(TopicConsumer, self).__init__(channel, | ||
4699 | 275 | callback, | ||
4700 | 276 | tag, | ||
4701 | 277 | name=name or topic, | ||
4702 | 278 | exchange=exchange, | ||
4703 | 279 | routing_key=topic, | ||
4704 | 280 | **options) | ||
4705 | 281 | |||
4706 | 282 | |||
4707 | 283 | class FanoutConsumer(ConsumerBase): | ||
4708 | 284 | """Consumer class for 'fanout'.""" | ||
4709 | 285 | |||
4710 | 286 | def __init__(self, conf, channel, topic, callback, tag, **kwargs): | ||
4711 | 287 | """Init a 'fanout' queue. | ||
4712 | 288 | |||
4713 | 289 | 'channel' is the amqp channel to use | ||
4714 | 290 | 'topic' is the topic to listen on | ||
4715 | 291 | 'callback' is the callback to call when messages are received | ||
4716 | 292 | 'tag' is a unique ID for the consumer on the channel | ||
4717 | 293 | |||
4718 | 294 | Other kombu options may be passed | ||
4719 | 295 | """ | ||
4720 | 296 | unique = uuid.uuid4().hex | ||
4721 | 297 | exchange_name = '%s_fanout' % topic | ||
4722 | 298 | queue_name = '%s_fanout_%s' % (topic, unique) | ||
4723 | 299 | |||
4724 | 300 | # Default options | ||
4725 | 301 | options = {'durable': False, | ||
4726 | 302 | 'queue_arguments': _get_queue_arguments(conf), | ||
4727 | 303 | 'auto_delete': True, | ||
4728 | 304 | 'exclusive': False} | ||
4729 | 305 | options.update(kwargs) | ||
4730 | 306 | exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', | ||
4731 | 307 | durable=options['durable'], | ||
4732 | 308 | auto_delete=options['auto_delete']) | ||
4733 | 309 | super(FanoutConsumer, self).__init__(channel, callback, tag, | ||
4734 | 310 | name=queue_name, | ||
4735 | 311 | exchange=exchange, | ||
4736 | 312 | routing_key=topic, | ||
4737 | 313 | **options) | ||
4738 | 314 | |||
4739 | 315 | |||
4740 | 316 | class Publisher(object): | ||
4741 | 317 | """Base Publisher class.""" | ||
4742 | 318 | |||
4743 | 319 | def __init__(self, channel, exchange_name, routing_key, **kwargs): | ||
4744 | 320 | """Init the Publisher class with the exchange_name, routing_key, | ||
4745 | 321 | and other options | ||
4746 | 322 | """ | ||
4747 | 323 | self.exchange_name = exchange_name | ||
4748 | 324 | self.routing_key = routing_key | ||
4749 | 325 | self.kwargs = kwargs | ||
4750 | 326 | self.reconnect(channel) | ||
4751 | 327 | |||
4752 | 328 | def reconnect(self, channel): | ||
4753 | 329 | """Re-establish the Producer after a rabbit reconnection.""" | ||
4754 | 330 | self.exchange = kombu.entity.Exchange(name=self.exchange_name, | ||
4755 | 331 | **self.kwargs) | ||
4756 | 332 | self.producer = kombu.messaging.Producer(exchange=self.exchange, | ||
4757 | 333 | channel=channel, | ||
4758 | 334 | routing_key=self.routing_key) | ||
4759 | 335 | |||
4760 | 336 | def send(self, msg, timeout=None): | ||
4761 | 337 | """Send a message.""" | ||
4762 | 338 | if timeout: | ||
4763 | 339 | # | ||
4764 | 340 | # AMQP TTL is in milliseconds when set in the header. | ||
4765 | 341 | # | ||
4766 | 342 | self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) | ||
4767 | 343 | else: | ||
4768 | 344 | self.producer.publish(msg) | ||
4769 | 345 | |||
4770 | 346 | |||
4771 | 347 | class DirectPublisher(Publisher): | ||
4772 | 348 | """Publisher class for 'direct'.""" | ||
4773 | 349 | def __init__(self, conf, channel, topic, **kwargs): | ||
4774 | 350 | """Init a 'direct' publisher. | ||
4775 | 351 | |||
4776 | 352 | Kombu options may be passed as keyword args to override defaults | ||
4777 | 353 | """ | ||
4778 | 354 | |||
4779 | 355 | options = {'durable': False, | ||
4780 | 356 | 'auto_delete': True, | ||
4781 | 357 | 'exclusive': False} | ||
4782 | 358 | options.update(kwargs) | ||
4783 | 359 | super(DirectPublisher, self).__init__(channel, topic, topic, | ||
4784 | 360 | type='direct', **options) | ||
4785 | 361 | |||
4786 | 362 | |||
4787 | 363 | class TopicPublisher(Publisher): | ||
4788 | 364 | """Publisher class for 'topic'.""" | ||
4789 | 365 | def __init__(self, conf, channel, exchange_name, topic, **kwargs): | ||
4790 | 366 | """Init a 'topic' publisher. | ||
4791 | 367 | |||
4792 | 368 | Kombu options may be passed as keyword args to override defaults | ||
4793 | 369 | """ | ||
4794 | 370 | options = {'durable': conf.amqp_durable_queues, | ||
4795 | 371 | 'auto_delete': conf.amqp_auto_delete, | ||
4796 | 372 | 'exclusive': False} | ||
4797 | 373 | options.update(kwargs) | ||
4798 | 374 | super(TopicPublisher, self).__init__(channel, | ||
4799 | 375 | exchange_name, | ||
4800 | 376 | topic, | ||
4801 | 377 | type='topic', | ||
4802 | 378 | **options) | ||
4803 | 379 | |||
4804 | 380 | |||
4805 | 381 | class FanoutPublisher(Publisher): | ||
4806 | 382 | """Publisher class for 'fanout'.""" | ||
4807 | 383 | def __init__(self, conf, channel, topic, **kwargs): | ||
4808 | 384 | """Init a 'fanout' publisher. | ||
4809 | 385 | |||
4810 | 386 | Kombu options may be passed as keyword args to override defaults | ||
4811 | 387 | """ | ||
4812 | 388 | options = {'durable': False, | ||
4813 | 389 | 'auto_delete': True, | ||
4814 | 390 | 'exclusive': False} | ||
4815 | 391 | options.update(kwargs) | ||
4816 | 392 | super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, | ||
4817 | 393 | None, type='fanout', **options) | ||
4818 | 394 | |||
4819 | 395 | |||
4820 | 396 | class NotifyPublisher(TopicPublisher): | ||
4821 | 397 | """Publisher class for 'notify'.""" | ||
4822 | 398 | |||
4823 | 399 | def __init__(self, conf, channel, exchange_name, topic, **kwargs): | ||
4824 | 400 | self.durable = kwargs.pop('durable', conf.amqp_durable_queues) | ||
4825 | 401 | self.queue_arguments = _get_queue_arguments(conf) | ||
4826 | 402 | super(NotifyPublisher, self).__init__(conf, channel, exchange_name, | ||
4827 | 403 | topic, **kwargs) | ||
4828 | 404 | |||
4829 | 405 | def reconnect(self, channel): | ||
4830 | 406 | super(NotifyPublisher, self).reconnect(channel) | ||
4831 | 407 | |||
4832 | 408 | # NOTE(jerdfelt): Normally the consumer would create the queue, but | ||
4833 | 409 | # we do this to ensure that messages don't get dropped if the | ||
4834 | 410 | # consumer is started after we do | ||
4835 | 411 | queue = kombu.entity.Queue(channel=channel, | ||
4836 | 412 | exchange=self.exchange, | ||
4837 | 413 | durable=self.durable, | ||
4838 | 414 | name=self.routing_key, | ||
4839 | 415 | routing_key=self.routing_key, | ||
4840 | 416 | queue_arguments=self.queue_arguments) | ||
4841 | 417 | queue.declare() | ||
4842 | 418 | |||
4843 | 419 | |||
4844 | 420 | class Connection(object): | ||
4845 | 421 | """Connection object.""" | ||
4846 | 422 | |||
4847 | 423 | pools = {} | ||
4848 | 424 | |||
4849 | 425 | def __init__(self, conf, url): | ||
4850 | 426 | self.consumers = [] | ||
4851 | 427 | self.conf = conf | ||
4852 | 428 | self.max_retries = self.conf.rabbit_max_retries | ||
4853 | 429 | # Try forever? | ||
4854 | 430 | if self.max_retries <= 0: | ||
4855 | 431 | self.max_retries = None | ||
4856 | 432 | self.interval_start = self.conf.rabbit_retry_interval | ||
4857 | 433 | self.interval_stepping = self.conf.rabbit_retry_backoff | ||
4858 | 434 | # max retry-interval = 30 seconds | ||
4859 | 435 | self.interval_max = 30 | ||
4860 | 436 | self.memory_transport = False | ||
4861 | 437 | |||
4862 | 438 | ssl_params = self._fetch_ssl_params() | ||
4863 | 439 | |||
4864 | 440 | if url.virtual_host is not None: | ||
4865 | 441 | virtual_host = url.virtual_host | ||
4866 | 442 | else: | ||
4867 | 443 | virtual_host = self.conf.rabbit_virtual_host | ||
4868 | 444 | |||
4869 | 445 | self.brokers_params = [] | ||
4870 | 446 | if url.hosts: | ||
4871 | 447 | for host in url.hosts: | ||
4872 | 448 | params = { | ||
4873 | 449 | 'hostname': host.hostname, | ||
4874 | 450 | 'port': host.port or 5672, | ||
4875 | 451 | 'userid': host.username or '', | ||
4876 | 452 | 'password': host.password or '', | ||
4877 | 453 | 'login_method': self.conf.rabbit_login_method, | ||
4878 | 454 | 'virtual_host': virtual_host | ||
4879 | 455 | } | ||
4880 | 456 | if self.conf.fake_rabbit: | ||
4881 | 457 | params['transport'] = 'memory' | ||
4882 | 458 | if self.conf.rabbit_use_ssl: | ||
4883 | 459 | params['ssl'] = ssl_params | ||
4884 | 460 | |||
4885 | 461 | self.brokers_params.append(params) | ||
4886 | 462 | else: | ||
4887 | 463 | # Old configuration format | ||
4888 | 464 | for adr in self.conf.rabbit_hosts: | ||
4889 | 465 | hostname, port = netutils.parse_host_port( | ||
4890 | 466 | adr, default_port=self.conf.rabbit_port) | ||
4891 | 467 | |||
4892 | 468 | params = { | ||
4893 | 469 | 'hostname': hostname, | ||
4894 | 470 | 'port': port, | ||
4895 | 471 | 'userid': self.conf.rabbit_userid, | ||
4896 | 472 | 'password': self.conf.rabbit_password, | ||
4897 | 473 | 'login_method': self.conf.rabbit_login_method, | ||
4898 | 474 | 'virtual_host': virtual_host | ||
4899 | 475 | } | ||
4900 | 476 | |||
4901 | 477 | if self.conf.fake_rabbit: | ||
4902 | 478 | params['transport'] = 'memory' | ||
4903 | 479 | if self.conf.rabbit_use_ssl: | ||
4904 | 480 | params['ssl'] = ssl_params | ||
4905 | 481 | |||
4906 | 482 | self.brokers_params.append(params) | ||
4907 | 483 | |||
4908 | 484 | random.shuffle(self.brokers_params) | ||
4909 | 485 | self.brokers = itertools.cycle(self.brokers_params) | ||
4910 | 486 | |||
4911 | 487 | self.memory_transport = self.conf.fake_rabbit | ||
4912 | 488 | |||
4913 | 489 | self.connection = None | ||
4914 | 490 | self.do_consume = None | ||
4915 | 491 | self.reconnect() | ||
4916 | 492 | |||
4917 | 493 | # FIXME(markmc): use oslo sslutils when it is available as a library | ||
4918 | 494 | _SSL_PROTOCOLS = { | ||
4919 | 495 | "tlsv1": ssl.PROTOCOL_TLSv1, | ||
4920 | 496 | "sslv23": ssl.PROTOCOL_SSLv23, | ||
4921 | 497 | "sslv3": ssl.PROTOCOL_SSLv3 | ||
4922 | 498 | } | ||
4923 | 499 | |||
4924 | 500 | try: | ||
4925 | 501 | _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 | ||
4926 | 502 | except AttributeError: | ||
4927 | 503 | pass | ||
4928 | 504 | |||
4929 | 505 | @classmethod | ||
4930 | 506 | def validate_ssl_version(cls, version): | ||
4931 | 507 | key = version.lower() | ||
4932 | 508 | try: | ||
4933 | 509 | return cls._SSL_PROTOCOLS[key] | ||
4934 | 510 | except KeyError: | ||
4935 | 511 | raise RuntimeError(_("Invalid SSL version : %s") % version) | ||
4936 | 512 | |||
4937 | 513 | def _fetch_ssl_params(self): | ||
4938 | 514 | """Handles fetching what ssl params should be used for the connection | ||
4939 | 515 | (if any). | ||
4940 | 516 | """ | ||
4941 | 517 | ssl_params = dict() | ||
4942 | 518 | |||
4943 | 519 | # http://docs.python.org/library/ssl.html - ssl.wrap_socket | ||
4944 | 520 | if self.conf.kombu_ssl_version: | ||
4945 | 521 | ssl_params['ssl_version'] = self.validate_ssl_version( | ||
4946 | 522 | self.conf.kombu_ssl_version) | ||
4947 | 523 | if self.conf.kombu_ssl_keyfile: | ||
4948 | 524 | ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile | ||
4949 | 525 | if self.conf.kombu_ssl_certfile: | ||
4950 | 526 | ssl_params['certfile'] = self.conf.kombu_ssl_certfile | ||
4951 | 527 | if self.conf.kombu_ssl_ca_certs: | ||
4952 | 528 | ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs | ||
4953 | 529 | # We might want to allow variations in the | ||
4954 | 530 | # future with this? | ||
4955 | 531 | ssl_params['cert_reqs'] = ssl.CERT_REQUIRED | ||
4956 | 532 | |||
4957 | 533 | # Return the extended behavior or just have the default behavior | ||
4958 | 534 | return ssl_params or True | ||
4959 | 535 | |||
4960 | 536 | def _connect(self, broker): | ||
4961 | 537 | """Connect to rabbit. Re-establish any queues that may have | ||
4962 | 538 | been declared before if we are reconnecting. Exceptions should | ||
4963 | 539 | be handled by the caller. | ||
4964 | 540 | """ | ||
4965 | 541 | LOG.info(_("Connecting to AMQP server on " | ||
4966 | 542 | "%(hostname)s:%(port)d"), broker) | ||
4967 | 543 | self.connection = kombu.connection.BrokerConnection(**broker) | ||
4968 | 544 | self.connection_errors = self.connection.connection_errors | ||
4969 | 545 | self.channel_errors = self.connection.channel_errors | ||
4970 | 546 | if self.memory_transport: | ||
4971 | 547 | # Kludge to speed up tests. | ||
4972 | 548 | self.connection.transport.polling_interval = 0.0 | ||
4973 | 549 | self.do_consume = True | ||
4974 | 550 | self.consumer_num = itertools.count(1) | ||
4975 | 551 | self.connection.connect() | ||
4976 | 552 | self.channel = self.connection.channel() | ||
4977 | 553 | # work around 'memory' transport bug in 1.1.3 | ||
4978 | 554 | if self.memory_transport: | ||
4979 | 555 | self.channel._new_queue('ae.undeliver') | ||
4980 | 556 | for consumer in self.consumers: | ||
4981 | 557 | consumer.reconnect(self.channel) | ||
4982 | 558 | LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), | ||
4983 | 559 | broker) | ||
4984 | 560 | |||
4985 | 561 | def _disconnect(self): | ||
4986 | 562 | if self.connection: | ||
4987 | 563 | # XXX(nic): when reconnecting to a RabbitMQ cluster | ||
4988 | 564 | # with mirrored queues in use, the attempt to release the | ||
4989 | 565 | # connection can hang "indefinitely" somewhere deep down | ||
4990 | 566 | # in Kombu. Blocking the thread for a bit prior to | ||
4991 | 567 | # release seems to kludge around the problem where it is | ||
4992 | 568 | # otherwise reproduceable. | ||
4993 | 569 | if self.conf.kombu_reconnect_delay > 0: | ||
4994 | 570 | LOG.info(_("Delaying reconnect for %1.1f seconds...") % | ||
4995 | 571 | self.conf.kombu_reconnect_delay) | ||
4996 | 572 | time.sleep(self.conf.kombu_reconnect_delay) | ||
4997 | 573 | |||
4998 | 574 | try: | ||
4999 | 575 | self.connection.release() | ||
5000 | 576 | except self.connection_errors: |
The diff has been truncated for viewing.