Merge lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721 into lp:~ubuntu-cloud-archive/ubuntu/trusty/oslo.messaging/juno
- Trusty (14.04)
- juno-lp1318721
- Merge into juno
Proposed by
Xiang Hui
Status: | Needs review | ||||
---|---|---|---|---|---|
Proposed branch: | lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721 | ||||
Merge into: | lp:~ubuntu-cloud-archive/ubuntu/trusty/oslo.messaging/juno | ||||
Diff against target: |
9657 lines (+9295/-23) 29 files modified
.pc/.quilt_patches (+1/-0) .pc/.quilt_series (+1/-0) .pc/.version (+1/-0) .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py (+444/-0) .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py (+349/-0) .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py (+821/-0) .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py (+374/-0) .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py (+49/-0) .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py (+729/-0) .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py (+832/-0) .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py (+735/-0) .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py (+839/-0) .pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py (+374/-0) .pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py (+839/-0) .pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/drivers/test_impl_rabbit.py (+756/-0) .pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py (+64/-0) .pc/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging/_drivers/impl_rabbit.py (+865/-0) .pc/applied-patches (+8/-0) .pc/zmq-server-routing.patch/oslo/messaging/_drivers/impl_zmq.py (+995/-0) debian/changelog (+8/-0) debian/patches/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch (+24/-0) debian/patches/series (+1/-0) oslo/messaging/_drivers/amqpdriver.py (+16/-8) oslo/messaging/_drivers/common.py (+26/-0) oslo/messaging/_drivers/impl_qpid.py (+0/-1) oslo/messaging/_drivers/impl_rabbit.py (+69/-12) oslo/messaging/_drivers/impl_zmq.py (+2/-0) tests/drivers/test_impl_rabbit.py (+41/-2) tests/test_utils.py (+32/-0) |
||||
To merge this branch: | bzr merge lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721 | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Cloud Archive Team | Pending | ||
Review via email: mp+280822@code.launchpad.net |
Commit message
Description of the change
* Backport of upstream release. (LP: #1318721):-
- d/p/0007-
Redeclare if exception is catched after self.queue.
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added directory '.pc' |
2 | === added file '.pc/.quilt_patches' |
3 | --- .pc/.quilt_patches 1970-01-01 00:00:00 +0000 |
4 | +++ .pc/.quilt_patches 2015-12-17 11:03:34 +0000 |
5 | @@ -0,0 +1,1 @@ |
6 | +debian/patches |
7 | |
8 | === added file '.pc/.quilt_series' |
9 | --- .pc/.quilt_series 1970-01-01 00:00:00 +0000 |
10 | +++ .pc/.quilt_series 2015-12-17 11:03:34 +0000 |
11 | @@ -0,0 +1,1 @@ |
12 | +series |
13 | |
14 | === added file '.pc/.version' |
15 | --- .pc/.version 1970-01-01 00:00:00 +0000 |
16 | +++ .pc/.version 2015-12-17 11:03:34 +0000 |
17 | @@ -0,0 +1,1 @@ |
18 | +2 |
19 | |
20 | === added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch' |
21 | === added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/.timestamp' |
22 | === added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo' |
23 | === added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging' |
24 | === added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers' |
25 | === added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py' |
26 | --- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py 1970-01-01 00:00:00 +0000 |
27 | +++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py 2015-12-17 11:03:34 +0000 |
28 | @@ -0,0 +1,444 @@ |
29 | + |
30 | +# Copyright 2013 Red Hat, Inc. |
31 | +# |
32 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
33 | +# not use this file except in compliance with the License. You may obtain |
34 | +# a copy of the License at |
35 | +# |
36 | +# http://www.apache.org/licenses/LICENSE-2.0 |
37 | +# |
38 | +# Unless required by applicable law or agreed to in writing, software |
39 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
40 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
41 | +# License for the specific language governing permissions and limitations |
42 | +# under the License. |
43 | + |
44 | +__all__ = ['AMQPDriverBase'] |
45 | + |
46 | +import logging |
47 | +import threading |
48 | +import time |
49 | +import uuid |
50 | + |
51 | +from six import moves |
52 | + |
53 | +from oslo import messaging |
54 | +from oslo.messaging._drivers import amqp as rpc_amqp |
55 | +from oslo.messaging._drivers import base |
56 | +from oslo.messaging._drivers import common as rpc_common |
57 | + |
58 | +LOG = logging.getLogger(__name__) |
59 | + |
60 | + |
61 | +class AMQPIncomingMessage(base.IncomingMessage): |
62 | + |
63 | + def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q): |
64 | + super(AMQPIncomingMessage, self).__init__(listener, ctxt, |
65 | + dict(message)) |
66 | + |
67 | + self.unique_id = unique_id |
68 | + self.msg_id = msg_id |
69 | + self.reply_q = reply_q |
70 | + self.acknowledge_callback = message.acknowledge |
71 | + self.requeue_callback = message.requeue |
72 | + |
73 | + def _send_reply(self, conn, reply=None, failure=None, |
74 | + ending=False, log_failure=True): |
75 | + if failure: |
76 | + failure = rpc_common.serialize_remote_exception(failure, |
77 | + log_failure) |
78 | + |
79 | + msg = {'result': reply, 'failure': failure} |
80 | + if ending: |
81 | + msg['ending'] = True |
82 | + |
83 | + rpc_amqp._add_unique_id(msg) |
84 | + |
85 | + # If a reply_q exists, add the msg_id to the reply and pass the |
86 | + # reply_q to direct_send() to use it as the response queue. |
87 | + # Otherwise use the msg_id for backward compatibility. |
88 | + if self.reply_q: |
89 | + msg['_msg_id'] = self.msg_id |
90 | + conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) |
91 | + else: |
92 | + conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) |
93 | + |
94 | + def reply(self, reply=None, failure=None, log_failure=True): |
95 | + if not self.msg_id: |
96 | + # NOTE(Alexei_987) not sending reply, if msg_id is empty |
97 | + # because reply should not be expected by caller side |
98 | + return |
99 | + with self.listener.driver._get_connection() as conn: |
100 | + self._send_reply(conn, reply, failure, log_failure=log_failure) |
101 | + self._send_reply(conn, ending=True) |
102 | + |
103 | + def acknowledge(self): |
104 | + self.listener.msg_id_cache.add(self.unique_id) |
105 | + self.acknowledge_callback() |
106 | + |
107 | + def requeue(self): |
108 | + # NOTE(sileht): In case of the connection is lost between receiving the |
109 | + # message and requeing it, this requeue call fail |
110 | + # but because the message is not acknowledged and not added to the |
111 | + # msg_id_cache, the message will be reconsumed, the only difference is |
112 | + # the message stay at the beginning of the queue instead of moving to |
113 | + # the end. |
114 | + self.requeue_callback() |
115 | + |
116 | + |
117 | +class AMQPListener(base.Listener): |
118 | + |
119 | + def __init__(self, driver, conn): |
120 | + super(AMQPListener, self).__init__(driver) |
121 | + self.conn = conn |
122 | + self.msg_id_cache = rpc_amqp._MsgIdCache() |
123 | + self.incoming = [] |
124 | + |
125 | + def __call__(self, message): |
126 | + # FIXME(markmc): logging isn't driver specific |
127 | + rpc_common._safe_log(LOG.debug, 'received %s', dict(message)) |
128 | + |
129 | + unique_id = self.msg_id_cache.check_duplicate_message(message) |
130 | + ctxt = rpc_amqp.unpack_context(self.conf, message) |
131 | + |
132 | + self.incoming.append(AMQPIncomingMessage(self, |
133 | + ctxt.to_dict(), |
134 | + message, |
135 | + unique_id, |
136 | + ctxt.msg_id, |
137 | + ctxt.reply_q)) |
138 | + |
139 | + def poll(self, timeout=None): |
140 | + if timeout is not None: |
141 | + deadline = time.time() + timeout |
142 | + else: |
143 | + deadline = None |
144 | + while True: |
145 | + if self.incoming: |
146 | + return self.incoming.pop(0) |
147 | + if deadline is not None: |
148 | + timeout = deadline - time.time() |
149 | + if timeout < 0: |
150 | + return None |
151 | + try: |
152 | + self.conn.consume(limit=1, timeout=timeout) |
153 | + except rpc_common.Timeout: |
154 | + return None |
155 | + else: |
156 | + self.conn.consume(limit=1) |
157 | + |
158 | + |
159 | +class ReplyWaiters(object): |
160 | + |
161 | + WAKE_UP = object() |
162 | + |
163 | + def __init__(self): |
164 | + self._queues = {} |
165 | + self._wrn_threshold = 10 |
166 | + |
167 | + def get(self, msg_id, timeout): |
168 | + try: |
169 | + return self._queues[msg_id].get(block=True, timeout=timeout) |
170 | + except moves.queue.Empty: |
171 | + raise messaging.MessagingTimeout('Timed out waiting for a reply ' |
172 | + 'to message ID %s' % msg_id) |
173 | + |
174 | + def check(self, msg_id): |
175 | + try: |
176 | + return self._queues[msg_id].get(block=False) |
177 | + except moves.queue.Empty: |
178 | + return None |
179 | + |
180 | + def put(self, msg_id, message_data): |
181 | + queue = self._queues.get(msg_id) |
182 | + if not queue: |
183 | + LOG.warn('No calling threads waiting for msg_id : %(msg_id)s' |
184 | + ', message : %(data)s', {'msg_id': msg_id, |
185 | + 'data': message_data}) |
186 | + LOG.warn('_queues: %s', self._queues) |
187 | + else: |
188 | + queue.put(message_data) |
189 | + |
190 | + def wake_all(self, except_id): |
191 | + msg_ids = [i for i in self._queues.keys() if i != except_id] |
192 | + for msg_id in msg_ids: |
193 | + self.put(msg_id, self.WAKE_UP) |
194 | + |
195 | + def add(self, msg_id, queue): |
196 | + self._queues[msg_id] = queue |
197 | + if len(self._queues) > self._wrn_threshold: |
198 | + LOG.warn('Number of call queues is greater than warning ' |
199 | + 'threshold: %d. There could be a leak.', |
200 | + self._wrn_threshold) |
201 | + self._wrn_threshold *= 2 |
202 | + |
203 | + def remove(self, msg_id): |
204 | + del self._queues[msg_id] |
205 | + |
206 | + |
207 | +class ReplyWaiter(object): |
208 | + |
209 | + def __init__(self, conf, reply_q, conn, allowed_remote_exmods): |
210 | + self.conf = conf |
211 | + self.conn = conn |
212 | + self.reply_q = reply_q |
213 | + self.allowed_remote_exmods = allowed_remote_exmods |
214 | + |
215 | + self.conn_lock = threading.Lock() |
216 | + self.incoming = [] |
217 | + self.msg_id_cache = rpc_amqp._MsgIdCache() |
218 | + self.waiters = ReplyWaiters() |
219 | + |
220 | + conn.declare_direct_consumer(reply_q, self) |
221 | + |
222 | + def __call__(self, message): |
223 | + message.acknowledge() |
224 | + self.incoming.append(message) |
225 | + |
226 | + def listen(self, msg_id): |
227 | + queue = moves.queue.Queue() |
228 | + self.waiters.add(msg_id, queue) |
229 | + |
230 | + def unlisten(self, msg_id): |
231 | + self.waiters.remove(msg_id) |
232 | + |
233 | + def _process_reply(self, data): |
234 | + result = None |
235 | + ending = False |
236 | + self.msg_id_cache.check_duplicate_message(data) |
237 | + if data['failure']: |
238 | + failure = data['failure'] |
239 | + result = rpc_common.deserialize_remote_exception( |
240 | + failure, self.allowed_remote_exmods) |
241 | + elif data.get('ending', False): |
242 | + ending = True |
243 | + else: |
244 | + result = data['result'] |
245 | + return result, ending |
246 | + |
247 | + def _poll_connection(self, msg_id, timeout): |
248 | + while True: |
249 | + while self.incoming: |
250 | + message_data = self.incoming.pop(0) |
251 | + |
252 | + incoming_msg_id = message_data.pop('_msg_id', None) |
253 | + if incoming_msg_id == msg_id: |
254 | + return self._process_reply(message_data) |
255 | + |
256 | + self.waiters.put(incoming_msg_id, message_data) |
257 | + |
258 | + try: |
259 | + self.conn.consume(limit=1, timeout=timeout) |
260 | + except rpc_common.Timeout: |
261 | + raise messaging.MessagingTimeout('Timed out waiting for a ' |
262 | + 'reply to message ID %s' |
263 | + % msg_id) |
264 | + |
265 | + def _poll_queue(self, msg_id, timeout): |
266 | + message = self.waiters.get(msg_id, timeout) |
267 | + if message is self.waiters.WAKE_UP: |
268 | + return None, None, True # lock was released |
269 | + |
270 | + reply, ending = self._process_reply(message) |
271 | + return reply, ending, False |
272 | + |
273 | + def _check_queue(self, msg_id): |
274 | + while True: |
275 | + message = self.waiters.check(msg_id) |
276 | + if message is self.waiters.WAKE_UP: |
277 | + continue |
278 | + if message is None: |
279 | + return None, None, True # queue is empty |
280 | + |
281 | + reply, ending = self._process_reply(message) |
282 | + return reply, ending, False |
283 | + |
284 | + def wait(self, msg_id, timeout): |
285 | + # |
286 | + # NOTE(markmc): we're waiting for a reply for msg_id to come in for on |
287 | + # the reply_q, but there may be other threads also waiting for replies |
288 | + # to other msg_ids |
289 | + # |
290 | + # Only one thread can be consuming from the queue using this connection |
291 | + # and we don't want to hold open a connection per thread, so instead we |
292 | + # have the first thread take responsibility for passing replies not |
293 | + # intended for itself to the appropriate thread. |
294 | + # |
295 | + final_reply = None |
296 | + while True: |
297 | + if self.conn_lock.acquire(False): |
298 | + # Ok, we're the thread responsible for polling the connection |
299 | + try: |
300 | + # Check the queue to see if a previous lock-holding thread |
301 | + # queued up a reply already |
302 | + while True: |
303 | + reply, ending, empty = self._check_queue(msg_id) |
304 | + if empty: |
305 | + break |
306 | + if not ending: |
307 | + final_reply = reply |
308 | + else: |
309 | + return final_reply |
310 | + |
311 | + # Now actually poll the connection |
312 | + while True: |
313 | + reply, ending = self._poll_connection(msg_id, timeout) |
314 | + if not ending: |
315 | + final_reply = reply |
316 | + else: |
317 | + return final_reply |
318 | + finally: |
319 | + self.conn_lock.release() |
320 | + # We've got our reply, tell the other threads to wake up |
321 | + # so that one of them will take over the responsibility for |
322 | + # polling the connection |
323 | + self.waiters.wake_all(msg_id) |
324 | + else: |
325 | + # We're going to wait for the first thread to pass us our reply |
326 | + reply, ending, trylock = self._poll_queue(msg_id, timeout) |
327 | + if trylock: |
328 | + # The first thread got its reply, let's try and take over |
329 | + # the responsibility for polling |
330 | + continue |
331 | + if not ending: |
332 | + final_reply = reply |
333 | + else: |
334 | + return final_reply |
335 | + |
336 | + |
337 | +class AMQPDriverBase(base.BaseDriver): |
338 | + |
339 | + def __init__(self, conf, url, connection_pool, |
340 | + default_exchange=None, allowed_remote_exmods=None): |
341 | + super(AMQPDriverBase, self).__init__(conf, url, default_exchange, |
342 | + allowed_remote_exmods) |
343 | + |
344 | + self._default_exchange = default_exchange |
345 | + |
346 | + self._connection_pool = connection_pool |
347 | + |
348 | + self._reply_q_lock = threading.Lock() |
349 | + self._reply_q = None |
350 | + self._reply_q_conn = None |
351 | + self._waiter = None |
352 | + |
353 | + def _get_exchange(self, target): |
354 | + return target.exchange or self._default_exchange |
355 | + |
356 | + def _get_connection(self, pooled=True): |
357 | + return rpc_amqp.ConnectionContext(self.conf, |
358 | + self._url, |
359 | + self._connection_pool, |
360 | + pooled=pooled) |
361 | + |
362 | + def _get_reply_q(self): |
363 | + with self._reply_q_lock: |
364 | + if self._reply_q is not None: |
365 | + return self._reply_q |
366 | + |
367 | + reply_q = 'reply_' + uuid.uuid4().hex |
368 | + |
369 | + conn = self._get_connection(pooled=False) |
370 | + |
371 | + self._waiter = ReplyWaiter(self.conf, reply_q, conn, |
372 | + self._allowed_remote_exmods) |
373 | + |
374 | + self._reply_q = reply_q |
375 | + self._reply_q_conn = conn |
376 | + |
377 | + return self._reply_q |
378 | + |
379 | + def _send(self, target, ctxt, message, |
380 | + wait_for_reply=None, timeout=None, |
381 | + envelope=True, notify=False, retry=None): |
382 | + |
383 | + # FIXME(markmc): remove this temporary hack |
384 | + class Context(object): |
385 | + def __init__(self, d): |
386 | + self.d = d |
387 | + |
388 | + def to_dict(self): |
389 | + return self.d |
390 | + |
391 | + context = Context(ctxt) |
392 | + msg = message |
393 | + |
394 | + if wait_for_reply: |
395 | + msg_id = uuid.uuid4().hex |
396 | + msg.update({'_msg_id': msg_id}) |
397 | + LOG.debug('MSG_ID is %s', msg_id) |
398 | + msg.update({'_reply_q': self._get_reply_q()}) |
399 | + |
400 | + rpc_amqp._add_unique_id(msg) |
401 | + rpc_amqp.pack_context(msg, context) |
402 | + |
403 | + if envelope: |
404 | + msg = rpc_common.serialize_msg(msg) |
405 | + |
406 | + if wait_for_reply: |
407 | + self._waiter.listen(msg_id) |
408 | + |
409 | + try: |
410 | + with self._get_connection() as conn: |
411 | + if notify: |
412 | + conn.notify_send(self._get_exchange(target), |
413 | + target.topic, msg, retry=retry) |
414 | + elif target.fanout: |
415 | + conn.fanout_send(target.topic, msg, retry=retry) |
416 | + else: |
417 | + topic = target.topic |
418 | + if target.server: |
419 | + topic = '%s.%s' % (target.topic, target.server) |
420 | + conn.topic_send(exchange_name=self._get_exchange(target), |
421 | + topic=topic, msg=msg, timeout=timeout, |
422 | + retry=retry) |
423 | + |
424 | + if wait_for_reply: |
425 | + result = self._waiter.wait(msg_id, timeout) |
426 | + if isinstance(result, Exception): |
427 | + raise result |
428 | + return result |
429 | + finally: |
430 | + if wait_for_reply: |
431 | + self._waiter.unlisten(msg_id) |
432 | + |
433 | + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, |
434 | + retry=None): |
435 | + return self._send(target, ctxt, message, wait_for_reply, timeout, |
436 | + retry=retry) |
437 | + |
438 | + def send_notification(self, target, ctxt, message, version, retry=None): |
439 | + return self._send(target, ctxt, message, |
440 | + envelope=(version == 2.0), notify=True, retry=retry) |
441 | + |
442 | + def listen(self, target): |
443 | + conn = self._get_connection(pooled=False) |
444 | + |
445 | + listener = AMQPListener(self, conn) |
446 | + |
447 | + conn.declare_topic_consumer(exchange_name=self._get_exchange(target), |
448 | + topic=target.topic, |
449 | + callback=listener) |
450 | + conn.declare_topic_consumer(exchange_name=self._get_exchange(target), |
451 | + topic='%s.%s' % (target.topic, |
452 | + target.server), |
453 | + callback=listener) |
454 | + conn.declare_fanout_consumer(target.topic, listener) |
455 | + |
456 | + return listener |
457 | + |
458 | + def listen_for_notifications(self, targets_and_priorities): |
459 | + conn = self._get_connection(pooled=False) |
460 | + |
461 | + listener = AMQPListener(self, conn) |
462 | + for target, priority in targets_and_priorities: |
463 | + conn.declare_topic_consumer( |
464 | + exchange_name=self._get_exchange(target), |
465 | + topic='%s.%s' % (target.topic, priority), |
466 | + callback=listener) |
467 | + return listener |
468 | + |
469 | + def cleanup(self): |
470 | + if self._connection_pool: |
471 | + self._connection_pool.empty() |
472 | + self._connection_pool = None |
473 | |
474 | === added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py' |
475 | --- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000 |
476 | +++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py 2015-12-17 11:03:34 +0000 |
477 | @@ -0,0 +1,349 @@ |
478 | +# Copyright 2010 United States Government as represented by the |
479 | +# Administrator of the National Aeronautics and Space Administration. |
480 | +# All Rights Reserved. |
481 | +# Copyright 2011 Red Hat, Inc. |
482 | +# |
483 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
484 | +# not use this file except in compliance with the License. You may obtain |
485 | +# a copy of the License at |
486 | +# |
487 | +# http://www.apache.org/licenses/LICENSE-2.0 |
488 | +# |
489 | +# Unless required by applicable law or agreed to in writing, software |
490 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
491 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
492 | +# License for the specific language governing permissions and limitations |
493 | +# under the License. |
494 | + |
495 | +import copy |
496 | +import logging |
497 | +import sys |
498 | +import traceback |
499 | + |
500 | +import six |
501 | + |
502 | +from oslo import messaging |
503 | +from oslo.messaging import _utils as utils |
504 | +from oslo.messaging.openstack.common.gettextutils import _ |
505 | +from oslo.messaging.openstack.common import jsonutils |
506 | + |
507 | +LOG = logging.getLogger(__name__) |
508 | + |
509 | +_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' |
510 | + |
511 | + |
512 | +'''RPC Envelope Version. |
513 | + |
514 | +This version number applies to the top level structure of messages sent out. |
515 | +It does *not* apply to the message payload, which must be versioned |
516 | +independently. For example, when using rpc APIs, a version number is applied |
517 | +for changes to the API being exposed over rpc. This version number is handled |
518 | +in the rpc proxy and dispatcher modules. |
519 | + |
520 | +This version number applies to the message envelope that is used in the |
521 | +serialization done inside the rpc layer. See serialize_msg() and |
522 | +deserialize_msg(). |
523 | + |
524 | +The current message format (version 2.0) is very simple. It is: |
525 | + |
526 | + { |
527 | + 'oslo.version': <RPC Envelope Version as a String>, |
528 | + 'oslo.message': <Application Message Payload, JSON encoded> |
529 | + } |
530 | + |
531 | +Message format version '1.0' is just considered to be the messages we sent |
532 | +without a message envelope. |
533 | + |
534 | +So, the current message envelope just includes the envelope version. It may |
535 | +eventually contain additional information, such as a signature for the message |
536 | +payload. |
537 | + |
538 | +We will JSON encode the application message payload. The message envelope, |
539 | +which includes the JSON encoded application message body, will be passed down |
540 | +to the messaging libraries as a dict. |
541 | +''' |
542 | +_RPC_ENVELOPE_VERSION = '2.0' |
543 | + |
544 | +_VERSION_KEY = 'oslo.version' |
545 | +_MESSAGE_KEY = 'oslo.message' |
546 | + |
547 | +_REMOTE_POSTFIX = '_Remote' |
548 | + |
549 | + |
550 | +class RPCException(Exception): |
551 | + msg_fmt = _("An unknown RPC related exception occurred.") |
552 | + |
553 | + def __init__(self, message=None, **kwargs): |
554 | + self.kwargs = kwargs |
555 | + |
556 | + if not message: |
557 | + try: |
558 | + message = self.msg_fmt % kwargs |
559 | + |
560 | + except Exception: |
561 | + # kwargs doesn't match a variable in the message |
562 | + # log the issue and the kwargs |
563 | + LOG.exception(_('Exception in string format operation')) |
564 | + for name, value in six.iteritems(kwargs): |
565 | + LOG.error("%s: %s", name, value) |
566 | + # at least get the core message out if something happened |
567 | + message = self.msg_fmt |
568 | + |
569 | + super(RPCException, self).__init__(message) |
570 | + |
571 | + |
572 | +class Timeout(RPCException): |
573 | + """Signifies that a timeout has occurred. |
574 | + |
575 | + This exception is raised if the rpc_response_timeout is reached while |
576 | + waiting for a response from the remote side. |
577 | + """ |
578 | + msg_fmt = _('Timeout while waiting on RPC response - ' |
579 | + 'topic: "%(topic)s", RPC method: "%(method)s" ' |
580 | + 'info: "%(info)s"') |
581 | + |
582 | + def __init__(self, info=None, topic=None, method=None): |
583 | + """Initiates Timeout object. |
584 | + |
585 | + :param info: Extra info to convey to the user |
586 | + :param topic: The topic that the rpc call was sent to |
587 | + :param rpc_method_name: The name of the rpc method being |
588 | + called |
589 | + """ |
590 | + self.info = info |
591 | + self.topic = topic |
592 | + self.method = method |
593 | + super(Timeout, self).__init__( |
594 | + None, |
595 | + info=info or _('<unknown>'), |
596 | + topic=topic or _('<unknown>'), |
597 | + method=method or _('<unknown>')) |
598 | + |
599 | + |
600 | +class DuplicateMessageError(RPCException): |
601 | + msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.") |
602 | + |
603 | + |
604 | +class InvalidRPCConnectionReuse(RPCException): |
605 | + msg_fmt = _("Invalid reuse of an RPC connection.") |
606 | + |
607 | + |
608 | +class UnsupportedRpcVersion(RPCException): |
609 | + msg_fmt = _("Specified RPC version, %(version)s, not supported by " |
610 | + "this endpoint.") |
611 | + |
612 | + |
613 | +class UnsupportedRpcEnvelopeVersion(RPCException): |
614 | + msg_fmt = _("Specified RPC envelope version, %(version)s, " |
615 | + "not supported by this endpoint.") |
616 | + |
617 | + |
618 | +class RpcVersionCapError(RPCException): |
619 | + msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") |
620 | + |
621 | + |
622 | +class Connection(object): |
623 | + """A connection, returned by rpc.create_connection(). |
624 | + |
625 | + This class represents a connection to the message bus used for rpc. |
626 | + An instance of this class should never be created by users of the rpc API. |
627 | + Use rpc.create_connection() instead. |
628 | + """ |
629 | + def close(self): |
630 | + """Close the connection. |
631 | + |
632 | + This method must be called when the connection will no longer be used. |
633 | + It will ensure that any resources associated with the connection, such |
634 | + as a network connection, and cleaned up. |
635 | + """ |
636 | + raise NotImplementedError() |
637 | + |
638 | + |
639 | +def _safe_log(log_func, msg, msg_data): |
640 | + """Sanitizes the msg_data field before logging.""" |
641 | + SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass'] |
642 | + |
643 | + def _fix_passwords(d): |
644 | + """Sanitizes the password fields in the dictionary.""" |
645 | + for k in six.iterkeys(d): |
646 | + if k.lower().find('password') != -1: |
647 | + d[k] = '<SANITIZED>' |
648 | + elif k.lower() in SANITIZE: |
649 | + d[k] = '<SANITIZED>' |
650 | + elif isinstance(d[k], dict): |
651 | + _fix_passwords(d[k]) |
652 | + return d |
653 | + |
654 | + return log_func(msg, _fix_passwords(copy.deepcopy(msg_data))) |
655 | + |
656 | + |
657 | +def serialize_remote_exception(failure_info, log_failure=True): |
658 | + """Prepares exception data to be sent over rpc. |
659 | + |
660 | + Failure_info should be a sys.exc_info() tuple. |
661 | + |
662 | + """ |
663 | + tb = traceback.format_exception(*failure_info) |
664 | + failure = failure_info[1] |
665 | + if log_failure: |
666 | + LOG.error(_("Returning exception %s to caller"), |
667 | + six.text_type(failure)) |
668 | + LOG.error(tb) |
669 | + |
670 | + kwargs = {} |
671 | + if hasattr(failure, 'kwargs'): |
672 | + kwargs = failure.kwargs |
673 | + |
674 | + # NOTE(matiu): With cells, it's possible to re-raise remote, remote |
675 | + # exceptions. Lets turn it back into the original exception type. |
676 | + cls_name = six.text_type(failure.__class__.__name__) |
677 | + mod_name = six.text_type(failure.__class__.__module__) |
678 | + if (cls_name.endswith(_REMOTE_POSTFIX) and |
679 | + mod_name.endswith(_REMOTE_POSTFIX)): |
680 | + cls_name = cls_name[:-len(_REMOTE_POSTFIX)] |
681 | + mod_name = mod_name[:-len(_REMOTE_POSTFIX)] |
682 | + |
683 | + data = { |
684 | + 'class': cls_name, |
685 | + 'module': mod_name, |
686 | + 'message': six.text_type(failure), |
687 | + 'tb': tb, |
688 | + 'args': failure.args, |
689 | + 'kwargs': kwargs |
690 | + } |
691 | + |
692 | + json_data = jsonutils.dumps(data) |
693 | + |
694 | + return json_data |
695 | + |
696 | + |
697 | +def deserialize_remote_exception(data, allowed_remote_exmods): |
698 | + failure = jsonutils.loads(six.text_type(data)) |
699 | + |
700 | + trace = failure.get('tb', []) |
701 | + message = failure.get('message', "") + "\n" + "\n".join(trace) |
702 | + name = failure.get('class') |
703 | + module = failure.get('module') |
704 | + |
705 | + # NOTE(ameade): We DO NOT want to allow just any module to be imported, in |
706 | + # order to prevent arbitrary code execution. |
707 | + if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods: |
708 | + return messaging.RemoteError(name, failure.get('message'), trace) |
709 | + |
710 | + try: |
711 | + __import__(module) |
712 | + mod = sys.modules[module] |
713 | + klass = getattr(mod, name) |
714 | + if not issubclass(klass, Exception): |
715 | + raise TypeError("Can only deserialize Exceptions") |
716 | + |
717 | + failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) |
718 | + except (AttributeError, TypeError, ImportError): |
719 | + return messaging.RemoteError(name, failure.get('message'), trace) |
720 | + |
721 | + ex_type = type(failure) |
722 | + str_override = lambda self: message |
723 | + new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,), |
724 | + {'__str__': str_override, '__unicode__': str_override}) |
725 | + new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX) |
726 | + try: |
727 | + # NOTE(ameade): Dynamically create a new exception type and swap it in |
728 | + # as the new type for the exception. This only works on user defined |
729 | + # Exceptions and not core Python exceptions. This is important because |
730 | + # we cannot necessarily change an exception message so we must override |
731 | + # the __str__ method. |
732 | + failure.__class__ = new_ex_type |
733 | + except TypeError: |
734 | + # NOTE(ameade): If a core exception then just add the traceback to the |
735 | + # first exception argument. |
736 | + failure.args = (message,) + failure.args[1:] |
737 | + return failure |
738 | + |
739 | + |
740 | +class CommonRpcContext(object): |
741 | + def __init__(self, **kwargs): |
742 | + self.values = kwargs |
743 | + |
744 | + def __getattr__(self, key): |
745 | + try: |
746 | + return self.values[key] |
747 | + except KeyError: |
748 | + raise AttributeError(key) |
749 | + |
750 | + def to_dict(self): |
751 | + return copy.deepcopy(self.values) |
752 | + |
753 | + @classmethod |
754 | + def from_dict(cls, values): |
755 | + return cls(**values) |
756 | + |
757 | + def deepcopy(self): |
758 | + return self.from_dict(self.to_dict()) |
759 | + |
760 | + def update_store(self): |
761 | + # local.store.context = self |
762 | + pass |
763 | + |
764 | + |
765 | +class ClientException(Exception): |
766 | + """Encapsulates actual exception expected to be hit by a RPC proxy object. |
767 | + |
768 | + Merely instantiating it records the current exception information, which |
769 | + will be passed back to the RPC client without exceptional logging. |
770 | + """ |
771 | + def __init__(self): |
772 | + self._exc_info = sys.exc_info() |
773 | + |
774 | + |
775 | +def serialize_msg(raw_msg): |
776 | + # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more |
777 | + # information about this format. |
778 | + msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, |
779 | + _MESSAGE_KEY: jsonutils.dumps(raw_msg)} |
780 | + |
781 | + return msg |
782 | + |
783 | + |
784 | +def deserialize_msg(msg): |
785 | + # NOTE(russellb): Hang on to your hats, this road is about to |
786 | + # get a little bumpy. |
787 | + # |
788 | + # Robustness Principle: |
789 | + # "Be strict in what you send, liberal in what you accept." |
790 | + # |
791 | + # At this point we have to do a bit of guessing about what it |
792 | + # is we just received. Here is the set of possibilities: |
793 | + # |
794 | + # 1) We received a dict. This could be 2 things: |
795 | + # |
796 | + # a) Inspect it to see if it looks like a standard message envelope. |
797 | + # If so, great! |
798 | + # |
799 | + # b) If it doesn't look like a standard message envelope, it could either |
800 | + # be a notification, or a message from before we added a message |
801 | + # envelope (referred to as version 1.0). |
802 | + # Just return the message as-is. |
803 | + # |
804 | + # 2) It's any other non-dict type. Just return it and hope for the best. |
805 | + # This case covers return values from rpc.call() from before message |
806 | + # envelopes were used. (messages to call a method were always a dict) |
807 | + |
808 | + if not isinstance(msg, dict): |
809 | + # See #2 above. |
810 | + return msg |
811 | + |
812 | + base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) |
813 | + if not all(map(lambda key: key in msg, base_envelope_keys)): |
814 | + # See #1.b above. |
815 | + return msg |
816 | + |
817 | + # At this point we think we have the message envelope |
818 | + # format we were expecting. (#1.a above) |
819 | + |
820 | + if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION, |
821 | + msg[_VERSION_KEY]): |
822 | + raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) |
823 | + |
824 | + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) |
825 | + |
826 | + return raw_msg |
827 | |
828 | === added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py' |
829 | --- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000 |
830 | +++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000 |
831 | @@ -0,0 +1,821 @@ |
832 | +# Copyright 2011 OpenStack Foundation |
833 | +# |
834 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
835 | +# not use this file except in compliance with the License. You may obtain |
836 | +# a copy of the License at |
837 | +# |
838 | +# http://www.apache.org/licenses/LICENSE-2.0 |
839 | +# |
840 | +# Unless required by applicable law or agreed to in writing, software |
841 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
842 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
843 | +# License for the specific language governing permissions and limitations |
844 | +# under the License. |
845 | + |
846 | +import functools |
847 | +import itertools |
848 | +import logging |
849 | +import random |
850 | +import socket |
851 | +import ssl |
852 | +import time |
853 | +import uuid |
854 | + |
855 | +import kombu |
856 | +import kombu.connection |
857 | +import kombu.entity |
858 | +import kombu.messaging |
859 | +import six |
860 | + |
861 | +from oslo.config import cfg |
862 | +from oslo.messaging._drivers import amqp as rpc_amqp |
863 | +from oslo.messaging._drivers import amqpdriver |
864 | +from oslo.messaging._drivers import common as rpc_common |
865 | +from oslo.messaging import exceptions |
866 | +from oslo.messaging.openstack.common.gettextutils import _ |
867 | +from oslo.utils import netutils |
868 | + |
869 | +rabbit_opts = [ |
870 | + cfg.StrOpt('kombu_ssl_version', |
871 | + default='', |
872 | + help='SSL version to use (valid only if SSL enabled). ' |
873 | + 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' |
874 | + 'be available on some distributions.' |
875 | + ), |
876 | + cfg.StrOpt('kombu_ssl_keyfile', |
877 | + default='', |
878 | + help='SSL key file (valid only if SSL enabled).'), |
879 | + cfg.StrOpt('kombu_ssl_certfile', |
880 | + default='', |
881 | + help='SSL cert file (valid only if SSL enabled).'), |
882 | + cfg.StrOpt('kombu_ssl_ca_certs', |
883 | + default='', |
884 | + help='SSL certification authority file ' |
885 | + '(valid only if SSL enabled).'), |
886 | + cfg.FloatOpt('kombu_reconnect_delay', |
887 | + default=1.0, |
888 | + help='How long to wait before reconnecting in response to an ' |
889 | + 'AMQP consumer cancel notification.'), |
890 | + cfg.StrOpt('rabbit_host', |
891 | + default='localhost', |
892 | + help='The RabbitMQ broker address where a single node is ' |
893 | + 'used.'), |
894 | + cfg.IntOpt('rabbit_port', |
895 | + default=5672, |
896 | + help='The RabbitMQ broker port where a single node is used.'), |
897 | + cfg.ListOpt('rabbit_hosts', |
898 | + default=['$rabbit_host:$rabbit_port'], |
899 | + help='RabbitMQ HA cluster host:port pairs.'), |
900 | + cfg.BoolOpt('rabbit_use_ssl', |
901 | + default=False, |
902 | + help='Connect over SSL for RabbitMQ.'), |
903 | + cfg.StrOpt('rabbit_userid', |
904 | + default='guest', |
905 | + help='The RabbitMQ userid.'), |
906 | + cfg.StrOpt('rabbit_password', |
907 | + default='guest', |
908 | + help='The RabbitMQ password.', |
909 | + secret=True), |
910 | + cfg.StrOpt('rabbit_login_method', |
911 | + default='AMQPLAIN', |
912 | + help='the RabbitMQ login method'), |
913 | + cfg.StrOpt('rabbit_virtual_host', |
914 | + default='/', |
915 | + help='The RabbitMQ virtual host.'), |
916 | + cfg.IntOpt('rabbit_retry_interval', |
917 | + default=1, |
918 | + help='How frequently to retry connecting with RabbitMQ.'), |
919 | + cfg.IntOpt('rabbit_retry_backoff', |
920 | + default=2, |
921 | + help='How long to backoff for between retries when connecting ' |
922 | + 'to RabbitMQ.'), |
923 | + cfg.IntOpt('rabbit_max_retries', |
924 | + default=0, |
925 | + help='Maximum number of RabbitMQ connection retries. ' |
926 | + 'Default is 0 (infinite retry count).'), |
927 | + cfg.BoolOpt('rabbit_ha_queues', |
928 | + default=False, |
929 | + help='Use HA queues in RabbitMQ (x-ha-policy: all). ' |
930 | + 'If you change this option, you must wipe the ' |
931 | + 'RabbitMQ database.'), |
932 | + |
933 | + # FIXME(markmc): this was toplevel in openstack.common.rpc |
934 | + cfg.BoolOpt('fake_rabbit', |
935 | + default=False, |
936 | + help='If passed, use a fake RabbitMQ provider.'), |
937 | +] |
938 | + |
939 | +LOG = logging.getLogger(__name__) |
940 | + |
941 | + |
942 | +def _get_queue_arguments(conf): |
943 | + """Construct the arguments for declaring a queue. |
944 | + |
945 | + If the rabbit_ha_queues option is set, we declare a mirrored queue |
946 | + as described here: |
947 | + |
948 | + http://www.rabbitmq.com/ha.html |
949 | + |
950 | + Setting x-ha-policy to all means that the queue will be mirrored |
951 | + to all nodes in the cluster. |
952 | + """ |
953 | + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} |
954 | + |
955 | + |
956 | +class RabbitMessage(dict): |
957 | + def __init__(self, raw_message): |
958 | + super(RabbitMessage, self).__init__( |
959 | + rpc_common.deserialize_msg(raw_message.payload)) |
960 | + self._raw_message = raw_message |
961 | + |
962 | + def acknowledge(self): |
963 | + self._raw_message.ack() |
964 | + |
965 | + def requeue(self): |
966 | + self._raw_message.requeue() |
967 | + |
968 | + |
969 | +class ConsumerBase(object): |
970 | + """Consumer base class.""" |
971 | + |
972 | + def __init__(self, channel, callback, tag, **kwargs): |
973 | + """Declare a queue on an amqp channel. |
974 | + |
975 | + 'channel' is the amqp channel to use |
976 | + 'callback' is the callback to call when messages are received |
977 | + 'tag' is a unique ID for the consumer on the channel |
978 | + |
979 | + queue name, exchange name, and other kombu options are |
980 | + passed in here as a dictionary. |
981 | + """ |
982 | + self.callback = callback |
983 | + self.tag = six.text_type(tag) |
984 | + self.kwargs = kwargs |
985 | + self.queue = None |
986 | + self.reconnect(channel) |
987 | + |
988 | + def reconnect(self, channel): |
989 | + """Re-declare the queue after a rabbit reconnect.""" |
990 | + self.channel = channel |
991 | + self.kwargs['channel'] = channel |
992 | + self.queue = kombu.entity.Queue(**self.kwargs) |
993 | + self.queue.declare() |
994 | + |
995 | + def _callback_handler(self, message, callback): |
996 | + """Call callback with deserialized message. |
997 | + |
998 | + Messages that are processed and ack'ed. |
999 | + """ |
1000 | + |
1001 | + try: |
1002 | + callback(RabbitMessage(message)) |
1003 | + except Exception: |
1004 | + LOG.exception(_("Failed to process message" |
1005 | + " ... skipping it.")) |
1006 | + message.ack() |
1007 | + |
1008 | + def consume(self, *args, **kwargs): |
1009 | + """Actually declare the consumer on the amqp channel. This will |
1010 | + start the flow of messages from the queue. Using the |
1011 | + Connection.iterconsume() iterator will process the messages, |
1012 | + calling the appropriate callback. |
1013 | + |
1014 | + If a callback is specified in kwargs, use that. Otherwise, |
1015 | + use the callback passed during __init__() |
1016 | + |
1017 | + If kwargs['nowait'] is True, then this call will block until |
1018 | + a message is read. |
1019 | + |
1020 | + """ |
1021 | + |
1022 | + options = {'consumer_tag': self.tag} |
1023 | + options['nowait'] = kwargs.get('nowait', False) |
1024 | + callback = kwargs.get('callback', self.callback) |
1025 | + if not callback: |
1026 | + raise ValueError("No callback defined") |
1027 | + |
1028 | + def _callback(raw_message): |
1029 | + message = self.channel.message_to_python(raw_message) |
1030 | + self._callback_handler(message, callback) |
1031 | + |
1032 | + self.queue.consume(*args, callback=_callback, **options) |
1033 | + |
1034 | + def cancel(self): |
1035 | + """Cancel the consuming from the queue, if it has started.""" |
1036 | + try: |
1037 | + self.queue.cancel(self.tag) |
1038 | + except KeyError as e: |
1039 | + # NOTE(comstud): Kludge to get around a amqplib bug |
1040 | + if six.text_type(e) != "u'%s'" % self.tag: |
1041 | + raise |
1042 | + self.queue = None |
1043 | + |
1044 | + |
1045 | +class DirectConsumer(ConsumerBase): |
1046 | + """Queue/consumer class for 'direct'.""" |
1047 | + |
1048 | + def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): |
1049 | + """Init a 'direct' queue. |
1050 | + |
1051 | + 'channel' is the amqp channel to use |
1052 | + 'msg_id' is the msg_id to listen on |
1053 | + 'callback' is the callback to call when messages are received |
1054 | + 'tag' is a unique ID for the consumer on the channel |
1055 | + |
1056 | + Other kombu options may be passed |
1057 | + """ |
1058 | + # Default options |
1059 | + options = {'durable': False, |
1060 | + 'queue_arguments': _get_queue_arguments(conf), |
1061 | + 'auto_delete': True, |
1062 | + 'exclusive': False} |
1063 | + options.update(kwargs) |
1064 | + exchange = kombu.entity.Exchange(name=msg_id, |
1065 | + type='direct', |
1066 | + durable=options['durable'], |
1067 | + auto_delete=options['auto_delete']) |
1068 | + super(DirectConsumer, self).__init__(channel, |
1069 | + callback, |
1070 | + tag, |
1071 | + name=msg_id, |
1072 | + exchange=exchange, |
1073 | + routing_key=msg_id, |
1074 | + **options) |
1075 | + |
1076 | + |
1077 | +class TopicConsumer(ConsumerBase): |
1078 | + """Consumer class for 'topic'.""" |
1079 | + |
1080 | + def __init__(self, conf, channel, topic, callback, tag, exchange_name, |
1081 | + name=None, **kwargs): |
1082 | + """Init a 'topic' queue. |
1083 | + |
1084 | + :param channel: the amqp channel to use |
1085 | + :param topic: the topic to listen on |
1086 | + :paramtype topic: str |
1087 | + :param callback: the callback to call when messages are received |
1088 | + :param tag: a unique ID for the consumer on the channel |
1089 | + :param exchange_name: the exchange name to use |
1090 | + :param name: optional queue name, defaults to topic |
1091 | + :paramtype name: str |
1092 | + |
1093 | + Other kombu options may be passed as keyword arguments |
1094 | + """ |
1095 | + # Default options |
1096 | + options = {'durable': conf.amqp_durable_queues, |
1097 | + 'queue_arguments': _get_queue_arguments(conf), |
1098 | + 'auto_delete': conf.amqp_auto_delete, |
1099 | + 'exclusive': False} |
1100 | + options.update(kwargs) |
1101 | + exchange = kombu.entity.Exchange(name=exchange_name, |
1102 | + type='topic', |
1103 | + durable=options['durable'], |
1104 | + auto_delete=options['auto_delete']) |
1105 | + super(TopicConsumer, self).__init__(channel, |
1106 | + callback, |
1107 | + tag, |
1108 | + name=name or topic, |
1109 | + exchange=exchange, |
1110 | + routing_key=topic, |
1111 | + **options) |
1112 | + |
1113 | + |
1114 | +class FanoutConsumer(ConsumerBase): |
1115 | + """Consumer class for 'fanout'.""" |
1116 | + |
1117 | + def __init__(self, conf, channel, topic, callback, tag, **kwargs): |
1118 | + """Init a 'fanout' queue. |
1119 | + |
1120 | + 'channel' is the amqp channel to use |
1121 | + 'topic' is the topic to listen on |
1122 | + 'callback' is the callback to call when messages are received |
1123 | + 'tag' is a unique ID for the consumer on the channel |
1124 | + |
1125 | + Other kombu options may be passed |
1126 | + """ |
1127 | + unique = uuid.uuid4().hex |
1128 | + exchange_name = '%s_fanout' % topic |
1129 | + queue_name = '%s_fanout_%s' % (topic, unique) |
1130 | + |
1131 | + # Default options |
1132 | + options = {'durable': False, |
1133 | + 'queue_arguments': _get_queue_arguments(conf), |
1134 | + 'auto_delete': True, |
1135 | + 'exclusive': False} |
1136 | + options.update(kwargs) |
1137 | + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', |
1138 | + durable=options['durable'], |
1139 | + auto_delete=options['auto_delete']) |
1140 | + super(FanoutConsumer, self).__init__(channel, callback, tag, |
1141 | + name=queue_name, |
1142 | + exchange=exchange, |
1143 | + routing_key=topic, |
1144 | + **options) |
1145 | + |
1146 | + |
1147 | +class Publisher(object): |
1148 | + """Base Publisher class.""" |
1149 | + |
1150 | + def __init__(self, channel, exchange_name, routing_key, **kwargs): |
1151 | + """Init the Publisher class with the exchange_name, routing_key, |
1152 | + and other options |
1153 | + """ |
1154 | + self.exchange_name = exchange_name |
1155 | + self.routing_key = routing_key |
1156 | + self.kwargs = kwargs |
1157 | + self.reconnect(channel) |
1158 | + |
1159 | + def reconnect(self, channel): |
1160 | + """Re-establish the Producer after a rabbit reconnection.""" |
1161 | + self.exchange = kombu.entity.Exchange(name=self.exchange_name, |
1162 | + **self.kwargs) |
1163 | + self.producer = kombu.messaging.Producer(exchange=self.exchange, |
1164 | + channel=channel, |
1165 | + routing_key=self.routing_key) |
1166 | + |
1167 | + def send(self, msg, timeout=None): |
1168 | + """Send a message.""" |
1169 | + if timeout: |
1170 | + # |
1171 | + # AMQP TTL is in milliseconds when set in the header. |
1172 | + # |
1173 | + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) |
1174 | + else: |
1175 | + self.producer.publish(msg) |
1176 | + |
1177 | + |
1178 | +class DirectPublisher(Publisher): |
1179 | + """Publisher class for 'direct'.""" |
1180 | + def __init__(self, conf, channel, topic, **kwargs): |
1181 | + """Init a 'direct' publisher. |
1182 | + |
1183 | + Kombu options may be passed as keyword args to override defaults |
1184 | + """ |
1185 | + |
1186 | + options = {'durable': False, |
1187 | + 'auto_delete': True, |
1188 | + 'exclusive': False} |
1189 | + options.update(kwargs) |
1190 | + super(DirectPublisher, self).__init__(channel, topic, topic, |
1191 | + type='direct', **options) |
1192 | + |
1193 | + |
1194 | +class TopicPublisher(Publisher): |
1195 | + """Publisher class for 'topic'.""" |
1196 | + def __init__(self, conf, channel, exchange_name, topic, **kwargs): |
1197 | + """Init a 'topic' publisher. |
1198 | + |
1199 | + Kombu options may be passed as keyword args to override defaults |
1200 | + """ |
1201 | + options = {'durable': conf.amqp_durable_queues, |
1202 | + 'auto_delete': conf.amqp_auto_delete, |
1203 | + 'exclusive': False} |
1204 | + options.update(kwargs) |
1205 | + super(TopicPublisher, self).__init__(channel, |
1206 | + exchange_name, |
1207 | + topic, |
1208 | + type='topic', |
1209 | + **options) |
1210 | + |
1211 | + |
1212 | +class FanoutPublisher(Publisher): |
1213 | + """Publisher class for 'fanout'.""" |
1214 | + def __init__(self, conf, channel, topic, **kwargs): |
1215 | + """Init a 'fanout' publisher. |
1216 | + |
1217 | + Kombu options may be passed as keyword args to override defaults |
1218 | + """ |
1219 | + options = {'durable': False, |
1220 | + 'auto_delete': True, |
1221 | + 'exclusive': False} |
1222 | + options.update(kwargs) |
1223 | + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, |
1224 | + None, type='fanout', **options) |
1225 | + |
1226 | + |
1227 | +class NotifyPublisher(TopicPublisher): |
1228 | + """Publisher class for 'notify'.""" |
1229 | + |
1230 | + def __init__(self, conf, channel, exchange_name, topic, **kwargs): |
1231 | + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) |
1232 | + self.queue_arguments = _get_queue_arguments(conf) |
1233 | + super(NotifyPublisher, self).__init__(conf, channel, exchange_name, |
1234 | + topic, **kwargs) |
1235 | + |
1236 | + def reconnect(self, channel): |
1237 | + super(NotifyPublisher, self).reconnect(channel) |
1238 | + |
1239 | + # NOTE(jerdfelt): Normally the consumer would create the queue, but |
1240 | + # we do this to ensure that messages don't get dropped if the |
1241 | + # consumer is started after we do |
1242 | + queue = kombu.entity.Queue(channel=channel, |
1243 | + exchange=self.exchange, |
1244 | + durable=self.durable, |
1245 | + name=self.routing_key, |
1246 | + routing_key=self.routing_key, |
1247 | + queue_arguments=self.queue_arguments) |
1248 | + queue.declare() |
1249 | + |
1250 | + |
1251 | +class Connection(object): |
1252 | + """Connection object.""" |
1253 | + |
1254 | + pools = {} |
1255 | + |
1256 | + def __init__(self, conf, url): |
1257 | + self.consumers = [] |
1258 | + self.conf = conf |
1259 | + self.max_retries = self.conf.rabbit_max_retries |
1260 | + # Try forever? |
1261 | + if self.max_retries <= 0: |
1262 | + self.max_retries = None |
1263 | + self.interval_start = self.conf.rabbit_retry_interval |
1264 | + self.interval_stepping = self.conf.rabbit_retry_backoff |
1265 | + # max retry-interval = 30 seconds |
1266 | + self.interval_max = 30 |
1267 | + self.memory_transport = False |
1268 | + |
1269 | + ssl_params = self._fetch_ssl_params() |
1270 | + |
1271 | + if url.virtual_host is not None: |
1272 | + virtual_host = url.virtual_host |
1273 | + else: |
1274 | + virtual_host = self.conf.rabbit_virtual_host |
1275 | + |
1276 | + self.brokers_params = [] |
1277 | + if url.hosts: |
1278 | + for host in url.hosts: |
1279 | + params = { |
1280 | + 'hostname': host.hostname, |
1281 | + 'port': host.port or 5672, |
1282 | + 'userid': host.username or '', |
1283 | + 'password': host.password or '', |
1284 | + 'login_method': self.conf.rabbit_login_method, |
1285 | + 'virtual_host': virtual_host |
1286 | + } |
1287 | + if self.conf.fake_rabbit: |
1288 | + params['transport'] = 'memory' |
1289 | + if self.conf.rabbit_use_ssl: |
1290 | + params['ssl'] = ssl_params |
1291 | + |
1292 | + self.brokers_params.append(params) |
1293 | + else: |
1294 | + # Old configuration format |
1295 | + for adr in self.conf.rabbit_hosts: |
1296 | + hostname, port = netutils.parse_host_port( |
1297 | + adr, default_port=self.conf.rabbit_port) |
1298 | + |
1299 | + params = { |
1300 | + 'hostname': hostname, |
1301 | + 'port': port, |
1302 | + 'userid': self.conf.rabbit_userid, |
1303 | + 'password': self.conf.rabbit_password, |
1304 | + 'login_method': self.conf.rabbit_login_method, |
1305 | + 'virtual_host': virtual_host |
1306 | + } |
1307 | + |
1308 | + if self.conf.fake_rabbit: |
1309 | + params['transport'] = 'memory' |
1310 | + if self.conf.rabbit_use_ssl: |
1311 | + params['ssl'] = ssl_params |
1312 | + |
1313 | + self.brokers_params.append(params) |
1314 | + |
1315 | + random.shuffle(self.brokers_params) |
1316 | + self.brokers = itertools.cycle(self.brokers_params) |
1317 | + |
1318 | + self.memory_transport = self.conf.fake_rabbit |
1319 | + |
1320 | + self.connection = None |
1321 | + self.do_consume = None |
1322 | + self.reconnect() |
1323 | + |
1324 | + # FIXME(markmc): use oslo sslutils when it is available as a library |
1325 | + _SSL_PROTOCOLS = { |
1326 | + "tlsv1": ssl.PROTOCOL_TLSv1, |
1327 | + "sslv23": ssl.PROTOCOL_SSLv23, |
1328 | + "sslv3": ssl.PROTOCOL_SSLv3 |
1329 | + } |
1330 | + |
1331 | + try: |
1332 | + _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 |
1333 | + except AttributeError: |
1334 | + pass |
1335 | + |
1336 | + @classmethod |
1337 | + def validate_ssl_version(cls, version): |
1338 | + key = version.lower() |
1339 | + try: |
1340 | + return cls._SSL_PROTOCOLS[key] |
1341 | + except KeyError: |
1342 | + raise RuntimeError(_("Invalid SSL version : %s") % version) |
1343 | + |
1344 | + def _fetch_ssl_params(self): |
1345 | + """Handles fetching what ssl params should be used for the connection |
1346 | + (if any). |
1347 | + """ |
1348 | + ssl_params = dict() |
1349 | + |
1350 | + # http://docs.python.org/library/ssl.html - ssl.wrap_socket |
1351 | + if self.conf.kombu_ssl_version: |
1352 | + ssl_params['ssl_version'] = self.validate_ssl_version( |
1353 | + self.conf.kombu_ssl_version) |
1354 | + if self.conf.kombu_ssl_keyfile: |
1355 | + ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile |
1356 | + if self.conf.kombu_ssl_certfile: |
1357 | + ssl_params['certfile'] = self.conf.kombu_ssl_certfile |
1358 | + if self.conf.kombu_ssl_ca_certs: |
1359 | + ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs |
1360 | + # We might want to allow variations in the |
1361 | + # future with this? |
1362 | + ssl_params['cert_reqs'] = ssl.CERT_REQUIRED |
1363 | + |
1364 | + # Return the extended behavior or just have the default behavior |
1365 | + return ssl_params or True |
1366 | + |
1367 | + def _connect(self, broker): |
1368 | + """Connect to rabbit. Re-establish any queues that may have |
1369 | + been declared before if we are reconnecting. Exceptions should |
1370 | + be handled by the caller. |
1371 | + """ |
1372 | + LOG.info(_("Connecting to AMQP server on " |
1373 | + "%(hostname)s:%(port)d"), broker) |
1374 | + self.connection = kombu.connection.BrokerConnection(**broker) |
1375 | + self.connection_errors = self.connection.connection_errors |
1376 | + self.channel_errors = self.connection.channel_errors |
1377 | + if self.memory_transport: |
1378 | + # Kludge to speed up tests. |
1379 | + self.connection.transport.polling_interval = 0.0 |
1380 | + self.do_consume = True |
1381 | + self.consumer_num = itertools.count(1) |
1382 | + self.connection.connect() |
1383 | + self.channel = self.connection.channel() |
1384 | + # work around 'memory' transport bug in 1.1.3 |
1385 | + if self.memory_transport: |
1386 | + self.channel._new_queue('ae.undeliver') |
1387 | + for consumer in self.consumers: |
1388 | + consumer.reconnect(self.channel) |
1389 | + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), |
1390 | + broker) |
1391 | + |
1392 | + def _disconnect(self): |
1393 | + if self.connection: |
1394 | + # XXX(nic): when reconnecting to a RabbitMQ cluster |
1395 | + # with mirrored queues in use, the attempt to release the |
1396 | + # connection can hang "indefinitely" somewhere deep down |
1397 | + # in Kombu. Blocking the thread for a bit prior to |
1398 | + # release seems to kludge around the problem where it is |
1399 | + # otherwise reproduceable. |
1400 | + if self.conf.kombu_reconnect_delay > 0: |
1401 | + LOG.info(_("Delaying reconnect for %1.1f seconds...") % |
1402 | + self.conf.kombu_reconnect_delay) |
1403 | + time.sleep(self.conf.kombu_reconnect_delay) |
1404 | + |
1405 | + try: |
1406 | + self.connection.release() |
1407 | + except self.connection_errors: |
1408 | + pass |
1409 | + self.connection = None |
1410 | + |
1411 | + def reconnect(self, retry=None): |
1412 | + """Handles reconnecting and re-establishing queues. |
1413 | + Will retry up to retry number of times. |
1414 | + retry = None means use the value of rabbit_max_retries |
1415 | + retry = -1 means to retry forever |
1416 | + retry = 0 means no retry |
1417 | + retry = N means N retries |
1418 | + Sleep between tries, starting at self.interval_start |
1419 | + seconds, backing off self.interval_stepping number of seconds |
1420 | + each attempt. |
1421 | + """ |
1422 | + |
1423 | + attempt = 0 |
1424 | + loop_forever = False |
1425 | + if retry is None: |
1426 | + retry = self.max_retries |
1427 | + if retry is None or retry < 0: |
1428 | + loop_forever = True |
1429 | + |
1430 | + while True: |
1431 | + self._disconnect() |
1432 | + |
1433 | + broker = six.next(self.brokers) |
1434 | + attempt += 1 |
1435 | + try: |
1436 | + self._connect(broker) |
1437 | + return |
1438 | + except IOError as ex: |
1439 | + e = ex |
1440 | + except self.connection_errors as ex: |
1441 | + e = ex |
1442 | + except Exception as ex: |
1443 | + # NOTE(comstud): Unfortunately it's possible for amqplib |
1444 | + # to return an error not covered by its transport |
1445 | + # connection_errors in the case of a timeout waiting for |
1446 | + # a protocol response. (See paste link in LP888621) |
1447 | + # So, we check all exceptions for 'timeout' in them |
1448 | + # and try to reconnect in this case. |
1449 | + if 'timeout' not in six.text_type(e): |
1450 | + raise |
1451 | + e = ex |
1452 | + |
1453 | + log_info = {} |
1454 | + log_info['err_str'] = e |
1455 | + log_info['retry'] = retry or 0 |
1456 | + log_info.update(broker) |
1457 | + |
1458 | + if not loop_forever and attempt > retry: |
1459 | + msg = _('Unable to connect to AMQP server on ' |
1460 | + '%(hostname)s:%(port)d after %(retry)d ' |
1461 | + 'tries: %(err_str)s') % log_info |
1462 | + LOG.error(msg) |
1463 | + raise exceptions.MessageDeliveryFailure(msg) |
1464 | + else: |
1465 | + if attempt == 1: |
1466 | + sleep_time = self.interval_start or 1 |
1467 | + elif attempt > 1: |
1468 | + sleep_time += self.interval_stepping |
1469 | + |
1470 | + sleep_time = min(sleep_time, self.interval_max) |
1471 | + |
1472 | + log_info['sleep_time'] = sleep_time |
1473 | + if 'Socket closed' in six.text_type(e): |
1474 | + LOG.error(_('AMQP server %(hostname)s:%(port)d closed' |
1475 | + ' the connection. Check login credentials:' |
1476 | + ' %(err_str)s'), log_info) |
1477 | + else: |
1478 | + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' |
1479 | + 'unreachable: %(err_str)s. Trying again in ' |
1480 | + '%(sleep_time)d seconds.'), log_info) |
1481 | + time.sleep(sleep_time) |
1482 | + |
1483 | + def ensure(self, error_callback, method, retry=None): |
1484 | + while True: |
1485 | + try: |
1486 | + return method() |
1487 | + except self.connection_errors as e: |
1488 | + if error_callback: |
1489 | + error_callback(e) |
1490 | + except self.channel_errors as e: |
1491 | + if error_callback: |
1492 | + error_callback(e) |
1493 | + except (socket.timeout, IOError) as e: |
1494 | + if error_callback: |
1495 | + error_callback(e) |
1496 | + except Exception as e: |
1497 | + # NOTE(comstud): Unfortunately it's possible for amqplib |
1498 | + # to return an error not covered by its transport |
1499 | + # connection_errors in the case of a timeout waiting for |
1500 | + # a protocol response. (See paste link in LP888621) |
1501 | + # So, we check all exceptions for 'timeout' in them |
1502 | + # and try to reconnect in this case. |
1503 | + if 'timeout' not in six.text_type(e): |
1504 | + raise |
1505 | + if error_callback: |
1506 | + error_callback(e) |
1507 | + self.reconnect(retry=retry) |
1508 | + |
1509 | + def get_channel(self): |
1510 | + """Convenience call for bin/clear_rabbit_queues.""" |
1511 | + return self.channel |
1512 | + |
1513 | + def close(self): |
1514 | + """Close/release this connection.""" |
1515 | + if self.connection: |
1516 | + self.connection.release() |
1517 | + self.connection = None |
1518 | + |
1519 | + def reset(self): |
1520 | + """Reset a connection so it can be used again.""" |
1521 | + self.channel.close() |
1522 | + self.channel = self.connection.channel() |
1523 | + # work around 'memory' transport bug in 1.1.3 |
1524 | + if self.memory_transport: |
1525 | + self.channel._new_queue('ae.undeliver') |
1526 | + self.consumers = [] |
1527 | + |
1528 | + def declare_consumer(self, consumer_cls, topic, callback): |
1529 | + """Create a Consumer using the class that was passed in and |
1530 | + add it to our list of consumers |
1531 | + """ |
1532 | + |
1533 | + def _connect_error(exc): |
1534 | + log_info = {'topic': topic, 'err_str': exc} |
1535 | + LOG.error(_("Failed to declare consumer for topic '%(topic)s': " |
1536 | + "%(err_str)s"), log_info) |
1537 | + |
1538 | + def _declare_consumer(): |
1539 | + consumer = consumer_cls(self.conf, self.channel, topic, callback, |
1540 | + six.next(self.consumer_num)) |
1541 | + self.consumers.append(consumer) |
1542 | + return consumer |
1543 | + |
1544 | + return self.ensure(_connect_error, _declare_consumer) |
1545 | + |
1546 | + def iterconsume(self, limit=None, timeout=None): |
1547 | + """Return an iterator that will consume from all queues/consumers.""" |
1548 | + |
1549 | + def _error_callback(exc): |
1550 | + if isinstance(exc, socket.timeout): |
1551 | + LOG.debug('Timed out waiting for RPC response: %s', exc) |
1552 | + raise rpc_common.Timeout() |
1553 | + else: |
1554 | + LOG.exception(_('Failed to consume message from queue: %s'), |
1555 | + exc) |
1556 | + self.do_consume = True |
1557 | + |
1558 | + def _consume(): |
1559 | + if self.do_consume: |
1560 | + queues_head = self.consumers[:-1] # not fanout. |
1561 | + queues_tail = self.consumers[-1] # fanout |
1562 | + for queue in queues_head: |
1563 | + queue.consume(nowait=True) |
1564 | + queues_tail.consume(nowait=False) |
1565 | + self.do_consume = False |
1566 | + return self.connection.drain_events(timeout=timeout) |
1567 | + |
1568 | + for iteration in itertools.count(0): |
1569 | + if limit and iteration >= limit: |
1570 | + raise StopIteration |
1571 | + yield self.ensure(_error_callback, _consume) |
1572 | + |
1573 | + def publisher_send(self, cls, topic, msg, timeout=None, retry=None, |
1574 | + **kwargs): |
1575 | + """Send to a publisher based on the publisher class.""" |
1576 | + |
1577 | + def _error_callback(exc): |
1578 | + log_info = {'topic': topic, 'err_str': exc} |
1579 | + LOG.exception(_("Failed to publish message to topic " |
1580 | + "'%(topic)s': %(err_str)s"), log_info) |
1581 | + |
1582 | + def _publish(): |
1583 | + publisher = cls(self.conf, self.channel, topic=topic, **kwargs) |
1584 | + publisher.send(msg, timeout) |
1585 | + |
1586 | + self.ensure(_error_callback, _publish, retry=retry) |
1587 | + |
1588 | + def declare_direct_consumer(self, topic, callback): |
1589 | + """Create a 'direct' queue. |
1590 | + In nova's use, this is generally a msg_id queue used for |
1591 | + responses for call/multicall |
1592 | + """ |
1593 | + self.declare_consumer(DirectConsumer, topic, callback) |
1594 | + |
1595 | + def declare_topic_consumer(self, exchange_name, topic, callback=None, |
1596 | + queue_name=None): |
1597 | + """Create a 'topic' consumer.""" |
1598 | + self.declare_consumer(functools.partial(TopicConsumer, |
1599 | + name=queue_name, |
1600 | + exchange_name=exchange_name, |
1601 | + ), |
1602 | + topic, callback) |
1603 | + |
1604 | + def declare_fanout_consumer(self, topic, callback): |
1605 | + """Create a 'fanout' consumer.""" |
1606 | + self.declare_consumer(FanoutConsumer, topic, callback) |
1607 | + |
1608 | + def direct_send(self, msg_id, msg): |
1609 | + """Send a 'direct' message.""" |
1610 | + self.publisher_send(DirectPublisher, msg_id, msg) |
1611 | + |
1612 | + def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): |
1613 | + """Send a 'topic' message.""" |
1614 | + self.publisher_send(TopicPublisher, topic, msg, timeout, |
1615 | + exchange_name=exchange_name, retry=retry) |
1616 | + |
1617 | + def fanout_send(self, topic, msg, retry=None): |
1618 | + """Send a 'fanout' message.""" |
1619 | + self.publisher_send(FanoutPublisher, topic, msg, retry=retry) |
1620 | + |
1621 | + def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): |
1622 | + """Send a notify message on a topic.""" |
1623 | + self.publisher_send(NotifyPublisher, topic, msg, timeout=None, |
1624 | + exchange_name=exchange_name, retry=retry, **kwargs) |
1625 | + |
1626 | + def consume(self, limit=None, timeout=None): |
1627 | + """Consume from all queues/consumers.""" |
1628 | + it = self.iterconsume(limit=limit, timeout=timeout) |
1629 | + while True: |
1630 | + try: |
1631 | + six.next(it) |
1632 | + except StopIteration: |
1633 | + return |
1634 | + |
1635 | + |
1636 | +class RabbitDriver(amqpdriver.AMQPDriverBase): |
1637 | + |
1638 | + def __init__(self, conf, url, |
1639 | + default_exchange=None, |
1640 | + allowed_remote_exmods=None): |
1641 | + conf.register_opts(rabbit_opts) |
1642 | + conf.register_opts(rpc_amqp.amqp_opts) |
1643 | + |
1644 | + connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection) |
1645 | + |
1646 | + super(RabbitDriver, self).__init__(conf, url, |
1647 | + connection_pool, |
1648 | + default_exchange, |
1649 | + allowed_remote_exmods) |
1650 | + |
1651 | + def require_features(self, requeue=True): |
1652 | + pass |
1653 | |
1654 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch' |
1655 | === added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/.timestamp' |
1656 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo' |
1657 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging' |
1658 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers' |
1659 | === added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py' |
1660 | --- .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000 |
1661 | +++ .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py 2015-12-17 11:03:34 +0000 |
1662 | @@ -0,0 +1,374 @@ |
1663 | +# Copyright 2010 United States Government as represented by the |
1664 | +# Administrator of the National Aeronautics and Space Administration. |
1665 | +# All Rights Reserved. |
1666 | +# Copyright 2011 Red Hat, Inc. |
1667 | +# |
1668 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
1669 | +# not use this file except in compliance with the License. You may obtain |
1670 | +# a copy of the License at |
1671 | +# |
1672 | +# http://www.apache.org/licenses/LICENSE-2.0 |
1673 | +# |
1674 | +# Unless required by applicable law or agreed to in writing, software |
1675 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
1676 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
1677 | +# License for the specific language governing permissions and limitations |
1678 | +# under the License. |
1679 | + |
1680 | +import copy |
1681 | +import logging |
1682 | +import sys |
1683 | +import time |
1684 | +import traceback |
1685 | + |
1686 | +import six |
1687 | + |
1688 | +from oslo import messaging |
1689 | +from oslo.messaging import _utils as utils |
1690 | +from oslo.messaging.openstack.common.gettextutils import _ |
1691 | +from oslo.messaging.openstack.common import jsonutils |
1692 | + |
1693 | +LOG = logging.getLogger(__name__) |
1694 | + |
1695 | +_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' |
1696 | + |
1697 | + |
1698 | +'''RPC Envelope Version. |
1699 | + |
1700 | +This version number applies to the top level structure of messages sent out. |
1701 | +It does *not* apply to the message payload, which must be versioned |
1702 | +independently. For example, when using rpc APIs, a version number is applied |
1703 | +for changes to the API being exposed over rpc. This version number is handled |
1704 | +in the rpc proxy and dispatcher modules. |
1705 | + |
1706 | +This version number applies to the message envelope that is used in the |
1707 | +serialization done inside the rpc layer. See serialize_msg() and |
1708 | +deserialize_msg(). |
1709 | + |
1710 | +The current message format (version 2.0) is very simple. It is: |
1711 | + |
1712 | + { |
1713 | + 'oslo.version': <RPC Envelope Version as a String>, |
1714 | + 'oslo.message': <Application Message Payload, JSON encoded> |
1715 | + } |
1716 | + |
1717 | +Message format version '1.0' is just considered to be the messages we sent |
1718 | +without a message envelope. |
1719 | + |
1720 | +So, the current message envelope just includes the envelope version. It may |
1721 | +eventually contain additional information, such as a signature for the message |
1722 | +payload. |
1723 | + |
1724 | +We will JSON encode the application message payload. The message envelope, |
1725 | +which includes the JSON encoded application message body, will be passed down |
1726 | +to the messaging libraries as a dict. |
1727 | +''' |
1728 | +_RPC_ENVELOPE_VERSION = '2.0' |
1729 | + |
1730 | +_VERSION_KEY = 'oslo.version' |
1731 | +_MESSAGE_KEY = 'oslo.message' |
1732 | + |
1733 | +_REMOTE_POSTFIX = '_Remote' |
1734 | + |
1735 | + |
1736 | +class RPCException(Exception): |
1737 | + msg_fmt = _("An unknown RPC related exception occurred.") |
1738 | + |
1739 | + def __init__(self, message=None, **kwargs): |
1740 | + self.kwargs = kwargs |
1741 | + |
1742 | + if not message: |
1743 | + try: |
1744 | + message = self.msg_fmt % kwargs |
1745 | + |
1746 | + except Exception: |
1747 | + # kwargs doesn't match a variable in the message |
1748 | + # log the issue and the kwargs |
1749 | + LOG.exception(_('Exception in string format operation')) |
1750 | + for name, value in six.iteritems(kwargs): |
1751 | + LOG.error("%s: %s", name, value) |
1752 | + # at least get the core message out if something happened |
1753 | + message = self.msg_fmt |
1754 | + |
1755 | + super(RPCException, self).__init__(message) |
1756 | + |
1757 | + |
1758 | +class Timeout(RPCException): |
1759 | + """Signifies that a timeout has occurred. |
1760 | + |
1761 | + This exception is raised if the rpc_response_timeout is reached while |
1762 | + waiting for a response from the remote side. |
1763 | + """ |
1764 | + msg_fmt = _('Timeout while waiting on RPC response - ' |
1765 | + 'topic: "%(topic)s", RPC method: "%(method)s" ' |
1766 | + 'info: "%(info)s"') |
1767 | + |
1768 | + def __init__(self, info=None, topic=None, method=None): |
1769 | + """Initiates Timeout object. |
1770 | + |
1771 | + :param info: Extra info to convey to the user |
1772 | + :param topic: The topic that the rpc call was sent to |
1773 | + :param rpc_method_name: The name of the rpc method being |
1774 | + called |
1775 | + """ |
1776 | + self.info = info |
1777 | + self.topic = topic |
1778 | + self.method = method |
1779 | + super(Timeout, self).__init__( |
1780 | + None, |
1781 | + info=info or _('<unknown>'), |
1782 | + topic=topic or _('<unknown>'), |
1783 | + method=method or _('<unknown>')) |
1784 | + |
1785 | + |
1786 | +class DuplicateMessageError(RPCException): |
1787 | + msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.") |
1788 | + |
1789 | + |
1790 | +class InvalidRPCConnectionReuse(RPCException): |
1791 | + msg_fmt = _("Invalid reuse of an RPC connection.") |
1792 | + |
1793 | + |
1794 | +class UnsupportedRpcVersion(RPCException): |
1795 | + msg_fmt = _("Specified RPC version, %(version)s, not supported by " |
1796 | + "this endpoint.") |
1797 | + |
1798 | + |
1799 | +class UnsupportedRpcEnvelopeVersion(RPCException): |
1800 | + msg_fmt = _("Specified RPC envelope version, %(version)s, " |
1801 | + "not supported by this endpoint.") |
1802 | + |
1803 | + |
1804 | +class RpcVersionCapError(RPCException): |
1805 | + msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") |
1806 | + |
1807 | + |
1808 | +class Connection(object): |
1809 | + """A connection, returned by rpc.create_connection(). |
1810 | + |
1811 | + This class represents a connection to the message bus used for rpc. |
1812 | + An instance of this class should never be created by users of the rpc API. |
1813 | + Use rpc.create_connection() instead. |
1814 | + """ |
1815 | + def close(self): |
1816 | + """Close the connection. |
1817 | + |
1818 | + This method must be called when the connection will no longer be used. |
1819 | + It will ensure that any resources associated with the connection, such |
1820 | + as a network connection, and cleaned up. |
1821 | + """ |
1822 | + raise NotImplementedError() |
1823 | + |
1824 | + |
1825 | +def _safe_log(log_func, msg, msg_data): |
1826 | + """Sanitizes the msg_data field before logging.""" |
1827 | + SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass'] |
1828 | + |
1829 | + def _fix_passwords(d): |
1830 | + """Sanitizes the password fields in the dictionary.""" |
1831 | + for k in six.iterkeys(d): |
1832 | + if k.lower().find('password') != -1: |
1833 | + d[k] = '<SANITIZED>' |
1834 | + elif k.lower() in SANITIZE: |
1835 | + d[k] = '<SANITIZED>' |
1836 | + elif isinstance(d[k], dict): |
1837 | + _fix_passwords(d[k]) |
1838 | + return d |
1839 | + |
1840 | + return log_func(msg, _fix_passwords(copy.deepcopy(msg_data))) |
1841 | + |
1842 | + |
1843 | +def serialize_remote_exception(failure_info, log_failure=True): |
1844 | + """Prepares exception data to be sent over rpc. |
1845 | + |
1846 | + Failure_info should be a sys.exc_info() tuple. |
1847 | + |
1848 | + """ |
1849 | + tb = traceback.format_exception(*failure_info) |
1850 | + failure = failure_info[1] |
1851 | + if log_failure: |
1852 | + LOG.error(_("Returning exception %s to caller"), |
1853 | + six.text_type(failure)) |
1854 | + LOG.error(tb) |
1855 | + |
1856 | + kwargs = {} |
1857 | + if hasattr(failure, 'kwargs'): |
1858 | + kwargs = failure.kwargs |
1859 | + |
1860 | + # NOTE(matiu): With cells, it's possible to re-raise remote, remote |
1861 | + # exceptions. Lets turn it back into the original exception type. |
1862 | + cls_name = six.text_type(failure.__class__.__name__) |
1863 | + mod_name = six.text_type(failure.__class__.__module__) |
1864 | + if (cls_name.endswith(_REMOTE_POSTFIX) and |
1865 | + mod_name.endswith(_REMOTE_POSTFIX)): |
1866 | + cls_name = cls_name[:-len(_REMOTE_POSTFIX)] |
1867 | + mod_name = mod_name[:-len(_REMOTE_POSTFIX)] |
1868 | + |
1869 | + data = { |
1870 | + 'class': cls_name, |
1871 | + 'module': mod_name, |
1872 | + 'message': six.text_type(failure), |
1873 | + 'tb': tb, |
1874 | + 'args': failure.args, |
1875 | + 'kwargs': kwargs |
1876 | + } |
1877 | + |
1878 | + json_data = jsonutils.dumps(data) |
1879 | + |
1880 | + return json_data |
1881 | + |
1882 | + |
1883 | +def deserialize_remote_exception(data, allowed_remote_exmods): |
1884 | + failure = jsonutils.loads(six.text_type(data)) |
1885 | + |
1886 | + trace = failure.get('tb', []) |
1887 | + message = failure.get('message', "") + "\n" + "\n".join(trace) |
1888 | + name = failure.get('class') |
1889 | + module = failure.get('module') |
1890 | + |
1891 | + # NOTE(ameade): We DO NOT want to allow just any module to be imported, in |
1892 | + # order to prevent arbitrary code execution. |
1893 | + if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods: |
1894 | + return messaging.RemoteError(name, failure.get('message'), trace) |
1895 | + |
1896 | + try: |
1897 | + __import__(module) |
1898 | + mod = sys.modules[module] |
1899 | + klass = getattr(mod, name) |
1900 | + if not issubclass(klass, Exception): |
1901 | + raise TypeError("Can only deserialize Exceptions") |
1902 | + |
1903 | + failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) |
1904 | + except (AttributeError, TypeError, ImportError): |
1905 | + return messaging.RemoteError(name, failure.get('message'), trace) |
1906 | + |
1907 | + ex_type = type(failure) |
1908 | + str_override = lambda self: message |
1909 | + new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,), |
1910 | + {'__str__': str_override, '__unicode__': str_override}) |
1911 | + new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX) |
1912 | + try: |
1913 | + # NOTE(ameade): Dynamically create a new exception type and swap it in |
1914 | + # as the new type for the exception. This only works on user defined |
1915 | + # Exceptions and not core Python exceptions. This is important because |
1916 | + # we cannot necessarily change an exception message so we must override |
1917 | + # the __str__ method. |
1918 | + failure.__class__ = new_ex_type |
1919 | + except TypeError: |
1920 | + # NOTE(ameade): If a core exception then just add the traceback to the |
1921 | + # first exception argument. |
1922 | + failure.args = (message,) + failure.args[1:] |
1923 | + return failure |
1924 | + |
1925 | + |
1926 | +class CommonRpcContext(object): |
1927 | + def __init__(self, **kwargs): |
1928 | + self.values = kwargs |
1929 | + |
1930 | + def __getattr__(self, key): |
1931 | + try: |
1932 | + return self.values[key] |
1933 | + except KeyError: |
1934 | + raise AttributeError(key) |
1935 | + |
1936 | + def to_dict(self): |
1937 | + return copy.deepcopy(self.values) |
1938 | + |
1939 | + @classmethod |
1940 | + def from_dict(cls, values): |
1941 | + return cls(**values) |
1942 | + |
1943 | + def deepcopy(self): |
1944 | + return self.from_dict(self.to_dict()) |
1945 | + |
1946 | + def update_store(self): |
1947 | + # local.store.context = self |
1948 | + pass |
1949 | + |
1950 | + |
1951 | +class ClientException(Exception): |
1952 | + """Encapsulates actual exception expected to be hit by a RPC proxy object. |
1953 | + |
1954 | + Merely instantiating it records the current exception information, which |
1955 | + will be passed back to the RPC client without exceptional logging. |
1956 | + """ |
1957 | + def __init__(self): |
1958 | + self._exc_info = sys.exc_info() |
1959 | + |
1960 | + |
1961 | +def serialize_msg(raw_msg): |
1962 | + # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more |
1963 | + # information about this format. |
1964 | + msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, |
1965 | + _MESSAGE_KEY: jsonutils.dumps(raw_msg)} |
1966 | + |
1967 | + return msg |
1968 | + |
1969 | + |
1970 | +def deserialize_msg(msg): |
1971 | + # NOTE(russellb): Hang on to your hats, this road is about to |
1972 | + # get a little bumpy. |
1973 | + # |
1974 | + # Robustness Principle: |
1975 | + # "Be strict in what you send, liberal in what you accept." |
1976 | + # |
1977 | + # At this point we have to do a bit of guessing about what it |
1978 | + # is we just received. Here is the set of possibilities: |
1979 | + # |
1980 | + # 1) We received a dict. This could be 2 things: |
1981 | + # |
1982 | + # a) Inspect it to see if it looks like a standard message envelope. |
1983 | + # If so, great! |
1984 | + # |
1985 | + # b) If it doesn't look like a standard message envelope, it could either |
1986 | + # be a notification, or a message from before we added a message |
1987 | + # envelope (referred to as version 1.0). |
1988 | + # Just return the message as-is. |
1989 | + # |
1990 | + # 2) It's any other non-dict type. Just return it and hope for the best. |
1991 | + # This case covers return values from rpc.call() from before message |
1992 | + # envelopes were used. (messages to call a method were always a dict) |
1993 | + |
1994 | + if not isinstance(msg, dict): |
1995 | + # See #2 above. |
1996 | + return msg |
1997 | + |
1998 | + base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) |
1999 | + if not all(map(lambda key: key in msg, base_envelope_keys)): |
2000 | + # See #1.b above. |
2001 | + return msg |
2002 | + |
2003 | + # At this point we think we have the message envelope |
2004 | + # format we were expecting. (#1.a above) |
2005 | + |
2006 | + if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION, |
2007 | + msg[_VERSION_KEY]): |
2008 | + raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) |
2009 | + |
2010 | + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) |
2011 | + |
2012 | + return raw_msg |
2013 | + |
2014 | + |
2015 | +class DecayingTimer(object): |
2016 | + def __init__(self, duration=None): |
2017 | + self._duration = duration |
2018 | + self._ends_at = None |
2019 | + |
2020 | + def start(self): |
2021 | + if self._duration is not None: |
2022 | + self._ends_at = time.time() + max(0, self._duration) |
2023 | + |
2024 | + def check_return(self, timeout_callback, *args, **kwargs): |
2025 | + if self._duration is None: |
2026 | + return None |
2027 | + if self._ends_at is None: |
2028 | + raise RuntimeError(_("Can not check/return a timeout from a timer" |
2029 | + " that has not been started.")) |
2030 | + |
2031 | + maximum = kwargs.pop('maximum', None) |
2032 | + left = self._ends_at - time.time() |
2033 | + if left <= 0: |
2034 | + timeout_callback(*args, **kwargs) |
2035 | + |
2036 | + return left if maximum is None else min(left, maximum) |
2037 | |
2038 | === added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests' |
2039 | === added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py' |
2040 | --- .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py 1970-01-01 00:00:00 +0000 |
2041 | +++ .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py 2015-12-17 11:03:34 +0000 |
2042 | @@ -0,0 +1,49 @@ |
2043 | + |
2044 | +# Copyright 2013 Red Hat, Inc. |
2045 | +# |
2046 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
2047 | +# not use this file except in compliance with the License. You may obtain |
2048 | +# a copy of the License at |
2049 | +# |
2050 | +# http://www.apache.org/licenses/LICENSE-2.0 |
2051 | +# |
2052 | +# Unless required by applicable law or agreed to in writing, software |
2053 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
2054 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
2055 | +# License for the specific language governing permissions and limitations |
2056 | +# under the License. |
2057 | + |
2058 | +from oslo.messaging import _utils as utils |
2059 | +from tests import utils as test_utils |
2060 | + |
2061 | + |
2062 | +class VersionIsCompatibleTestCase(test_utils.BaseTestCase): |
2063 | + def test_version_is_compatible_same(self): |
2064 | + self.assertTrue(utils.version_is_compatible('1.23', '1.23')) |
2065 | + |
2066 | + def test_version_is_compatible_newer_minor(self): |
2067 | + self.assertTrue(utils.version_is_compatible('1.24', '1.23')) |
2068 | + |
2069 | + def test_version_is_compatible_older_minor(self): |
2070 | + self.assertFalse(utils.version_is_compatible('1.22', '1.23')) |
2071 | + |
2072 | + def test_version_is_compatible_major_difference1(self): |
2073 | + self.assertFalse(utils.version_is_compatible('2.23', '1.23')) |
2074 | + |
2075 | + def test_version_is_compatible_major_difference2(self): |
2076 | + self.assertFalse(utils.version_is_compatible('1.23', '2.23')) |
2077 | + |
2078 | + def test_version_is_compatible_newer_rev(self): |
2079 | + self.assertFalse(utils.version_is_compatible('1.23', '1.23.1')) |
2080 | + |
2081 | + def test_version_is_compatible_newer_rev_both(self): |
2082 | + self.assertFalse(utils.version_is_compatible('1.23.1', '1.23.2')) |
2083 | + |
2084 | + def test_version_is_compatible_older_rev_both(self): |
2085 | + self.assertTrue(utils.version_is_compatible('1.23.2', '1.23.1')) |
2086 | + |
2087 | + def test_version_is_compatible_older_rev(self): |
2088 | + self.assertTrue(utils.version_is_compatible('1.24', '1.23.1')) |
2089 | + |
2090 | + def test_version_is_compatible_no_rev_is_zero(self): |
2091 | + self.assertTrue(utils.version_is_compatible('1.23.0', '1.23')) |
2092 | |
2093 | === added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch' |
2094 | === added file '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/.timestamp' |
2095 | === added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo' |
2096 | === added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging' |
2097 | === added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers' |
2098 | === added file '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py' |
2099 | --- .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py 1970-01-01 00:00:00 +0000 |
2100 | +++ .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py 2015-12-17 11:03:34 +0000 |
2101 | @@ -0,0 +1,729 @@ |
2102 | +# Copyright 2011 OpenStack Foundation |
2103 | +# Copyright 2011 - 2012, Red Hat, Inc. |
2104 | +# |
2105 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
2106 | +# not use this file except in compliance with the License. You may obtain |
2107 | +# a copy of the License at |
2108 | +# |
2109 | +# http://www.apache.org/licenses/LICENSE-2.0 |
2110 | +# |
2111 | +# Unless required by applicable law or agreed to in writing, software |
2112 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
2113 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
2114 | +# License for the specific language governing permissions and limitations |
2115 | +# under the License. |
2116 | + |
2117 | +import functools |
2118 | +import itertools |
2119 | +import logging |
2120 | +import random |
2121 | +import time |
2122 | + |
2123 | +import six |
2124 | + |
2125 | +from oslo.config import cfg |
2126 | +from oslo.messaging._drivers import amqp as rpc_amqp |
2127 | +from oslo.messaging._drivers import amqpdriver |
2128 | +from oslo.messaging._drivers import common as rpc_common |
2129 | +from oslo.messaging import exceptions |
2130 | +from oslo.messaging.openstack.common.gettextutils import _ |
2131 | +from oslo.messaging.openstack.common import jsonutils |
2132 | +from oslo.utils import importutils |
2133 | +from oslo.utils import netutils |
2134 | + |
2135 | +qpid_codec = importutils.try_import("qpid.codec010") |
2136 | +qpid_messaging = importutils.try_import("qpid.messaging") |
2137 | +qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") |
2138 | + |
2139 | +LOG = logging.getLogger(__name__) |
2140 | + |
2141 | +qpid_opts = [ |
2142 | + cfg.StrOpt('qpid_hostname', |
2143 | + default='localhost', |
2144 | + help='Qpid broker hostname.'), |
2145 | + cfg.IntOpt('qpid_port', |
2146 | + default=5672, |
2147 | + help='Qpid broker port.'), |
2148 | + cfg.ListOpt('qpid_hosts', |
2149 | + default=['$qpid_hostname:$qpid_port'], |
2150 | + help='Qpid HA cluster host:port pairs.'), |
2151 | + cfg.StrOpt('qpid_username', |
2152 | + default='', |
2153 | + help='Username for Qpid connection.'), |
2154 | + cfg.StrOpt('qpid_password', |
2155 | + default='', |
2156 | + help='Password for Qpid connection.', |
2157 | + secret=True), |
2158 | + cfg.StrOpt('qpid_sasl_mechanisms', |
2159 | + default='', |
2160 | + help='Space separated list of SASL mechanisms to use for ' |
2161 | + 'auth.'), |
2162 | + cfg.IntOpt('qpid_heartbeat', |
2163 | + default=60, |
2164 | + help='Seconds between connection keepalive heartbeats.'), |
2165 | + cfg.StrOpt('qpid_protocol', |
2166 | + default='tcp', |
2167 | + help="Transport to use, either 'tcp' or 'ssl'."), |
2168 | + cfg.BoolOpt('qpid_tcp_nodelay', |
2169 | + default=True, |
2170 | + help='Whether to disable the Nagle algorithm.'), |
2171 | + cfg.IntOpt('qpid_receiver_capacity', |
2172 | + default=1, |
2173 | + help='The number of prefetched messages held by receiver.'), |
2174 | + # NOTE(russellb) If any additional versions are added (beyond 1 and 2), |
2175 | + # this file could probably use some additional refactoring so that the |
2176 | + # differences between each version are split into different classes. |
2177 | + cfg.IntOpt('qpid_topology_version', |
2178 | + default=1, |
2179 | + help="The qpid topology version to use. Version 1 is what " |
2180 | + "was originally used by impl_qpid. Version 2 includes " |
2181 | + "some backwards-incompatible changes that allow broker " |
2182 | + "federation to work. Users should update to version 2 " |
2183 | + "when they are able to take everything down, as it " |
2184 | + "requires a clean break."), |
2185 | +] |
2186 | + |
2187 | +JSON_CONTENT_TYPE = 'application/json; charset=utf8' |
2188 | + |
2189 | + |
2190 | +def raise_invalid_topology_version(conf): |
2191 | + msg = (_("Invalid value for qpid_topology_version: %d") % |
2192 | + conf.qpid_topology_version) |
2193 | + LOG.error(msg) |
2194 | + raise Exception(msg) |
2195 | + |
2196 | + |
2197 | +class QpidMessage(dict): |
2198 | + def __init__(self, session, raw_message): |
2199 | + super(QpidMessage, self).__init__( |
2200 | + rpc_common.deserialize_msg(raw_message.content)) |
2201 | + self._raw_message = raw_message |
2202 | + self._session = session |
2203 | + |
2204 | + def acknowledge(self): |
2205 | + self._session.acknowledge(self._raw_message) |
2206 | + |
2207 | + def requeue(self): |
2208 | + pass |
2209 | + |
2210 | + |
2211 | +class ConsumerBase(object): |
2212 | + """Consumer base class.""" |
2213 | + |
2214 | + def __init__(self, conf, session, callback, node_name, node_opts, |
2215 | + link_name, link_opts): |
2216 | + """Declare a queue on an amqp session. |
2217 | + |
2218 | + 'session' is the amqp session to use |
2219 | + 'callback' is the callback to call when messages are received |
2220 | + 'node_name' is the first part of the Qpid address string, before ';' |
2221 | + 'node_opts' will be applied to the "x-declare" section of "node" |
2222 | + in the address string. |
2223 | + 'link_name' goes into the "name" field of the "link" in the address |
2224 | + string |
2225 | + 'link_opts' will be applied to the "x-declare" section of "link" |
2226 | + in the address string. |
2227 | + """ |
2228 | + self.callback = callback |
2229 | + self.receiver = None |
2230 | + self.rcv_capacity = conf.qpid_receiver_capacity |
2231 | + self.session = None |
2232 | + |
2233 | + if conf.qpid_topology_version == 1: |
2234 | + addr_opts = { |
2235 | + "create": "always", |
2236 | + "node": { |
2237 | + "type": "topic", |
2238 | + "x-declare": { |
2239 | + "durable": True, |
2240 | + "auto-delete": True, |
2241 | + }, |
2242 | + }, |
2243 | + "link": { |
2244 | + "durable": True, |
2245 | + "x-declare": { |
2246 | + "durable": False, |
2247 | + "auto-delete": True, |
2248 | + "exclusive": False, |
2249 | + }, |
2250 | + }, |
2251 | + } |
2252 | + addr_opts["node"]["x-declare"].update(node_opts) |
2253 | + elif conf.qpid_topology_version == 2: |
2254 | + addr_opts = { |
2255 | + "link": { |
2256 | + "x-declare": { |
2257 | + "auto-delete": True, |
2258 | + "exclusive": False, |
2259 | + }, |
2260 | + }, |
2261 | + } |
2262 | + else: |
2263 | + raise_invalid_topology_version(conf) |
2264 | + |
2265 | + addr_opts["link"]["x-declare"].update(link_opts) |
2266 | + if link_name: |
2267 | + addr_opts["link"]["name"] = link_name |
2268 | + |
2269 | + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) |
2270 | + |
2271 | + self.connect(session) |
2272 | + |
2273 | + def connect(self, session): |
2274 | + """Declare the receiver on connect.""" |
2275 | + self._declare_receiver(session) |
2276 | + |
2277 | + def reconnect(self, session): |
2278 | + """Re-declare the receiver after a Qpid reconnect.""" |
2279 | + self._declare_receiver(session) |
2280 | + |
2281 | + def _declare_receiver(self, session): |
2282 | + self.session = session |
2283 | + self.receiver = session.receiver(self.address) |
2284 | + self.receiver.capacity = self.rcv_capacity |
2285 | + |
2286 | + def _unpack_json_msg(self, msg): |
2287 | + """Load the JSON data in msg if msg.content_type indicates that it |
2288 | + is necessary. Put the loaded data back into msg.content and |
2289 | + update msg.content_type appropriately. |
2290 | + |
2291 | + A Qpid Message containing a dict will have a content_type of |
2292 | + 'amqp/map', whereas one containing a string that needs to be converted |
2293 | + back from JSON will have a content_type of JSON_CONTENT_TYPE. |
2294 | + |
2295 | + :param msg: a Qpid Message object |
2296 | + :returns: None |
2297 | + """ |
2298 | + if msg.content_type == JSON_CONTENT_TYPE: |
2299 | + msg.content = jsonutils.loads(msg.content) |
2300 | + msg.content_type = 'amqp/map' |
2301 | + |
2302 | + def consume(self): |
2303 | + """Fetch the message and pass it to the callback object.""" |
2304 | + message = self.receiver.fetch() |
2305 | + try: |
2306 | + self._unpack_json_msg(message) |
2307 | + self.callback(QpidMessage(self.session, message)) |
2308 | + except Exception: |
2309 | + LOG.exception(_("Failed to process message... skipping it.")) |
2310 | + self.session.acknowledge(message) |
2311 | + |
2312 | + def get_receiver(self): |
2313 | + return self.receiver |
2314 | + |
2315 | + def get_node_name(self): |
2316 | + return self.address.split(';')[0] |
2317 | + |
2318 | + |
2319 | +class DirectConsumer(ConsumerBase): |
2320 | + """Queue/consumer class for 'direct'.""" |
2321 | + |
2322 | + def __init__(self, conf, session, msg_id, callback): |
2323 | + """Init a 'direct' queue. |
2324 | + |
2325 | + 'session' is the amqp session to use |
2326 | + 'msg_id' is the msg_id to listen on |
2327 | + 'callback' is the callback to call when messages are received |
2328 | + """ |
2329 | + |
2330 | + link_opts = { |
2331 | + "auto-delete": conf.amqp_auto_delete, |
2332 | + "exclusive": True, |
2333 | + "durable": conf.amqp_durable_queues, |
2334 | + } |
2335 | + |
2336 | + if conf.qpid_topology_version == 1: |
2337 | + node_name = "%s/%s" % (msg_id, msg_id) |
2338 | + node_opts = {"type": "direct"} |
2339 | + link_name = msg_id |
2340 | + elif conf.qpid_topology_version == 2: |
2341 | + node_name = "amq.direct/%s" % msg_id |
2342 | + node_opts = {} |
2343 | + link_name = msg_id |
2344 | + else: |
2345 | + raise_invalid_topology_version(conf) |
2346 | + |
2347 | + super(DirectConsumer, self).__init__(conf, session, callback, |
2348 | + node_name, node_opts, link_name, |
2349 | + link_opts) |
2350 | + |
2351 | + |
2352 | +class TopicConsumer(ConsumerBase): |
2353 | + """Consumer class for 'topic'.""" |
2354 | + |
2355 | + def __init__(self, conf, session, topic, callback, exchange_name, |
2356 | + name=None): |
2357 | + """Init a 'topic' queue. |
2358 | + |
2359 | + :param session: the amqp session to use |
2360 | + :param topic: is the topic to listen on |
2361 | + :paramtype topic: str |
2362 | + :param callback: the callback to call when messages are received |
2363 | + :param name: optional queue name, defaults to topic |
2364 | + """ |
2365 | + |
2366 | + link_opts = { |
2367 | + "auto-delete": conf.amqp_auto_delete, |
2368 | + "durable": conf.amqp_durable_queues, |
2369 | + } |
2370 | + |
2371 | + if conf.qpid_topology_version == 1: |
2372 | + node_name = "%s/%s" % (exchange_name, topic) |
2373 | + elif conf.qpid_topology_version == 2: |
2374 | + node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) |
2375 | + else: |
2376 | + raise_invalid_topology_version(conf) |
2377 | + |
2378 | + super(TopicConsumer, self).__init__(conf, session, callback, node_name, |
2379 | + {}, name or topic, link_opts) |
2380 | + |
2381 | + |
2382 | +class FanoutConsumer(ConsumerBase): |
2383 | + """Consumer class for 'fanout'.""" |
2384 | + |
2385 | + def __init__(self, conf, session, topic, callback): |
2386 | + """Init a 'fanout' queue. |
2387 | + |
2388 | + 'session' is the amqp session to use |
2389 | + 'topic' is the topic to listen on |
2390 | + 'callback' is the callback to call when messages are received |
2391 | + """ |
2392 | + self.conf = conf |
2393 | + |
2394 | + link_opts = {"exclusive": True} |
2395 | + |
2396 | + if conf.qpid_topology_version == 1: |
2397 | + node_name = "%s_fanout" % topic |
2398 | + node_opts = {"durable": False, "type": "fanout"} |
2399 | + elif conf.qpid_topology_version == 2: |
2400 | + node_name = "amq.topic/fanout/%s" % topic |
2401 | + node_opts = {} |
2402 | + else: |
2403 | + raise_invalid_topology_version(conf) |
2404 | + |
2405 | + super(FanoutConsumer, self).__init__(conf, session, callback, |
2406 | + node_name, node_opts, None, |
2407 | + link_opts) |
2408 | + |
2409 | + |
2410 | +class Publisher(object): |
2411 | + """Base Publisher class.""" |
2412 | + |
2413 | + def __init__(self, conf, session, node_name, node_opts=None): |
2414 | + """Init the Publisher class with the exchange_name, routing_key, |
2415 | + and other options |
2416 | + """ |
2417 | + self.sender = None |
2418 | + self.session = session |
2419 | + |
2420 | + if conf.qpid_topology_version == 1: |
2421 | + addr_opts = { |
2422 | + "create": "always", |
2423 | + "node": { |
2424 | + "type": "topic", |
2425 | + "x-declare": { |
2426 | + "durable": False, |
2427 | + # auto-delete isn't implemented for exchanges in qpid, |
2428 | + # but put in here anyway |
2429 | + "auto-delete": True, |
2430 | + }, |
2431 | + }, |
2432 | + } |
2433 | + if node_opts: |
2434 | + addr_opts["node"]["x-declare"].update(node_opts) |
2435 | + |
2436 | + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) |
2437 | + elif conf.qpid_topology_version == 2: |
2438 | + self.address = node_name |
2439 | + else: |
2440 | + raise_invalid_topology_version(conf) |
2441 | + |
2442 | + self.reconnect(session) |
2443 | + |
2444 | + def reconnect(self, session): |
2445 | + """Re-establish the Sender after a reconnection.""" |
2446 | + self.sender = session.sender(self.address) |
2447 | + |
2448 | + def _pack_json_msg(self, msg): |
2449 | + """Qpid cannot serialize dicts containing strings longer than 65535 |
2450 | + characters. This function dumps the message content to a JSON |
2451 | + string, which Qpid is able to handle. |
2452 | + |
2453 | + :param msg: May be either a Qpid Message object or a bare dict. |
2454 | + :returns: A Qpid Message with its content field JSON encoded. |
2455 | + """ |
2456 | + try: |
2457 | + msg.content = jsonutils.dumps(msg.content) |
2458 | + except AttributeError: |
2459 | + # Need to have a Qpid message so we can set the content_type. |
2460 | + msg = qpid_messaging.Message(jsonutils.dumps(msg)) |
2461 | + msg.content_type = JSON_CONTENT_TYPE |
2462 | + return msg |
2463 | + |
2464 | + def send(self, msg): |
2465 | + """Send a message.""" |
2466 | + try: |
2467 | + # Check if Qpid can encode the message |
2468 | + check_msg = msg |
2469 | + if not hasattr(check_msg, 'content_type'): |
2470 | + check_msg = qpid_messaging.Message(msg) |
2471 | + content_type = check_msg.content_type |
2472 | + enc, dec = qpid_messaging.message.get_codec(content_type) |
2473 | + enc(check_msg.content) |
2474 | + except qpid_codec.CodecException: |
2475 | + # This means the message couldn't be serialized as a dict. |
2476 | + msg = self._pack_json_msg(msg) |
2477 | + self.sender.send(msg) |
2478 | + |
2479 | + |
2480 | +class DirectPublisher(Publisher): |
2481 | + """Publisher class for 'direct'.""" |
2482 | + def __init__(self, conf, session, topic): |
2483 | + """Init a 'direct' publisher.""" |
2484 | + |
2485 | + if conf.qpid_topology_version == 1: |
2486 | + node_name = "%s/%s" % (topic, topic) |
2487 | + node_opts = {"type": "direct"} |
2488 | + elif conf.qpid_topology_version == 2: |
2489 | + node_name = "amq.direct/%s" % topic |
2490 | + node_opts = {} |
2491 | + else: |
2492 | + raise_invalid_topology_version(conf) |
2493 | + |
2494 | + super(DirectPublisher, self).__init__(conf, session, node_name, |
2495 | + node_opts) |
2496 | + |
2497 | + |
2498 | +class TopicPublisher(Publisher): |
2499 | + """Publisher class for 'topic'.""" |
2500 | + def __init__(self, conf, session, exchange_name, topic): |
2501 | + """Init a 'topic' publisher. |
2502 | + """ |
2503 | + if conf.qpid_topology_version == 1: |
2504 | + node_name = "%s/%s" % (exchange_name, topic) |
2505 | + elif conf.qpid_topology_version == 2: |
2506 | + node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) |
2507 | + else: |
2508 | + raise_invalid_topology_version(conf) |
2509 | + |
2510 | + super(TopicPublisher, self).__init__(conf, session, node_name) |
2511 | + |
2512 | + |
2513 | +class FanoutPublisher(Publisher): |
2514 | + """Publisher class for 'fanout'.""" |
2515 | + def __init__(self, conf, session, topic): |
2516 | + """Init a 'fanout' publisher. |
2517 | + """ |
2518 | + |
2519 | + if conf.qpid_topology_version == 1: |
2520 | + node_name = "%s_fanout" % topic |
2521 | + node_opts = {"type": "fanout"} |
2522 | + elif conf.qpid_topology_version == 2: |
2523 | + node_name = "amq.topic/fanout/%s" % topic |
2524 | + node_opts = {} |
2525 | + else: |
2526 | + raise_invalid_topology_version(conf) |
2527 | + |
2528 | + super(FanoutPublisher, self).__init__(conf, session, node_name, |
2529 | + node_opts) |
2530 | + |
2531 | + |
2532 | +class NotifyPublisher(Publisher): |
2533 | + """Publisher class for notifications.""" |
2534 | + def __init__(self, conf, session, exchange_name, topic): |
2535 | + """Init a 'topic' publisher. |
2536 | + """ |
2537 | + node_opts = {"durable": True} |
2538 | + |
2539 | + if conf.qpid_topology_version == 1: |
2540 | + node_name = "%s/%s" % (exchange_name, topic) |
2541 | + elif conf.qpid_topology_version == 2: |
2542 | + node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) |
2543 | + else: |
2544 | + raise_invalid_topology_version(conf) |
2545 | + |
2546 | + super(NotifyPublisher, self).__init__(conf, session, node_name, |
2547 | + node_opts) |
2548 | + |
2549 | + |
2550 | +class Connection(object): |
2551 | + """Connection object.""" |
2552 | + |
2553 | + pools = {} |
2554 | + |
2555 | + def __init__(self, conf, url): |
2556 | + if not qpid_messaging: |
2557 | + raise ImportError("Failed to import qpid.messaging") |
2558 | + |
2559 | + self.connection = None |
2560 | + self.session = None |
2561 | + self.consumers = {} |
2562 | + self.conf = conf |
2563 | + |
2564 | + self.brokers_params = [] |
2565 | + if url.hosts: |
2566 | + for host in url.hosts: |
2567 | + params = { |
2568 | + 'username': host.username or '', |
2569 | + 'password': host.password or '', |
2570 | + } |
2571 | + if host.port is not None: |
2572 | + params['host'] = '%s:%d' % (host.hostname, host.port) |
2573 | + else: |
2574 | + params['host'] = host.hostname |
2575 | + self.brokers_params.append(params) |
2576 | + else: |
2577 | + # Old configuration format |
2578 | + for adr in self.conf.qpid_hosts: |
2579 | + hostname, port = netutils.parse_host_port( |
2580 | + adr, default_port=5672) |
2581 | + |
2582 | + params = { |
2583 | + 'host': '%s:%d' % (hostname, port), |
2584 | + 'username': self.conf.qpid_username, |
2585 | + 'password': self.conf.qpid_password, |
2586 | + } |
2587 | + self.brokers_params.append(params) |
2588 | + |
2589 | + random.shuffle(self.brokers_params) |
2590 | + self.brokers = itertools.cycle(self.brokers_params) |
2591 | + |
2592 | + self.reconnect() |
2593 | + |
2594 | + def _connect(self, broker): |
2595 | + # Create the connection - this does not open the connection |
2596 | + self.connection = qpid_messaging.Connection(broker['host']) |
2597 | + |
2598 | + # Check if flags are set and if so set them for the connection |
2599 | + # before we call open |
2600 | + self.connection.username = broker['username'] |
2601 | + self.connection.password = broker['password'] |
2602 | + |
2603 | + self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms |
2604 | + # Reconnection is done by self.reconnect() |
2605 | + self.connection.reconnect = False |
2606 | + self.connection.heartbeat = self.conf.qpid_heartbeat |
2607 | + self.connection.transport = self.conf.qpid_protocol |
2608 | + self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay |
2609 | + self.connection.open() |
2610 | + |
2611 | + def _register_consumer(self, consumer): |
2612 | + self.consumers[six.text_type(consumer.get_receiver())] = consumer |
2613 | + |
2614 | + def _lookup_consumer(self, receiver): |
2615 | + return self.consumers[six.text_type(receiver)] |
2616 | + |
2617 | + def _disconnect(self): |
2618 | + # Close the session if necessary |
2619 | + if self.connection is not None and self.connection.opened(): |
2620 | + try: |
2621 | + self.connection.close() |
2622 | + except qpid_exceptions.MessagingError: |
2623 | + pass |
2624 | + self.connection = None |
2625 | + |
2626 | + def reconnect(self, retry=None): |
2627 | + """Handles reconnecting and re-establishing sessions and queues. |
2628 | + Will retry up to retry number of times. |
2629 | + retry = None or -1 means to retry forever |
2630 | + retry = 0 means no retry |
2631 | + retry = N means N retries |
2632 | + """ |
2633 | + delay = 1 |
2634 | + attempt = 0 |
2635 | + loop_forever = False |
2636 | + if retry is None or retry < 0: |
2637 | + loop_forever = True |
2638 | + |
2639 | + while True: |
2640 | + self._disconnect() |
2641 | + |
2642 | + attempt += 1 |
2643 | + broker = six.next(self.brokers) |
2644 | + try: |
2645 | + self._connect(broker) |
2646 | + except qpid_exceptions.MessagingError as e: |
2647 | + msg_dict = dict(e=e, |
2648 | + delay=delay, |
2649 | + retry=retry, |
2650 | + broker=broker) |
2651 | + if not loop_forever and attempt > retry: |
2652 | + msg = _('Unable to connect to AMQP server on ' |
2653 | + '%(broker)s after %(retry)d ' |
2654 | + 'tries: %(e)s') % msg_dict |
2655 | + LOG.error(msg) |
2656 | + raise exceptions.MessageDeliveryFailure(msg) |
2657 | + else: |
2658 | + msg = _("Unable to connect to AMQP server on %(broker)s: " |
2659 | + "%(e)s. Sleeping %(delay)s seconds") % msg_dict |
2660 | + LOG.error(msg) |
2661 | + time.sleep(delay) |
2662 | + delay = min(delay + 1, 5) |
2663 | + else: |
2664 | + LOG.info(_('Connected to AMQP server on %s'), broker['host']) |
2665 | + break |
2666 | + |
2667 | + self.session = self.connection.session() |
2668 | + |
2669 | + if self.consumers: |
2670 | + consumers = self.consumers |
2671 | + self.consumers = {} |
2672 | + |
2673 | + for consumer in six.itervalues(consumers): |
2674 | + consumer.reconnect(self.session) |
2675 | + self._register_consumer(consumer) |
2676 | + |
2677 | + LOG.debug("Re-established AMQP queues") |
2678 | + |
2679 | + def ensure(self, error_callback, method, retry=None): |
2680 | + while True: |
2681 | + try: |
2682 | + return method() |
2683 | + except (qpid_exceptions.Empty, |
2684 | + qpid_exceptions.MessagingError) as e: |
2685 | + if error_callback: |
2686 | + error_callback(e) |
2687 | + self.reconnect(retry=retry) |
2688 | + |
2689 | + def close(self): |
2690 | + """Close/release this connection.""" |
2691 | + try: |
2692 | + self.connection.close() |
2693 | + except Exception: |
2694 | + # NOTE(dripton) Logging exceptions that happen during cleanup just |
2695 | + # causes confusion; there's really nothing useful we can do with |
2696 | + # them. |
2697 | + pass |
2698 | + self.connection = None |
2699 | + |
2700 | + def reset(self): |
2701 | + """Reset a connection so it can be used again.""" |
2702 | + self.session.close() |
2703 | + self.session = self.connection.session() |
2704 | + self.consumers = {} |
2705 | + |
2706 | + def declare_consumer(self, consumer_cls, topic, callback): |
2707 | + """Create a Consumer using the class that was passed in and |
2708 | + add it to our list of consumers |
2709 | + """ |
2710 | + def _connect_error(exc): |
2711 | + log_info = {'topic': topic, 'err_str': exc} |
2712 | + LOG.error(_("Failed to declare consumer for topic '%(topic)s': " |
2713 | + "%(err_str)s"), log_info) |
2714 | + |
2715 | + def _declare_consumer(): |
2716 | + consumer = consumer_cls(self.conf, self.session, topic, callback) |
2717 | + self._register_consumer(consumer) |
2718 | + return consumer |
2719 | + |
2720 | + return self.ensure(_connect_error, _declare_consumer) |
2721 | + |
2722 | + def iterconsume(self, limit=None, timeout=None): |
2723 | + """Return an iterator that will consume from all queues/consumers.""" |
2724 | + |
2725 | + def _error_callback(exc): |
2726 | + if isinstance(exc, qpid_exceptions.Empty): |
2727 | + LOG.debug('Timed out waiting for RPC response: %s', exc) |
2728 | + raise rpc_common.Timeout() |
2729 | + else: |
2730 | + LOG.exception(_('Failed to consume message from queue: %s'), |
2731 | + exc) |
2732 | + |
2733 | + def _consume(): |
2734 | + nxt_receiver = self.session.next_receiver(timeout=timeout) |
2735 | + try: |
2736 | + self._lookup_consumer(nxt_receiver).consume() |
2737 | + except Exception: |
2738 | + LOG.exception(_("Error processing message. Skipping it.")) |
2739 | + |
2740 | + for iteration in itertools.count(0): |
2741 | + if limit and iteration >= limit: |
2742 | + raise StopIteration |
2743 | + yield self.ensure(_error_callback, _consume) |
2744 | + |
2745 | + def publisher_send(self, cls, topic, msg, retry=None, **kwargs): |
2746 | + """Send to a publisher based on the publisher class.""" |
2747 | + |
2748 | + def _connect_error(exc): |
2749 | + log_info = {'topic': topic, 'err_str': exc} |
2750 | + LOG.exception(_("Failed to publish message to topic " |
2751 | + "'%(topic)s': %(err_str)s"), log_info) |
2752 | + |
2753 | + def _publisher_send(): |
2754 | + publisher = cls(self.conf, self.session, topic=topic, **kwargs) |
2755 | + publisher.send(msg) |
2756 | + |
2757 | + return self.ensure(_connect_error, _publisher_send, retry=retry) |
2758 | + |
2759 | + def declare_direct_consumer(self, topic, callback): |
2760 | + """Create a 'direct' queue. |
2761 | + In nova's use, this is generally a msg_id queue used for |
2762 | + responses for call/multicall |
2763 | + """ |
2764 | + self.declare_consumer(DirectConsumer, topic, callback) |
2765 | + |
2766 | + def declare_topic_consumer(self, exchange_name, topic, callback=None, |
2767 | + queue_name=None): |
2768 | + """Create a 'topic' consumer.""" |
2769 | + self.declare_consumer(functools.partial(TopicConsumer, |
2770 | + name=queue_name, |
2771 | + exchange_name=exchange_name, |
2772 | + ), |
2773 | + topic, callback) |
2774 | + |
2775 | + def declare_fanout_consumer(self, topic, callback): |
2776 | + """Create a 'fanout' consumer.""" |
2777 | + self.declare_consumer(FanoutConsumer, topic, callback) |
2778 | + |
2779 | + def direct_send(self, msg_id, msg): |
2780 | + """Send a 'direct' message.""" |
2781 | + self.publisher_send(DirectPublisher, topic=msg_id, msg=msg) |
2782 | + |
2783 | + def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): |
2784 | + """Send a 'topic' message.""" |
2785 | + # |
2786 | + # We want to create a message with attributes, for example a TTL. We |
2787 | + # don't really need to keep 'msg' in its JSON format any longer |
2788 | + # so let's create an actual Qpid message here and get some |
2789 | + # value-add on the go. |
2790 | + # |
2791 | + # WARNING: Request timeout happens to be in the same units as |
2792 | + # Qpid's TTL (seconds). If this changes in the future, then this |
2793 | + # will need to be altered accordingly. |
2794 | + # |
2795 | + qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) |
2796 | + self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message, |
2797 | + exchange_name=exchange_name, retry=retry) |
2798 | + |
2799 | + def fanout_send(self, topic, msg, retry=None): |
2800 | + """Send a 'fanout' message.""" |
2801 | + self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry) |
2802 | + |
2803 | + def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): |
2804 | + """Send a notify message on a topic.""" |
2805 | + self.publisher_send(NotifyPublisher, topic=topic, msg=msg, |
2806 | + exchange_name=exchange_name, retry=retry) |
2807 | + |
2808 | + def consume(self, limit=None, timeout=None): |
2809 | + """Consume from all queues/consumers.""" |
2810 | + it = self.iterconsume(limit=limit, timeout=timeout) |
2811 | + while True: |
2812 | + try: |
2813 | + six.next(it) |
2814 | + except StopIteration: |
2815 | + return |
2816 | + |
2817 | + |
2818 | +class QpidDriver(amqpdriver.AMQPDriverBase): |
2819 | + |
2820 | + def __init__(self, conf, url, |
2821 | + default_exchange=None, allowed_remote_exmods=None): |
2822 | + conf.register_opts(qpid_opts) |
2823 | + conf.register_opts(rpc_amqp.amqp_opts) |
2824 | + |
2825 | + connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection) |
2826 | + |
2827 | + super(QpidDriver, self).__init__(conf, url, |
2828 | + connection_pool, |
2829 | + default_exchange, |
2830 | + allowed_remote_exmods) |
2831 | |
2832 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch' |
2833 | === added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/.timestamp' |
2834 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo' |
2835 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging' |
2836 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers' |
2837 | === added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py' |
2838 | --- .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000 |
2839 | +++ .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000 |
2840 | @@ -0,0 +1,832 @@ |
2841 | +# Copyright 2011 OpenStack Foundation |
2842 | +# |
2843 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
2844 | +# not use this file except in compliance with the License. You may obtain |
2845 | +# a copy of the License at |
2846 | +# |
2847 | +# http://www.apache.org/licenses/LICENSE-2.0 |
2848 | +# |
2849 | +# Unless required by applicable law or agreed to in writing, software |
2850 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
2851 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
2852 | +# License for the specific language governing permissions and limitations |
2853 | +# under the License. |
2854 | + |
2855 | +import functools |
2856 | +import itertools |
2857 | +import logging |
2858 | +import random |
2859 | +import socket |
2860 | +import ssl |
2861 | +import time |
2862 | +import uuid |
2863 | + |
2864 | +import kombu |
2865 | +import kombu.connection |
2866 | +import kombu.entity |
2867 | +import kombu.messaging |
2868 | +import six |
2869 | + |
2870 | +from oslo.config import cfg |
2871 | +from oslo.messaging._drivers import amqp as rpc_amqp |
2872 | +from oslo.messaging._drivers import amqpdriver |
2873 | +from oslo.messaging._drivers import common as rpc_common |
2874 | +from oslo.messaging import exceptions |
2875 | +from oslo.messaging.openstack.common.gettextutils import _ |
2876 | +from oslo.utils import netutils |
2877 | + |
2878 | +rabbit_opts = [ |
2879 | + cfg.StrOpt('kombu_ssl_version', |
2880 | + default='', |
2881 | + help='SSL version to use (valid only if SSL enabled). ' |
2882 | + 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' |
2883 | + 'be available on some distributions.' |
2884 | + ), |
2885 | + cfg.StrOpt('kombu_ssl_keyfile', |
2886 | + default='', |
2887 | + help='SSL key file (valid only if SSL enabled).'), |
2888 | + cfg.StrOpt('kombu_ssl_certfile', |
2889 | + default='', |
2890 | + help='SSL cert file (valid only if SSL enabled).'), |
2891 | + cfg.StrOpt('kombu_ssl_ca_certs', |
2892 | + default='', |
2893 | + help='SSL certification authority file ' |
2894 | + '(valid only if SSL enabled).'), |
2895 | + cfg.FloatOpt('kombu_reconnect_delay', |
2896 | + default=1.0, |
2897 | + help='How long to wait before reconnecting in response to an ' |
2898 | + 'AMQP consumer cancel notification.'), |
2899 | + cfg.StrOpt('rabbit_host', |
2900 | + default='localhost', |
2901 | + help='The RabbitMQ broker address where a single node is ' |
2902 | + 'used.'), |
2903 | + cfg.IntOpt('rabbit_port', |
2904 | + default=5672, |
2905 | + help='The RabbitMQ broker port where a single node is used.'), |
2906 | + cfg.ListOpt('rabbit_hosts', |
2907 | + default=['$rabbit_host:$rabbit_port'], |
2908 | + help='RabbitMQ HA cluster host:port pairs.'), |
2909 | + cfg.BoolOpt('rabbit_use_ssl', |
2910 | + default=False, |
2911 | + help='Connect over SSL for RabbitMQ.'), |
2912 | + cfg.StrOpt('rabbit_userid', |
2913 | + default='guest', |
2914 | + help='The RabbitMQ userid.'), |
2915 | + cfg.StrOpt('rabbit_password', |
2916 | + default='guest', |
2917 | + help='The RabbitMQ password.', |
2918 | + secret=True), |
2919 | + cfg.StrOpt('rabbit_login_method', |
2920 | + default='AMQPLAIN', |
2921 | + help='the RabbitMQ login method'), |
2922 | + cfg.StrOpt('rabbit_virtual_host', |
2923 | + default='/', |
2924 | + help='The RabbitMQ virtual host.'), |
2925 | + cfg.IntOpt('rabbit_retry_interval', |
2926 | + default=1, |
2927 | + help='How frequently to retry connecting with RabbitMQ.'), |
2928 | + cfg.IntOpt('rabbit_retry_backoff', |
2929 | + default=2, |
2930 | + help='How long to backoff for between retries when connecting ' |
2931 | + 'to RabbitMQ.'), |
2932 | + cfg.IntOpt('rabbit_max_retries', |
2933 | + default=0, |
2934 | + help='Maximum number of RabbitMQ connection retries. ' |
2935 | + 'Default is 0 (infinite retry count).'), |
2936 | + cfg.BoolOpt('rabbit_ha_queues', |
2937 | + default=False, |
2938 | + help='Use HA queues in RabbitMQ (x-ha-policy: all). ' |
2939 | + 'If you change this option, you must wipe the ' |
2940 | + 'RabbitMQ database.'), |
2941 | + |
2942 | + # FIXME(markmc): this was toplevel in openstack.common.rpc |
2943 | + cfg.BoolOpt('fake_rabbit', |
2944 | + default=False, |
2945 | + help='If passed, use a fake RabbitMQ provider.'), |
2946 | +] |
2947 | + |
2948 | +LOG = logging.getLogger(__name__) |
2949 | + |
2950 | + |
2951 | +def _get_queue_arguments(conf): |
2952 | + """Construct the arguments for declaring a queue. |
2953 | + |
2954 | + If the rabbit_ha_queues option is set, we declare a mirrored queue |
2955 | + as described here: |
2956 | + |
2957 | + http://www.rabbitmq.com/ha.html |
2958 | + |
2959 | + Setting x-ha-policy to all means that the queue will be mirrored |
2960 | + to all nodes in the cluster. |
2961 | + """ |
2962 | + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} |
2963 | + |
2964 | + |
2965 | +class RabbitMessage(dict): |
2966 | + def __init__(self, raw_message): |
2967 | + super(RabbitMessage, self).__init__( |
2968 | + rpc_common.deserialize_msg(raw_message.payload)) |
2969 | + self._raw_message = raw_message |
2970 | + |
2971 | + def acknowledge(self): |
2972 | + self._raw_message.ack() |
2973 | + |
2974 | + def requeue(self): |
2975 | + self._raw_message.requeue() |
2976 | + |
2977 | + |
2978 | +class ConsumerBase(object): |
2979 | + """Consumer base class.""" |
2980 | + |
2981 | + def __init__(self, channel, callback, tag, **kwargs): |
2982 | + """Declare a queue on an amqp channel. |
2983 | + |
2984 | + 'channel' is the amqp channel to use |
2985 | + 'callback' is the callback to call when messages are received |
2986 | + 'tag' is a unique ID for the consumer on the channel |
2987 | + |
2988 | + queue name, exchange name, and other kombu options are |
2989 | + passed in here as a dictionary. |
2990 | + """ |
2991 | + self.callback = callback |
2992 | + self.tag = six.text_type(tag) |
2993 | + self.kwargs = kwargs |
2994 | + self.queue = None |
2995 | + self.reconnect(channel) |
2996 | + |
2997 | + def reconnect(self, channel): |
2998 | + """Re-declare the queue after a rabbit reconnect.""" |
2999 | + self.channel = channel |
3000 | + self.kwargs['channel'] = channel |
3001 | + self.queue = kombu.entity.Queue(**self.kwargs) |
3002 | + self.queue.declare() |
3003 | + |
3004 | + def _callback_handler(self, message, callback): |
3005 | + """Call callback with deserialized message. |
3006 | + |
3007 | + Messages that are processed and ack'ed. |
3008 | + """ |
3009 | + |
3010 | + try: |
3011 | + callback(RabbitMessage(message)) |
3012 | + except Exception: |
3013 | + LOG.exception(_("Failed to process message" |
3014 | + " ... skipping it.")) |
3015 | + message.ack() |
3016 | + |
3017 | + def consume(self, *args, **kwargs): |
3018 | + """Actually declare the consumer on the amqp channel. This will |
3019 | + start the flow of messages from the queue. Using the |
3020 | + Connection.iterconsume() iterator will process the messages, |
3021 | + calling the appropriate callback. |
3022 | + |
3023 | + If a callback is specified in kwargs, use that. Otherwise, |
3024 | + use the callback passed during __init__() |
3025 | + |
3026 | + If kwargs['nowait'] is True, then this call will block until |
3027 | + a message is read. |
3028 | + |
3029 | + """ |
3030 | + |
3031 | + options = {'consumer_tag': self.tag} |
3032 | + options['nowait'] = kwargs.get('nowait', False) |
3033 | + callback = kwargs.get('callback', self.callback) |
3034 | + if not callback: |
3035 | + raise ValueError("No callback defined") |
3036 | + |
3037 | + def _callback(raw_message): |
3038 | + message = self.channel.message_to_python(raw_message) |
3039 | + self._callback_handler(message, callback) |
3040 | + |
3041 | + self.queue.consume(*args, callback=_callback, **options) |
3042 | + |
3043 | + def cancel(self): |
3044 | + """Cancel the consuming from the queue, if it has started.""" |
3045 | + try: |
3046 | + self.queue.cancel(self.tag) |
3047 | + except KeyError as e: |
3048 | + # NOTE(comstud): Kludge to get around a amqplib bug |
3049 | + if six.text_type(e) != "u'%s'" % self.tag: |
3050 | + raise |
3051 | + self.queue = None |
3052 | + |
3053 | + |
3054 | +class DirectConsumer(ConsumerBase): |
3055 | + """Queue/consumer class for 'direct'.""" |
3056 | + |
3057 | + def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): |
3058 | + """Init a 'direct' queue. |
3059 | + |
3060 | + 'channel' is the amqp channel to use |
3061 | + 'msg_id' is the msg_id to listen on |
3062 | + 'callback' is the callback to call when messages are received |
3063 | + 'tag' is a unique ID for the consumer on the channel |
3064 | + |
3065 | + Other kombu options may be passed |
3066 | + """ |
3067 | + # Default options |
3068 | + options = {'durable': False, |
3069 | + 'queue_arguments': _get_queue_arguments(conf), |
3070 | + 'auto_delete': True, |
3071 | + 'exclusive': False} |
3072 | + options.update(kwargs) |
3073 | + exchange = kombu.entity.Exchange(name=msg_id, |
3074 | + type='direct', |
3075 | + durable=options['durable'], |
3076 | + auto_delete=options['auto_delete']) |
3077 | + super(DirectConsumer, self).__init__(channel, |
3078 | + callback, |
3079 | + tag, |
3080 | + name=msg_id, |
3081 | + exchange=exchange, |
3082 | + routing_key=msg_id, |
3083 | + **options) |
3084 | + |
3085 | + |
3086 | +class TopicConsumer(ConsumerBase): |
3087 | + """Consumer class for 'topic'.""" |
3088 | + |
3089 | + def __init__(self, conf, channel, topic, callback, tag, exchange_name, |
3090 | + name=None, **kwargs): |
3091 | + """Init a 'topic' queue. |
3092 | + |
3093 | + :param channel: the amqp channel to use |
3094 | + :param topic: the topic to listen on |
3095 | + :paramtype topic: str |
3096 | + :param callback: the callback to call when messages are received |
3097 | + :param tag: a unique ID for the consumer on the channel |
3098 | + :param exchange_name: the exchange name to use |
3099 | + :param name: optional queue name, defaults to topic |
3100 | + :paramtype name: str |
3101 | + |
3102 | + Other kombu options may be passed as keyword arguments |
3103 | + """ |
3104 | + # Default options |
3105 | + options = {'durable': conf.amqp_durable_queues, |
3106 | + 'queue_arguments': _get_queue_arguments(conf), |
3107 | + 'auto_delete': conf.amqp_auto_delete, |
3108 | + 'exclusive': False} |
3109 | + options.update(kwargs) |
3110 | + exchange = kombu.entity.Exchange(name=exchange_name, |
3111 | + type='topic', |
3112 | + durable=options['durable'], |
3113 | + auto_delete=options['auto_delete']) |
3114 | + super(TopicConsumer, self).__init__(channel, |
3115 | + callback, |
3116 | + tag, |
3117 | + name=name or topic, |
3118 | + exchange=exchange, |
3119 | + routing_key=topic, |
3120 | + **options) |
3121 | + |
3122 | + |
3123 | +class FanoutConsumer(ConsumerBase): |
3124 | + """Consumer class for 'fanout'.""" |
3125 | + |
3126 | + def __init__(self, conf, channel, topic, callback, tag, **kwargs): |
3127 | + """Init a 'fanout' queue. |
3128 | + |
3129 | + 'channel' is the amqp channel to use |
3130 | + 'topic' is the topic to listen on |
3131 | + 'callback' is the callback to call when messages are received |
3132 | + 'tag' is a unique ID for the consumer on the channel |
3133 | + |
3134 | + Other kombu options may be passed |
3135 | + """ |
3136 | + unique = uuid.uuid4().hex |
3137 | + exchange_name = '%s_fanout' % topic |
3138 | + queue_name = '%s_fanout_%s' % (topic, unique) |
3139 | + |
3140 | + # Default options |
3141 | + options = {'durable': False, |
3142 | + 'queue_arguments': _get_queue_arguments(conf), |
3143 | + 'auto_delete': True, |
3144 | + 'exclusive': False} |
3145 | + options.update(kwargs) |
3146 | + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', |
3147 | + durable=options['durable'], |
3148 | + auto_delete=options['auto_delete']) |
3149 | + super(FanoutConsumer, self).__init__(channel, callback, tag, |
3150 | + name=queue_name, |
3151 | + exchange=exchange, |
3152 | + routing_key=topic, |
3153 | + **options) |
3154 | + |
3155 | + |
3156 | +class Publisher(object): |
3157 | + """Base Publisher class.""" |
3158 | + |
3159 | + def __init__(self, channel, exchange_name, routing_key, **kwargs): |
3160 | + """Init the Publisher class with the exchange_name, routing_key, |
3161 | + and other options |
3162 | + """ |
3163 | + self.exchange_name = exchange_name |
3164 | + self.routing_key = routing_key |
3165 | + self.kwargs = kwargs |
3166 | + self.reconnect(channel) |
3167 | + |
3168 | + def reconnect(self, channel): |
3169 | + """Re-establish the Producer after a rabbit reconnection.""" |
3170 | + self.exchange = kombu.entity.Exchange(name=self.exchange_name, |
3171 | + **self.kwargs) |
3172 | + self.producer = kombu.messaging.Producer(exchange=self.exchange, |
3173 | + channel=channel, |
3174 | + routing_key=self.routing_key) |
3175 | + |
3176 | + def send(self, msg, timeout=None): |
3177 | + """Send a message.""" |
3178 | + if timeout: |
3179 | + # |
3180 | + # AMQP TTL is in milliseconds when set in the header. |
3181 | + # |
3182 | + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) |
3183 | + else: |
3184 | + self.producer.publish(msg) |
3185 | + |
3186 | + |
3187 | +class DirectPublisher(Publisher): |
3188 | + """Publisher class for 'direct'.""" |
3189 | + def __init__(self, conf, channel, topic, **kwargs): |
3190 | + """Init a 'direct' publisher. |
3191 | + |
3192 | + Kombu options may be passed as keyword args to override defaults |
3193 | + """ |
3194 | + |
3195 | + options = {'durable': False, |
3196 | + 'auto_delete': True, |
3197 | + 'exclusive': False} |
3198 | + options.update(kwargs) |
3199 | + super(DirectPublisher, self).__init__(channel, topic, topic, |
3200 | + type='direct', **options) |
3201 | + |
3202 | + |
3203 | +class TopicPublisher(Publisher): |
3204 | + """Publisher class for 'topic'.""" |
3205 | + def __init__(self, conf, channel, exchange_name, topic, **kwargs): |
3206 | + """Init a 'topic' publisher. |
3207 | + |
3208 | + Kombu options may be passed as keyword args to override defaults |
3209 | + """ |
3210 | + options = {'durable': conf.amqp_durable_queues, |
3211 | + 'auto_delete': conf.amqp_auto_delete, |
3212 | + 'exclusive': False} |
3213 | + options.update(kwargs) |
3214 | + super(TopicPublisher, self).__init__(channel, |
3215 | + exchange_name, |
3216 | + topic, |
3217 | + type='topic', |
3218 | + **options) |
3219 | + |
3220 | + |
3221 | +class FanoutPublisher(Publisher): |
3222 | + """Publisher class for 'fanout'.""" |
3223 | + def __init__(self, conf, channel, topic, **kwargs): |
3224 | + """Init a 'fanout' publisher. |
3225 | + |
3226 | + Kombu options may be passed as keyword args to override defaults |
3227 | + """ |
3228 | + options = {'durable': False, |
3229 | + 'auto_delete': True, |
3230 | + 'exclusive': False} |
3231 | + options.update(kwargs) |
3232 | + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, |
3233 | + None, type='fanout', **options) |
3234 | + |
3235 | + |
3236 | +class NotifyPublisher(TopicPublisher): |
3237 | + """Publisher class for 'notify'.""" |
3238 | + |
3239 | + def __init__(self, conf, channel, exchange_name, topic, **kwargs): |
3240 | + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) |
3241 | + self.queue_arguments = _get_queue_arguments(conf) |
3242 | + super(NotifyPublisher, self).__init__(conf, channel, exchange_name, |
3243 | + topic, **kwargs) |
3244 | + |
3245 | + def reconnect(self, channel): |
3246 | + super(NotifyPublisher, self).reconnect(channel) |
3247 | + |
3248 | + # NOTE(jerdfelt): Normally the consumer would create the queue, but |
3249 | + # we do this to ensure that messages don't get dropped if the |
3250 | + # consumer is started after we do |
3251 | + queue = kombu.entity.Queue(channel=channel, |
3252 | + exchange=self.exchange, |
3253 | + durable=self.durable, |
3254 | + name=self.routing_key, |
3255 | + routing_key=self.routing_key, |
3256 | + queue_arguments=self.queue_arguments) |
3257 | + queue.declare() |
3258 | + |
3259 | + |
3260 | +class Connection(object): |
3261 | + """Connection object.""" |
3262 | + |
3263 | + pools = {} |
3264 | + |
3265 | + def __init__(self, conf, url): |
3266 | + self.consumers = [] |
3267 | + self.conf = conf |
3268 | + self.max_retries = self.conf.rabbit_max_retries |
3269 | + # Try forever? |
3270 | + if self.max_retries <= 0: |
3271 | + self.max_retries = None |
3272 | + self.interval_start = self.conf.rabbit_retry_interval |
3273 | + self.interval_stepping = self.conf.rabbit_retry_backoff |
3274 | + # max retry-interval = 30 seconds |
3275 | + self.interval_max = 30 |
3276 | + self.memory_transport = False |
3277 | + |
3278 | + ssl_params = self._fetch_ssl_params() |
3279 | + |
3280 | + if url.virtual_host is not None: |
3281 | + virtual_host = url.virtual_host |
3282 | + else: |
3283 | + virtual_host = self.conf.rabbit_virtual_host |
3284 | + |
3285 | + self.brokers_params = [] |
3286 | + if url.hosts: |
3287 | + for host in url.hosts: |
3288 | + params = { |
3289 | + 'hostname': host.hostname, |
3290 | + 'port': host.port or 5672, |
3291 | + 'userid': host.username or '', |
3292 | + 'password': host.password or '', |
3293 | + 'login_method': self.conf.rabbit_login_method, |
3294 | + 'virtual_host': virtual_host |
3295 | + } |
3296 | + if self.conf.fake_rabbit: |
3297 | + params['transport'] = 'memory' |
3298 | + if self.conf.rabbit_use_ssl: |
3299 | + params['ssl'] = ssl_params |
3300 | + |
3301 | + self.brokers_params.append(params) |
3302 | + else: |
3303 | + # Old configuration format |
3304 | + for adr in self.conf.rabbit_hosts: |
3305 | + hostname, port = netutils.parse_host_port( |
3306 | + adr, default_port=self.conf.rabbit_port) |
3307 | + |
3308 | + params = { |
3309 | + 'hostname': hostname, |
3310 | + 'port': port, |
3311 | + 'userid': self.conf.rabbit_userid, |
3312 | + 'password': self.conf.rabbit_password, |
3313 | + 'login_method': self.conf.rabbit_login_method, |
3314 | + 'virtual_host': virtual_host |
3315 | + } |
3316 | + |
3317 | + if self.conf.fake_rabbit: |
3318 | + params['transport'] = 'memory' |
3319 | + if self.conf.rabbit_use_ssl: |
3320 | + params['ssl'] = ssl_params |
3321 | + |
3322 | + self.brokers_params.append(params) |
3323 | + |
3324 | + random.shuffle(self.brokers_params) |
3325 | + self.brokers = itertools.cycle(self.brokers_params) |
3326 | + |
3327 | + self.memory_transport = self.conf.fake_rabbit |
3328 | + |
3329 | + self.connection = None |
3330 | + self.do_consume = None |
3331 | + self.reconnect() |
3332 | + |
3333 | + # FIXME(markmc): use oslo sslutils when it is available as a library |
3334 | + _SSL_PROTOCOLS = { |
3335 | + "tlsv1": ssl.PROTOCOL_TLSv1, |
3336 | + "sslv23": ssl.PROTOCOL_SSLv23, |
3337 | + "sslv3": ssl.PROTOCOL_SSLv3 |
3338 | + } |
3339 | + |
3340 | + try: |
3341 | + _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 |
3342 | + except AttributeError: |
3343 | + pass |
3344 | + |
3345 | + @classmethod |
3346 | + def validate_ssl_version(cls, version): |
3347 | + key = version.lower() |
3348 | + try: |
3349 | + return cls._SSL_PROTOCOLS[key] |
3350 | + except KeyError: |
3351 | + raise RuntimeError(_("Invalid SSL version : %s") % version) |
3352 | + |
3353 | + def _fetch_ssl_params(self): |
3354 | + """Handles fetching what ssl params should be used for the connection |
3355 | + (if any). |
3356 | + """ |
3357 | + ssl_params = dict() |
3358 | + |
3359 | + # http://docs.python.org/library/ssl.html - ssl.wrap_socket |
3360 | + if self.conf.kombu_ssl_version: |
3361 | + ssl_params['ssl_version'] = self.validate_ssl_version( |
3362 | + self.conf.kombu_ssl_version) |
3363 | + if self.conf.kombu_ssl_keyfile: |
3364 | + ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile |
3365 | + if self.conf.kombu_ssl_certfile: |
3366 | + ssl_params['certfile'] = self.conf.kombu_ssl_certfile |
3367 | + if self.conf.kombu_ssl_ca_certs: |
3368 | + ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs |
3369 | + # We might want to allow variations in the |
3370 | + # future with this? |
3371 | + ssl_params['cert_reqs'] = ssl.CERT_REQUIRED |
3372 | + |
3373 | + # Return the extended behavior or just have the default behavior |
3374 | + return ssl_params or True |
3375 | + |
3376 | + def _connect(self, broker): |
3377 | + """Connect to rabbit. Re-establish any queues that may have |
3378 | + been declared before if we are reconnecting. Exceptions should |
3379 | + be handled by the caller. |
3380 | + """ |
3381 | + LOG.info(_("Connecting to AMQP server on " |
3382 | + "%(hostname)s:%(port)d"), broker) |
3383 | + self.connection = kombu.connection.BrokerConnection(**broker) |
3384 | + self.connection_errors = self.connection.connection_errors |
3385 | + self.channel_errors = self.connection.channel_errors |
3386 | + if self.memory_transport: |
3387 | + # Kludge to speed up tests. |
3388 | + self.connection.transport.polling_interval = 0.0 |
3389 | + self.do_consume = True |
3390 | + self.consumer_num = itertools.count(1) |
3391 | + self.connection.connect() |
3392 | + self.channel = self.connection.channel() |
3393 | + # work around 'memory' transport bug in 1.1.3 |
3394 | + if self.memory_transport: |
3395 | + self.channel._new_queue('ae.undeliver') |
3396 | + for consumer in self.consumers: |
3397 | + consumer.reconnect(self.channel) |
3398 | + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), |
3399 | + broker) |
3400 | + |
3401 | + def _disconnect(self): |
3402 | + if self.connection: |
3403 | + # XXX(nic): when reconnecting to a RabbitMQ cluster |
3404 | + # with mirrored queues in use, the attempt to release the |
3405 | + # connection can hang "indefinitely" somewhere deep down |
3406 | + # in Kombu. Blocking the thread for a bit prior to |
3407 | + # release seems to kludge around the problem where it is |
3408 | + # otherwise reproduceable. |
3409 | + if self.conf.kombu_reconnect_delay > 0: |
3410 | + LOG.info(_("Delaying reconnect for %1.1f seconds...") % |
3411 | + self.conf.kombu_reconnect_delay) |
3412 | + time.sleep(self.conf.kombu_reconnect_delay) |
3413 | + |
3414 | + try: |
3415 | + self.connection.release() |
3416 | + except self.connection_errors: |
3417 | + pass |
3418 | + self.connection = None |
3419 | + |
3420 | + def reconnect(self, retry=None): |
3421 | + """Handles reconnecting and re-establishing queues. |
3422 | + Will retry up to retry number of times. |
3423 | + retry = None means use the value of rabbit_max_retries |
3424 | + retry = -1 means to retry forever |
3425 | + retry = 0 means no retry |
3426 | + retry = N means N retries |
3427 | + Sleep between tries, starting at self.interval_start |
3428 | + seconds, backing off self.interval_stepping number of seconds |
3429 | + each attempt. |
3430 | + """ |
3431 | + |
3432 | + attempt = 0 |
3433 | + loop_forever = False |
3434 | + if retry is None: |
3435 | + retry = self.max_retries |
3436 | + if retry is None or retry < 0: |
3437 | + loop_forever = True |
3438 | + |
3439 | + while True: |
3440 | + self._disconnect() |
3441 | + |
3442 | + broker = six.next(self.brokers) |
3443 | + attempt += 1 |
3444 | + try: |
3445 | + self._connect(broker) |
3446 | + return |
3447 | + except IOError as ex: |
3448 | + e = ex |
3449 | + except self.connection_errors as ex: |
3450 | + e = ex |
3451 | + except Exception as ex: |
3452 | + # NOTE(comstud): Unfortunately it's possible for amqplib |
3453 | + # to return an error not covered by its transport |
3454 | + # connection_errors in the case of a timeout waiting for |
3455 | + # a protocol response. (See paste link in LP888621) |
3456 | + # So, we check all exceptions for 'timeout' in them |
3457 | + # and try to reconnect in this case. |
3458 | + if 'timeout' not in six.text_type(e): |
3459 | + raise |
3460 | + e = ex |
3461 | + |
3462 | + log_info = {} |
3463 | + log_info['err_str'] = e |
3464 | + log_info['retry'] = retry or 0 |
3465 | + log_info.update(broker) |
3466 | + |
3467 | + if not loop_forever and attempt > retry: |
3468 | + msg = _('Unable to connect to AMQP server on ' |
3469 | + '%(hostname)s:%(port)d after %(retry)d ' |
3470 | + 'tries: %(err_str)s') % log_info |
3471 | + LOG.error(msg) |
3472 | + raise exceptions.MessageDeliveryFailure(msg) |
3473 | + else: |
3474 | + if attempt == 1: |
3475 | + sleep_time = self.interval_start or 1 |
3476 | + elif attempt > 1: |
3477 | + sleep_time += self.interval_stepping |
3478 | + |
3479 | + sleep_time = min(sleep_time, self.interval_max) |
3480 | + |
3481 | + log_info['sleep_time'] = sleep_time |
3482 | + if 'Socket closed' in six.text_type(e): |
3483 | + LOG.error(_('AMQP server %(hostname)s:%(port)d closed' |
3484 | + ' the connection. Check login credentials:' |
3485 | + ' %(err_str)s'), log_info) |
3486 | + else: |
3487 | + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' |
3488 | + 'unreachable: %(err_str)s. Trying again in ' |
3489 | + '%(sleep_time)d seconds.'), log_info) |
3490 | + time.sleep(sleep_time) |
3491 | + |
3492 | + def ensure(self, error_callback, method, retry=None): |
3493 | + while True: |
3494 | + try: |
3495 | + return method() |
3496 | + except self.connection_errors as e: |
3497 | + if error_callback: |
3498 | + error_callback(e) |
3499 | + except self.channel_errors as e: |
3500 | + if error_callback: |
3501 | + error_callback(e) |
3502 | + except (socket.timeout, IOError) as e: |
3503 | + if error_callback: |
3504 | + error_callback(e) |
3505 | + except Exception as e: |
3506 | + # NOTE(comstud): Unfortunately it's possible for amqplib |
3507 | + # to return an error not covered by its transport |
3508 | + # connection_errors in the case of a timeout waiting for |
3509 | + # a protocol response. (See paste link in LP888621) |
3510 | + # So, we check all exceptions for 'timeout' in them |
3511 | + # and try to reconnect in this case. |
3512 | + if 'timeout' not in six.text_type(e): |
3513 | + raise |
3514 | + if error_callback: |
3515 | + error_callback(e) |
3516 | + self.reconnect(retry=retry) |
3517 | + |
3518 | + def get_channel(self): |
3519 | + """Convenience call for bin/clear_rabbit_queues.""" |
3520 | + return self.channel |
3521 | + |
3522 | + def close(self): |
3523 | + """Close/release this connection.""" |
3524 | + if self.connection: |
3525 | + self.connection.release() |
3526 | + self.connection = None |
3527 | + |
3528 | + def reset(self): |
3529 | + """Reset a connection so it can be used again.""" |
3530 | + self.channel.close() |
3531 | + self.channel = self.connection.channel() |
3532 | + # work around 'memory' transport bug in 1.1.3 |
3533 | + if self.memory_transport: |
3534 | + self.channel._new_queue('ae.undeliver') |
3535 | + self.consumers = [] |
3536 | + |
3537 | + def declare_consumer(self, consumer_cls, topic, callback): |
3538 | + """Create a Consumer using the class that was passed in and |
3539 | + add it to our list of consumers |
3540 | + """ |
3541 | + |
3542 | + def _connect_error(exc): |
3543 | + log_info = {'topic': topic, 'err_str': exc} |
3544 | + LOG.error(_("Failed to declare consumer for topic '%(topic)s': " |
3545 | + "%(err_str)s"), log_info) |
3546 | + |
3547 | + def _declare_consumer(): |
3548 | + consumer = consumer_cls(self.conf, self.channel, topic, callback, |
3549 | + six.next(self.consumer_num)) |
3550 | + self.consumers.append(consumer) |
3551 | + return consumer |
3552 | + |
3553 | + return self.ensure(_connect_error, _declare_consumer) |
3554 | + |
3555 | + def iterconsume(self, limit=None, timeout=None): |
3556 | + """Return an iterator that will consume from all queues/consumers.""" |
3557 | + |
3558 | + timer = rpc_common.DecayingTimer(duration=timeout) |
3559 | + timer.start() |
3560 | + |
3561 | + def _raise_timeout(exc): |
3562 | + LOG.debug('Timed out waiting for RPC response: %s', exc) |
3563 | + raise rpc_common.Timeout() |
3564 | + |
3565 | + def _error_callback(exc): |
3566 | + timer.check_return(_raise_timeout, exc) |
3567 | + LOG.exception(_('Failed to consume message from queue: %s'), |
3568 | + exc) |
3569 | + self.do_consume = True |
3570 | + |
3571 | + def _consume(): |
3572 | + if self.do_consume: |
3573 | + queues_head = self.consumers[:-1] # not fanout. |
3574 | + queues_tail = self.consumers[-1] # fanout |
3575 | + for queue in queues_head: |
3576 | + queue.consume(nowait=True) |
3577 | + queues_tail.consume(nowait=False) |
3578 | + self.do_consume = False |
3579 | + |
3580 | + poll_timeout = 1 if timeout is None else min(timeout, 1) |
3581 | + while True: |
3582 | + try: |
3583 | + return self.connection.drain_events(timeout=poll_timeout) |
3584 | + except socket.timeout as exc: |
3585 | + poll_timeout = timer.check_return(_raise_timeout, exc, |
3586 | + maximum=1) |
3587 | + |
3588 | + for iteration in itertools.count(0): |
3589 | + if limit and iteration >= limit: |
3590 | + raise StopIteration |
3591 | + yield self.ensure(_error_callback, _consume) |
3592 | + |
3593 | + def publisher_send(self, cls, topic, msg, timeout=None, retry=None, |
3594 | + **kwargs): |
3595 | + """Send to a publisher based on the publisher class.""" |
3596 | + |
3597 | + def _error_callback(exc): |
3598 | + log_info = {'topic': topic, 'err_str': exc} |
3599 | + LOG.exception(_("Failed to publish message to topic " |
3600 | + "'%(topic)s': %(err_str)s"), log_info) |
3601 | + |
3602 | + def _publish(): |
3603 | + publisher = cls(self.conf, self.channel, topic=topic, **kwargs) |
3604 | + publisher.send(msg, timeout) |
3605 | + |
3606 | + self.ensure(_error_callback, _publish, retry=retry) |
3607 | + |
3608 | + def declare_direct_consumer(self, topic, callback): |
3609 | + """Create a 'direct' queue. |
3610 | + In nova's use, this is generally a msg_id queue used for |
3611 | + responses for call/multicall |
3612 | + """ |
3613 | + self.declare_consumer(DirectConsumer, topic, callback) |
3614 | + |
3615 | + def declare_topic_consumer(self, exchange_name, topic, callback=None, |
3616 | + queue_name=None): |
3617 | + """Create a 'topic' consumer.""" |
3618 | + self.declare_consumer(functools.partial(TopicConsumer, |
3619 | + name=queue_name, |
3620 | + exchange_name=exchange_name, |
3621 | + ), |
3622 | + topic, callback) |
3623 | + |
3624 | + def declare_fanout_consumer(self, topic, callback): |
3625 | + """Create a 'fanout' consumer.""" |
3626 | + self.declare_consumer(FanoutConsumer, topic, callback) |
3627 | + |
3628 | + def direct_send(self, msg_id, msg): |
3629 | + """Send a 'direct' message.""" |
3630 | + self.publisher_send(DirectPublisher, msg_id, msg) |
3631 | + |
3632 | + def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): |
3633 | + """Send a 'topic' message.""" |
3634 | + self.publisher_send(TopicPublisher, topic, msg, timeout, |
3635 | + exchange_name=exchange_name, retry=retry) |
3636 | + |
3637 | + def fanout_send(self, topic, msg, retry=None): |
3638 | + """Send a 'fanout' message.""" |
3639 | + self.publisher_send(FanoutPublisher, topic, msg, retry=retry) |
3640 | + |
3641 | + def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): |
3642 | + """Send a notify message on a topic.""" |
3643 | + self.publisher_send(NotifyPublisher, topic, msg, timeout=None, |
3644 | + exchange_name=exchange_name, retry=retry, **kwargs) |
3645 | + |
3646 | + def consume(self, limit=None, timeout=None): |
3647 | + """Consume from all queues/consumers.""" |
3648 | + it = self.iterconsume(limit=limit, timeout=timeout) |
3649 | + while True: |
3650 | + try: |
3651 | + six.next(it) |
3652 | + except StopIteration: |
3653 | + return |
3654 | + |
3655 | + |
3656 | +class RabbitDriver(amqpdriver.AMQPDriverBase): |
3657 | + |
3658 | + def __init__(self, conf, url, |
3659 | + default_exchange=None, |
3660 | + allowed_remote_exmods=None): |
3661 | + conf.register_opts(rabbit_opts) |
3662 | + conf.register_opts(rpc_amqp.amqp_opts) |
3663 | + |
3664 | + connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection) |
3665 | + |
3666 | + super(RabbitDriver, self).__init__(conf, url, |
3667 | + connection_pool, |
3668 | + default_exchange, |
3669 | + allowed_remote_exmods) |
3670 | + |
3671 | + def require_features(self, requeue=True): |
3672 | + pass |
3673 | |
3674 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests' |
3675 | === added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers' |
3676 | === added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py' |
3677 | --- .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py 1970-01-01 00:00:00 +0000 |
3678 | +++ .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py 2015-12-17 11:03:34 +0000 |
3679 | @@ -0,0 +1,735 @@ |
3680 | +# Copyright 2013 Red Hat, Inc. |
3681 | +# |
3682 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
3683 | +# not use this file except in compliance with the License. You may obtain |
3684 | +# a copy of the License at |
3685 | +# |
3686 | +# http://www.apache.org/licenses/LICENSE-2.0 |
3687 | +# |
3688 | +# Unless required by applicable law or agreed to in writing, software |
3689 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
3690 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
3691 | +# License for the specific language governing permissions and limitations |
3692 | +# under the License. |
3693 | + |
3694 | +import datetime |
3695 | +import operator |
3696 | +import sys |
3697 | +import threading |
3698 | +import uuid |
3699 | + |
3700 | +import fixtures |
3701 | +import kombu |
3702 | +import mock |
3703 | +import testscenarios |
3704 | + |
3705 | +from oslo import messaging |
3706 | +from oslo.messaging._drivers import amqpdriver |
3707 | +from oslo.messaging._drivers import common as driver_common |
3708 | +from oslo.messaging._drivers import impl_rabbit as rabbit_driver |
3709 | +from oslo.messaging.openstack.common import jsonutils |
3710 | +from tests import utils as test_utils |
3711 | + |
3712 | +load_tests = testscenarios.load_tests_apply_scenarios |
3713 | + |
3714 | + |
3715 | +class TestRabbitDriverLoad(test_utils.BaseTestCase): |
3716 | + |
3717 | + def setUp(self): |
3718 | + super(TestRabbitDriverLoad, self).setUp() |
3719 | + self.messaging_conf.transport_driver = 'rabbit' |
3720 | + self.messaging_conf.in_memory = True |
3721 | + |
3722 | + def test_driver_load(self): |
3723 | + transport = messaging.get_transport(self.conf) |
3724 | + self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver) |
3725 | + |
3726 | + |
3727 | +class TestRabbitTransportURL(test_utils.BaseTestCase): |
3728 | + |
3729 | + scenarios = [ |
3730 | + ('none', dict(url=None, |
3731 | + expected=[dict(hostname='localhost', |
3732 | + port=5672, |
3733 | + userid='guest', |
3734 | + password='guest', |
3735 | + virtual_host='/')])), |
3736 | + ('empty', |
3737 | + dict(url='rabbit:///', |
3738 | + expected=[dict(hostname='localhost', |
3739 | + port=5672, |
3740 | + userid='guest', |
3741 | + password='guest', |
3742 | + virtual_host='')])), |
3743 | + ('localhost', |
3744 | + dict(url='rabbit://localhost/', |
3745 | + expected=[dict(hostname='localhost', |
3746 | + port=5672, |
3747 | + userid='', |
3748 | + password='', |
3749 | + virtual_host='')])), |
3750 | + ('virtual_host', |
3751 | + dict(url='rabbit:///vhost', |
3752 | + expected=[dict(hostname='localhost', |
3753 | + port=5672, |
3754 | + userid='guest', |
3755 | + password='guest', |
3756 | + virtual_host='vhost')])), |
3757 | + ('no_creds', |
3758 | + dict(url='rabbit://host/virtual_host', |
3759 | + expected=[dict(hostname='host', |
3760 | + port=5672, |
3761 | + userid='', |
3762 | + password='', |
3763 | + virtual_host='virtual_host')])), |
3764 | + ('no_port', |
3765 | + dict(url='rabbit://user:password@host/virtual_host', |
3766 | + expected=[dict(hostname='host', |
3767 | + port=5672, |
3768 | + userid='user', |
3769 | + password='password', |
3770 | + virtual_host='virtual_host')])), |
3771 | + ('full_url', |
3772 | + dict(url='rabbit://user:password@host:10/virtual_host', |
3773 | + expected=[dict(hostname='host', |
3774 | + port=10, |
3775 | + userid='user', |
3776 | + password='password', |
3777 | + virtual_host='virtual_host')])), |
3778 | + ('full_two_url', |
3779 | + dict(url='rabbit://user:password@host:10,' |
3780 | + 'user2:password2@host2:12/virtual_host', |
3781 | + expected=[dict(hostname='host', |
3782 | + port=10, |
3783 | + userid='user', |
3784 | + password='password', |
3785 | + virtual_host='virtual_host'), |
3786 | + dict(hostname='host2', |
3787 | + port=12, |
3788 | + userid='user2', |
3789 | + password='password2', |
3790 | + virtual_host='virtual_host') |
3791 | + ] |
3792 | + )), |
3793 | + |
3794 | + ] |
3795 | + |
3796 | + def test_transport_url(self): |
3797 | + self.messaging_conf.in_memory = True |
3798 | + |
3799 | + transport = messaging.get_transport(self.conf, self.url) |
3800 | + self.addCleanup(transport.cleanup) |
3801 | + driver = transport._driver |
3802 | + |
3803 | + brokers_params = driver._get_connection().brokers_params[:] |
3804 | + brokers_params = [dict((k, v) for k, v in broker.items() |
3805 | + if k not in ['transport', 'login_method']) |
3806 | + for broker in brokers_params] |
3807 | + |
3808 | + self.assertEqual(sorted(self.expected, |
3809 | + key=operator.itemgetter('hostname')), |
3810 | + sorted(brokers_params, |
3811 | + key=operator.itemgetter('hostname'))) |
3812 | + |
3813 | + |
3814 | +class TestSendReceive(test_utils.BaseTestCase): |
3815 | + |
3816 | + _n_senders = [ |
3817 | + ('single_sender', dict(n_senders=1)), |
3818 | + ('multiple_senders', dict(n_senders=10)), |
3819 | + ] |
3820 | + |
3821 | + _context = [ |
3822 | + ('empty_context', dict(ctxt={})), |
3823 | + ('with_context', dict(ctxt={'user': 'mark'})), |
3824 | + ] |
3825 | + |
3826 | + _reply = [ |
3827 | + ('rx_id', dict(rx_id=True, reply=None)), |
3828 | + ('none', dict(rx_id=False, reply=None)), |
3829 | + ('empty_list', dict(rx_id=False, reply=[])), |
3830 | + ('empty_dict', dict(rx_id=False, reply={})), |
3831 | + ('false', dict(rx_id=False, reply=False)), |
3832 | + ('zero', dict(rx_id=False, reply=0)), |
3833 | + ] |
3834 | + |
3835 | + _failure = [ |
3836 | + ('success', dict(failure=False)), |
3837 | + ('failure', dict(failure=True, expected=False)), |
3838 | + ('expected_failure', dict(failure=True, expected=True)), |
3839 | + ] |
3840 | + |
3841 | + _timeout = [ |
3842 | + ('no_timeout', dict(timeout=None)), |
3843 | + ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken? |
3844 | + ] |
3845 | + |
3846 | + @classmethod |
3847 | + def generate_scenarios(cls): |
3848 | + cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders, |
3849 | + cls._context, |
3850 | + cls._reply, |
3851 | + cls._failure, |
3852 | + cls._timeout) |
3853 | + |
3854 | + def setUp(self): |
3855 | + super(TestSendReceive, self).setUp() |
3856 | + self.messaging_conf.transport_driver = 'rabbit' |
3857 | + self.messaging_conf.in_memory = True |
3858 | + |
3859 | + def test_send_receive(self): |
3860 | + transport = messaging.get_transport(self.conf) |
3861 | + self.addCleanup(transport.cleanup) |
3862 | + |
3863 | + driver = transport._driver |
3864 | + |
3865 | + target = messaging.Target(topic='testtopic') |
3866 | + |
3867 | + listener = driver.listen(target) |
3868 | + |
3869 | + senders = [] |
3870 | + replies = [] |
3871 | + msgs = [] |
3872 | + errors = [] |
3873 | + |
3874 | + def stub_error(msg, *a, **kw): |
3875 | + if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): |
3876 | + a = a[0] |
3877 | + errors.append(str(msg) % a) |
3878 | + |
3879 | + self.stubs.Set(driver_common.LOG, 'error', stub_error) |
3880 | + |
3881 | + def send_and_wait_for_reply(i): |
3882 | + try: |
3883 | + replies.append(driver.send(target, |
3884 | + self.ctxt, |
3885 | + {'tx_id': i}, |
3886 | + wait_for_reply=True, |
3887 | + timeout=self.timeout)) |
3888 | + self.assertFalse(self.failure) |
3889 | + self.assertIsNone(self.timeout) |
3890 | + except (ZeroDivisionError, messaging.MessagingTimeout) as e: |
3891 | + replies.append(e) |
3892 | + self.assertTrue(self.failure or self.timeout is not None) |
3893 | + |
3894 | + while len(senders) < self.n_senders: |
3895 | + senders.append(threading.Thread(target=send_and_wait_for_reply, |
3896 | + args=(len(senders), ))) |
3897 | + |
3898 | + for i in range(len(senders)): |
3899 | + senders[i].start() |
3900 | + |
3901 | + received = listener.poll() |
3902 | + self.assertIsNotNone(received) |
3903 | + self.assertEqual(self.ctxt, received.ctxt) |
3904 | + self.assertEqual({'tx_id': i}, received.message) |
3905 | + msgs.append(received) |
3906 | + |
3907 | + # reply in reverse, except reply to the first guy second from last |
3908 | + order = list(range(len(senders) - 1, -1, -1)) |
3909 | + if len(order) > 1: |
3910 | + order[-1], order[-2] = order[-2], order[-1] |
3911 | + |
3912 | + for i in order: |
3913 | + if self.timeout is None: |
3914 | + if self.failure: |
3915 | + try: |
3916 | + raise ZeroDivisionError |
3917 | + except Exception: |
3918 | + failure = sys.exc_info() |
3919 | + msgs[i].reply(failure=failure, |
3920 | + log_failure=not self.expected) |
3921 | + elif self.rx_id: |
3922 | + msgs[i].reply({'rx_id': i}) |
3923 | + else: |
3924 | + msgs[i].reply(self.reply) |
3925 | + senders[i].join() |
3926 | + |
3927 | + self.assertEqual(len(senders), len(replies)) |
3928 | + for i, reply in enumerate(replies): |
3929 | + if self.timeout is not None: |
3930 | + self.assertIsInstance(reply, messaging.MessagingTimeout) |
3931 | + elif self.failure: |
3932 | + self.assertIsInstance(reply, ZeroDivisionError) |
3933 | + elif self.rx_id: |
3934 | + self.assertEqual({'rx_id': order[i]}, reply) |
3935 | + else: |
3936 | + self.assertEqual(self.reply, reply) |
3937 | + |
3938 | + if not self.timeout and self.failure and not self.expected: |
3939 | + self.assertTrue(len(errors) > 0, errors) |
3940 | + else: |
3941 | + self.assertEqual(0, len(errors), errors) |
3942 | + |
3943 | + |
3944 | +TestSendReceive.generate_scenarios() |
3945 | + |
3946 | + |
3947 | +class TestPollAsync(test_utils.BaseTestCase): |
3948 | + |
3949 | + def setUp(self): |
3950 | + super(TestPollAsync, self).setUp() |
3951 | + self.messaging_conf.transport_driver = 'rabbit' |
3952 | + self.messaging_conf.in_memory = True |
3953 | + |
3954 | + def test_poll_timeout(self): |
3955 | + transport = messaging.get_transport(self.conf) |
3956 | + self.addCleanup(transport.cleanup) |
3957 | + driver = transport._driver |
3958 | + target = messaging.Target(topic='testtopic') |
3959 | + listener = driver.listen(target) |
3960 | + received = listener.poll(timeout=0.050) |
3961 | + self.assertIsNone(received) |
3962 | + |
3963 | + |
3964 | +class TestRacyWaitForReply(test_utils.BaseTestCase): |
3965 | + |
3966 | + def setUp(self): |
3967 | + super(TestRacyWaitForReply, self).setUp() |
3968 | + self.messaging_conf.transport_driver = 'rabbit' |
3969 | + self.messaging_conf.in_memory = True |
3970 | + |
3971 | + def test_send_receive(self): |
3972 | + transport = messaging.get_transport(self.conf) |
3973 | + self.addCleanup(transport.cleanup) |
3974 | + |
3975 | + driver = transport._driver |
3976 | + |
3977 | + target = messaging.Target(topic='testtopic') |
3978 | + |
3979 | + listener = driver.listen(target) |
3980 | + |
3981 | + senders = [] |
3982 | + replies = [] |
3983 | + msgs = [] |
3984 | + |
3985 | + wait_conditions = [] |
3986 | + orig_reply_waiter = amqpdriver.ReplyWaiter.wait |
3987 | + |
3988 | + def reply_waiter(self, msg_id, timeout): |
3989 | + if wait_conditions: |
3990 | + cond = wait_conditions.pop() |
3991 | + with cond: |
3992 | + cond.notify() |
3993 | + with cond: |
3994 | + cond.wait() |
3995 | + return orig_reply_waiter(self, msg_id, timeout) |
3996 | + |
3997 | + self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter) |
3998 | + |
3999 | + def send_and_wait_for_reply(i, wait_for_reply): |
4000 | + replies.append(driver.send(target, |
4001 | + {}, |
4002 | + {'tx_id': i}, |
4003 | + wait_for_reply=wait_for_reply, |
4004 | + timeout=None)) |
4005 | + |
4006 | + while len(senders) < 2: |
4007 | + t = threading.Thread(target=send_and_wait_for_reply, |
4008 | + args=(len(senders), True)) |
4009 | + t.daemon = True |
4010 | + senders.append(t) |
4011 | + |
4012 | + # test the case then msg_id is not set |
4013 | + t = threading.Thread(target=send_and_wait_for_reply, |
4014 | + args=(len(senders), False)) |
4015 | + t.daemon = True |
4016 | + senders.append(t) |
4017 | + |
4018 | + # Start the first guy, receive his message, but delay his polling |
4019 | + notify_condition = threading.Condition() |
4020 | + wait_conditions.append(notify_condition) |
4021 | + with notify_condition: |
4022 | + senders[0].start() |
4023 | + notify_condition.wait() |
4024 | + |
4025 | + msgs.append(listener.poll()) |
4026 | + self.assertEqual({'tx_id': 0}, msgs[-1].message) |
4027 | + |
4028 | + # Start the second guy, receive his message |
4029 | + senders[1].start() |
4030 | + |
4031 | + msgs.append(listener.poll()) |
4032 | + self.assertEqual({'tx_id': 1}, msgs[-1].message) |
4033 | + |
4034 | + # Reply to both in order, making the second thread queue |
4035 | + # the reply meant for the first thread |
4036 | + msgs[0].reply({'rx_id': 0}) |
4037 | + msgs[1].reply({'rx_id': 1}) |
4038 | + |
4039 | + # Wait for the second thread to finish |
4040 | + senders[1].join() |
4041 | + |
4042 | + # Start the 3rd guy, receive his message |
4043 | + senders[2].start() |
4044 | + |
4045 | + msgs.append(listener.poll()) |
4046 | + self.assertEqual({'tx_id': 2}, msgs[-1].message) |
4047 | + |
4048 | + # Verify the _send_reply was not invoked by driver: |
4049 | + with mock.patch.object(msgs[2], '_send_reply') as method: |
4050 | + msgs[2].reply({'rx_id': 2}) |
4051 | + self.assertEqual(method.call_count, 0) |
4052 | + |
4053 | + # Wait for the 3rd thread to finish |
4054 | + senders[2].join() |
4055 | + |
4056 | + # Let the first thread continue |
4057 | + with notify_condition: |
4058 | + notify_condition.notify() |
4059 | + |
4060 | + # Wait for the first thread to finish |
4061 | + senders[0].join() |
4062 | + |
4063 | + # Verify replies were received out of order |
4064 | + self.assertEqual(len(senders), len(replies)) |
4065 | + self.assertEqual({'rx_id': 1}, replies[0]) |
4066 | + self.assertIsNone(replies[1]) |
4067 | + self.assertEqual({'rx_id': 0}, replies[2]) |
4068 | + |
4069 | + |
4070 | +def _declare_queue(target): |
4071 | + connection = kombu.connection.BrokerConnection(transport='memory') |
4072 | + |
4073 | + # Kludge to speed up tests. |
4074 | + connection.transport.polling_interval = 0.0 |
4075 | + |
4076 | + connection.connect() |
4077 | + channel = connection.channel() |
4078 | + |
4079 | + # work around 'memory' transport bug in 1.1.3 |
4080 | + channel._new_queue('ae.undeliver') |
4081 | + |
4082 | + if target.fanout: |
4083 | + exchange = kombu.entity.Exchange(name=target.topic + '_fanout', |
4084 | + type='fanout', |
4085 | + durable=False, |
4086 | + auto_delete=True) |
4087 | + queue = kombu.entity.Queue(name=target.topic + '_fanout_12345', |
4088 | + channel=channel, |
4089 | + exchange=exchange, |
4090 | + routing_key=target.topic) |
4091 | + if target.server: |
4092 | + exchange = kombu.entity.Exchange(name='openstack', |
4093 | + type='topic', |
4094 | + durable=False, |
4095 | + auto_delete=False) |
4096 | + topic = '%s.%s' % (target.topic, target.server) |
4097 | + queue = kombu.entity.Queue(name=topic, |
4098 | + channel=channel, |
4099 | + exchange=exchange, |
4100 | + routing_key=topic) |
4101 | + else: |
4102 | + exchange = kombu.entity.Exchange(name='openstack', |
4103 | + type='topic', |
4104 | + durable=False, |
4105 | + auto_delete=False) |
4106 | + queue = kombu.entity.Queue(name=target.topic, |
4107 | + channel=channel, |
4108 | + exchange=exchange, |
4109 | + routing_key=target.topic) |
4110 | + |
4111 | + queue.declare() |
4112 | + |
4113 | + return connection, channel, queue |
4114 | + |
4115 | + |
4116 | +class TestRequestWireFormat(test_utils.BaseTestCase): |
4117 | + |
4118 | + _target = [ |
4119 | + ('topic_target', |
4120 | + dict(topic='testtopic', server=None, fanout=False)), |
4121 | + ('server_target', |
4122 | + dict(topic='testtopic', server='testserver', fanout=False)), |
4123 | + # NOTE(markmc): https://github.com/celery/kombu/issues/195 |
4124 | + ('fanout_target', |
4125 | + dict(topic='testtopic', server=None, fanout=True, |
4126 | + skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')), |
4127 | + ] |
4128 | + |
4129 | + _msg = [ |
4130 | + ('empty_msg', |
4131 | + dict(msg={}, expected={})), |
4132 | + ('primitive_msg', |
4133 | + dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})), |
4134 | + ('complex_msg', |
4135 | + dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}}, |
4136 | + expected={'a': {'b': '1920-02-03T04:05:06.000007'}})), |
4137 | + ] |
4138 | + |
4139 | + _context = [ |
4140 | + ('empty_ctxt', dict(ctxt={}, expected_ctxt={})), |
4141 | + ('user_project_ctxt', |
4142 | + dict(ctxt={'user': 'mark', 'project': 'snarkybunch'}, |
4143 | + expected_ctxt={'_context_user': 'mark', |
4144 | + '_context_project': 'snarkybunch'})), |
4145 | + ] |
4146 | + |
4147 | + @classmethod |
4148 | + def generate_scenarios(cls): |
4149 | + cls.scenarios = testscenarios.multiply_scenarios(cls._msg, |
4150 | + cls._context, |
4151 | + cls._target) |
4152 | + |
4153 | + def setUp(self): |
4154 | + super(TestRequestWireFormat, self).setUp() |
4155 | + self.messaging_conf.transport_driver = 'rabbit' |
4156 | + self.messaging_conf.in_memory = True |
4157 | + |
4158 | + self.uuids = [] |
4159 | + self.orig_uuid4 = uuid.uuid4 |
4160 | + self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4)) |
4161 | + |
4162 | + def mock_uuid4(self): |
4163 | + self.uuids.append(self.orig_uuid4()) |
4164 | + return self.uuids[-1] |
4165 | + |
4166 | + def test_request_wire_format(self): |
4167 | + if hasattr(self, 'skip_msg'): |
4168 | + self.skipTest(self.skip_msg) |
4169 | + |
4170 | + transport = messaging.get_transport(self.conf) |
4171 | + self.addCleanup(transport.cleanup) |
4172 | + |
4173 | + driver = transport._driver |
4174 | + |
4175 | + target = messaging.Target(topic=self.topic, |
4176 | + server=self.server, |
4177 | + fanout=self.fanout) |
4178 | + |
4179 | + connection, channel, queue = _declare_queue(target) |
4180 | + self.addCleanup(connection.release) |
4181 | + |
4182 | + driver.send(target, self.ctxt, self.msg) |
4183 | + |
4184 | + msgs = [] |
4185 | + |
4186 | + def callback(msg): |
4187 | + msg = channel.message_to_python(msg) |
4188 | + msg.ack() |
4189 | + msgs.append(msg.payload) |
4190 | + |
4191 | + queue.consume(callback=callback, |
4192 | + consumer_tag='1', |
4193 | + nowait=False) |
4194 | + |
4195 | + connection.drain_events() |
4196 | + |
4197 | + self.assertEqual(1, len(msgs)) |
4198 | + self.assertIn('oslo.message', msgs[0]) |
4199 | + |
4200 | + received = msgs[0] |
4201 | + received['oslo.message'] = jsonutils.loads(received['oslo.message']) |
4202 | + |
4203 | + # FIXME(markmc): add _msg_id and _reply_q check |
4204 | + expected_msg = { |
4205 | + '_unique_id': self.uuids[0].hex, |
4206 | + } |
4207 | + expected_msg.update(self.expected) |
4208 | + expected_msg.update(self.expected_ctxt) |
4209 | + |
4210 | + expected = { |
4211 | + 'oslo.version': '2.0', |
4212 | + 'oslo.message': expected_msg, |
4213 | + } |
4214 | + |
4215 | + self.assertEqual(expected, received) |
4216 | + |
4217 | + |
4218 | +TestRequestWireFormat.generate_scenarios() |
4219 | + |
4220 | + |
4221 | +def _create_producer(target): |
4222 | + connection = kombu.connection.BrokerConnection(transport='memory') |
4223 | + |
4224 | + # Kludge to speed up tests. |
4225 | + connection.transport.polling_interval = 0.0 |
4226 | + |
4227 | + connection.connect() |
4228 | + channel = connection.channel() |
4229 | + |
4230 | + # work around 'memory' transport bug in 1.1.3 |
4231 | + channel._new_queue('ae.undeliver') |
4232 | + |
4233 | + if target.fanout: |
4234 | + exchange = kombu.entity.Exchange(name=target.topic + '_fanout', |
4235 | + type='fanout', |
4236 | + durable=False, |
4237 | + auto_delete=True) |
4238 | + producer = kombu.messaging.Producer(exchange=exchange, |
4239 | + channel=channel, |
4240 | + routing_key=target.topic) |
4241 | + elif target.server: |
4242 | + exchange = kombu.entity.Exchange(name='openstack', |
4243 | + type='topic', |
4244 | + durable=False, |
4245 | + auto_delete=False) |
4246 | + topic = '%s.%s' % (target.topic, target.server) |
4247 | + producer = kombu.messaging.Producer(exchange=exchange, |
4248 | + channel=channel, |
4249 | + routing_key=topic) |
4250 | + else: |
4251 | + exchange = kombu.entity.Exchange(name='openstack', |
4252 | + type='topic', |
4253 | + durable=False, |
4254 | + auto_delete=False) |
4255 | + producer = kombu.messaging.Producer(exchange=exchange, |
4256 | + channel=channel, |
4257 | + routing_key=target.topic) |
4258 | + |
4259 | + return connection, producer |
4260 | + |
4261 | + |
4262 | +class TestReplyWireFormat(test_utils.BaseTestCase): |
4263 | + |
4264 | + _target = [ |
4265 | + ('topic_target', |
4266 | + dict(topic='testtopic', server=None, fanout=False)), |
4267 | + ('server_target', |
4268 | + dict(topic='testtopic', server='testserver', fanout=False)), |
4269 | + # NOTE(markmc): https://github.com/celery/kombu/issues/195 |
4270 | + ('fanout_target', |
4271 | + dict(topic='testtopic', server=None, fanout=True, |
4272 | + skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')), |
4273 | + ] |
4274 | + |
4275 | + _msg = [ |
4276 | + ('empty_msg', |
4277 | + dict(msg={}, expected={})), |
4278 | + ('primitive_msg', |
4279 | + dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})), |
4280 | + ('complex_msg', |
4281 | + dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}}, |
4282 | + expected={'a': {'b': '1920-02-03T04:05:06.000007'}})), |
4283 | + ] |
4284 | + |
4285 | + _context = [ |
4286 | + ('empty_ctxt', dict(ctxt={}, expected_ctxt={})), |
4287 | + ('user_project_ctxt', |
4288 | + dict(ctxt={'_context_user': 'mark', |
4289 | + '_context_project': 'snarkybunch'}, |
4290 | + expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})), |
4291 | + ] |
4292 | + |
4293 | + @classmethod |
4294 | + def generate_scenarios(cls): |
4295 | + cls.scenarios = testscenarios.multiply_scenarios(cls._msg, |
4296 | + cls._context, |
4297 | + cls._target) |
4298 | + |
4299 | + def setUp(self): |
4300 | + super(TestReplyWireFormat, self).setUp() |
4301 | + self.messaging_conf.transport_driver = 'rabbit' |
4302 | + self.messaging_conf.in_memory = True |
4303 | + |
4304 | + def test_reply_wire_format(self): |
4305 | + if hasattr(self, 'skip_msg'): |
4306 | + self.skipTest(self.skip_msg) |
4307 | + |
4308 | + transport = messaging.get_transport(self.conf) |
4309 | + self.addCleanup(transport.cleanup) |
4310 | + |
4311 | + driver = transport._driver |
4312 | + |
4313 | + target = messaging.Target(topic=self.topic, |
4314 | + server=self.server, |
4315 | + fanout=self.fanout) |
4316 | + |
4317 | + listener = driver.listen(target) |
4318 | + |
4319 | + connection, producer = _create_producer(target) |
4320 | + self.addCleanup(connection.release) |
4321 | + |
4322 | + msg = { |
4323 | + 'oslo.version': '2.0', |
4324 | + 'oslo.message': {} |
4325 | + } |
4326 | + |
4327 | + msg['oslo.message'].update(self.msg) |
4328 | + msg['oslo.message'].update(self.ctxt) |
4329 | + |
4330 | + msg['oslo.message'].update({ |
4331 | + '_msg_id': uuid.uuid4().hex, |
4332 | + '_unique_id': uuid.uuid4().hex, |
4333 | + '_reply_q': 'reply_' + uuid.uuid4().hex, |
4334 | + }) |
4335 | + |
4336 | + msg['oslo.message'] = jsonutils.dumps(msg['oslo.message']) |
4337 | + |
4338 | + producer.publish(msg) |
4339 | + |
4340 | + received = listener.poll() |
4341 | + self.assertIsNotNone(received) |
4342 | + self.assertEqual(self.expected_ctxt, received.ctxt) |
4343 | + self.assertEqual(self.expected, received.message) |
4344 | + |
4345 | + |
4346 | +TestReplyWireFormat.generate_scenarios() |
4347 | + |
4348 | + |
4349 | +class RpcKombuHATestCase(test_utils.BaseTestCase): |
4350 | + |
4351 | + def setUp(self): |
4352 | + super(RpcKombuHATestCase, self).setUp() |
4353 | + self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] |
4354 | + self.config(rabbit_hosts=self.brokers) |
4355 | + |
4356 | + hostname_sets = set() |
4357 | + self.info = {'attempt': 0, |
4358 | + 'fail': False} |
4359 | + |
4360 | + def _connect(myself, params): |
4361 | + # do as little work that is enough to pass connection attempt |
4362 | + myself.connection = kombu.connection.BrokerConnection(**params) |
4363 | + myself.connection_errors = myself.connection.connection_errors |
4364 | + |
4365 | + hostname = params['hostname'] |
4366 | + self.assertNotIn(hostname, hostname_sets) |
4367 | + hostname_sets.add(hostname) |
4368 | + |
4369 | + self.info['attempt'] += 1 |
4370 | + if self.info['fail']: |
4371 | + raise IOError('fake fail') |
4372 | + |
4373 | + # just make sure connection instantiation does not fail with an |
4374 | + # exception |
4375 | + self.stubs.Set(rabbit_driver.Connection, '_connect', _connect) |
4376 | + |
4377 | + # starting from the first broker in the list |
4378 | + url = messaging.TransportURL.parse(self.conf, None) |
4379 | + self.connection = rabbit_driver.Connection(self.conf, url) |
4380 | + self.addCleanup(self.connection.close) |
4381 | + |
4382 | + self.info.update({'attempt': 0, |
4383 | + 'fail': True}) |
4384 | + hostname_sets.clear() |
4385 | + |
4386 | + def test_reconnect_order(self): |
4387 | + self.assertRaises(messaging.MessageDeliveryFailure, |
4388 | + self.connection.reconnect, |
4389 | + retry=len(self.brokers) - 1) |
4390 | + self.assertEqual(len(self.brokers), self.info['attempt']) |
4391 | + |
4392 | + def test_ensure_four_retry(self): |
4393 | + mock_callback = mock.Mock(side_effect=IOError) |
4394 | + self.assertRaises(messaging.MessageDeliveryFailure, |
4395 | + self.connection.ensure, None, mock_callback, |
4396 | + retry=4) |
4397 | + self.assertEqual(5, self.info['attempt']) |
4398 | + self.assertEqual(1, mock_callback.call_count) |
4399 | + |
4400 | + def test_ensure_one_retry(self): |
4401 | + mock_callback = mock.Mock(side_effect=IOError) |
4402 | + self.assertRaises(messaging.MessageDeliveryFailure, |
4403 | + self.connection.ensure, None, mock_callback, |
4404 | + retry=1) |
4405 | + self.assertEqual(2, self.info['attempt']) |
4406 | + self.assertEqual(1, mock_callback.call_count) |
4407 | + |
4408 | + def test_ensure_no_retry(self): |
4409 | + mock_callback = mock.Mock(side_effect=IOError) |
4410 | + self.assertRaises(messaging.MessageDeliveryFailure, |
4411 | + self.connection.ensure, None, mock_callback, |
4412 | + retry=0) |
4413 | + self.assertEqual(1, self.info['attempt']) |
4414 | + self.assertEqual(1, mock_callback.call_count) |
4415 | |
4416 | === added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch' |
4417 | === added file '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/.timestamp' |
4418 | === added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo' |
4419 | === added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging' |
4420 | === added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers' |
4421 | === added file '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py' |
4422 | --- .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000 |
4423 | +++ .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000 |
4424 | @@ -0,0 +1,839 @@ |
4425 | +# Copyright 2011 OpenStack Foundation |
4426 | +# |
4427 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
4428 | +# not use this file except in compliance with the License. You may obtain |
4429 | +# a copy of the License at |
4430 | +# |
4431 | +# http://www.apache.org/licenses/LICENSE-2.0 |
4432 | +# |
4433 | +# Unless required by applicable law or agreed to in writing, software |
4434 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
4435 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
4436 | +# License for the specific language governing permissions and limitations |
4437 | +# under the License. |
4438 | + |
4439 | +import functools |
4440 | +import itertools |
4441 | +import logging |
4442 | +import random |
4443 | +import socket |
4444 | +import ssl |
4445 | +import time |
4446 | +import uuid |
4447 | + |
4448 | +import kombu |
4449 | +import kombu.connection |
4450 | +import kombu.entity |
4451 | +import kombu.messaging |
4452 | +import six |
4453 | + |
4454 | +from oslo.config import cfg |
4455 | +from oslo.messaging._drivers import amqp as rpc_amqp |
4456 | +from oslo.messaging._drivers import amqpdriver |
4457 | +from oslo.messaging._drivers import common as rpc_common |
4458 | +from oslo.messaging import exceptions |
4459 | +from oslo.messaging.openstack.common.gettextutils import _ |
4460 | +from oslo.utils import netutils |
4461 | + |
4462 | +rabbit_opts = [ |
4463 | + cfg.StrOpt('kombu_ssl_version', |
4464 | + default='', |
4465 | + help='SSL version to use (valid only if SSL enabled). ' |
4466 | + 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' |
4467 | + 'be available on some distributions.' |
4468 | + ), |
4469 | + cfg.StrOpt('kombu_ssl_keyfile', |
4470 | + default='', |
4471 | + help='SSL key file (valid only if SSL enabled).'), |
4472 | + cfg.StrOpt('kombu_ssl_certfile', |
4473 | + default='', |
4474 | + help='SSL cert file (valid only if SSL enabled).'), |
4475 | + cfg.StrOpt('kombu_ssl_ca_certs', |
4476 | + default='', |
4477 | + help='SSL certification authority file ' |
4478 | + '(valid only if SSL enabled).'), |
4479 | + cfg.FloatOpt('kombu_reconnect_delay', |
4480 | + default=1.0, |
4481 | + help='How long to wait before reconnecting in response to an ' |
4482 | + 'AMQP consumer cancel notification.'), |
4483 | + cfg.StrOpt('rabbit_host', |
4484 | + default='localhost', |
4485 | + help='The RabbitMQ broker address where a single node is ' |
4486 | + 'used.'), |
4487 | + cfg.IntOpt('rabbit_port', |
4488 | + default=5672, |
4489 | + help='The RabbitMQ broker port where a single node is used.'), |
4490 | + cfg.ListOpt('rabbit_hosts', |
4491 | + default=['$rabbit_host:$rabbit_port'], |
4492 | + help='RabbitMQ HA cluster host:port pairs.'), |
4493 | + cfg.BoolOpt('rabbit_use_ssl', |
4494 | + default=False, |
4495 | + help='Connect over SSL for RabbitMQ.'), |
4496 | + cfg.StrOpt('rabbit_userid', |
4497 | + default='guest', |
4498 | + help='The RabbitMQ userid.'), |
4499 | + cfg.StrOpt('rabbit_password', |
4500 | + default='guest', |
4501 | + help='The RabbitMQ password.', |
4502 | + secret=True), |
4503 | + cfg.StrOpt('rabbit_login_method', |
4504 | + default='AMQPLAIN', |
4505 | + help='the RabbitMQ login method'), |
4506 | + cfg.StrOpt('rabbit_virtual_host', |
4507 | + default='/', |
4508 | + help='The RabbitMQ virtual host.'), |
4509 | + cfg.IntOpt('rabbit_retry_interval', |
4510 | + default=1, |
4511 | + help='How frequently to retry connecting with RabbitMQ.'), |
4512 | + cfg.IntOpt('rabbit_retry_backoff', |
4513 | + default=2, |
4514 | + help='How long to backoff for between retries when connecting ' |
4515 | + 'to RabbitMQ.'), |
4516 | + cfg.IntOpt('rabbit_max_retries', |
4517 | + default=0, |
4518 | + help='Maximum number of RabbitMQ connection retries. ' |
4519 | + 'Default is 0 (infinite retry count).'), |
4520 | + cfg.BoolOpt('rabbit_ha_queues', |
4521 | + default=False, |
4522 | + help='Use HA queues in RabbitMQ (x-ha-policy: all). ' |
4523 | + 'If you change this option, you must wipe the ' |
4524 | + 'RabbitMQ database.'), |
4525 | + |
4526 | + # FIXME(markmc): this was toplevel in openstack.common.rpc |
4527 | + cfg.BoolOpt('fake_rabbit', |
4528 | + default=False, |
4529 | + help='If passed, use a fake RabbitMQ provider.'), |
4530 | +] |
4531 | + |
4532 | +LOG = logging.getLogger(__name__) |
4533 | + |
4534 | + |
4535 | +def _get_queue_arguments(conf): |
4536 | + """Construct the arguments for declaring a queue. |
4537 | + |
4538 | + If the rabbit_ha_queues option is set, we declare a mirrored queue |
4539 | + as described here: |
4540 | + |
4541 | + http://www.rabbitmq.com/ha.html |
4542 | + |
4543 | + Setting x-ha-policy to all means that the queue will be mirrored |
4544 | + to all nodes in the cluster. |
4545 | + """ |
4546 | + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} |
4547 | + |
4548 | + |
4549 | +class RabbitMessage(dict): |
4550 | + def __init__(self, raw_message): |
4551 | + super(RabbitMessage, self).__init__( |
4552 | + rpc_common.deserialize_msg(raw_message.payload)) |
4553 | + self._raw_message = raw_message |
4554 | + |
4555 | + def acknowledge(self): |
4556 | + self._raw_message.ack() |
4557 | + |
4558 | + def requeue(self): |
4559 | + self._raw_message.requeue() |
4560 | + |
4561 | + |
4562 | +class ConsumerBase(object): |
4563 | + """Consumer base class.""" |
4564 | + |
4565 | + def __init__(self, channel, callback, tag, **kwargs): |
4566 | + """Declare a queue on an amqp channel. |
4567 | + |
4568 | + 'channel' is the amqp channel to use |
4569 | + 'callback' is the callback to call when messages are received |
4570 | + 'tag' is a unique ID for the consumer on the channel |
4571 | + |
4572 | + queue name, exchange name, and other kombu options are |
4573 | + passed in here as a dictionary. |
4574 | + """ |
4575 | + self.callback = callback |
4576 | + self.tag = six.text_type(tag) |
4577 | + self.kwargs = kwargs |
4578 | + self.queue = None |
4579 | + self.reconnect(channel) |
4580 | + |
4581 | + def reconnect(self, channel): |
4582 | + """Re-declare the queue after a rabbit reconnect.""" |
4583 | + self.channel = channel |
4584 | + self.kwargs['channel'] = channel |
4585 | + self.queue = kombu.entity.Queue(**self.kwargs) |
4586 | + self.queue.declare() |
4587 | + |
4588 | + def _callback_handler(self, message, callback): |
4589 | + """Call callback with deserialized message. |
4590 | + |
4591 | + Messages that are processed and ack'ed. |
4592 | + """ |
4593 | + |
4594 | + try: |
4595 | + callback(RabbitMessage(message)) |
4596 | + except Exception: |
4597 | + LOG.exception(_("Failed to process message" |
4598 | + " ... skipping it.")) |
4599 | + message.ack() |
4600 | + |
4601 | + def consume(self, *args, **kwargs): |
4602 | + """Actually declare the consumer on the amqp channel. This will |
4603 | + start the flow of messages from the queue. Using the |
4604 | + Connection.iterconsume() iterator will process the messages, |
4605 | + calling the appropriate callback. |
4606 | + |
4607 | + If a callback is specified in kwargs, use that. Otherwise, |
4608 | + use the callback passed during __init__() |
4609 | + |
4610 | + If kwargs['nowait'] is True, then this call will block until |
4611 | + a message is read. |
4612 | + |
4613 | + """ |
4614 | + |
4615 | + options = {'consumer_tag': self.tag} |
4616 | + options['nowait'] = kwargs.get('nowait', False) |
4617 | + callback = kwargs.get('callback', self.callback) |
4618 | + if not callback: |
4619 | + raise ValueError("No callback defined") |
4620 | + |
4621 | + def _callback(raw_message): |
4622 | + message = self.channel.message_to_python(raw_message) |
4623 | + self._callback_handler(message, callback) |
4624 | + |
4625 | + self.queue.consume(*args, callback=_callback, **options) |
4626 | + |
4627 | + def cancel(self): |
4628 | + """Cancel the consuming from the queue, if it has started.""" |
4629 | + try: |
4630 | + self.queue.cancel(self.tag) |
4631 | + except KeyError as e: |
4632 | + # NOTE(comstud): Kludge to get around a amqplib bug |
4633 | + if six.text_type(e) != "u'%s'" % self.tag: |
4634 | + raise |
4635 | + self.queue = None |
4636 | + |
4637 | + |
4638 | +class DirectConsumer(ConsumerBase): |
4639 | + """Queue/consumer class for 'direct'.""" |
4640 | + |
4641 | + def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): |
4642 | + """Init a 'direct' queue. |
4643 | + |
4644 | + 'channel' is the amqp channel to use |
4645 | + 'msg_id' is the msg_id to listen on |
4646 | + 'callback' is the callback to call when messages are received |
4647 | + 'tag' is a unique ID for the consumer on the channel |
4648 | + |
4649 | + Other kombu options may be passed |
4650 | + """ |
4651 | + # Default options |
4652 | + options = {'durable': False, |
4653 | + 'queue_arguments': _get_queue_arguments(conf), |
4654 | + 'auto_delete': True, |
4655 | + 'exclusive': False} |
4656 | + options.update(kwargs) |
4657 | + exchange = kombu.entity.Exchange(name=msg_id, |
4658 | + type='direct', |
4659 | + durable=options['durable'], |
4660 | + auto_delete=options['auto_delete']) |
4661 | + super(DirectConsumer, self).__init__(channel, |
4662 | + callback, |
4663 | + tag, |
4664 | + name=msg_id, |
4665 | + exchange=exchange, |
4666 | + routing_key=msg_id, |
4667 | + **options) |
4668 | + |
4669 | + |
4670 | +class TopicConsumer(ConsumerBase): |
4671 | + """Consumer class for 'topic'.""" |
4672 | + |
4673 | + def __init__(self, conf, channel, topic, callback, tag, exchange_name, |
4674 | + name=None, **kwargs): |
4675 | + """Init a 'topic' queue. |
4676 | + |
4677 | + :param channel: the amqp channel to use |
4678 | + :param topic: the topic to listen on |
4679 | + :paramtype topic: str |
4680 | + :param callback: the callback to call when messages are received |
4681 | + :param tag: a unique ID for the consumer on the channel |
4682 | + :param exchange_name: the exchange name to use |
4683 | + :param name: optional queue name, defaults to topic |
4684 | + :paramtype name: str |
4685 | + |
4686 | + Other kombu options may be passed as keyword arguments |
4687 | + """ |
4688 | + # Default options |
4689 | + options = {'durable': conf.amqp_durable_queues, |
4690 | + 'queue_arguments': _get_queue_arguments(conf), |
4691 | + 'auto_delete': conf.amqp_auto_delete, |
4692 | + 'exclusive': False} |
4693 | + options.update(kwargs) |
4694 | + exchange = kombu.entity.Exchange(name=exchange_name, |
4695 | + type='topic', |
4696 | + durable=options['durable'], |
4697 | + auto_delete=options['auto_delete']) |
4698 | + super(TopicConsumer, self).__init__(channel, |
4699 | + callback, |
4700 | + tag, |
4701 | + name=name or topic, |
4702 | + exchange=exchange, |
4703 | + routing_key=topic, |
4704 | + **options) |
4705 | + |
4706 | + |
4707 | +class FanoutConsumer(ConsumerBase): |
4708 | + """Consumer class for 'fanout'.""" |
4709 | + |
4710 | + def __init__(self, conf, channel, topic, callback, tag, **kwargs): |
4711 | + """Init a 'fanout' queue. |
4712 | + |
4713 | + 'channel' is the amqp channel to use |
4714 | + 'topic' is the topic to listen on |
4715 | + 'callback' is the callback to call when messages are received |
4716 | + 'tag' is a unique ID for the consumer on the channel |
4717 | + |
4718 | + Other kombu options may be passed |
4719 | + """ |
4720 | + unique = uuid.uuid4().hex |
4721 | + exchange_name = '%s_fanout' % topic |
4722 | + queue_name = '%s_fanout_%s' % (topic, unique) |
4723 | + |
4724 | + # Default options |
4725 | + options = {'durable': False, |
4726 | + 'queue_arguments': _get_queue_arguments(conf), |
4727 | + 'auto_delete': True, |
4728 | + 'exclusive': False} |
4729 | + options.update(kwargs) |
4730 | + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', |
4731 | + durable=options['durable'], |
4732 | + auto_delete=options['auto_delete']) |
4733 | + super(FanoutConsumer, self).__init__(channel, callback, tag, |
4734 | + name=queue_name, |
4735 | + exchange=exchange, |
4736 | + routing_key=topic, |
4737 | + **options) |
4738 | + |
4739 | + |
4740 | +class Publisher(object): |
4741 | + """Base Publisher class.""" |
4742 | + |
4743 | + def __init__(self, channel, exchange_name, routing_key, **kwargs): |
4744 | + """Init the Publisher class with the exchange_name, routing_key, |
4745 | + and other options |
4746 | + """ |
4747 | + self.exchange_name = exchange_name |
4748 | + self.routing_key = routing_key |
4749 | + self.kwargs = kwargs |
4750 | + self.reconnect(channel) |
4751 | + |
4752 | + def reconnect(self, channel): |
4753 | + """Re-establish the Producer after a rabbit reconnection.""" |
4754 | + self.exchange = kombu.entity.Exchange(name=self.exchange_name, |
4755 | + **self.kwargs) |
4756 | + self.producer = kombu.messaging.Producer(exchange=self.exchange, |
4757 | + channel=channel, |
4758 | + routing_key=self.routing_key) |
4759 | + |
4760 | + def send(self, msg, timeout=None): |
4761 | + """Send a message.""" |
4762 | + if timeout: |
4763 | + # |
4764 | + # AMQP TTL is in milliseconds when set in the header. |
4765 | + # |
4766 | + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) |
4767 | + else: |
4768 | + self.producer.publish(msg) |
4769 | + |
4770 | + |
4771 | +class DirectPublisher(Publisher): |
4772 | + """Publisher class for 'direct'.""" |
4773 | + def __init__(self, conf, channel, topic, **kwargs): |
4774 | + """Init a 'direct' publisher. |
4775 | + |
4776 | + Kombu options may be passed as keyword args to override defaults |
4777 | + """ |
4778 | + |
4779 | + options = {'durable': False, |
4780 | + 'auto_delete': True, |
4781 | + 'exclusive': False} |
4782 | + options.update(kwargs) |
4783 | + super(DirectPublisher, self).__init__(channel, topic, topic, |
4784 | + type='direct', **options) |
4785 | + |
4786 | + |
4787 | +class TopicPublisher(Publisher): |
4788 | + """Publisher class for 'topic'.""" |
4789 | + def __init__(self, conf, channel, exchange_name, topic, **kwargs): |
4790 | + """Init a 'topic' publisher. |
4791 | + |
4792 | + Kombu options may be passed as keyword args to override defaults |
4793 | + """ |
4794 | + options = {'durable': conf.amqp_durable_queues, |
4795 | + 'auto_delete': conf.amqp_auto_delete, |
4796 | + 'exclusive': False} |
4797 | + options.update(kwargs) |
4798 | + super(TopicPublisher, self).__init__(channel, |
4799 | + exchange_name, |
4800 | + topic, |
4801 | + type='topic', |
4802 | + **options) |
4803 | + |
4804 | + |
4805 | +class FanoutPublisher(Publisher): |
4806 | + """Publisher class for 'fanout'.""" |
4807 | + def __init__(self, conf, channel, topic, **kwargs): |
4808 | + """Init a 'fanout' publisher. |
4809 | + |
4810 | + Kombu options may be passed as keyword args to override defaults |
4811 | + """ |
4812 | + options = {'durable': False, |
4813 | + 'auto_delete': True, |
4814 | + 'exclusive': False} |
4815 | + options.update(kwargs) |
4816 | + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, |
4817 | + None, type='fanout', **options) |
4818 | + |
4819 | + |
4820 | +class NotifyPublisher(TopicPublisher): |
4821 | + """Publisher class for 'notify'.""" |
4822 | + |
4823 | + def __init__(self, conf, channel, exchange_name, topic, **kwargs): |
4824 | + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) |
4825 | + self.queue_arguments = _get_queue_arguments(conf) |
4826 | + super(NotifyPublisher, self).__init__(conf, channel, exchange_name, |
4827 | + topic, **kwargs) |
4828 | + |
4829 | + def reconnect(self, channel): |
4830 | + super(NotifyPublisher, self).reconnect(channel) |
4831 | + |
4832 | + # NOTE(jerdfelt): Normally the consumer would create the queue, but |
4833 | + # we do this to ensure that messages don't get dropped if the |
4834 | + # consumer is started after we do |
4835 | + queue = kombu.entity.Queue(channel=channel, |
4836 | + exchange=self.exchange, |
4837 | + durable=self.durable, |
4838 | + name=self.routing_key, |
4839 | + routing_key=self.routing_key, |
4840 | + queue_arguments=self.queue_arguments) |
4841 | + queue.declare() |
4842 | + |
4843 | + |
4844 | +class Connection(object): |
4845 | + """Connection object.""" |
4846 | + |
4847 | + pools = {} |
4848 | + |
4849 | + def __init__(self, conf, url): |
4850 | + self.consumers = [] |
4851 | + self.conf = conf |
4852 | + self.max_retries = self.conf.rabbit_max_retries |
4853 | + # Try forever? |
4854 | + if self.max_retries <= 0: |
4855 | + self.max_retries = None |
4856 | + self.interval_start = self.conf.rabbit_retry_interval |
4857 | + self.interval_stepping = self.conf.rabbit_retry_backoff |
4858 | + # max retry-interval = 30 seconds |
4859 | + self.interval_max = 30 |
4860 | + self.memory_transport = False |
4861 | + |
4862 | + ssl_params = self._fetch_ssl_params() |
4863 | + |
4864 | + if url.virtual_host is not None: |
4865 | + virtual_host = url.virtual_host |
4866 | + else: |
4867 | + virtual_host = self.conf.rabbit_virtual_host |
4868 | + |
4869 | + self.brokers_params = [] |
4870 | + if url.hosts: |
4871 | + for host in url.hosts: |
4872 | + params = { |
4873 | + 'hostname': host.hostname, |
4874 | + 'port': host.port or 5672, |
4875 | + 'userid': host.username or '', |
4876 | + 'password': host.password or '', |
4877 | + 'login_method': self.conf.rabbit_login_method, |
4878 | + 'virtual_host': virtual_host |
4879 | + } |
4880 | + if self.conf.fake_rabbit: |
4881 | + params['transport'] = 'memory' |
4882 | + if self.conf.rabbit_use_ssl: |
4883 | + params['ssl'] = ssl_params |
4884 | + |
4885 | + self.brokers_params.append(params) |
4886 | + else: |
4887 | + # Old configuration format |
4888 | + for adr in self.conf.rabbit_hosts: |
4889 | + hostname, port = netutils.parse_host_port( |
4890 | + adr, default_port=self.conf.rabbit_port) |
4891 | + |
4892 | + params = { |
4893 | + 'hostname': hostname, |
4894 | + 'port': port, |
4895 | + 'userid': self.conf.rabbit_userid, |
4896 | + 'password': self.conf.rabbit_password, |
4897 | + 'login_method': self.conf.rabbit_login_method, |
4898 | + 'virtual_host': virtual_host |
4899 | + } |
4900 | + |
4901 | + if self.conf.fake_rabbit: |
4902 | + params['transport'] = 'memory' |
4903 | + if self.conf.rabbit_use_ssl: |
4904 | + params['ssl'] = ssl_params |
4905 | + |
4906 | + self.brokers_params.append(params) |
4907 | + |
4908 | + random.shuffle(self.brokers_params) |
4909 | + self.brokers = itertools.cycle(self.brokers_params) |
4910 | + |
4911 | + self.memory_transport = self.conf.fake_rabbit |
4912 | + |
4913 | + self.connection = None |
4914 | + self.do_consume = None |
4915 | + self.reconnect() |
4916 | + |
4917 | + # FIXME(markmc): use oslo sslutils when it is available as a library |
4918 | + _SSL_PROTOCOLS = { |
4919 | + "tlsv1": ssl.PROTOCOL_TLSv1, |
4920 | + "sslv23": ssl.PROTOCOL_SSLv23, |
4921 | + "sslv3": ssl.PROTOCOL_SSLv3 |
4922 | + } |
4923 | + |
4924 | + try: |
4925 | + _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 |
4926 | + except AttributeError: |
4927 | + pass |
4928 | + |
4929 | + @classmethod |
4930 | + def validate_ssl_version(cls, version): |
4931 | + key = version.lower() |
4932 | + try: |
4933 | + return cls._SSL_PROTOCOLS[key] |
4934 | + except KeyError: |
4935 | + raise RuntimeError(_("Invalid SSL version : %s") % version) |
4936 | + |
4937 | + def _fetch_ssl_params(self): |
4938 | + """Handles fetching what ssl params should be used for the connection |
4939 | + (if any). |
4940 | + """ |
4941 | + ssl_params = dict() |
4942 | + |
4943 | + # http://docs.python.org/library/ssl.html - ssl.wrap_socket |
4944 | + if self.conf.kombu_ssl_version: |
4945 | + ssl_params['ssl_version'] = self.validate_ssl_version( |
4946 | + self.conf.kombu_ssl_version) |
4947 | + if self.conf.kombu_ssl_keyfile: |
4948 | + ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile |
4949 | + if self.conf.kombu_ssl_certfile: |
4950 | + ssl_params['certfile'] = self.conf.kombu_ssl_certfile |
4951 | + if self.conf.kombu_ssl_ca_certs: |
4952 | + ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs |
4953 | + # We might want to allow variations in the |
4954 | + # future with this? |
4955 | + ssl_params['cert_reqs'] = ssl.CERT_REQUIRED |
4956 | + |
4957 | + # Return the extended behavior or just have the default behavior |
4958 | + return ssl_params or True |
4959 | + |
4960 | + def _connect(self, broker): |
4961 | + """Connect to rabbit. Re-establish any queues that may have |
4962 | + been declared before if we are reconnecting. Exceptions should |
4963 | + be handled by the caller. |
4964 | + """ |
4965 | + LOG.info(_("Connecting to AMQP server on " |
4966 | + "%(hostname)s:%(port)d"), broker) |
4967 | + self.connection = kombu.connection.BrokerConnection(**broker) |
4968 | + self.connection_errors = self.connection.connection_errors |
4969 | + self.channel_errors = self.connection.channel_errors |
4970 | + if self.memory_transport: |
4971 | + # Kludge to speed up tests. |
4972 | + self.connection.transport.polling_interval = 0.0 |
4973 | + self.do_consume = True |
4974 | + self.consumer_num = itertools.count(1) |
4975 | + self.connection.connect() |
4976 | + self.channel = self.connection.channel() |
4977 | + # work around 'memory' transport bug in 1.1.3 |
4978 | + if self.memory_transport: |
4979 | + self.channel._new_queue('ae.undeliver') |
4980 | + for consumer in self.consumers: |
4981 | + consumer.reconnect(self.channel) |
4982 | + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), |
4983 | + broker) |
4984 | + |
4985 | + def _disconnect(self): |
4986 | + if self.connection: |
4987 | + # XXX(nic): when reconnecting to a RabbitMQ cluster |
4988 | + # with mirrored queues in use, the attempt to release the |
4989 | + # connection can hang "indefinitely" somewhere deep down |
4990 | + # in Kombu. Blocking the thread for a bit prior to |
4991 | + # release seems to kludge around the problem where it is |
4992 | + # otherwise reproduceable. |
4993 | + if self.conf.kombu_reconnect_delay > 0: |
4994 | + LOG.info(_("Delaying reconnect for %1.1f seconds...") % |
4995 | + self.conf.kombu_reconnect_delay) |
4996 | + time.sleep(self.conf.kombu_reconnect_delay) |
4997 | + |
4998 | + try: |
4999 | + self.connection.release() |
5000 | + except self.connection_errors: |
The diff has been truncated for viewing.