Merge lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721 into lp:~ubuntu-cloud-archive/ubuntu/trusty/oslo.messaging/juno

Proposed by Xiang Hui
Status: Needs review
Proposed branch: lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721
Merge into: lp:~ubuntu-cloud-archive/ubuntu/trusty/oslo.messaging/juno
Diff against target: 9657 lines (+9295/-23)
29 files modified
.pc/.quilt_patches (+1/-0)
.pc/.quilt_series (+1/-0)
.pc/.version (+1/-0)
.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py (+444/-0)
.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py (+349/-0)
.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py (+821/-0)
.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py (+374/-0)
.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py (+49/-0)
.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py (+729/-0)
.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py (+832/-0)
.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py (+735/-0)
.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py (+839/-0)
.pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py (+374/-0)
.pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py (+839/-0)
.pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/drivers/test_impl_rabbit.py (+756/-0)
.pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py (+64/-0)
.pc/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging/_drivers/impl_rabbit.py (+865/-0)
.pc/applied-patches (+8/-0)
.pc/zmq-server-routing.patch/oslo/messaging/_drivers/impl_zmq.py (+995/-0)
debian/changelog (+8/-0)
debian/patches/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch (+24/-0)
debian/patches/series (+1/-0)
oslo/messaging/_drivers/amqpdriver.py (+16/-8)
oslo/messaging/_drivers/common.py (+26/-0)
oslo/messaging/_drivers/impl_qpid.py (+0/-1)
oslo/messaging/_drivers/impl_rabbit.py (+69/-12)
oslo/messaging/_drivers/impl_zmq.py (+2/-0)
tests/drivers/test_impl_rabbit.py (+41/-2)
tests/test_utils.py (+32/-0)
To merge this branch: bzr merge lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721
Reviewer Review Type Date Requested Status
Ubuntu Cloud Archive Team Pending
Review via email: mp+280822@code.launchpad.net

Description of the change

  * Backport of upstream release. (LP: #1318721):-
    - d/p/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
      Redeclare if exception is catched after self.queue.declare() failed.

To post a comment you must log in.

Unmerged revisions

3. By Xiang Hui

  * Backport of upstream release. (LP: #1318721):-
    - d/p/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
      Redeclare if exception is catched after self.queue.declare() failed.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added directory '.pc'
=== added file '.pc/.quilt_patches'
--- .pc/.quilt_patches 1970-01-01 00:00:00 +0000
+++ .pc/.quilt_patches 2015-12-17 11:03:34 +0000
@@ -0,0 +1,1 @@
1debian/patches
02
=== added file '.pc/.quilt_series'
--- .pc/.quilt_series 1970-01-01 00:00:00 +0000
+++ .pc/.quilt_series 2015-12-17 11:03:34 +0000
@@ -0,0 +1,1 @@
1series
02
=== added file '.pc/.version'
--- .pc/.version 1970-01-01 00:00:00 +0000
+++ .pc/.version 2015-12-17 11:03:34 +0000
@@ -0,0 +1,1 @@
12
02
=== added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch'
=== added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/.timestamp'
=== added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo'
=== added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging'
=== added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers'
=== added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py'
--- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py 1970-01-01 00:00:00 +0000
+++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py 2015-12-17 11:03:34 +0000
@@ -0,0 +1,444 @@
1
2# Copyright 2013 Red Hat, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16__all__ = ['AMQPDriverBase']
17
18import logging
19import threading
20import time
21import uuid
22
23from six import moves
24
25from oslo import messaging
26from oslo.messaging._drivers import amqp as rpc_amqp
27from oslo.messaging._drivers import base
28from oslo.messaging._drivers import common as rpc_common
29
30LOG = logging.getLogger(__name__)
31
32
33class AMQPIncomingMessage(base.IncomingMessage):
34
35 def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
36 super(AMQPIncomingMessage, self).__init__(listener, ctxt,
37 dict(message))
38
39 self.unique_id = unique_id
40 self.msg_id = msg_id
41 self.reply_q = reply_q
42 self.acknowledge_callback = message.acknowledge
43 self.requeue_callback = message.requeue
44
45 def _send_reply(self, conn, reply=None, failure=None,
46 ending=False, log_failure=True):
47 if failure:
48 failure = rpc_common.serialize_remote_exception(failure,
49 log_failure)
50
51 msg = {'result': reply, 'failure': failure}
52 if ending:
53 msg['ending'] = True
54
55 rpc_amqp._add_unique_id(msg)
56
57 # If a reply_q exists, add the msg_id to the reply and pass the
58 # reply_q to direct_send() to use it as the response queue.
59 # Otherwise use the msg_id for backward compatibility.
60 if self.reply_q:
61 msg['_msg_id'] = self.msg_id
62 conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
63 else:
64 conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
65
66 def reply(self, reply=None, failure=None, log_failure=True):
67 if not self.msg_id:
68 # NOTE(Alexei_987) not sending reply, if msg_id is empty
69 # because reply should not be expected by caller side
70 return
71 with self.listener.driver._get_connection() as conn:
72 self._send_reply(conn, reply, failure, log_failure=log_failure)
73 self._send_reply(conn, ending=True)
74
75 def acknowledge(self):
76 self.listener.msg_id_cache.add(self.unique_id)
77 self.acknowledge_callback()
78
79 def requeue(self):
80 # NOTE(sileht): In case of the connection is lost between receiving the
81 # message and requeing it, this requeue call fail
82 # but because the message is not acknowledged and not added to the
83 # msg_id_cache, the message will be reconsumed, the only difference is
84 # the message stay at the beginning of the queue instead of moving to
85 # the end.
86 self.requeue_callback()
87
88
89class AMQPListener(base.Listener):
90
91 def __init__(self, driver, conn):
92 super(AMQPListener, self).__init__(driver)
93 self.conn = conn
94 self.msg_id_cache = rpc_amqp._MsgIdCache()
95 self.incoming = []
96
97 def __call__(self, message):
98 # FIXME(markmc): logging isn't driver specific
99 rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
100
101 unique_id = self.msg_id_cache.check_duplicate_message(message)
102 ctxt = rpc_amqp.unpack_context(self.conf, message)
103
104 self.incoming.append(AMQPIncomingMessage(self,
105 ctxt.to_dict(),
106 message,
107 unique_id,
108 ctxt.msg_id,
109 ctxt.reply_q))
110
111 def poll(self, timeout=None):
112 if timeout is not None:
113 deadline = time.time() + timeout
114 else:
115 deadline = None
116 while True:
117 if self.incoming:
118 return self.incoming.pop(0)
119 if deadline is not None:
120 timeout = deadline - time.time()
121 if timeout < 0:
122 return None
123 try:
124 self.conn.consume(limit=1, timeout=timeout)
125 except rpc_common.Timeout:
126 return None
127 else:
128 self.conn.consume(limit=1)
129
130
131class ReplyWaiters(object):
132
133 WAKE_UP = object()
134
135 def __init__(self):
136 self._queues = {}
137 self._wrn_threshold = 10
138
139 def get(self, msg_id, timeout):
140 try:
141 return self._queues[msg_id].get(block=True, timeout=timeout)
142 except moves.queue.Empty:
143 raise messaging.MessagingTimeout('Timed out waiting for a reply '
144 'to message ID %s' % msg_id)
145
146 def check(self, msg_id):
147 try:
148 return self._queues[msg_id].get(block=False)
149 except moves.queue.Empty:
150 return None
151
152 def put(self, msg_id, message_data):
153 queue = self._queues.get(msg_id)
154 if not queue:
155 LOG.warn('No calling threads waiting for msg_id : %(msg_id)s'
156 ', message : %(data)s', {'msg_id': msg_id,
157 'data': message_data})
158 LOG.warn('_queues: %s', self._queues)
159 else:
160 queue.put(message_data)
161
162 def wake_all(self, except_id):
163 msg_ids = [i for i in self._queues.keys() if i != except_id]
164 for msg_id in msg_ids:
165 self.put(msg_id, self.WAKE_UP)
166
167 def add(self, msg_id, queue):
168 self._queues[msg_id] = queue
169 if len(self._queues) > self._wrn_threshold:
170 LOG.warn('Number of call queues is greater than warning '
171 'threshold: %d. There could be a leak.',
172 self._wrn_threshold)
173 self._wrn_threshold *= 2
174
175 def remove(self, msg_id):
176 del self._queues[msg_id]
177
178
179class ReplyWaiter(object):
180
181 def __init__(self, conf, reply_q, conn, allowed_remote_exmods):
182 self.conf = conf
183 self.conn = conn
184 self.reply_q = reply_q
185 self.allowed_remote_exmods = allowed_remote_exmods
186
187 self.conn_lock = threading.Lock()
188 self.incoming = []
189 self.msg_id_cache = rpc_amqp._MsgIdCache()
190 self.waiters = ReplyWaiters()
191
192 conn.declare_direct_consumer(reply_q, self)
193
194 def __call__(self, message):
195 message.acknowledge()
196 self.incoming.append(message)
197
198 def listen(self, msg_id):
199 queue = moves.queue.Queue()
200 self.waiters.add(msg_id, queue)
201
202 def unlisten(self, msg_id):
203 self.waiters.remove(msg_id)
204
205 def _process_reply(self, data):
206 result = None
207 ending = False
208 self.msg_id_cache.check_duplicate_message(data)
209 if data['failure']:
210 failure = data['failure']
211 result = rpc_common.deserialize_remote_exception(
212 failure, self.allowed_remote_exmods)
213 elif data.get('ending', False):
214 ending = True
215 else:
216 result = data['result']
217 return result, ending
218
219 def _poll_connection(self, msg_id, timeout):
220 while True:
221 while self.incoming:
222 message_data = self.incoming.pop(0)
223
224 incoming_msg_id = message_data.pop('_msg_id', None)
225 if incoming_msg_id == msg_id:
226 return self._process_reply(message_data)
227
228 self.waiters.put(incoming_msg_id, message_data)
229
230 try:
231 self.conn.consume(limit=1, timeout=timeout)
232 except rpc_common.Timeout:
233 raise messaging.MessagingTimeout('Timed out waiting for a '
234 'reply to message ID %s'
235 % msg_id)
236
237 def _poll_queue(self, msg_id, timeout):
238 message = self.waiters.get(msg_id, timeout)
239 if message is self.waiters.WAKE_UP:
240 return None, None, True # lock was released
241
242 reply, ending = self._process_reply(message)
243 return reply, ending, False
244
245 def _check_queue(self, msg_id):
246 while True:
247 message = self.waiters.check(msg_id)
248 if message is self.waiters.WAKE_UP:
249 continue
250 if message is None:
251 return None, None, True # queue is empty
252
253 reply, ending = self._process_reply(message)
254 return reply, ending, False
255
256 def wait(self, msg_id, timeout):
257 #
258 # NOTE(markmc): we're waiting for a reply for msg_id to come in for on
259 # the reply_q, but there may be other threads also waiting for replies
260 # to other msg_ids
261 #
262 # Only one thread can be consuming from the queue using this connection
263 # and we don't want to hold open a connection per thread, so instead we
264 # have the first thread take responsibility for passing replies not
265 # intended for itself to the appropriate thread.
266 #
267 final_reply = None
268 while True:
269 if self.conn_lock.acquire(False):
270 # Ok, we're the thread responsible for polling the connection
271 try:
272 # Check the queue to see if a previous lock-holding thread
273 # queued up a reply already
274 while True:
275 reply, ending, empty = self._check_queue(msg_id)
276 if empty:
277 break
278 if not ending:
279 final_reply = reply
280 else:
281 return final_reply
282
283 # Now actually poll the connection
284 while True:
285 reply, ending = self._poll_connection(msg_id, timeout)
286 if not ending:
287 final_reply = reply
288 else:
289 return final_reply
290 finally:
291 self.conn_lock.release()
292 # We've got our reply, tell the other threads to wake up
293 # so that one of them will take over the responsibility for
294 # polling the connection
295 self.waiters.wake_all(msg_id)
296 else:
297 # We're going to wait for the first thread to pass us our reply
298 reply, ending, trylock = self._poll_queue(msg_id, timeout)
299 if trylock:
300 # The first thread got its reply, let's try and take over
301 # the responsibility for polling
302 continue
303 if not ending:
304 final_reply = reply
305 else:
306 return final_reply
307
308
309class AMQPDriverBase(base.BaseDriver):
310
311 def __init__(self, conf, url, connection_pool,
312 default_exchange=None, allowed_remote_exmods=None):
313 super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
314 allowed_remote_exmods)
315
316 self._default_exchange = default_exchange
317
318 self._connection_pool = connection_pool
319
320 self._reply_q_lock = threading.Lock()
321 self._reply_q = None
322 self._reply_q_conn = None
323 self._waiter = None
324
325 def _get_exchange(self, target):
326 return target.exchange or self._default_exchange
327
328 def _get_connection(self, pooled=True):
329 return rpc_amqp.ConnectionContext(self.conf,
330 self._url,
331 self._connection_pool,
332 pooled=pooled)
333
334 def _get_reply_q(self):
335 with self._reply_q_lock:
336 if self._reply_q is not None:
337 return self._reply_q
338
339 reply_q = 'reply_' + uuid.uuid4().hex
340
341 conn = self._get_connection(pooled=False)
342
343 self._waiter = ReplyWaiter(self.conf, reply_q, conn,
344 self._allowed_remote_exmods)
345
346 self._reply_q = reply_q
347 self._reply_q_conn = conn
348
349 return self._reply_q
350
351 def _send(self, target, ctxt, message,
352 wait_for_reply=None, timeout=None,
353 envelope=True, notify=False, retry=None):
354
355 # FIXME(markmc): remove this temporary hack
356 class Context(object):
357 def __init__(self, d):
358 self.d = d
359
360 def to_dict(self):
361 return self.d
362
363 context = Context(ctxt)
364 msg = message
365
366 if wait_for_reply:
367 msg_id = uuid.uuid4().hex
368 msg.update({'_msg_id': msg_id})
369 LOG.debug('MSG_ID is %s', msg_id)
370 msg.update({'_reply_q': self._get_reply_q()})
371
372 rpc_amqp._add_unique_id(msg)
373 rpc_amqp.pack_context(msg, context)
374
375 if envelope:
376 msg = rpc_common.serialize_msg(msg)
377
378 if wait_for_reply:
379 self._waiter.listen(msg_id)
380
381 try:
382 with self._get_connection() as conn:
383 if notify:
384 conn.notify_send(self._get_exchange(target),
385 target.topic, msg, retry=retry)
386 elif target.fanout:
387 conn.fanout_send(target.topic, msg, retry=retry)
388 else:
389 topic = target.topic
390 if target.server:
391 topic = '%s.%s' % (target.topic, target.server)
392 conn.topic_send(exchange_name=self._get_exchange(target),
393 topic=topic, msg=msg, timeout=timeout,
394 retry=retry)
395
396 if wait_for_reply:
397 result = self._waiter.wait(msg_id, timeout)
398 if isinstance(result, Exception):
399 raise result
400 return result
401 finally:
402 if wait_for_reply:
403 self._waiter.unlisten(msg_id)
404
405 def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
406 retry=None):
407 return self._send(target, ctxt, message, wait_for_reply, timeout,
408 retry=retry)
409
410 def send_notification(self, target, ctxt, message, version, retry=None):
411 return self._send(target, ctxt, message,
412 envelope=(version == 2.0), notify=True, retry=retry)
413
414 def listen(self, target):
415 conn = self._get_connection(pooled=False)
416
417 listener = AMQPListener(self, conn)
418
419 conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
420 topic=target.topic,
421 callback=listener)
422 conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
423 topic='%s.%s' % (target.topic,
424 target.server),
425 callback=listener)
426 conn.declare_fanout_consumer(target.topic, listener)
427
428 return listener
429
430 def listen_for_notifications(self, targets_and_priorities):
431 conn = self._get_connection(pooled=False)
432
433 listener = AMQPListener(self, conn)
434 for target, priority in targets_and_priorities:
435 conn.declare_topic_consumer(
436 exchange_name=self._get_exchange(target),
437 topic='%s.%s' % (target.topic, priority),
438 callback=listener)
439 return listener
440
441 def cleanup(self):
442 if self._connection_pool:
443 self._connection_pool.empty()
444 self._connection_pool = None
0445
=== added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py'
--- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
+++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py 2015-12-17 11:03:34 +0000
@@ -0,0 +1,349 @@
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4# Copyright 2011 Red Hat, Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
18import copy
19import logging
20import sys
21import traceback
22
23import six
24
25from oslo import messaging
26from oslo.messaging import _utils as utils
27from oslo.messaging.openstack.common.gettextutils import _
28from oslo.messaging.openstack.common import jsonutils
29
30LOG = logging.getLogger(__name__)
31
32_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
33
34
35'''RPC Envelope Version.
36
37This version number applies to the top level structure of messages sent out.
38It does *not* apply to the message payload, which must be versioned
39independently. For example, when using rpc APIs, a version number is applied
40for changes to the API being exposed over rpc. This version number is handled
41in the rpc proxy and dispatcher modules.
42
43This version number applies to the message envelope that is used in the
44serialization done inside the rpc layer. See serialize_msg() and
45deserialize_msg().
46
47The current message format (version 2.0) is very simple. It is:
48
49 {
50 'oslo.version': <RPC Envelope Version as a String>,
51 'oslo.message': <Application Message Payload, JSON encoded>
52 }
53
54Message format version '1.0' is just considered to be the messages we sent
55without a message envelope.
56
57So, the current message envelope just includes the envelope version. It may
58eventually contain additional information, such as a signature for the message
59payload.
60
61We will JSON encode the application message payload. The message envelope,
62which includes the JSON encoded application message body, will be passed down
63to the messaging libraries as a dict.
64'''
65_RPC_ENVELOPE_VERSION = '2.0'
66
67_VERSION_KEY = 'oslo.version'
68_MESSAGE_KEY = 'oslo.message'
69
70_REMOTE_POSTFIX = '_Remote'
71
72
73class RPCException(Exception):
74 msg_fmt = _("An unknown RPC related exception occurred.")
75
76 def __init__(self, message=None, **kwargs):
77 self.kwargs = kwargs
78
79 if not message:
80 try:
81 message = self.msg_fmt % kwargs
82
83 except Exception:
84 # kwargs doesn't match a variable in the message
85 # log the issue and the kwargs
86 LOG.exception(_('Exception in string format operation'))
87 for name, value in six.iteritems(kwargs):
88 LOG.error("%s: %s", name, value)
89 # at least get the core message out if something happened
90 message = self.msg_fmt
91
92 super(RPCException, self).__init__(message)
93
94
95class Timeout(RPCException):
96 """Signifies that a timeout has occurred.
97
98 This exception is raised if the rpc_response_timeout is reached while
99 waiting for a response from the remote side.
100 """
101 msg_fmt = _('Timeout while waiting on RPC response - '
102 'topic: "%(topic)s", RPC method: "%(method)s" '
103 'info: "%(info)s"')
104
105 def __init__(self, info=None, topic=None, method=None):
106 """Initiates Timeout object.
107
108 :param info: Extra info to convey to the user
109 :param topic: The topic that the rpc call was sent to
110 :param rpc_method_name: The name of the rpc method being
111 called
112 """
113 self.info = info
114 self.topic = topic
115 self.method = method
116 super(Timeout, self).__init__(
117 None,
118 info=info or _('<unknown>'),
119 topic=topic or _('<unknown>'),
120 method=method or _('<unknown>'))
121
122
123class DuplicateMessageError(RPCException):
124 msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
125
126
127class InvalidRPCConnectionReuse(RPCException):
128 msg_fmt = _("Invalid reuse of an RPC connection.")
129
130
131class UnsupportedRpcVersion(RPCException):
132 msg_fmt = _("Specified RPC version, %(version)s, not supported by "
133 "this endpoint.")
134
135
136class UnsupportedRpcEnvelopeVersion(RPCException):
137 msg_fmt = _("Specified RPC envelope version, %(version)s, "
138 "not supported by this endpoint.")
139
140
141class RpcVersionCapError(RPCException):
142 msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
143
144
145class Connection(object):
146 """A connection, returned by rpc.create_connection().
147
148 This class represents a connection to the message bus used for rpc.
149 An instance of this class should never be created by users of the rpc API.
150 Use rpc.create_connection() instead.
151 """
152 def close(self):
153 """Close the connection.
154
155 This method must be called when the connection will no longer be used.
156 It will ensure that any resources associated with the connection, such
157 as a network connection, and cleaned up.
158 """
159 raise NotImplementedError()
160
161
162def _safe_log(log_func, msg, msg_data):
163 """Sanitizes the msg_data field before logging."""
164 SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
165
166 def _fix_passwords(d):
167 """Sanitizes the password fields in the dictionary."""
168 for k in six.iterkeys(d):
169 if k.lower().find('password') != -1:
170 d[k] = '<SANITIZED>'
171 elif k.lower() in SANITIZE:
172 d[k] = '<SANITIZED>'
173 elif isinstance(d[k], dict):
174 _fix_passwords(d[k])
175 return d
176
177 return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
178
179
180def serialize_remote_exception(failure_info, log_failure=True):
181 """Prepares exception data to be sent over rpc.
182
183 Failure_info should be a sys.exc_info() tuple.
184
185 """
186 tb = traceback.format_exception(*failure_info)
187 failure = failure_info[1]
188 if log_failure:
189 LOG.error(_("Returning exception %s to caller"),
190 six.text_type(failure))
191 LOG.error(tb)
192
193 kwargs = {}
194 if hasattr(failure, 'kwargs'):
195 kwargs = failure.kwargs
196
197 # NOTE(matiu): With cells, it's possible to re-raise remote, remote
198 # exceptions. Lets turn it back into the original exception type.
199 cls_name = six.text_type(failure.__class__.__name__)
200 mod_name = six.text_type(failure.__class__.__module__)
201 if (cls_name.endswith(_REMOTE_POSTFIX) and
202 mod_name.endswith(_REMOTE_POSTFIX)):
203 cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
204 mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
205
206 data = {
207 'class': cls_name,
208 'module': mod_name,
209 'message': six.text_type(failure),
210 'tb': tb,
211 'args': failure.args,
212 'kwargs': kwargs
213 }
214
215 json_data = jsonutils.dumps(data)
216
217 return json_data
218
219
220def deserialize_remote_exception(data, allowed_remote_exmods):
221 failure = jsonutils.loads(six.text_type(data))
222
223 trace = failure.get('tb', [])
224 message = failure.get('message', "") + "\n" + "\n".join(trace)
225 name = failure.get('class')
226 module = failure.get('module')
227
228 # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
229 # order to prevent arbitrary code execution.
230 if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
231 return messaging.RemoteError(name, failure.get('message'), trace)
232
233 try:
234 __import__(module)
235 mod = sys.modules[module]
236 klass = getattr(mod, name)
237 if not issubclass(klass, Exception):
238 raise TypeError("Can only deserialize Exceptions")
239
240 failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
241 except (AttributeError, TypeError, ImportError):
242 return messaging.RemoteError(name, failure.get('message'), trace)
243
244 ex_type = type(failure)
245 str_override = lambda self: message
246 new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
247 {'__str__': str_override, '__unicode__': str_override})
248 new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
249 try:
250 # NOTE(ameade): Dynamically create a new exception type and swap it in
251 # as the new type for the exception. This only works on user defined
252 # Exceptions and not core Python exceptions. This is important because
253 # we cannot necessarily change an exception message so we must override
254 # the __str__ method.
255 failure.__class__ = new_ex_type
256 except TypeError:
257 # NOTE(ameade): If a core exception then just add the traceback to the
258 # first exception argument.
259 failure.args = (message,) + failure.args[1:]
260 return failure
261
262
263class CommonRpcContext(object):
264 def __init__(self, **kwargs):
265 self.values = kwargs
266
267 def __getattr__(self, key):
268 try:
269 return self.values[key]
270 except KeyError:
271 raise AttributeError(key)
272
273 def to_dict(self):
274 return copy.deepcopy(self.values)
275
276 @classmethod
277 def from_dict(cls, values):
278 return cls(**values)
279
280 def deepcopy(self):
281 return self.from_dict(self.to_dict())
282
283 def update_store(self):
284 # local.store.context = self
285 pass
286
287
288class ClientException(Exception):
289 """Encapsulates actual exception expected to be hit by a RPC proxy object.
290
291 Merely instantiating it records the current exception information, which
292 will be passed back to the RPC client without exceptional logging.
293 """
294 def __init__(self):
295 self._exc_info = sys.exc_info()
296
297
298def serialize_msg(raw_msg):
299 # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
300 # information about this format.
301 msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
302 _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
303
304 return msg
305
306
307def deserialize_msg(msg):
308 # NOTE(russellb): Hang on to your hats, this road is about to
309 # get a little bumpy.
310 #
311 # Robustness Principle:
312 # "Be strict in what you send, liberal in what you accept."
313 #
314 # At this point we have to do a bit of guessing about what it
315 # is we just received. Here is the set of possibilities:
316 #
317 # 1) We received a dict. This could be 2 things:
318 #
319 # a) Inspect it to see if it looks like a standard message envelope.
320 # If so, great!
321 #
322 # b) If it doesn't look like a standard message envelope, it could either
323 # be a notification, or a message from before we added a message
324 # envelope (referred to as version 1.0).
325 # Just return the message as-is.
326 #
327 # 2) It's any other non-dict type. Just return it and hope for the best.
328 # This case covers return values from rpc.call() from before message
329 # envelopes were used. (messages to call a method were always a dict)
330
331 if not isinstance(msg, dict):
332 # See #2 above.
333 return msg
334
335 base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
336 if not all(map(lambda key: key in msg, base_envelope_keys)):
337 # See #1.b above.
338 return msg
339
340 # At this point we think we have the message envelope
341 # format we were expecting. (#1.a above)
342
343 if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
344 msg[_VERSION_KEY]):
345 raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
346
347 raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
348
349 return raw_msg
0350
=== added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py'
--- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
+++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000
@@ -0,0 +1,821 @@
1# Copyright 2011 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import functools
16import itertools
17import logging
18import random
19import socket
20import ssl
21import time
22import uuid
23
24import kombu
25import kombu.connection
26import kombu.entity
27import kombu.messaging
28import six
29
30from oslo.config import cfg
31from oslo.messaging._drivers import amqp as rpc_amqp
32from oslo.messaging._drivers import amqpdriver
33from oslo.messaging._drivers import common as rpc_common
34from oslo.messaging import exceptions
35from oslo.messaging.openstack.common.gettextutils import _
36from oslo.utils import netutils
37
38rabbit_opts = [
39 cfg.StrOpt('kombu_ssl_version',
40 default='',
41 help='SSL version to use (valid only if SSL enabled). '
42 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
43 'be available on some distributions.'
44 ),
45 cfg.StrOpt('kombu_ssl_keyfile',
46 default='',
47 help='SSL key file (valid only if SSL enabled).'),
48 cfg.StrOpt('kombu_ssl_certfile',
49 default='',
50 help='SSL cert file (valid only if SSL enabled).'),
51 cfg.StrOpt('kombu_ssl_ca_certs',
52 default='',
53 help='SSL certification authority file '
54 '(valid only if SSL enabled).'),
55 cfg.FloatOpt('kombu_reconnect_delay',
56 default=1.0,
57 help='How long to wait before reconnecting in response to an '
58 'AMQP consumer cancel notification.'),
59 cfg.StrOpt('rabbit_host',
60 default='localhost',
61 help='The RabbitMQ broker address where a single node is '
62 'used.'),
63 cfg.IntOpt('rabbit_port',
64 default=5672,
65 help='The RabbitMQ broker port where a single node is used.'),
66 cfg.ListOpt('rabbit_hosts',
67 default=['$rabbit_host:$rabbit_port'],
68 help='RabbitMQ HA cluster host:port pairs.'),
69 cfg.BoolOpt('rabbit_use_ssl',
70 default=False,
71 help='Connect over SSL for RabbitMQ.'),
72 cfg.StrOpt('rabbit_userid',
73 default='guest',
74 help='The RabbitMQ userid.'),
75 cfg.StrOpt('rabbit_password',
76 default='guest',
77 help='The RabbitMQ password.',
78 secret=True),
79 cfg.StrOpt('rabbit_login_method',
80 default='AMQPLAIN',
81 help='the RabbitMQ login method'),
82 cfg.StrOpt('rabbit_virtual_host',
83 default='/',
84 help='The RabbitMQ virtual host.'),
85 cfg.IntOpt('rabbit_retry_interval',
86 default=1,
87 help='How frequently to retry connecting with RabbitMQ.'),
88 cfg.IntOpt('rabbit_retry_backoff',
89 default=2,
90 help='How long to backoff for between retries when connecting '
91 'to RabbitMQ.'),
92 cfg.IntOpt('rabbit_max_retries',
93 default=0,
94 help='Maximum number of RabbitMQ connection retries. '
95 'Default is 0 (infinite retry count).'),
96 cfg.BoolOpt('rabbit_ha_queues',
97 default=False,
98 help='Use HA queues in RabbitMQ (x-ha-policy: all). '
99 'If you change this option, you must wipe the '
100 'RabbitMQ database.'),
101
102 # FIXME(markmc): this was toplevel in openstack.common.rpc
103 cfg.BoolOpt('fake_rabbit',
104 default=False,
105 help='If passed, use a fake RabbitMQ provider.'),
106]
107
108LOG = logging.getLogger(__name__)
109
110
111def _get_queue_arguments(conf):
112 """Construct the arguments for declaring a queue.
113
114 If the rabbit_ha_queues option is set, we declare a mirrored queue
115 as described here:
116
117 http://www.rabbitmq.com/ha.html
118
119 Setting x-ha-policy to all means that the queue will be mirrored
120 to all nodes in the cluster.
121 """
122 return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
123
124
125class RabbitMessage(dict):
126 def __init__(self, raw_message):
127 super(RabbitMessage, self).__init__(
128 rpc_common.deserialize_msg(raw_message.payload))
129 self._raw_message = raw_message
130
131 def acknowledge(self):
132 self._raw_message.ack()
133
134 def requeue(self):
135 self._raw_message.requeue()
136
137
138class ConsumerBase(object):
139 """Consumer base class."""
140
141 def __init__(self, channel, callback, tag, **kwargs):
142 """Declare a queue on an amqp channel.
143
144 'channel' is the amqp channel to use
145 'callback' is the callback to call when messages are received
146 'tag' is a unique ID for the consumer on the channel
147
148 queue name, exchange name, and other kombu options are
149 passed in here as a dictionary.
150 """
151 self.callback = callback
152 self.tag = six.text_type(tag)
153 self.kwargs = kwargs
154 self.queue = None
155 self.reconnect(channel)
156
157 def reconnect(self, channel):
158 """Re-declare the queue after a rabbit reconnect."""
159 self.channel = channel
160 self.kwargs['channel'] = channel
161 self.queue = kombu.entity.Queue(**self.kwargs)
162 self.queue.declare()
163
164 def _callback_handler(self, message, callback):
165 """Call callback with deserialized message.
166
167 Messages that are processed and ack'ed.
168 """
169
170 try:
171 callback(RabbitMessage(message))
172 except Exception:
173 LOG.exception(_("Failed to process message"
174 " ... skipping it."))
175 message.ack()
176
177 def consume(self, *args, **kwargs):
178 """Actually declare the consumer on the amqp channel. This will
179 start the flow of messages from the queue. Using the
180 Connection.iterconsume() iterator will process the messages,
181 calling the appropriate callback.
182
183 If a callback is specified in kwargs, use that. Otherwise,
184 use the callback passed during __init__()
185
186 If kwargs['nowait'] is True, then this call will block until
187 a message is read.
188
189 """
190
191 options = {'consumer_tag': self.tag}
192 options['nowait'] = kwargs.get('nowait', False)
193 callback = kwargs.get('callback', self.callback)
194 if not callback:
195 raise ValueError("No callback defined")
196
197 def _callback(raw_message):
198 message = self.channel.message_to_python(raw_message)
199 self._callback_handler(message, callback)
200
201 self.queue.consume(*args, callback=_callback, **options)
202
203 def cancel(self):
204 """Cancel the consuming from the queue, if it has started."""
205 try:
206 self.queue.cancel(self.tag)
207 except KeyError as e:
208 # NOTE(comstud): Kludge to get around a amqplib bug
209 if six.text_type(e) != "u'%s'" % self.tag:
210 raise
211 self.queue = None
212
213
214class DirectConsumer(ConsumerBase):
215 """Queue/consumer class for 'direct'."""
216
217 def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
218 """Init a 'direct' queue.
219
220 'channel' is the amqp channel to use
221 'msg_id' is the msg_id to listen on
222 'callback' is the callback to call when messages are received
223 'tag' is a unique ID for the consumer on the channel
224
225 Other kombu options may be passed
226 """
227 # Default options
228 options = {'durable': False,
229 'queue_arguments': _get_queue_arguments(conf),
230 'auto_delete': True,
231 'exclusive': False}
232 options.update(kwargs)
233 exchange = kombu.entity.Exchange(name=msg_id,
234 type='direct',
235 durable=options['durable'],
236 auto_delete=options['auto_delete'])
237 super(DirectConsumer, self).__init__(channel,
238 callback,
239 tag,
240 name=msg_id,
241 exchange=exchange,
242 routing_key=msg_id,
243 **options)
244
245
246class TopicConsumer(ConsumerBase):
247 """Consumer class for 'topic'."""
248
249 def __init__(self, conf, channel, topic, callback, tag, exchange_name,
250 name=None, **kwargs):
251 """Init a 'topic' queue.
252
253 :param channel: the amqp channel to use
254 :param topic: the topic to listen on
255 :paramtype topic: str
256 :param callback: the callback to call when messages are received
257 :param tag: a unique ID for the consumer on the channel
258 :param exchange_name: the exchange name to use
259 :param name: optional queue name, defaults to topic
260 :paramtype name: str
261
262 Other kombu options may be passed as keyword arguments
263 """
264 # Default options
265 options = {'durable': conf.amqp_durable_queues,
266 'queue_arguments': _get_queue_arguments(conf),
267 'auto_delete': conf.amqp_auto_delete,
268 'exclusive': False}
269 options.update(kwargs)
270 exchange = kombu.entity.Exchange(name=exchange_name,
271 type='topic',
272 durable=options['durable'],
273 auto_delete=options['auto_delete'])
274 super(TopicConsumer, self).__init__(channel,
275 callback,
276 tag,
277 name=name or topic,
278 exchange=exchange,
279 routing_key=topic,
280 **options)
281
282
283class FanoutConsumer(ConsumerBase):
284 """Consumer class for 'fanout'."""
285
286 def __init__(self, conf, channel, topic, callback, tag, **kwargs):
287 """Init a 'fanout' queue.
288
289 'channel' is the amqp channel to use
290 'topic' is the topic to listen on
291 'callback' is the callback to call when messages are received
292 'tag' is a unique ID for the consumer on the channel
293
294 Other kombu options may be passed
295 """
296 unique = uuid.uuid4().hex
297 exchange_name = '%s_fanout' % topic
298 queue_name = '%s_fanout_%s' % (topic, unique)
299
300 # Default options
301 options = {'durable': False,
302 'queue_arguments': _get_queue_arguments(conf),
303 'auto_delete': True,
304 'exclusive': False}
305 options.update(kwargs)
306 exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
307 durable=options['durable'],
308 auto_delete=options['auto_delete'])
309 super(FanoutConsumer, self).__init__(channel, callback, tag,
310 name=queue_name,
311 exchange=exchange,
312 routing_key=topic,
313 **options)
314
315
316class Publisher(object):
317 """Base Publisher class."""
318
319 def __init__(self, channel, exchange_name, routing_key, **kwargs):
320 """Init the Publisher class with the exchange_name, routing_key,
321 and other options
322 """
323 self.exchange_name = exchange_name
324 self.routing_key = routing_key
325 self.kwargs = kwargs
326 self.reconnect(channel)
327
328 def reconnect(self, channel):
329 """Re-establish the Producer after a rabbit reconnection."""
330 self.exchange = kombu.entity.Exchange(name=self.exchange_name,
331 **self.kwargs)
332 self.producer = kombu.messaging.Producer(exchange=self.exchange,
333 channel=channel,
334 routing_key=self.routing_key)
335
336 def send(self, msg, timeout=None):
337 """Send a message."""
338 if timeout:
339 #
340 # AMQP TTL is in milliseconds when set in the header.
341 #
342 self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
343 else:
344 self.producer.publish(msg)
345
346
347class DirectPublisher(Publisher):
348 """Publisher class for 'direct'."""
349 def __init__(self, conf, channel, topic, **kwargs):
350 """Init a 'direct' publisher.
351
352 Kombu options may be passed as keyword args to override defaults
353 """
354
355 options = {'durable': False,
356 'auto_delete': True,
357 'exclusive': False}
358 options.update(kwargs)
359 super(DirectPublisher, self).__init__(channel, topic, topic,
360 type='direct', **options)
361
362
363class TopicPublisher(Publisher):
364 """Publisher class for 'topic'."""
365 def __init__(self, conf, channel, exchange_name, topic, **kwargs):
366 """Init a 'topic' publisher.
367
368 Kombu options may be passed as keyword args to override defaults
369 """
370 options = {'durable': conf.amqp_durable_queues,
371 'auto_delete': conf.amqp_auto_delete,
372 'exclusive': False}
373 options.update(kwargs)
374 super(TopicPublisher, self).__init__(channel,
375 exchange_name,
376 topic,
377 type='topic',
378 **options)
379
380
381class FanoutPublisher(Publisher):
382 """Publisher class for 'fanout'."""
383 def __init__(self, conf, channel, topic, **kwargs):
384 """Init a 'fanout' publisher.
385
386 Kombu options may be passed as keyword args to override defaults
387 """
388 options = {'durable': False,
389 'auto_delete': True,
390 'exclusive': False}
391 options.update(kwargs)
392 super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
393 None, type='fanout', **options)
394
395
396class NotifyPublisher(TopicPublisher):
397 """Publisher class for 'notify'."""
398
399 def __init__(self, conf, channel, exchange_name, topic, **kwargs):
400 self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
401 self.queue_arguments = _get_queue_arguments(conf)
402 super(NotifyPublisher, self).__init__(conf, channel, exchange_name,
403 topic, **kwargs)
404
405 def reconnect(self, channel):
406 super(NotifyPublisher, self).reconnect(channel)
407
408 # NOTE(jerdfelt): Normally the consumer would create the queue, but
409 # we do this to ensure that messages don't get dropped if the
410 # consumer is started after we do
411 queue = kombu.entity.Queue(channel=channel,
412 exchange=self.exchange,
413 durable=self.durable,
414 name=self.routing_key,
415 routing_key=self.routing_key,
416 queue_arguments=self.queue_arguments)
417 queue.declare()
418
419
420class Connection(object):
421 """Connection object."""
422
423 pools = {}
424
425 def __init__(self, conf, url):
426 self.consumers = []
427 self.conf = conf
428 self.max_retries = self.conf.rabbit_max_retries
429 # Try forever?
430 if self.max_retries <= 0:
431 self.max_retries = None
432 self.interval_start = self.conf.rabbit_retry_interval
433 self.interval_stepping = self.conf.rabbit_retry_backoff
434 # max retry-interval = 30 seconds
435 self.interval_max = 30
436 self.memory_transport = False
437
438 ssl_params = self._fetch_ssl_params()
439
440 if url.virtual_host is not None:
441 virtual_host = url.virtual_host
442 else:
443 virtual_host = self.conf.rabbit_virtual_host
444
445 self.brokers_params = []
446 if url.hosts:
447 for host in url.hosts:
448 params = {
449 'hostname': host.hostname,
450 'port': host.port or 5672,
451 'userid': host.username or '',
452 'password': host.password or '',
453 'login_method': self.conf.rabbit_login_method,
454 'virtual_host': virtual_host
455 }
456 if self.conf.fake_rabbit:
457 params['transport'] = 'memory'
458 if self.conf.rabbit_use_ssl:
459 params['ssl'] = ssl_params
460
461 self.brokers_params.append(params)
462 else:
463 # Old configuration format
464 for adr in self.conf.rabbit_hosts:
465 hostname, port = netutils.parse_host_port(
466 adr, default_port=self.conf.rabbit_port)
467
468 params = {
469 'hostname': hostname,
470 'port': port,
471 'userid': self.conf.rabbit_userid,
472 'password': self.conf.rabbit_password,
473 'login_method': self.conf.rabbit_login_method,
474 'virtual_host': virtual_host
475 }
476
477 if self.conf.fake_rabbit:
478 params['transport'] = 'memory'
479 if self.conf.rabbit_use_ssl:
480 params['ssl'] = ssl_params
481
482 self.brokers_params.append(params)
483
484 random.shuffle(self.brokers_params)
485 self.brokers = itertools.cycle(self.brokers_params)
486
487 self.memory_transport = self.conf.fake_rabbit
488
489 self.connection = None
490 self.do_consume = None
491 self.reconnect()
492
493 # FIXME(markmc): use oslo sslutils when it is available as a library
494 _SSL_PROTOCOLS = {
495 "tlsv1": ssl.PROTOCOL_TLSv1,
496 "sslv23": ssl.PROTOCOL_SSLv23,
497 "sslv3": ssl.PROTOCOL_SSLv3
498 }
499
500 try:
501 _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
502 except AttributeError:
503 pass
504
505 @classmethod
506 def validate_ssl_version(cls, version):
507 key = version.lower()
508 try:
509 return cls._SSL_PROTOCOLS[key]
510 except KeyError:
511 raise RuntimeError(_("Invalid SSL version : %s") % version)
512
513 def _fetch_ssl_params(self):
514 """Handles fetching what ssl params should be used for the connection
515 (if any).
516 """
517 ssl_params = dict()
518
519 # http://docs.python.org/library/ssl.html - ssl.wrap_socket
520 if self.conf.kombu_ssl_version:
521 ssl_params['ssl_version'] = self.validate_ssl_version(
522 self.conf.kombu_ssl_version)
523 if self.conf.kombu_ssl_keyfile:
524 ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
525 if self.conf.kombu_ssl_certfile:
526 ssl_params['certfile'] = self.conf.kombu_ssl_certfile
527 if self.conf.kombu_ssl_ca_certs:
528 ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
529 # We might want to allow variations in the
530 # future with this?
531 ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
532
533 # Return the extended behavior or just have the default behavior
534 return ssl_params or True
535
536 def _connect(self, broker):
537 """Connect to rabbit. Re-establish any queues that may have
538 been declared before if we are reconnecting. Exceptions should
539 be handled by the caller.
540 """
541 LOG.info(_("Connecting to AMQP server on "
542 "%(hostname)s:%(port)d"), broker)
543 self.connection = kombu.connection.BrokerConnection(**broker)
544 self.connection_errors = self.connection.connection_errors
545 self.channel_errors = self.connection.channel_errors
546 if self.memory_transport:
547 # Kludge to speed up tests.
548 self.connection.transport.polling_interval = 0.0
549 self.do_consume = True
550 self.consumer_num = itertools.count(1)
551 self.connection.connect()
552 self.channel = self.connection.channel()
553 # work around 'memory' transport bug in 1.1.3
554 if self.memory_transport:
555 self.channel._new_queue('ae.undeliver')
556 for consumer in self.consumers:
557 consumer.reconnect(self.channel)
558 LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
559 broker)
560
561 def _disconnect(self):
562 if self.connection:
563 # XXX(nic): when reconnecting to a RabbitMQ cluster
564 # with mirrored queues in use, the attempt to release the
565 # connection can hang "indefinitely" somewhere deep down
566 # in Kombu. Blocking the thread for a bit prior to
567 # release seems to kludge around the problem where it is
568 # otherwise reproduceable.
569 if self.conf.kombu_reconnect_delay > 0:
570 LOG.info(_("Delaying reconnect for %1.1f seconds...") %
571 self.conf.kombu_reconnect_delay)
572 time.sleep(self.conf.kombu_reconnect_delay)
573
574 try:
575 self.connection.release()
576 except self.connection_errors:
577 pass
578 self.connection = None
579
580 def reconnect(self, retry=None):
581 """Handles reconnecting and re-establishing queues.
582 Will retry up to retry number of times.
583 retry = None means use the value of rabbit_max_retries
584 retry = -1 means to retry forever
585 retry = 0 means no retry
586 retry = N means N retries
587 Sleep between tries, starting at self.interval_start
588 seconds, backing off self.interval_stepping number of seconds
589 each attempt.
590 """
591
592 attempt = 0
593 loop_forever = False
594 if retry is None:
595 retry = self.max_retries
596 if retry is None or retry < 0:
597 loop_forever = True
598
599 while True:
600 self._disconnect()
601
602 broker = six.next(self.brokers)
603 attempt += 1
604 try:
605 self._connect(broker)
606 return
607 except IOError as ex:
608 e = ex
609 except self.connection_errors as ex:
610 e = ex
611 except Exception as ex:
612 # NOTE(comstud): Unfortunately it's possible for amqplib
613 # to return an error not covered by its transport
614 # connection_errors in the case of a timeout waiting for
615 # a protocol response. (See paste link in LP888621)
616 # So, we check all exceptions for 'timeout' in them
617 # and try to reconnect in this case.
618 if 'timeout' not in six.text_type(e):
619 raise
620 e = ex
621
622 log_info = {}
623 log_info['err_str'] = e
624 log_info['retry'] = retry or 0
625 log_info.update(broker)
626
627 if not loop_forever and attempt > retry:
628 msg = _('Unable to connect to AMQP server on '
629 '%(hostname)s:%(port)d after %(retry)d '
630 'tries: %(err_str)s') % log_info
631 LOG.error(msg)
632 raise exceptions.MessageDeliveryFailure(msg)
633 else:
634 if attempt == 1:
635 sleep_time = self.interval_start or 1
636 elif attempt > 1:
637 sleep_time += self.interval_stepping
638
639 sleep_time = min(sleep_time, self.interval_max)
640
641 log_info['sleep_time'] = sleep_time
642 if 'Socket closed' in six.text_type(e):
643 LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
644 ' the connection. Check login credentials:'
645 ' %(err_str)s'), log_info)
646 else:
647 LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
648 'unreachable: %(err_str)s. Trying again in '
649 '%(sleep_time)d seconds.'), log_info)
650 time.sleep(sleep_time)
651
652 def ensure(self, error_callback, method, retry=None):
653 while True:
654 try:
655 return method()
656 except self.connection_errors as e:
657 if error_callback:
658 error_callback(e)
659 except self.channel_errors as e:
660 if error_callback:
661 error_callback(e)
662 except (socket.timeout, IOError) as e:
663 if error_callback:
664 error_callback(e)
665 except Exception as e:
666 # NOTE(comstud): Unfortunately it's possible for amqplib
667 # to return an error not covered by its transport
668 # connection_errors in the case of a timeout waiting for
669 # a protocol response. (See paste link in LP888621)
670 # So, we check all exceptions for 'timeout' in them
671 # and try to reconnect in this case.
672 if 'timeout' not in six.text_type(e):
673 raise
674 if error_callback:
675 error_callback(e)
676 self.reconnect(retry=retry)
677
678 def get_channel(self):
679 """Convenience call for bin/clear_rabbit_queues."""
680 return self.channel
681
682 def close(self):
683 """Close/release this connection."""
684 if self.connection:
685 self.connection.release()
686 self.connection = None
687
688 def reset(self):
689 """Reset a connection so it can be used again."""
690 self.channel.close()
691 self.channel = self.connection.channel()
692 # work around 'memory' transport bug in 1.1.3
693 if self.memory_transport:
694 self.channel._new_queue('ae.undeliver')
695 self.consumers = []
696
697 def declare_consumer(self, consumer_cls, topic, callback):
698 """Create a Consumer using the class that was passed in and
699 add it to our list of consumers
700 """
701
702 def _connect_error(exc):
703 log_info = {'topic': topic, 'err_str': exc}
704 LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
705 "%(err_str)s"), log_info)
706
707 def _declare_consumer():
708 consumer = consumer_cls(self.conf, self.channel, topic, callback,
709 six.next(self.consumer_num))
710 self.consumers.append(consumer)
711 return consumer
712
713 return self.ensure(_connect_error, _declare_consumer)
714
715 def iterconsume(self, limit=None, timeout=None):
716 """Return an iterator that will consume from all queues/consumers."""
717
718 def _error_callback(exc):
719 if isinstance(exc, socket.timeout):
720 LOG.debug('Timed out waiting for RPC response: %s', exc)
721 raise rpc_common.Timeout()
722 else:
723 LOG.exception(_('Failed to consume message from queue: %s'),
724 exc)
725 self.do_consume = True
726
727 def _consume():
728 if self.do_consume:
729 queues_head = self.consumers[:-1] # not fanout.
730 queues_tail = self.consumers[-1] # fanout
731 for queue in queues_head:
732 queue.consume(nowait=True)
733 queues_tail.consume(nowait=False)
734 self.do_consume = False
735 return self.connection.drain_events(timeout=timeout)
736
737 for iteration in itertools.count(0):
738 if limit and iteration >= limit:
739 raise StopIteration
740 yield self.ensure(_error_callback, _consume)
741
742 def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
743 **kwargs):
744 """Send to a publisher based on the publisher class."""
745
746 def _error_callback(exc):
747 log_info = {'topic': topic, 'err_str': exc}
748 LOG.exception(_("Failed to publish message to topic "
749 "'%(topic)s': %(err_str)s"), log_info)
750
751 def _publish():
752 publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
753 publisher.send(msg, timeout)
754
755 self.ensure(_error_callback, _publish, retry=retry)
756
757 def declare_direct_consumer(self, topic, callback):
758 """Create a 'direct' queue.
759 In nova's use, this is generally a msg_id queue used for
760 responses for call/multicall
761 """
762 self.declare_consumer(DirectConsumer, topic, callback)
763
764 def declare_topic_consumer(self, exchange_name, topic, callback=None,
765 queue_name=None):
766 """Create a 'topic' consumer."""
767 self.declare_consumer(functools.partial(TopicConsumer,
768 name=queue_name,
769 exchange_name=exchange_name,
770 ),
771 topic, callback)
772
773 def declare_fanout_consumer(self, topic, callback):
774 """Create a 'fanout' consumer."""
775 self.declare_consumer(FanoutConsumer, topic, callback)
776
777 def direct_send(self, msg_id, msg):
778 """Send a 'direct' message."""
779 self.publisher_send(DirectPublisher, msg_id, msg)
780
781 def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
782 """Send a 'topic' message."""
783 self.publisher_send(TopicPublisher, topic, msg, timeout,
784 exchange_name=exchange_name, retry=retry)
785
786 def fanout_send(self, topic, msg, retry=None):
787 """Send a 'fanout' message."""
788 self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
789
790 def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
791 """Send a notify message on a topic."""
792 self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
793 exchange_name=exchange_name, retry=retry, **kwargs)
794
795 def consume(self, limit=None, timeout=None):
796 """Consume from all queues/consumers."""
797 it = self.iterconsume(limit=limit, timeout=timeout)
798 while True:
799 try:
800 six.next(it)
801 except StopIteration:
802 return
803
804
805class RabbitDriver(amqpdriver.AMQPDriverBase):
806
807 def __init__(self, conf, url,
808 default_exchange=None,
809 allowed_remote_exmods=None):
810 conf.register_opts(rabbit_opts)
811 conf.register_opts(rpc_amqp.amqp_opts)
812
813 connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection)
814
815 super(RabbitDriver, self).__init__(conf, url,
816 connection_pool,
817 default_exchange,
818 allowed_remote_exmods)
819
820 def require_features(self, requeue=True):
821 pass
0822
=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch'
=== added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/.timestamp'
=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo'
=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging'
=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers'
=== added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py'
--- .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
+++ .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py 2015-12-17 11:03:34 +0000
@@ -0,0 +1,374 @@
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4# Copyright 2011 Red Hat, Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
18import copy
19import logging
20import sys
21import time
22import traceback
23
24import six
25
26from oslo import messaging
27from oslo.messaging import _utils as utils
28from oslo.messaging.openstack.common.gettextutils import _
29from oslo.messaging.openstack.common import jsonutils
30
31LOG = logging.getLogger(__name__)
32
33_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
34
35
36'''RPC Envelope Version.
37
38This version number applies to the top level structure of messages sent out.
39It does *not* apply to the message payload, which must be versioned
40independently. For example, when using rpc APIs, a version number is applied
41for changes to the API being exposed over rpc. This version number is handled
42in the rpc proxy and dispatcher modules.
43
44This version number applies to the message envelope that is used in the
45serialization done inside the rpc layer. See serialize_msg() and
46deserialize_msg().
47
48The current message format (version 2.0) is very simple. It is:
49
50 {
51 'oslo.version': <RPC Envelope Version as a String>,
52 'oslo.message': <Application Message Payload, JSON encoded>
53 }
54
55Message format version '1.0' is just considered to be the messages we sent
56without a message envelope.
57
58So, the current message envelope just includes the envelope version. It may
59eventually contain additional information, such as a signature for the message
60payload.
61
62We will JSON encode the application message payload. The message envelope,
63which includes the JSON encoded application message body, will be passed down
64to the messaging libraries as a dict.
65'''
66_RPC_ENVELOPE_VERSION = '2.0'
67
68_VERSION_KEY = 'oslo.version'
69_MESSAGE_KEY = 'oslo.message'
70
71_REMOTE_POSTFIX = '_Remote'
72
73
74class RPCException(Exception):
75 msg_fmt = _("An unknown RPC related exception occurred.")
76
77 def __init__(self, message=None, **kwargs):
78 self.kwargs = kwargs
79
80 if not message:
81 try:
82 message = self.msg_fmt % kwargs
83
84 except Exception:
85 # kwargs doesn't match a variable in the message
86 # log the issue and the kwargs
87 LOG.exception(_('Exception in string format operation'))
88 for name, value in six.iteritems(kwargs):
89 LOG.error("%s: %s", name, value)
90 # at least get the core message out if something happened
91 message = self.msg_fmt
92
93 super(RPCException, self).__init__(message)
94
95
96class Timeout(RPCException):
97 """Signifies that a timeout has occurred.
98
99 This exception is raised if the rpc_response_timeout is reached while
100 waiting for a response from the remote side.
101 """
102 msg_fmt = _('Timeout while waiting on RPC response - '
103 'topic: "%(topic)s", RPC method: "%(method)s" '
104 'info: "%(info)s"')
105
106 def __init__(self, info=None, topic=None, method=None):
107 """Initiates Timeout object.
108
109 :param info: Extra info to convey to the user
110 :param topic: The topic that the rpc call was sent to
111 :param rpc_method_name: The name of the rpc method being
112 called
113 """
114 self.info = info
115 self.topic = topic
116 self.method = method
117 super(Timeout, self).__init__(
118 None,
119 info=info or _('<unknown>'),
120 topic=topic or _('<unknown>'),
121 method=method or _('<unknown>'))
122
123
124class DuplicateMessageError(RPCException):
125 msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
126
127
128class InvalidRPCConnectionReuse(RPCException):
129 msg_fmt = _("Invalid reuse of an RPC connection.")
130
131
132class UnsupportedRpcVersion(RPCException):
133 msg_fmt = _("Specified RPC version, %(version)s, not supported by "
134 "this endpoint.")
135
136
137class UnsupportedRpcEnvelopeVersion(RPCException):
138 msg_fmt = _("Specified RPC envelope version, %(version)s, "
139 "not supported by this endpoint.")
140
141
142class RpcVersionCapError(RPCException):
143 msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
144
145
146class Connection(object):
147 """A connection, returned by rpc.create_connection().
148
149 This class represents a connection to the message bus used for rpc.
150 An instance of this class should never be created by users of the rpc API.
151 Use rpc.create_connection() instead.
152 """
153 def close(self):
154 """Close the connection.
155
156 This method must be called when the connection will no longer be used.
157 It will ensure that any resources associated with the connection, such
158 as a network connection, and cleaned up.
159 """
160 raise NotImplementedError()
161
162
163def _safe_log(log_func, msg, msg_data):
164 """Sanitizes the msg_data field before logging."""
165 SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
166
167 def _fix_passwords(d):
168 """Sanitizes the password fields in the dictionary."""
169 for k in six.iterkeys(d):
170 if k.lower().find('password') != -1:
171 d[k] = '<SANITIZED>'
172 elif k.lower() in SANITIZE:
173 d[k] = '<SANITIZED>'
174 elif isinstance(d[k], dict):
175 _fix_passwords(d[k])
176 return d
177
178 return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
179
180
181def serialize_remote_exception(failure_info, log_failure=True):
182 """Prepares exception data to be sent over rpc.
183
184 Failure_info should be a sys.exc_info() tuple.
185
186 """
187 tb = traceback.format_exception(*failure_info)
188 failure = failure_info[1]
189 if log_failure:
190 LOG.error(_("Returning exception %s to caller"),
191 six.text_type(failure))
192 LOG.error(tb)
193
194 kwargs = {}
195 if hasattr(failure, 'kwargs'):
196 kwargs = failure.kwargs
197
198 # NOTE(matiu): With cells, it's possible to re-raise remote, remote
199 # exceptions. Lets turn it back into the original exception type.
200 cls_name = six.text_type(failure.__class__.__name__)
201 mod_name = six.text_type(failure.__class__.__module__)
202 if (cls_name.endswith(_REMOTE_POSTFIX) and
203 mod_name.endswith(_REMOTE_POSTFIX)):
204 cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
205 mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
206
207 data = {
208 'class': cls_name,
209 'module': mod_name,
210 'message': six.text_type(failure),
211 'tb': tb,
212 'args': failure.args,
213 'kwargs': kwargs
214 }
215
216 json_data = jsonutils.dumps(data)
217
218 return json_data
219
220
221def deserialize_remote_exception(data, allowed_remote_exmods):
222 failure = jsonutils.loads(six.text_type(data))
223
224 trace = failure.get('tb', [])
225 message = failure.get('message', "") + "\n" + "\n".join(trace)
226 name = failure.get('class')
227 module = failure.get('module')
228
229 # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
230 # order to prevent arbitrary code execution.
231 if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
232 return messaging.RemoteError(name, failure.get('message'), trace)
233
234 try:
235 __import__(module)
236 mod = sys.modules[module]
237 klass = getattr(mod, name)
238 if not issubclass(klass, Exception):
239 raise TypeError("Can only deserialize Exceptions")
240
241 failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
242 except (AttributeError, TypeError, ImportError):
243 return messaging.RemoteError(name, failure.get('message'), trace)
244
245 ex_type = type(failure)
246 str_override = lambda self: message
247 new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
248 {'__str__': str_override, '__unicode__': str_override})
249 new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
250 try:
251 # NOTE(ameade): Dynamically create a new exception type and swap it in
252 # as the new type for the exception. This only works on user defined
253 # Exceptions and not core Python exceptions. This is important because
254 # we cannot necessarily change an exception message so we must override
255 # the __str__ method.
256 failure.__class__ = new_ex_type
257 except TypeError:
258 # NOTE(ameade): If a core exception then just add the traceback to the
259 # first exception argument.
260 failure.args = (message,) + failure.args[1:]
261 return failure
262
263
264class CommonRpcContext(object):
265 def __init__(self, **kwargs):
266 self.values = kwargs
267
268 def __getattr__(self, key):
269 try:
270 return self.values[key]
271 except KeyError:
272 raise AttributeError(key)
273
274 def to_dict(self):
275 return copy.deepcopy(self.values)
276
277 @classmethod
278 def from_dict(cls, values):
279 return cls(**values)
280
281 def deepcopy(self):
282 return self.from_dict(self.to_dict())
283
284 def update_store(self):
285 # local.store.context = self
286 pass
287
288
289class ClientException(Exception):
290 """Encapsulates actual exception expected to be hit by a RPC proxy object.
291
292 Merely instantiating it records the current exception information, which
293 will be passed back to the RPC client without exceptional logging.
294 """
295 def __init__(self):
296 self._exc_info = sys.exc_info()
297
298
299def serialize_msg(raw_msg):
300 # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
301 # information about this format.
302 msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
303 _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
304
305 return msg
306
307
308def deserialize_msg(msg):
309 # NOTE(russellb): Hang on to your hats, this road is about to
310 # get a little bumpy.
311 #
312 # Robustness Principle:
313 # "Be strict in what you send, liberal in what you accept."
314 #
315 # At this point we have to do a bit of guessing about what it
316 # is we just received. Here is the set of possibilities:
317 #
318 # 1) We received a dict. This could be 2 things:
319 #
320 # a) Inspect it to see if it looks like a standard message envelope.
321 # If so, great!
322 #
323 # b) If it doesn't look like a standard message envelope, it could either
324 # be a notification, or a message from before we added a message
325 # envelope (referred to as version 1.0).
326 # Just return the message as-is.
327 #
328 # 2) It's any other non-dict type. Just return it and hope for the best.
329 # This case covers return values from rpc.call() from before message
330 # envelopes were used. (messages to call a method were always a dict)
331
332 if not isinstance(msg, dict):
333 # See #2 above.
334 return msg
335
336 base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
337 if not all(map(lambda key: key in msg, base_envelope_keys)):
338 # See #1.b above.
339 return msg
340
341 # At this point we think we have the message envelope
342 # format we were expecting. (#1.a above)
343
344 if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
345 msg[_VERSION_KEY]):
346 raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
347
348 raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
349
350 return raw_msg
351
352
353class DecayingTimer(object):
354 def __init__(self, duration=None):
355 self._duration = duration
356 self._ends_at = None
357
358 def start(self):
359 if self._duration is not None:
360 self._ends_at = time.time() + max(0, self._duration)
361
362 def check_return(self, timeout_callback, *args, **kwargs):
363 if self._duration is None:
364 return None
365 if self._ends_at is None:
366 raise RuntimeError(_("Can not check/return a timeout from a timer"
367 " that has not been started."))
368
369 maximum = kwargs.pop('maximum', None)
370 left = self._ends_at - time.time()
371 if left <= 0:
372 timeout_callback(*args, **kwargs)
373
374 return left if maximum is None else min(left, maximum)
0375
=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests'
=== added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py'
--- .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py 1970-01-01 00:00:00 +0000
+++ .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py 2015-12-17 11:03:34 +0000
@@ -0,0 +1,49 @@
1
2# Copyright 2013 Red Hat, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16from oslo.messaging import _utils as utils
17from tests import utils as test_utils
18
19
20class VersionIsCompatibleTestCase(test_utils.BaseTestCase):
21 def test_version_is_compatible_same(self):
22 self.assertTrue(utils.version_is_compatible('1.23', '1.23'))
23
24 def test_version_is_compatible_newer_minor(self):
25 self.assertTrue(utils.version_is_compatible('1.24', '1.23'))
26
27 def test_version_is_compatible_older_minor(self):
28 self.assertFalse(utils.version_is_compatible('1.22', '1.23'))
29
30 def test_version_is_compatible_major_difference1(self):
31 self.assertFalse(utils.version_is_compatible('2.23', '1.23'))
32
33 def test_version_is_compatible_major_difference2(self):
34 self.assertFalse(utils.version_is_compatible('1.23', '2.23'))
35
36 def test_version_is_compatible_newer_rev(self):
37 self.assertFalse(utils.version_is_compatible('1.23', '1.23.1'))
38
39 def test_version_is_compatible_newer_rev_both(self):
40 self.assertFalse(utils.version_is_compatible('1.23.1', '1.23.2'))
41
42 def test_version_is_compatible_older_rev_both(self):
43 self.assertTrue(utils.version_is_compatible('1.23.2', '1.23.1'))
44
45 def test_version_is_compatible_older_rev(self):
46 self.assertTrue(utils.version_is_compatible('1.24', '1.23.1'))
47
48 def test_version_is_compatible_no_rev_is_zero(self):
49 self.assertTrue(utils.version_is_compatible('1.23.0', '1.23'))
050
=== added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch'
=== added file '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/.timestamp'
=== added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo'
=== added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging'
=== added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers'
=== added file '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py'
--- .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py 1970-01-01 00:00:00 +0000
+++ .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py 2015-12-17 11:03:34 +0000
@@ -0,0 +1,729 @@
1# Copyright 2011 OpenStack Foundation
2# Copyright 2011 - 2012, Red Hat, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import functools
17import itertools
18import logging
19import random
20import time
21
22import six
23
24from oslo.config import cfg
25from oslo.messaging._drivers import amqp as rpc_amqp
26from oslo.messaging._drivers import amqpdriver
27from oslo.messaging._drivers import common as rpc_common
28from oslo.messaging import exceptions
29from oslo.messaging.openstack.common.gettextutils import _
30from oslo.messaging.openstack.common import jsonutils
31from oslo.utils import importutils
32from oslo.utils import netutils
33
34qpid_codec = importutils.try_import("qpid.codec010")
35qpid_messaging = importutils.try_import("qpid.messaging")
36qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
37
38LOG = logging.getLogger(__name__)
39
40qpid_opts = [
41 cfg.StrOpt('qpid_hostname',
42 default='localhost',
43 help='Qpid broker hostname.'),
44 cfg.IntOpt('qpid_port',
45 default=5672,
46 help='Qpid broker port.'),
47 cfg.ListOpt('qpid_hosts',
48 default=['$qpid_hostname:$qpid_port'],
49 help='Qpid HA cluster host:port pairs.'),
50 cfg.StrOpt('qpid_username',
51 default='',
52 help='Username for Qpid connection.'),
53 cfg.StrOpt('qpid_password',
54 default='',
55 help='Password for Qpid connection.',
56 secret=True),
57 cfg.StrOpt('qpid_sasl_mechanisms',
58 default='',
59 help='Space separated list of SASL mechanisms to use for '
60 'auth.'),
61 cfg.IntOpt('qpid_heartbeat',
62 default=60,
63 help='Seconds between connection keepalive heartbeats.'),
64 cfg.StrOpt('qpid_protocol',
65 default='tcp',
66 help="Transport to use, either 'tcp' or 'ssl'."),
67 cfg.BoolOpt('qpid_tcp_nodelay',
68 default=True,
69 help='Whether to disable the Nagle algorithm.'),
70 cfg.IntOpt('qpid_receiver_capacity',
71 default=1,
72 help='The number of prefetched messages held by receiver.'),
73 # NOTE(russellb) If any additional versions are added (beyond 1 and 2),
74 # this file could probably use some additional refactoring so that the
75 # differences between each version are split into different classes.
76 cfg.IntOpt('qpid_topology_version',
77 default=1,
78 help="The qpid topology version to use. Version 1 is what "
79 "was originally used by impl_qpid. Version 2 includes "
80 "some backwards-incompatible changes that allow broker "
81 "federation to work. Users should update to version 2 "
82 "when they are able to take everything down, as it "
83 "requires a clean break."),
84]
85
86JSON_CONTENT_TYPE = 'application/json; charset=utf8'
87
88
89def raise_invalid_topology_version(conf):
90 msg = (_("Invalid value for qpid_topology_version: %d") %
91 conf.qpid_topology_version)
92 LOG.error(msg)
93 raise Exception(msg)
94
95
96class QpidMessage(dict):
97 def __init__(self, session, raw_message):
98 super(QpidMessage, self).__init__(
99 rpc_common.deserialize_msg(raw_message.content))
100 self._raw_message = raw_message
101 self._session = session
102
103 def acknowledge(self):
104 self._session.acknowledge(self._raw_message)
105
106 def requeue(self):
107 pass
108
109
110class ConsumerBase(object):
111 """Consumer base class."""
112
113 def __init__(self, conf, session, callback, node_name, node_opts,
114 link_name, link_opts):
115 """Declare a queue on an amqp session.
116
117 'session' is the amqp session to use
118 'callback' is the callback to call when messages are received
119 'node_name' is the first part of the Qpid address string, before ';'
120 'node_opts' will be applied to the "x-declare" section of "node"
121 in the address string.
122 'link_name' goes into the "name" field of the "link" in the address
123 string
124 'link_opts' will be applied to the "x-declare" section of "link"
125 in the address string.
126 """
127 self.callback = callback
128 self.receiver = None
129 self.rcv_capacity = conf.qpid_receiver_capacity
130 self.session = None
131
132 if conf.qpid_topology_version == 1:
133 addr_opts = {
134 "create": "always",
135 "node": {
136 "type": "topic",
137 "x-declare": {
138 "durable": True,
139 "auto-delete": True,
140 },
141 },
142 "link": {
143 "durable": True,
144 "x-declare": {
145 "durable": False,
146 "auto-delete": True,
147 "exclusive": False,
148 },
149 },
150 }
151 addr_opts["node"]["x-declare"].update(node_opts)
152 elif conf.qpid_topology_version == 2:
153 addr_opts = {
154 "link": {
155 "x-declare": {
156 "auto-delete": True,
157 "exclusive": False,
158 },
159 },
160 }
161 else:
162 raise_invalid_topology_version(conf)
163
164 addr_opts["link"]["x-declare"].update(link_opts)
165 if link_name:
166 addr_opts["link"]["name"] = link_name
167
168 self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
169
170 self.connect(session)
171
172 def connect(self, session):
173 """Declare the receiver on connect."""
174 self._declare_receiver(session)
175
176 def reconnect(self, session):
177 """Re-declare the receiver after a Qpid reconnect."""
178 self._declare_receiver(session)
179
180 def _declare_receiver(self, session):
181 self.session = session
182 self.receiver = session.receiver(self.address)
183 self.receiver.capacity = self.rcv_capacity
184
185 def _unpack_json_msg(self, msg):
186 """Load the JSON data in msg if msg.content_type indicates that it
187 is necessary. Put the loaded data back into msg.content and
188 update msg.content_type appropriately.
189
190 A Qpid Message containing a dict will have a content_type of
191 'amqp/map', whereas one containing a string that needs to be converted
192 back from JSON will have a content_type of JSON_CONTENT_TYPE.
193
194 :param msg: a Qpid Message object
195 :returns: None
196 """
197 if msg.content_type == JSON_CONTENT_TYPE:
198 msg.content = jsonutils.loads(msg.content)
199 msg.content_type = 'amqp/map'
200
201 def consume(self):
202 """Fetch the message and pass it to the callback object."""
203 message = self.receiver.fetch()
204 try:
205 self._unpack_json_msg(message)
206 self.callback(QpidMessage(self.session, message))
207 except Exception:
208 LOG.exception(_("Failed to process message... skipping it."))
209 self.session.acknowledge(message)
210
211 def get_receiver(self):
212 return self.receiver
213
214 def get_node_name(self):
215 return self.address.split(';')[0]
216
217
218class DirectConsumer(ConsumerBase):
219 """Queue/consumer class for 'direct'."""
220
221 def __init__(self, conf, session, msg_id, callback):
222 """Init a 'direct' queue.
223
224 'session' is the amqp session to use
225 'msg_id' is the msg_id to listen on
226 'callback' is the callback to call when messages are received
227 """
228
229 link_opts = {
230 "auto-delete": conf.amqp_auto_delete,
231 "exclusive": True,
232 "durable": conf.amqp_durable_queues,
233 }
234
235 if conf.qpid_topology_version == 1:
236 node_name = "%s/%s" % (msg_id, msg_id)
237 node_opts = {"type": "direct"}
238 link_name = msg_id
239 elif conf.qpid_topology_version == 2:
240 node_name = "amq.direct/%s" % msg_id
241 node_opts = {}
242 link_name = msg_id
243 else:
244 raise_invalid_topology_version(conf)
245
246 super(DirectConsumer, self).__init__(conf, session, callback,
247 node_name, node_opts, link_name,
248 link_opts)
249
250
251class TopicConsumer(ConsumerBase):
252 """Consumer class for 'topic'."""
253
254 def __init__(self, conf, session, topic, callback, exchange_name,
255 name=None):
256 """Init a 'topic' queue.
257
258 :param session: the amqp session to use
259 :param topic: is the topic to listen on
260 :paramtype topic: str
261 :param callback: the callback to call when messages are received
262 :param name: optional queue name, defaults to topic
263 """
264
265 link_opts = {
266 "auto-delete": conf.amqp_auto_delete,
267 "durable": conf.amqp_durable_queues,
268 }
269
270 if conf.qpid_topology_version == 1:
271 node_name = "%s/%s" % (exchange_name, topic)
272 elif conf.qpid_topology_version == 2:
273 node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
274 else:
275 raise_invalid_topology_version(conf)
276
277 super(TopicConsumer, self).__init__(conf, session, callback, node_name,
278 {}, name or topic, link_opts)
279
280
281class FanoutConsumer(ConsumerBase):
282 """Consumer class for 'fanout'."""
283
284 def __init__(self, conf, session, topic, callback):
285 """Init a 'fanout' queue.
286
287 'session' is the amqp session to use
288 'topic' is the topic to listen on
289 'callback' is the callback to call when messages are received
290 """
291 self.conf = conf
292
293 link_opts = {"exclusive": True}
294
295 if conf.qpid_topology_version == 1:
296 node_name = "%s_fanout" % topic
297 node_opts = {"durable": False, "type": "fanout"}
298 elif conf.qpid_topology_version == 2:
299 node_name = "amq.topic/fanout/%s" % topic
300 node_opts = {}
301 else:
302 raise_invalid_topology_version(conf)
303
304 super(FanoutConsumer, self).__init__(conf, session, callback,
305 node_name, node_opts, None,
306 link_opts)
307
308
309class Publisher(object):
310 """Base Publisher class."""
311
312 def __init__(self, conf, session, node_name, node_opts=None):
313 """Init the Publisher class with the exchange_name, routing_key,
314 and other options
315 """
316 self.sender = None
317 self.session = session
318
319 if conf.qpid_topology_version == 1:
320 addr_opts = {
321 "create": "always",
322 "node": {
323 "type": "topic",
324 "x-declare": {
325 "durable": False,
326 # auto-delete isn't implemented for exchanges in qpid,
327 # but put in here anyway
328 "auto-delete": True,
329 },
330 },
331 }
332 if node_opts:
333 addr_opts["node"]["x-declare"].update(node_opts)
334
335 self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
336 elif conf.qpid_topology_version == 2:
337 self.address = node_name
338 else:
339 raise_invalid_topology_version(conf)
340
341 self.reconnect(session)
342
343 def reconnect(self, session):
344 """Re-establish the Sender after a reconnection."""
345 self.sender = session.sender(self.address)
346
347 def _pack_json_msg(self, msg):
348 """Qpid cannot serialize dicts containing strings longer than 65535
349 characters. This function dumps the message content to a JSON
350 string, which Qpid is able to handle.
351
352 :param msg: May be either a Qpid Message object or a bare dict.
353 :returns: A Qpid Message with its content field JSON encoded.
354 """
355 try:
356 msg.content = jsonutils.dumps(msg.content)
357 except AttributeError:
358 # Need to have a Qpid message so we can set the content_type.
359 msg = qpid_messaging.Message(jsonutils.dumps(msg))
360 msg.content_type = JSON_CONTENT_TYPE
361 return msg
362
363 def send(self, msg):
364 """Send a message."""
365 try:
366 # Check if Qpid can encode the message
367 check_msg = msg
368 if not hasattr(check_msg, 'content_type'):
369 check_msg = qpid_messaging.Message(msg)
370 content_type = check_msg.content_type
371 enc, dec = qpid_messaging.message.get_codec(content_type)
372 enc(check_msg.content)
373 except qpid_codec.CodecException:
374 # This means the message couldn't be serialized as a dict.
375 msg = self._pack_json_msg(msg)
376 self.sender.send(msg)
377
378
379class DirectPublisher(Publisher):
380 """Publisher class for 'direct'."""
381 def __init__(self, conf, session, topic):
382 """Init a 'direct' publisher."""
383
384 if conf.qpid_topology_version == 1:
385 node_name = "%s/%s" % (topic, topic)
386 node_opts = {"type": "direct"}
387 elif conf.qpid_topology_version == 2:
388 node_name = "amq.direct/%s" % topic
389 node_opts = {}
390 else:
391 raise_invalid_topology_version(conf)
392
393 super(DirectPublisher, self).__init__(conf, session, node_name,
394 node_opts)
395
396
397class TopicPublisher(Publisher):
398 """Publisher class for 'topic'."""
399 def __init__(self, conf, session, exchange_name, topic):
400 """Init a 'topic' publisher.
401 """
402 if conf.qpid_topology_version == 1:
403 node_name = "%s/%s" % (exchange_name, topic)
404 elif conf.qpid_topology_version == 2:
405 node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
406 else:
407 raise_invalid_topology_version(conf)
408
409 super(TopicPublisher, self).__init__(conf, session, node_name)
410
411
412class FanoutPublisher(Publisher):
413 """Publisher class for 'fanout'."""
414 def __init__(self, conf, session, topic):
415 """Init a 'fanout' publisher.
416 """
417
418 if conf.qpid_topology_version == 1:
419 node_name = "%s_fanout" % topic
420 node_opts = {"type": "fanout"}
421 elif conf.qpid_topology_version == 2:
422 node_name = "amq.topic/fanout/%s" % topic
423 node_opts = {}
424 else:
425 raise_invalid_topology_version(conf)
426
427 super(FanoutPublisher, self).__init__(conf, session, node_name,
428 node_opts)
429
430
431class NotifyPublisher(Publisher):
432 """Publisher class for notifications."""
433 def __init__(self, conf, session, exchange_name, topic):
434 """Init a 'topic' publisher.
435 """
436 node_opts = {"durable": True}
437
438 if conf.qpid_topology_version == 1:
439 node_name = "%s/%s" % (exchange_name, topic)
440 elif conf.qpid_topology_version == 2:
441 node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
442 else:
443 raise_invalid_topology_version(conf)
444
445 super(NotifyPublisher, self).__init__(conf, session, node_name,
446 node_opts)
447
448
449class Connection(object):
450 """Connection object."""
451
452 pools = {}
453
454 def __init__(self, conf, url):
455 if not qpid_messaging:
456 raise ImportError("Failed to import qpid.messaging")
457
458 self.connection = None
459 self.session = None
460 self.consumers = {}
461 self.conf = conf
462
463 self.brokers_params = []
464 if url.hosts:
465 for host in url.hosts:
466 params = {
467 'username': host.username or '',
468 'password': host.password or '',
469 }
470 if host.port is not None:
471 params['host'] = '%s:%d' % (host.hostname, host.port)
472 else:
473 params['host'] = host.hostname
474 self.brokers_params.append(params)
475 else:
476 # Old configuration format
477 for adr in self.conf.qpid_hosts:
478 hostname, port = netutils.parse_host_port(
479 adr, default_port=5672)
480
481 params = {
482 'host': '%s:%d' % (hostname, port),
483 'username': self.conf.qpid_username,
484 'password': self.conf.qpid_password,
485 }
486 self.brokers_params.append(params)
487
488 random.shuffle(self.brokers_params)
489 self.brokers = itertools.cycle(self.brokers_params)
490
491 self.reconnect()
492
493 def _connect(self, broker):
494 # Create the connection - this does not open the connection
495 self.connection = qpid_messaging.Connection(broker['host'])
496
497 # Check if flags are set and if so set them for the connection
498 # before we call open
499 self.connection.username = broker['username']
500 self.connection.password = broker['password']
501
502 self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
503 # Reconnection is done by self.reconnect()
504 self.connection.reconnect = False
505 self.connection.heartbeat = self.conf.qpid_heartbeat
506 self.connection.transport = self.conf.qpid_protocol
507 self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
508 self.connection.open()
509
510 def _register_consumer(self, consumer):
511 self.consumers[six.text_type(consumer.get_receiver())] = consumer
512
513 def _lookup_consumer(self, receiver):
514 return self.consumers[six.text_type(receiver)]
515
516 def _disconnect(self):
517 # Close the session if necessary
518 if self.connection is not None and self.connection.opened():
519 try:
520 self.connection.close()
521 except qpid_exceptions.MessagingError:
522 pass
523 self.connection = None
524
525 def reconnect(self, retry=None):
526 """Handles reconnecting and re-establishing sessions and queues.
527 Will retry up to retry number of times.
528 retry = None or -1 means to retry forever
529 retry = 0 means no retry
530 retry = N means N retries
531 """
532 delay = 1
533 attempt = 0
534 loop_forever = False
535 if retry is None or retry < 0:
536 loop_forever = True
537
538 while True:
539 self._disconnect()
540
541 attempt += 1
542 broker = six.next(self.brokers)
543 try:
544 self._connect(broker)
545 except qpid_exceptions.MessagingError as e:
546 msg_dict = dict(e=e,
547 delay=delay,
548 retry=retry,
549 broker=broker)
550 if not loop_forever and attempt > retry:
551 msg = _('Unable to connect to AMQP server on '
552 '%(broker)s after %(retry)d '
553 'tries: %(e)s') % msg_dict
554 LOG.error(msg)
555 raise exceptions.MessageDeliveryFailure(msg)
556 else:
557 msg = _("Unable to connect to AMQP server on %(broker)s: "
558 "%(e)s. Sleeping %(delay)s seconds") % msg_dict
559 LOG.error(msg)
560 time.sleep(delay)
561 delay = min(delay + 1, 5)
562 else:
563 LOG.info(_('Connected to AMQP server on %s'), broker['host'])
564 break
565
566 self.session = self.connection.session()
567
568 if self.consumers:
569 consumers = self.consumers
570 self.consumers = {}
571
572 for consumer in six.itervalues(consumers):
573 consumer.reconnect(self.session)
574 self._register_consumer(consumer)
575
576 LOG.debug("Re-established AMQP queues")
577
578 def ensure(self, error_callback, method, retry=None):
579 while True:
580 try:
581 return method()
582 except (qpid_exceptions.Empty,
583 qpid_exceptions.MessagingError) as e:
584 if error_callback:
585 error_callback(e)
586 self.reconnect(retry=retry)
587
588 def close(self):
589 """Close/release this connection."""
590 try:
591 self.connection.close()
592 except Exception:
593 # NOTE(dripton) Logging exceptions that happen during cleanup just
594 # causes confusion; there's really nothing useful we can do with
595 # them.
596 pass
597 self.connection = None
598
599 def reset(self):
600 """Reset a connection so it can be used again."""
601 self.session.close()
602 self.session = self.connection.session()
603 self.consumers = {}
604
605 def declare_consumer(self, consumer_cls, topic, callback):
606 """Create a Consumer using the class that was passed in and
607 add it to our list of consumers
608 """
609 def _connect_error(exc):
610 log_info = {'topic': topic, 'err_str': exc}
611 LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
612 "%(err_str)s"), log_info)
613
614 def _declare_consumer():
615 consumer = consumer_cls(self.conf, self.session, topic, callback)
616 self._register_consumer(consumer)
617 return consumer
618
619 return self.ensure(_connect_error, _declare_consumer)
620
621 def iterconsume(self, limit=None, timeout=None):
622 """Return an iterator that will consume from all queues/consumers."""
623
624 def _error_callback(exc):
625 if isinstance(exc, qpid_exceptions.Empty):
626 LOG.debug('Timed out waiting for RPC response: %s', exc)
627 raise rpc_common.Timeout()
628 else:
629 LOG.exception(_('Failed to consume message from queue: %s'),
630 exc)
631
632 def _consume():
633 nxt_receiver = self.session.next_receiver(timeout=timeout)
634 try:
635 self._lookup_consumer(nxt_receiver).consume()
636 except Exception:
637 LOG.exception(_("Error processing message. Skipping it."))
638
639 for iteration in itertools.count(0):
640 if limit and iteration >= limit:
641 raise StopIteration
642 yield self.ensure(_error_callback, _consume)
643
644 def publisher_send(self, cls, topic, msg, retry=None, **kwargs):
645 """Send to a publisher based on the publisher class."""
646
647 def _connect_error(exc):
648 log_info = {'topic': topic, 'err_str': exc}
649 LOG.exception(_("Failed to publish message to topic "
650 "'%(topic)s': %(err_str)s"), log_info)
651
652 def _publisher_send():
653 publisher = cls(self.conf, self.session, topic=topic, **kwargs)
654 publisher.send(msg)
655
656 return self.ensure(_connect_error, _publisher_send, retry=retry)
657
658 def declare_direct_consumer(self, topic, callback):
659 """Create a 'direct' queue.
660 In nova's use, this is generally a msg_id queue used for
661 responses for call/multicall
662 """
663 self.declare_consumer(DirectConsumer, topic, callback)
664
665 def declare_topic_consumer(self, exchange_name, topic, callback=None,
666 queue_name=None):
667 """Create a 'topic' consumer."""
668 self.declare_consumer(functools.partial(TopicConsumer,
669 name=queue_name,
670 exchange_name=exchange_name,
671 ),
672 topic, callback)
673
674 def declare_fanout_consumer(self, topic, callback):
675 """Create a 'fanout' consumer."""
676 self.declare_consumer(FanoutConsumer, topic, callback)
677
678 def direct_send(self, msg_id, msg):
679 """Send a 'direct' message."""
680 self.publisher_send(DirectPublisher, topic=msg_id, msg=msg)
681
682 def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
683 """Send a 'topic' message."""
684 #
685 # We want to create a message with attributes, for example a TTL. We
686 # don't really need to keep 'msg' in its JSON format any longer
687 # so let's create an actual Qpid message here and get some
688 # value-add on the go.
689 #
690 # WARNING: Request timeout happens to be in the same units as
691 # Qpid's TTL (seconds). If this changes in the future, then this
692 # will need to be altered accordingly.
693 #
694 qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
695 self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message,
696 exchange_name=exchange_name, retry=retry)
697
698 def fanout_send(self, topic, msg, retry=None):
699 """Send a 'fanout' message."""
700 self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry)
701
702 def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
703 """Send a notify message on a topic."""
704 self.publisher_send(NotifyPublisher, topic=topic, msg=msg,
705 exchange_name=exchange_name, retry=retry)
706
707 def consume(self, limit=None, timeout=None):
708 """Consume from all queues/consumers."""
709 it = self.iterconsume(limit=limit, timeout=timeout)
710 while True:
711 try:
712 six.next(it)
713 except StopIteration:
714 return
715
716
717class QpidDriver(amqpdriver.AMQPDriverBase):
718
719 def __init__(self, conf, url,
720 default_exchange=None, allowed_remote_exmods=None):
721 conf.register_opts(qpid_opts)
722 conf.register_opts(rpc_amqp.amqp_opts)
723
724 connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection)
725
726 super(QpidDriver, self).__init__(conf, url,
727 connection_pool,
728 default_exchange,
729 allowed_remote_exmods)
0730
=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch'
=== added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/.timestamp'
=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo'
=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging'
=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers'
=== added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py'
--- .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
+++ .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000
@@ -0,0 +1,832 @@
1# Copyright 2011 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import functools
16import itertools
17import logging
18import random
19import socket
20import ssl
21import time
22import uuid
23
24import kombu
25import kombu.connection
26import kombu.entity
27import kombu.messaging
28import six
29
30from oslo.config import cfg
31from oslo.messaging._drivers import amqp as rpc_amqp
32from oslo.messaging._drivers import amqpdriver
33from oslo.messaging._drivers import common as rpc_common
34from oslo.messaging import exceptions
35from oslo.messaging.openstack.common.gettextutils import _
36from oslo.utils import netutils
37
38rabbit_opts = [
39 cfg.StrOpt('kombu_ssl_version',
40 default='',
41 help='SSL version to use (valid only if SSL enabled). '
42 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
43 'be available on some distributions.'
44 ),
45 cfg.StrOpt('kombu_ssl_keyfile',
46 default='',
47 help='SSL key file (valid only if SSL enabled).'),
48 cfg.StrOpt('kombu_ssl_certfile',
49 default='',
50 help='SSL cert file (valid only if SSL enabled).'),
51 cfg.StrOpt('kombu_ssl_ca_certs',
52 default='',
53 help='SSL certification authority file '
54 '(valid only if SSL enabled).'),
55 cfg.FloatOpt('kombu_reconnect_delay',
56 default=1.0,
57 help='How long to wait before reconnecting in response to an '
58 'AMQP consumer cancel notification.'),
59 cfg.StrOpt('rabbit_host',
60 default='localhost',
61 help='The RabbitMQ broker address where a single node is '
62 'used.'),
63 cfg.IntOpt('rabbit_port',
64 default=5672,
65 help='The RabbitMQ broker port where a single node is used.'),
66 cfg.ListOpt('rabbit_hosts',
67 default=['$rabbit_host:$rabbit_port'],
68 help='RabbitMQ HA cluster host:port pairs.'),
69 cfg.BoolOpt('rabbit_use_ssl',
70 default=False,
71 help='Connect over SSL for RabbitMQ.'),
72 cfg.StrOpt('rabbit_userid',
73 default='guest',
74 help='The RabbitMQ userid.'),
75 cfg.StrOpt('rabbit_password',
76 default='guest',
77 help='The RabbitMQ password.',
78 secret=True),
79 cfg.StrOpt('rabbit_login_method',
80 default='AMQPLAIN',
81 help='the RabbitMQ login method'),
82 cfg.StrOpt('rabbit_virtual_host',
83 default='/',
84 help='The RabbitMQ virtual host.'),
85 cfg.IntOpt('rabbit_retry_interval',
86 default=1,
87 help='How frequently to retry connecting with RabbitMQ.'),
88 cfg.IntOpt('rabbit_retry_backoff',
89 default=2,
90 help='How long to backoff for between retries when connecting '
91 'to RabbitMQ.'),
92 cfg.IntOpt('rabbit_max_retries',
93 default=0,
94 help='Maximum number of RabbitMQ connection retries. '
95 'Default is 0 (infinite retry count).'),
96 cfg.BoolOpt('rabbit_ha_queues',
97 default=False,
98 help='Use HA queues in RabbitMQ (x-ha-policy: all). '
99 'If you change this option, you must wipe the '
100 'RabbitMQ database.'),
101
102 # FIXME(markmc): this was toplevel in openstack.common.rpc
103 cfg.BoolOpt('fake_rabbit',
104 default=False,
105 help='If passed, use a fake RabbitMQ provider.'),
106]
107
108LOG = logging.getLogger(__name__)
109
110
111def _get_queue_arguments(conf):
112 """Construct the arguments for declaring a queue.
113
114 If the rabbit_ha_queues option is set, we declare a mirrored queue
115 as described here:
116
117 http://www.rabbitmq.com/ha.html
118
119 Setting x-ha-policy to all means that the queue will be mirrored
120 to all nodes in the cluster.
121 """
122 return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
123
124
125class RabbitMessage(dict):
126 def __init__(self, raw_message):
127 super(RabbitMessage, self).__init__(
128 rpc_common.deserialize_msg(raw_message.payload))
129 self._raw_message = raw_message
130
131 def acknowledge(self):
132 self._raw_message.ack()
133
134 def requeue(self):
135 self._raw_message.requeue()
136
137
138class ConsumerBase(object):
139 """Consumer base class."""
140
141 def __init__(self, channel, callback, tag, **kwargs):
142 """Declare a queue on an amqp channel.
143
144 'channel' is the amqp channel to use
145 'callback' is the callback to call when messages are received
146 'tag' is a unique ID for the consumer on the channel
147
148 queue name, exchange name, and other kombu options are
149 passed in here as a dictionary.
150 """
151 self.callback = callback
152 self.tag = six.text_type(tag)
153 self.kwargs = kwargs
154 self.queue = None
155 self.reconnect(channel)
156
157 def reconnect(self, channel):
158 """Re-declare the queue after a rabbit reconnect."""
159 self.channel = channel
160 self.kwargs['channel'] = channel
161 self.queue = kombu.entity.Queue(**self.kwargs)
162 self.queue.declare()
163
164 def _callback_handler(self, message, callback):
165 """Call callback with deserialized message.
166
167 Messages that are processed and ack'ed.
168 """
169
170 try:
171 callback(RabbitMessage(message))
172 except Exception:
173 LOG.exception(_("Failed to process message"
174 " ... skipping it."))
175 message.ack()
176
177 def consume(self, *args, **kwargs):
178 """Actually declare the consumer on the amqp channel. This will
179 start the flow of messages from the queue. Using the
180 Connection.iterconsume() iterator will process the messages,
181 calling the appropriate callback.
182
183 If a callback is specified in kwargs, use that. Otherwise,
184 use the callback passed during __init__()
185
186 If kwargs['nowait'] is True, then this call will block until
187 a message is read.
188
189 """
190
191 options = {'consumer_tag': self.tag}
192 options['nowait'] = kwargs.get('nowait', False)
193 callback = kwargs.get('callback', self.callback)
194 if not callback:
195 raise ValueError("No callback defined")
196
197 def _callback(raw_message):
198 message = self.channel.message_to_python(raw_message)
199 self._callback_handler(message, callback)
200
201 self.queue.consume(*args, callback=_callback, **options)
202
203 def cancel(self):
204 """Cancel the consuming from the queue, if it has started."""
205 try:
206 self.queue.cancel(self.tag)
207 except KeyError as e:
208 # NOTE(comstud): Kludge to get around a amqplib bug
209 if six.text_type(e) != "u'%s'" % self.tag:
210 raise
211 self.queue = None
212
213
214class DirectConsumer(ConsumerBase):
215 """Queue/consumer class for 'direct'."""
216
217 def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
218 """Init a 'direct' queue.
219
220 'channel' is the amqp channel to use
221 'msg_id' is the msg_id to listen on
222 'callback' is the callback to call when messages are received
223 'tag' is a unique ID for the consumer on the channel
224
225 Other kombu options may be passed
226 """
227 # Default options
228 options = {'durable': False,
229 'queue_arguments': _get_queue_arguments(conf),
230 'auto_delete': True,
231 'exclusive': False}
232 options.update(kwargs)
233 exchange = kombu.entity.Exchange(name=msg_id,
234 type='direct',
235 durable=options['durable'],
236 auto_delete=options['auto_delete'])
237 super(DirectConsumer, self).__init__(channel,
238 callback,
239 tag,
240 name=msg_id,
241 exchange=exchange,
242 routing_key=msg_id,
243 **options)
244
245
246class TopicConsumer(ConsumerBase):
247 """Consumer class for 'topic'."""
248
249 def __init__(self, conf, channel, topic, callback, tag, exchange_name,
250 name=None, **kwargs):
251 """Init a 'topic' queue.
252
253 :param channel: the amqp channel to use
254 :param topic: the topic to listen on
255 :paramtype topic: str
256 :param callback: the callback to call when messages are received
257 :param tag: a unique ID for the consumer on the channel
258 :param exchange_name: the exchange name to use
259 :param name: optional queue name, defaults to topic
260 :paramtype name: str
261
262 Other kombu options may be passed as keyword arguments
263 """
264 # Default options
265 options = {'durable': conf.amqp_durable_queues,
266 'queue_arguments': _get_queue_arguments(conf),
267 'auto_delete': conf.amqp_auto_delete,
268 'exclusive': False}
269 options.update(kwargs)
270 exchange = kombu.entity.Exchange(name=exchange_name,
271 type='topic',
272 durable=options['durable'],
273 auto_delete=options['auto_delete'])
274 super(TopicConsumer, self).__init__(channel,
275 callback,
276 tag,
277 name=name or topic,
278 exchange=exchange,
279 routing_key=topic,
280 **options)
281
282
283class FanoutConsumer(ConsumerBase):
284 """Consumer class for 'fanout'."""
285
286 def __init__(self, conf, channel, topic, callback, tag, **kwargs):
287 """Init a 'fanout' queue.
288
289 'channel' is the amqp channel to use
290 'topic' is the topic to listen on
291 'callback' is the callback to call when messages are received
292 'tag' is a unique ID for the consumer on the channel
293
294 Other kombu options may be passed
295 """
296 unique = uuid.uuid4().hex
297 exchange_name = '%s_fanout' % topic
298 queue_name = '%s_fanout_%s' % (topic, unique)
299
300 # Default options
301 options = {'durable': False,
302 'queue_arguments': _get_queue_arguments(conf),
303 'auto_delete': True,
304 'exclusive': False}
305 options.update(kwargs)
306 exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
307 durable=options['durable'],
308 auto_delete=options['auto_delete'])
309 super(FanoutConsumer, self).__init__(channel, callback, tag,
310 name=queue_name,
311 exchange=exchange,
312 routing_key=topic,
313 **options)
314
315
316class Publisher(object):
317 """Base Publisher class."""
318
319 def __init__(self, channel, exchange_name, routing_key, **kwargs):
320 """Init the Publisher class with the exchange_name, routing_key,
321 and other options
322 """
323 self.exchange_name = exchange_name
324 self.routing_key = routing_key
325 self.kwargs = kwargs
326 self.reconnect(channel)
327
328 def reconnect(self, channel):
329 """Re-establish the Producer after a rabbit reconnection."""
330 self.exchange = kombu.entity.Exchange(name=self.exchange_name,
331 **self.kwargs)
332 self.producer = kombu.messaging.Producer(exchange=self.exchange,
333 channel=channel,
334 routing_key=self.routing_key)
335
336 def send(self, msg, timeout=None):
337 """Send a message."""
338 if timeout:
339 #
340 # AMQP TTL is in milliseconds when set in the header.
341 #
342 self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
343 else:
344 self.producer.publish(msg)
345
346
347class DirectPublisher(Publisher):
348 """Publisher class for 'direct'."""
349 def __init__(self, conf, channel, topic, **kwargs):
350 """Init a 'direct' publisher.
351
352 Kombu options may be passed as keyword args to override defaults
353 """
354
355 options = {'durable': False,
356 'auto_delete': True,
357 'exclusive': False}
358 options.update(kwargs)
359 super(DirectPublisher, self).__init__(channel, topic, topic,
360 type='direct', **options)
361
362
363class TopicPublisher(Publisher):
364 """Publisher class for 'topic'."""
365 def __init__(self, conf, channel, exchange_name, topic, **kwargs):
366 """Init a 'topic' publisher.
367
368 Kombu options may be passed as keyword args to override defaults
369 """
370 options = {'durable': conf.amqp_durable_queues,
371 'auto_delete': conf.amqp_auto_delete,
372 'exclusive': False}
373 options.update(kwargs)
374 super(TopicPublisher, self).__init__(channel,
375 exchange_name,
376 topic,
377 type='topic',
378 **options)
379
380
381class FanoutPublisher(Publisher):
382 """Publisher class for 'fanout'."""
383 def __init__(self, conf, channel, topic, **kwargs):
384 """Init a 'fanout' publisher.
385
386 Kombu options may be passed as keyword args to override defaults
387 """
388 options = {'durable': False,
389 'auto_delete': True,
390 'exclusive': False}
391 options.update(kwargs)
392 super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
393 None, type='fanout', **options)
394
395
396class NotifyPublisher(TopicPublisher):
397 """Publisher class for 'notify'."""
398
399 def __init__(self, conf, channel, exchange_name, topic, **kwargs):
400 self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
401 self.queue_arguments = _get_queue_arguments(conf)
402 super(NotifyPublisher, self).__init__(conf, channel, exchange_name,
403 topic, **kwargs)
404
405 def reconnect(self, channel):
406 super(NotifyPublisher, self).reconnect(channel)
407
408 # NOTE(jerdfelt): Normally the consumer would create the queue, but
409 # we do this to ensure that messages don't get dropped if the
410 # consumer is started after we do
411 queue = kombu.entity.Queue(channel=channel,
412 exchange=self.exchange,
413 durable=self.durable,
414 name=self.routing_key,
415 routing_key=self.routing_key,
416 queue_arguments=self.queue_arguments)
417 queue.declare()
418
419
420class Connection(object):
421 """Connection object."""
422
423 pools = {}
424
425 def __init__(self, conf, url):
426 self.consumers = []
427 self.conf = conf
428 self.max_retries = self.conf.rabbit_max_retries
429 # Try forever?
430 if self.max_retries <= 0:
431 self.max_retries = None
432 self.interval_start = self.conf.rabbit_retry_interval
433 self.interval_stepping = self.conf.rabbit_retry_backoff
434 # max retry-interval = 30 seconds
435 self.interval_max = 30
436 self.memory_transport = False
437
438 ssl_params = self._fetch_ssl_params()
439
440 if url.virtual_host is not None:
441 virtual_host = url.virtual_host
442 else:
443 virtual_host = self.conf.rabbit_virtual_host
444
445 self.brokers_params = []
446 if url.hosts:
447 for host in url.hosts:
448 params = {
449 'hostname': host.hostname,
450 'port': host.port or 5672,
451 'userid': host.username or '',
452 'password': host.password or '',
453 'login_method': self.conf.rabbit_login_method,
454 'virtual_host': virtual_host
455 }
456 if self.conf.fake_rabbit:
457 params['transport'] = 'memory'
458 if self.conf.rabbit_use_ssl:
459 params['ssl'] = ssl_params
460
461 self.brokers_params.append(params)
462 else:
463 # Old configuration format
464 for adr in self.conf.rabbit_hosts:
465 hostname, port = netutils.parse_host_port(
466 adr, default_port=self.conf.rabbit_port)
467
468 params = {
469 'hostname': hostname,
470 'port': port,
471 'userid': self.conf.rabbit_userid,
472 'password': self.conf.rabbit_password,
473 'login_method': self.conf.rabbit_login_method,
474 'virtual_host': virtual_host
475 }
476
477 if self.conf.fake_rabbit:
478 params['transport'] = 'memory'
479 if self.conf.rabbit_use_ssl:
480 params['ssl'] = ssl_params
481
482 self.brokers_params.append(params)
483
484 random.shuffle(self.brokers_params)
485 self.brokers = itertools.cycle(self.brokers_params)
486
487 self.memory_transport = self.conf.fake_rabbit
488
489 self.connection = None
490 self.do_consume = None
491 self.reconnect()
492
493 # FIXME(markmc): use oslo sslutils when it is available as a library
494 _SSL_PROTOCOLS = {
495 "tlsv1": ssl.PROTOCOL_TLSv1,
496 "sslv23": ssl.PROTOCOL_SSLv23,
497 "sslv3": ssl.PROTOCOL_SSLv3
498 }
499
500 try:
501 _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
502 except AttributeError:
503 pass
504
505 @classmethod
506 def validate_ssl_version(cls, version):
507 key = version.lower()
508 try:
509 return cls._SSL_PROTOCOLS[key]
510 except KeyError:
511 raise RuntimeError(_("Invalid SSL version : %s") % version)
512
513 def _fetch_ssl_params(self):
514 """Handles fetching what ssl params should be used for the connection
515 (if any).
516 """
517 ssl_params = dict()
518
519 # http://docs.python.org/library/ssl.html - ssl.wrap_socket
520 if self.conf.kombu_ssl_version:
521 ssl_params['ssl_version'] = self.validate_ssl_version(
522 self.conf.kombu_ssl_version)
523 if self.conf.kombu_ssl_keyfile:
524 ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
525 if self.conf.kombu_ssl_certfile:
526 ssl_params['certfile'] = self.conf.kombu_ssl_certfile
527 if self.conf.kombu_ssl_ca_certs:
528 ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
529 # We might want to allow variations in the
530 # future with this?
531 ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
532
533 # Return the extended behavior or just have the default behavior
534 return ssl_params or True
535
536 def _connect(self, broker):
537 """Connect to rabbit. Re-establish any queues that may have
538 been declared before if we are reconnecting. Exceptions should
539 be handled by the caller.
540 """
541 LOG.info(_("Connecting to AMQP server on "
542 "%(hostname)s:%(port)d"), broker)
543 self.connection = kombu.connection.BrokerConnection(**broker)
544 self.connection_errors = self.connection.connection_errors
545 self.channel_errors = self.connection.channel_errors
546 if self.memory_transport:
547 # Kludge to speed up tests.
548 self.connection.transport.polling_interval = 0.0
549 self.do_consume = True
550 self.consumer_num = itertools.count(1)
551 self.connection.connect()
552 self.channel = self.connection.channel()
553 # work around 'memory' transport bug in 1.1.3
554 if self.memory_transport:
555 self.channel._new_queue('ae.undeliver')
556 for consumer in self.consumers:
557 consumer.reconnect(self.channel)
558 LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
559 broker)
560
561 def _disconnect(self):
562 if self.connection:
563 # XXX(nic): when reconnecting to a RabbitMQ cluster
564 # with mirrored queues in use, the attempt to release the
565 # connection can hang "indefinitely" somewhere deep down
566 # in Kombu. Blocking the thread for a bit prior to
567 # release seems to kludge around the problem where it is
568 # otherwise reproduceable.
569 if self.conf.kombu_reconnect_delay > 0:
570 LOG.info(_("Delaying reconnect for %1.1f seconds...") %
571 self.conf.kombu_reconnect_delay)
572 time.sleep(self.conf.kombu_reconnect_delay)
573
574 try:
575 self.connection.release()
576 except self.connection_errors:
577 pass
578 self.connection = None
579
580 def reconnect(self, retry=None):
581 """Handles reconnecting and re-establishing queues.
582 Will retry up to retry number of times.
583 retry = None means use the value of rabbit_max_retries
584 retry = -1 means to retry forever
585 retry = 0 means no retry
586 retry = N means N retries
587 Sleep between tries, starting at self.interval_start
588 seconds, backing off self.interval_stepping number of seconds
589 each attempt.
590 """
591
592 attempt = 0
593 loop_forever = False
594 if retry is None:
595 retry = self.max_retries
596 if retry is None or retry < 0:
597 loop_forever = True
598
599 while True:
600 self._disconnect()
601
602 broker = six.next(self.brokers)
603 attempt += 1
604 try:
605 self._connect(broker)
606 return
607 except IOError as ex:
608 e = ex
609 except self.connection_errors as ex:
610 e = ex
611 except Exception as ex:
612 # NOTE(comstud): Unfortunately it's possible for amqplib
613 # to return an error not covered by its transport
614 # connection_errors in the case of a timeout waiting for
615 # a protocol response. (See paste link in LP888621)
616 # So, we check all exceptions for 'timeout' in them
617 # and try to reconnect in this case.
618 if 'timeout' not in six.text_type(e):
619 raise
620 e = ex
621
622 log_info = {}
623 log_info['err_str'] = e
624 log_info['retry'] = retry or 0
625 log_info.update(broker)
626
627 if not loop_forever and attempt > retry:
628 msg = _('Unable to connect to AMQP server on '
629 '%(hostname)s:%(port)d after %(retry)d '
630 'tries: %(err_str)s') % log_info
631 LOG.error(msg)
632 raise exceptions.MessageDeliveryFailure(msg)
633 else:
634 if attempt == 1:
635 sleep_time = self.interval_start or 1
636 elif attempt > 1:
637 sleep_time += self.interval_stepping
638
639 sleep_time = min(sleep_time, self.interval_max)
640
641 log_info['sleep_time'] = sleep_time
642 if 'Socket closed' in six.text_type(e):
643 LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
644 ' the connection. Check login credentials:'
645 ' %(err_str)s'), log_info)
646 else:
647 LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
648 'unreachable: %(err_str)s. Trying again in '
649 '%(sleep_time)d seconds.'), log_info)
650 time.sleep(sleep_time)
651
652 def ensure(self, error_callback, method, retry=None):
653 while True:
654 try:
655 return method()
656 except self.connection_errors as e:
657 if error_callback:
658 error_callback(e)
659 except self.channel_errors as e:
660 if error_callback:
661 error_callback(e)
662 except (socket.timeout, IOError) as e:
663 if error_callback:
664 error_callback(e)
665 except Exception as e:
666 # NOTE(comstud): Unfortunately it's possible for amqplib
667 # to return an error not covered by its transport
668 # connection_errors in the case of a timeout waiting for
669 # a protocol response. (See paste link in LP888621)
670 # So, we check all exceptions for 'timeout' in them
671 # and try to reconnect in this case.
672 if 'timeout' not in six.text_type(e):
673 raise
674 if error_callback:
675 error_callback(e)
676 self.reconnect(retry=retry)
677
678 def get_channel(self):
679 """Convenience call for bin/clear_rabbit_queues."""
680 return self.channel
681
682 def close(self):
683 """Close/release this connection."""
684 if self.connection:
685 self.connection.release()
686 self.connection = None
687
688 def reset(self):
689 """Reset a connection so it can be used again."""
690 self.channel.close()
691 self.channel = self.connection.channel()
692 # work around 'memory' transport bug in 1.1.3
693 if self.memory_transport:
694 self.channel._new_queue('ae.undeliver')
695 self.consumers = []
696
697 def declare_consumer(self, consumer_cls, topic, callback):
698 """Create a Consumer using the class that was passed in and
699 add it to our list of consumers
700 """
701
702 def _connect_error(exc):
703 log_info = {'topic': topic, 'err_str': exc}
704 LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
705 "%(err_str)s"), log_info)
706
707 def _declare_consumer():
708 consumer = consumer_cls(self.conf, self.channel, topic, callback,
709 six.next(self.consumer_num))
710 self.consumers.append(consumer)
711 return consumer
712
713 return self.ensure(_connect_error, _declare_consumer)
714
715 def iterconsume(self, limit=None, timeout=None):
716 """Return an iterator that will consume from all queues/consumers."""
717
718 timer = rpc_common.DecayingTimer(duration=timeout)
719 timer.start()
720
721 def _raise_timeout(exc):
722 LOG.debug('Timed out waiting for RPC response: %s', exc)
723 raise rpc_common.Timeout()
724
725 def _error_callback(exc):
726 timer.check_return(_raise_timeout, exc)
727 LOG.exception(_('Failed to consume message from queue: %s'),
728 exc)
729 self.do_consume = True
730
731 def _consume():
732 if self.do_consume:
733 queues_head = self.consumers[:-1] # not fanout.
734 queues_tail = self.consumers[-1] # fanout
735 for queue in queues_head:
736 queue.consume(nowait=True)
737 queues_tail.consume(nowait=False)
738 self.do_consume = False
739
740 poll_timeout = 1 if timeout is None else min(timeout, 1)
741 while True:
742 try:
743 return self.connection.drain_events(timeout=poll_timeout)
744 except socket.timeout as exc:
745 poll_timeout = timer.check_return(_raise_timeout, exc,
746 maximum=1)
747
748 for iteration in itertools.count(0):
749 if limit and iteration >= limit:
750 raise StopIteration
751 yield self.ensure(_error_callback, _consume)
752
753 def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
754 **kwargs):
755 """Send to a publisher based on the publisher class."""
756
757 def _error_callback(exc):
758 log_info = {'topic': topic, 'err_str': exc}
759 LOG.exception(_("Failed to publish message to topic "
760 "'%(topic)s': %(err_str)s"), log_info)
761
762 def _publish():
763 publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
764 publisher.send(msg, timeout)
765
766 self.ensure(_error_callback, _publish, retry=retry)
767
768 def declare_direct_consumer(self, topic, callback):
769 """Create a 'direct' queue.
770 In nova's use, this is generally a msg_id queue used for
771 responses for call/multicall
772 """
773 self.declare_consumer(DirectConsumer, topic, callback)
774
775 def declare_topic_consumer(self, exchange_name, topic, callback=None,
776 queue_name=None):
777 """Create a 'topic' consumer."""
778 self.declare_consumer(functools.partial(TopicConsumer,
779 name=queue_name,
780 exchange_name=exchange_name,
781 ),
782 topic, callback)
783
784 def declare_fanout_consumer(self, topic, callback):
785 """Create a 'fanout' consumer."""
786 self.declare_consumer(FanoutConsumer, topic, callback)
787
788 def direct_send(self, msg_id, msg):
789 """Send a 'direct' message."""
790 self.publisher_send(DirectPublisher, msg_id, msg)
791
792 def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
793 """Send a 'topic' message."""
794 self.publisher_send(TopicPublisher, topic, msg, timeout,
795 exchange_name=exchange_name, retry=retry)
796
797 def fanout_send(self, topic, msg, retry=None):
798 """Send a 'fanout' message."""
799 self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
800
801 def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
802 """Send a notify message on a topic."""
803 self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
804 exchange_name=exchange_name, retry=retry, **kwargs)
805
806 def consume(self, limit=None, timeout=None):
807 """Consume from all queues/consumers."""
808 it = self.iterconsume(limit=limit, timeout=timeout)
809 while True:
810 try:
811 six.next(it)
812 except StopIteration:
813 return
814
815
816class RabbitDriver(amqpdriver.AMQPDriverBase):
817
818 def __init__(self, conf, url,
819 default_exchange=None,
820 allowed_remote_exmods=None):
821 conf.register_opts(rabbit_opts)
822 conf.register_opts(rpc_amqp.amqp_opts)
823
824 connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection)
825
826 super(RabbitDriver, self).__init__(conf, url,
827 connection_pool,
828 default_exchange,
829 allowed_remote_exmods)
830
831 def require_features(self, requeue=True):
832 pass
0833
=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests'
=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers'
=== added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py'
--- .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py 1970-01-01 00:00:00 +0000
+++ .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py 2015-12-17 11:03:34 +0000
@@ -0,0 +1,735 @@
1# Copyright 2013 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import datetime
16import operator
17import sys
18import threading
19import uuid
20
21import fixtures
22import kombu
23import mock
24import testscenarios
25
26from oslo import messaging
27from oslo.messaging._drivers import amqpdriver
28from oslo.messaging._drivers import common as driver_common
29from oslo.messaging._drivers import impl_rabbit as rabbit_driver
30from oslo.messaging.openstack.common import jsonutils
31from tests import utils as test_utils
32
33load_tests = testscenarios.load_tests_apply_scenarios
34
35
36class TestRabbitDriverLoad(test_utils.BaseTestCase):
37
38 def setUp(self):
39 super(TestRabbitDriverLoad, self).setUp()
40 self.messaging_conf.transport_driver = 'rabbit'
41 self.messaging_conf.in_memory = True
42
43 def test_driver_load(self):
44 transport = messaging.get_transport(self.conf)
45 self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
46
47
48class TestRabbitTransportURL(test_utils.BaseTestCase):
49
50 scenarios = [
51 ('none', dict(url=None,
52 expected=[dict(hostname='localhost',
53 port=5672,
54 userid='guest',
55 password='guest',
56 virtual_host='/')])),
57 ('empty',
58 dict(url='rabbit:///',
59 expected=[dict(hostname='localhost',
60 port=5672,
61 userid='guest',
62 password='guest',
63 virtual_host='')])),
64 ('localhost',
65 dict(url='rabbit://localhost/',
66 expected=[dict(hostname='localhost',
67 port=5672,
68 userid='',
69 password='',
70 virtual_host='')])),
71 ('virtual_host',
72 dict(url='rabbit:///vhost',
73 expected=[dict(hostname='localhost',
74 port=5672,
75 userid='guest',
76 password='guest',
77 virtual_host='vhost')])),
78 ('no_creds',
79 dict(url='rabbit://host/virtual_host',
80 expected=[dict(hostname='host',
81 port=5672,
82 userid='',
83 password='',
84 virtual_host='virtual_host')])),
85 ('no_port',
86 dict(url='rabbit://user:password@host/virtual_host',
87 expected=[dict(hostname='host',
88 port=5672,
89 userid='user',
90 password='password',
91 virtual_host='virtual_host')])),
92 ('full_url',
93 dict(url='rabbit://user:password@host:10/virtual_host',
94 expected=[dict(hostname='host',
95 port=10,
96 userid='user',
97 password='password',
98 virtual_host='virtual_host')])),
99 ('full_two_url',
100 dict(url='rabbit://user:password@host:10,'
101 'user2:password2@host2:12/virtual_host',
102 expected=[dict(hostname='host',
103 port=10,
104 userid='user',
105 password='password',
106 virtual_host='virtual_host'),
107 dict(hostname='host2',
108 port=12,
109 userid='user2',
110 password='password2',
111 virtual_host='virtual_host')
112 ]
113 )),
114
115 ]
116
117 def test_transport_url(self):
118 self.messaging_conf.in_memory = True
119
120 transport = messaging.get_transport(self.conf, self.url)
121 self.addCleanup(transport.cleanup)
122 driver = transport._driver
123
124 brokers_params = driver._get_connection().brokers_params[:]
125 brokers_params = [dict((k, v) for k, v in broker.items()
126 if k not in ['transport', 'login_method'])
127 for broker in brokers_params]
128
129 self.assertEqual(sorted(self.expected,
130 key=operator.itemgetter('hostname')),
131 sorted(brokers_params,
132 key=operator.itemgetter('hostname')))
133
134
135class TestSendReceive(test_utils.BaseTestCase):
136
137 _n_senders = [
138 ('single_sender', dict(n_senders=1)),
139 ('multiple_senders', dict(n_senders=10)),
140 ]
141
142 _context = [
143 ('empty_context', dict(ctxt={})),
144 ('with_context', dict(ctxt={'user': 'mark'})),
145 ]
146
147 _reply = [
148 ('rx_id', dict(rx_id=True, reply=None)),
149 ('none', dict(rx_id=False, reply=None)),
150 ('empty_list', dict(rx_id=False, reply=[])),
151 ('empty_dict', dict(rx_id=False, reply={})),
152 ('false', dict(rx_id=False, reply=False)),
153 ('zero', dict(rx_id=False, reply=0)),
154 ]
155
156 _failure = [
157 ('success', dict(failure=False)),
158 ('failure', dict(failure=True, expected=False)),
159 ('expected_failure', dict(failure=True, expected=True)),
160 ]
161
162 _timeout = [
163 ('no_timeout', dict(timeout=None)),
164 ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
165 ]
166
167 @classmethod
168 def generate_scenarios(cls):
169 cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
170 cls._context,
171 cls._reply,
172 cls._failure,
173 cls._timeout)
174
175 def setUp(self):
176 super(TestSendReceive, self).setUp()
177 self.messaging_conf.transport_driver = 'rabbit'
178 self.messaging_conf.in_memory = True
179
180 def test_send_receive(self):
181 transport = messaging.get_transport(self.conf)
182 self.addCleanup(transport.cleanup)
183
184 driver = transport._driver
185
186 target = messaging.Target(topic='testtopic')
187
188 listener = driver.listen(target)
189
190 senders = []
191 replies = []
192 msgs = []
193 errors = []
194
195 def stub_error(msg, *a, **kw):
196 if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
197 a = a[0]
198 errors.append(str(msg) % a)
199
200 self.stubs.Set(driver_common.LOG, 'error', stub_error)
201
202 def send_and_wait_for_reply(i):
203 try:
204 replies.append(driver.send(target,
205 self.ctxt,
206 {'tx_id': i},
207 wait_for_reply=True,
208 timeout=self.timeout))
209 self.assertFalse(self.failure)
210 self.assertIsNone(self.timeout)
211 except (ZeroDivisionError, messaging.MessagingTimeout) as e:
212 replies.append(e)
213 self.assertTrue(self.failure or self.timeout is not None)
214
215 while len(senders) < self.n_senders:
216 senders.append(threading.Thread(target=send_and_wait_for_reply,
217 args=(len(senders), )))
218
219 for i in range(len(senders)):
220 senders[i].start()
221
222 received = listener.poll()
223 self.assertIsNotNone(received)
224 self.assertEqual(self.ctxt, received.ctxt)
225 self.assertEqual({'tx_id': i}, received.message)
226 msgs.append(received)
227
228 # reply in reverse, except reply to the first guy second from last
229 order = list(range(len(senders) - 1, -1, -1))
230 if len(order) > 1:
231 order[-1], order[-2] = order[-2], order[-1]
232
233 for i in order:
234 if self.timeout is None:
235 if self.failure:
236 try:
237 raise ZeroDivisionError
238 except Exception:
239 failure = sys.exc_info()
240 msgs[i].reply(failure=failure,
241 log_failure=not self.expected)
242 elif self.rx_id:
243 msgs[i].reply({'rx_id': i})
244 else:
245 msgs[i].reply(self.reply)
246 senders[i].join()
247
248 self.assertEqual(len(senders), len(replies))
249 for i, reply in enumerate(replies):
250 if self.timeout is not None:
251 self.assertIsInstance(reply, messaging.MessagingTimeout)
252 elif self.failure:
253 self.assertIsInstance(reply, ZeroDivisionError)
254 elif self.rx_id:
255 self.assertEqual({'rx_id': order[i]}, reply)
256 else:
257 self.assertEqual(self.reply, reply)
258
259 if not self.timeout and self.failure and not self.expected:
260 self.assertTrue(len(errors) > 0, errors)
261 else:
262 self.assertEqual(0, len(errors), errors)
263
264
265TestSendReceive.generate_scenarios()
266
267
268class TestPollAsync(test_utils.BaseTestCase):
269
270 def setUp(self):
271 super(TestPollAsync, self).setUp()
272 self.messaging_conf.transport_driver = 'rabbit'
273 self.messaging_conf.in_memory = True
274
275 def test_poll_timeout(self):
276 transport = messaging.get_transport(self.conf)
277 self.addCleanup(transport.cleanup)
278 driver = transport._driver
279 target = messaging.Target(topic='testtopic')
280 listener = driver.listen(target)
281 received = listener.poll(timeout=0.050)
282 self.assertIsNone(received)
283
284
285class TestRacyWaitForReply(test_utils.BaseTestCase):
286
287 def setUp(self):
288 super(TestRacyWaitForReply, self).setUp()
289 self.messaging_conf.transport_driver = 'rabbit'
290 self.messaging_conf.in_memory = True
291
292 def test_send_receive(self):
293 transport = messaging.get_transport(self.conf)
294 self.addCleanup(transport.cleanup)
295
296 driver = transport._driver
297
298 target = messaging.Target(topic='testtopic')
299
300 listener = driver.listen(target)
301
302 senders = []
303 replies = []
304 msgs = []
305
306 wait_conditions = []
307 orig_reply_waiter = amqpdriver.ReplyWaiter.wait
308
309 def reply_waiter(self, msg_id, timeout):
310 if wait_conditions:
311 cond = wait_conditions.pop()
312 with cond:
313 cond.notify()
314 with cond:
315 cond.wait()
316 return orig_reply_waiter(self, msg_id, timeout)
317
318 self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
319
320 def send_and_wait_for_reply(i, wait_for_reply):
321 replies.append(driver.send(target,
322 {},
323 {'tx_id': i},
324 wait_for_reply=wait_for_reply,
325 timeout=None))
326
327 while len(senders) < 2:
328 t = threading.Thread(target=send_and_wait_for_reply,
329 args=(len(senders), True))
330 t.daemon = True
331 senders.append(t)
332
333 # test the case then msg_id is not set
334 t = threading.Thread(target=send_and_wait_for_reply,
335 args=(len(senders), False))
336 t.daemon = True
337 senders.append(t)
338
339 # Start the first guy, receive his message, but delay his polling
340 notify_condition = threading.Condition()
341 wait_conditions.append(notify_condition)
342 with notify_condition:
343 senders[0].start()
344 notify_condition.wait()
345
346 msgs.append(listener.poll())
347 self.assertEqual({'tx_id': 0}, msgs[-1].message)
348
349 # Start the second guy, receive his message
350 senders[1].start()
351
352 msgs.append(listener.poll())
353 self.assertEqual({'tx_id': 1}, msgs[-1].message)
354
355 # Reply to both in order, making the second thread queue
356 # the reply meant for the first thread
357 msgs[0].reply({'rx_id': 0})
358 msgs[1].reply({'rx_id': 1})
359
360 # Wait for the second thread to finish
361 senders[1].join()
362
363 # Start the 3rd guy, receive his message
364 senders[2].start()
365
366 msgs.append(listener.poll())
367 self.assertEqual({'tx_id': 2}, msgs[-1].message)
368
369 # Verify the _send_reply was not invoked by driver:
370 with mock.patch.object(msgs[2], '_send_reply') as method:
371 msgs[2].reply({'rx_id': 2})
372 self.assertEqual(method.call_count, 0)
373
374 # Wait for the 3rd thread to finish
375 senders[2].join()
376
377 # Let the first thread continue
378 with notify_condition:
379 notify_condition.notify()
380
381 # Wait for the first thread to finish
382 senders[0].join()
383
384 # Verify replies were received out of order
385 self.assertEqual(len(senders), len(replies))
386 self.assertEqual({'rx_id': 1}, replies[0])
387 self.assertIsNone(replies[1])
388 self.assertEqual({'rx_id': 0}, replies[2])
389
390
391def _declare_queue(target):
392 connection = kombu.connection.BrokerConnection(transport='memory')
393
394 # Kludge to speed up tests.
395 connection.transport.polling_interval = 0.0
396
397 connection.connect()
398 channel = connection.channel()
399
400 # work around 'memory' transport bug in 1.1.3
401 channel._new_queue('ae.undeliver')
402
403 if target.fanout:
404 exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
405 type='fanout',
406 durable=False,
407 auto_delete=True)
408 queue = kombu.entity.Queue(name=target.topic + '_fanout_12345',
409 channel=channel,
410 exchange=exchange,
411 routing_key=target.topic)
412 if target.server:
413 exchange = kombu.entity.Exchange(name='openstack',
414 type='topic',
415 durable=False,
416 auto_delete=False)
417 topic = '%s.%s' % (target.topic, target.server)
418 queue = kombu.entity.Queue(name=topic,
419 channel=channel,
420 exchange=exchange,
421 routing_key=topic)
422 else:
423 exchange = kombu.entity.Exchange(name='openstack',
424 type='topic',
425 durable=False,
426 auto_delete=False)
427 queue = kombu.entity.Queue(name=target.topic,
428 channel=channel,
429 exchange=exchange,
430 routing_key=target.topic)
431
432 queue.declare()
433
434 return connection, channel, queue
435
436
437class TestRequestWireFormat(test_utils.BaseTestCase):
438
439 _target = [
440 ('topic_target',
441 dict(topic='testtopic', server=None, fanout=False)),
442 ('server_target',
443 dict(topic='testtopic', server='testserver', fanout=False)),
444 # NOTE(markmc): https://github.com/celery/kombu/issues/195
445 ('fanout_target',
446 dict(topic='testtopic', server=None, fanout=True,
447 skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
448 ]
449
450 _msg = [
451 ('empty_msg',
452 dict(msg={}, expected={})),
453 ('primitive_msg',
454 dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
455 ('complex_msg',
456 dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}},
457 expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
458 ]
459
460 _context = [
461 ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
462 ('user_project_ctxt',
463 dict(ctxt={'user': 'mark', 'project': 'snarkybunch'},
464 expected_ctxt={'_context_user': 'mark',
465 '_context_project': 'snarkybunch'})),
466 ]
467
468 @classmethod
469 def generate_scenarios(cls):
470 cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
471 cls._context,
472 cls._target)
473
474 def setUp(self):
475 super(TestRequestWireFormat, self).setUp()
476 self.messaging_conf.transport_driver = 'rabbit'
477 self.messaging_conf.in_memory = True
478
479 self.uuids = []
480 self.orig_uuid4 = uuid.uuid4
481 self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
482
483 def mock_uuid4(self):
484 self.uuids.append(self.orig_uuid4())
485 return self.uuids[-1]
486
487 def test_request_wire_format(self):
488 if hasattr(self, 'skip_msg'):
489 self.skipTest(self.skip_msg)
490
491 transport = messaging.get_transport(self.conf)
492 self.addCleanup(transport.cleanup)
493
494 driver = transport._driver
495
496 target = messaging.Target(topic=self.topic,
497 server=self.server,
498 fanout=self.fanout)
499
500 connection, channel, queue = _declare_queue(target)
501 self.addCleanup(connection.release)
502
503 driver.send(target, self.ctxt, self.msg)
504
505 msgs = []
506
507 def callback(msg):
508 msg = channel.message_to_python(msg)
509 msg.ack()
510 msgs.append(msg.payload)
511
512 queue.consume(callback=callback,
513 consumer_tag='1',
514 nowait=False)
515
516 connection.drain_events()
517
518 self.assertEqual(1, len(msgs))
519 self.assertIn('oslo.message', msgs[0])
520
521 received = msgs[0]
522 received['oslo.message'] = jsonutils.loads(received['oslo.message'])
523
524 # FIXME(markmc): add _msg_id and _reply_q check
525 expected_msg = {
526 '_unique_id': self.uuids[0].hex,
527 }
528 expected_msg.update(self.expected)
529 expected_msg.update(self.expected_ctxt)
530
531 expected = {
532 'oslo.version': '2.0',
533 'oslo.message': expected_msg,
534 }
535
536 self.assertEqual(expected, received)
537
538
539TestRequestWireFormat.generate_scenarios()
540
541
542def _create_producer(target):
543 connection = kombu.connection.BrokerConnection(transport='memory')
544
545 # Kludge to speed up tests.
546 connection.transport.polling_interval = 0.0
547
548 connection.connect()
549 channel = connection.channel()
550
551 # work around 'memory' transport bug in 1.1.3
552 channel._new_queue('ae.undeliver')
553
554 if target.fanout:
555 exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
556 type='fanout',
557 durable=False,
558 auto_delete=True)
559 producer = kombu.messaging.Producer(exchange=exchange,
560 channel=channel,
561 routing_key=target.topic)
562 elif target.server:
563 exchange = kombu.entity.Exchange(name='openstack',
564 type='topic',
565 durable=False,
566 auto_delete=False)
567 topic = '%s.%s' % (target.topic, target.server)
568 producer = kombu.messaging.Producer(exchange=exchange,
569 channel=channel,
570 routing_key=topic)
571 else:
572 exchange = kombu.entity.Exchange(name='openstack',
573 type='topic',
574 durable=False,
575 auto_delete=False)
576 producer = kombu.messaging.Producer(exchange=exchange,
577 channel=channel,
578 routing_key=target.topic)
579
580 return connection, producer
581
582
583class TestReplyWireFormat(test_utils.BaseTestCase):
584
585 _target = [
586 ('topic_target',
587 dict(topic='testtopic', server=None, fanout=False)),
588 ('server_target',
589 dict(topic='testtopic', server='testserver', fanout=False)),
590 # NOTE(markmc): https://github.com/celery/kombu/issues/195
591 ('fanout_target',
592 dict(topic='testtopic', server=None, fanout=True,
593 skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
594 ]
595
596 _msg = [
597 ('empty_msg',
598 dict(msg={}, expected={})),
599 ('primitive_msg',
600 dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
601 ('complex_msg',
602 dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}},
603 expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
604 ]
605
606 _context = [
607 ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
608 ('user_project_ctxt',
609 dict(ctxt={'_context_user': 'mark',
610 '_context_project': 'snarkybunch'},
611 expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
612 ]
613
614 @classmethod
615 def generate_scenarios(cls):
616 cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
617 cls._context,
618 cls._target)
619
620 def setUp(self):
621 super(TestReplyWireFormat, self).setUp()
622 self.messaging_conf.transport_driver = 'rabbit'
623 self.messaging_conf.in_memory = True
624
625 def test_reply_wire_format(self):
626 if hasattr(self, 'skip_msg'):
627 self.skipTest(self.skip_msg)
628
629 transport = messaging.get_transport(self.conf)
630 self.addCleanup(transport.cleanup)
631
632 driver = transport._driver
633
634 target = messaging.Target(topic=self.topic,
635 server=self.server,
636 fanout=self.fanout)
637
638 listener = driver.listen(target)
639
640 connection, producer = _create_producer(target)
641 self.addCleanup(connection.release)
642
643 msg = {
644 'oslo.version': '2.0',
645 'oslo.message': {}
646 }
647
648 msg['oslo.message'].update(self.msg)
649 msg['oslo.message'].update(self.ctxt)
650
651 msg['oslo.message'].update({
652 '_msg_id': uuid.uuid4().hex,
653 '_unique_id': uuid.uuid4().hex,
654 '_reply_q': 'reply_' + uuid.uuid4().hex,
655 })
656
657 msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
658
659 producer.publish(msg)
660
661 received = listener.poll()
662 self.assertIsNotNone(received)
663 self.assertEqual(self.expected_ctxt, received.ctxt)
664 self.assertEqual(self.expected, received.message)
665
666
667TestReplyWireFormat.generate_scenarios()
668
669
670class RpcKombuHATestCase(test_utils.BaseTestCase):
671
672 def setUp(self):
673 super(RpcKombuHATestCase, self).setUp()
674 self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
675 self.config(rabbit_hosts=self.brokers)
676
677 hostname_sets = set()
678 self.info = {'attempt': 0,
679 'fail': False}
680
681 def _connect(myself, params):
682 # do as little work that is enough to pass connection attempt
683 myself.connection = kombu.connection.BrokerConnection(**params)
684 myself.connection_errors = myself.connection.connection_errors
685
686 hostname = params['hostname']
687 self.assertNotIn(hostname, hostname_sets)
688 hostname_sets.add(hostname)
689
690 self.info['attempt'] += 1
691 if self.info['fail']:
692 raise IOError('fake fail')
693
694 # just make sure connection instantiation does not fail with an
695 # exception
696 self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
697
698 # starting from the first broker in the list
699 url = messaging.TransportURL.parse(self.conf, None)
700 self.connection = rabbit_driver.Connection(self.conf, url)
701 self.addCleanup(self.connection.close)
702
703 self.info.update({'attempt': 0,
704 'fail': True})
705 hostname_sets.clear()
706
707 def test_reconnect_order(self):
708 self.assertRaises(messaging.MessageDeliveryFailure,
709 self.connection.reconnect,
710 retry=len(self.brokers) - 1)
711 self.assertEqual(len(self.brokers), self.info['attempt'])
712
713 def test_ensure_four_retry(self):
714 mock_callback = mock.Mock(side_effect=IOError)
715 self.assertRaises(messaging.MessageDeliveryFailure,
716 self.connection.ensure, None, mock_callback,
717 retry=4)
718 self.assertEqual(5, self.info['attempt'])
719 self.assertEqual(1, mock_callback.call_count)
720
721 def test_ensure_one_retry(self):
722 mock_callback = mock.Mock(side_effect=IOError)
723 self.assertRaises(messaging.MessageDeliveryFailure,
724 self.connection.ensure, None, mock_callback,
725 retry=1)
726 self.assertEqual(2, self.info['attempt'])
727 self.assertEqual(1, mock_callback.call_count)
728
729 def test_ensure_no_retry(self):
730 mock_callback = mock.Mock(side_effect=IOError)
731 self.assertRaises(messaging.MessageDeliveryFailure,
732 self.connection.ensure, None, mock_callback,
733 retry=0)
734 self.assertEqual(1, self.info['attempt'])
735 self.assertEqual(1, mock_callback.call_count)
0736
=== added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch'
=== added file '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/.timestamp'
=== added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo'
=== added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging'
=== added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers'
=== added file '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py'
--- .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
+++ .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000
@@ -0,0 +1,839 @@
1# Copyright 2011 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import functools
16import itertools
17import logging
18import random
19import socket
20import ssl
21import time
22import uuid
23
24import kombu
25import kombu.connection
26import kombu.entity
27import kombu.messaging
28import six
29
30from oslo.config import cfg
31from oslo.messaging._drivers import amqp as rpc_amqp
32from oslo.messaging._drivers import amqpdriver
33from oslo.messaging._drivers import common as rpc_common
34from oslo.messaging import exceptions
35from oslo.messaging.openstack.common.gettextutils import _
36from oslo.utils import netutils
37
38rabbit_opts = [
39 cfg.StrOpt('kombu_ssl_version',
40 default='',
41 help='SSL version to use (valid only if SSL enabled). '
42 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
43 'be available on some distributions.'
44 ),
45 cfg.StrOpt('kombu_ssl_keyfile',
46 default='',
47 help='SSL key file (valid only if SSL enabled).'),
48 cfg.StrOpt('kombu_ssl_certfile',
49 default='',
50 help='SSL cert file (valid only if SSL enabled).'),
51 cfg.StrOpt('kombu_ssl_ca_certs',
52 default='',
53 help='SSL certification authority file '
54 '(valid only if SSL enabled).'),
55 cfg.FloatOpt('kombu_reconnect_delay',
56 default=1.0,
57 help='How long to wait before reconnecting in response to an '
58 'AMQP consumer cancel notification.'),
59 cfg.StrOpt('rabbit_host',
60 default='localhost',
61 help='The RabbitMQ broker address where a single node is '
62 'used.'),
63 cfg.IntOpt('rabbit_port',
64 default=5672,
65 help='The RabbitMQ broker port where a single node is used.'),
66 cfg.ListOpt('rabbit_hosts',
67 default=['$rabbit_host:$rabbit_port'],
68 help='RabbitMQ HA cluster host:port pairs.'),
69 cfg.BoolOpt('rabbit_use_ssl',
70 default=False,
71 help='Connect over SSL for RabbitMQ.'),
72 cfg.StrOpt('rabbit_userid',
73 default='guest',
74 help='The RabbitMQ userid.'),
75 cfg.StrOpt('rabbit_password',
76 default='guest',
77 help='The RabbitMQ password.',
78 secret=True),
79 cfg.StrOpt('rabbit_login_method',
80 default='AMQPLAIN',
81 help='the RabbitMQ login method'),
82 cfg.StrOpt('rabbit_virtual_host',
83 default='/',
84 help='The RabbitMQ virtual host.'),
85 cfg.IntOpt('rabbit_retry_interval',
86 default=1,
87 help='How frequently to retry connecting with RabbitMQ.'),
88 cfg.IntOpt('rabbit_retry_backoff',
89 default=2,
90 help='How long to backoff for between retries when connecting '
91 'to RabbitMQ.'),
92 cfg.IntOpt('rabbit_max_retries',
93 default=0,
94 help='Maximum number of RabbitMQ connection retries. '
95 'Default is 0 (infinite retry count).'),
96 cfg.BoolOpt('rabbit_ha_queues',
97 default=False,
98 help='Use HA queues in RabbitMQ (x-ha-policy: all). '
99 'If you change this option, you must wipe the '
100 'RabbitMQ database.'),
101
102 # FIXME(markmc): this was toplevel in openstack.common.rpc
103 cfg.BoolOpt('fake_rabbit',
104 default=False,
105 help='If passed, use a fake RabbitMQ provider.'),
106]
107
108LOG = logging.getLogger(__name__)
109
110
111def _get_queue_arguments(conf):
112 """Construct the arguments for declaring a queue.
113
114 If the rabbit_ha_queues option is set, we declare a mirrored queue
115 as described here:
116
117 http://www.rabbitmq.com/ha.html
118
119 Setting x-ha-policy to all means that the queue will be mirrored
120 to all nodes in the cluster.
121 """
122 return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
123
124
125class RabbitMessage(dict):
126 def __init__(self, raw_message):
127 super(RabbitMessage, self).__init__(
128 rpc_common.deserialize_msg(raw_message.payload))
129 self._raw_message = raw_message
130
131 def acknowledge(self):
132 self._raw_message.ack()
133
134 def requeue(self):
135 self._raw_message.requeue()
136
137
138class ConsumerBase(object):
139 """Consumer base class."""
140
141 def __init__(self, channel, callback, tag, **kwargs):
142 """Declare a queue on an amqp channel.
143
144 'channel' is the amqp channel to use
145 'callback' is the callback to call when messages are received
146 'tag' is a unique ID for the consumer on the channel
147
148 queue name, exchange name, and other kombu options are
149 passed in here as a dictionary.
150 """
151 self.callback = callback
152 self.tag = six.text_type(tag)
153 self.kwargs = kwargs
154 self.queue = None
155 self.reconnect(channel)
156
157 def reconnect(self, channel):
158 """Re-declare the queue after a rabbit reconnect."""
159 self.channel = channel
160 self.kwargs['channel'] = channel
161 self.queue = kombu.entity.Queue(**self.kwargs)
162 self.queue.declare()
163
164 def _callback_handler(self, message, callback):
165 """Call callback with deserialized message.
166
167 Messages that are processed and ack'ed.
168 """
169
170 try:
171 callback(RabbitMessage(message))
172 except Exception:
173 LOG.exception(_("Failed to process message"
174 " ... skipping it."))
175 message.ack()
176
177 def consume(self, *args, **kwargs):
178 """Actually declare the consumer on the amqp channel. This will
179 start the flow of messages from the queue. Using the
180 Connection.iterconsume() iterator will process the messages,
181 calling the appropriate callback.
182
183 If a callback is specified in kwargs, use that. Otherwise,
184 use the callback passed during __init__()
185
186 If kwargs['nowait'] is True, then this call will block until
187 a message is read.
188
189 """
190
191 options = {'consumer_tag': self.tag}
192 options['nowait'] = kwargs.get('nowait', False)
193 callback = kwargs.get('callback', self.callback)
194 if not callback:
195 raise ValueError("No callback defined")
196
197 def _callback(raw_message):
198 message = self.channel.message_to_python(raw_message)
199 self._callback_handler(message, callback)
200
201 self.queue.consume(*args, callback=_callback, **options)
202
203 def cancel(self):
204 """Cancel the consuming from the queue, if it has started."""
205 try:
206 self.queue.cancel(self.tag)
207 except KeyError as e:
208 # NOTE(comstud): Kludge to get around a amqplib bug
209 if six.text_type(e) != "u'%s'" % self.tag:
210 raise
211 self.queue = None
212
213
214class DirectConsumer(ConsumerBase):
215 """Queue/consumer class for 'direct'."""
216
217 def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
218 """Init a 'direct' queue.
219
220 'channel' is the amqp channel to use
221 'msg_id' is the msg_id to listen on
222 'callback' is the callback to call when messages are received
223 'tag' is a unique ID for the consumer on the channel
224
225 Other kombu options may be passed
226 """
227 # Default options
228 options = {'durable': False,
229 'queue_arguments': _get_queue_arguments(conf),
230 'auto_delete': True,
231 'exclusive': False}
232 options.update(kwargs)
233 exchange = kombu.entity.Exchange(name=msg_id,
234 type='direct',
235 durable=options['durable'],
236 auto_delete=options['auto_delete'])
237 super(DirectConsumer, self).__init__(channel,
238 callback,
239 tag,
240 name=msg_id,
241 exchange=exchange,
242 routing_key=msg_id,
243 **options)
244
245
246class TopicConsumer(ConsumerBase):
247 """Consumer class for 'topic'."""
248
249 def __init__(self, conf, channel, topic, callback, tag, exchange_name,
250 name=None, **kwargs):
251 """Init a 'topic' queue.
252
253 :param channel: the amqp channel to use
254 :param topic: the topic to listen on
255 :paramtype topic: str
256 :param callback: the callback to call when messages are received
257 :param tag: a unique ID for the consumer on the channel
258 :param exchange_name: the exchange name to use
259 :param name: optional queue name, defaults to topic
260 :paramtype name: str
261
262 Other kombu options may be passed as keyword arguments
263 """
264 # Default options
265 options = {'durable': conf.amqp_durable_queues,
266 'queue_arguments': _get_queue_arguments(conf),
267 'auto_delete': conf.amqp_auto_delete,
268 'exclusive': False}
269 options.update(kwargs)
270 exchange = kombu.entity.Exchange(name=exchange_name,
271 type='topic',
272 durable=options['durable'],
273 auto_delete=options['auto_delete'])
274 super(TopicConsumer, self).__init__(channel,
275 callback,
276 tag,
277 name=name or topic,
278 exchange=exchange,
279 routing_key=topic,
280 **options)
281
282
283class FanoutConsumer(ConsumerBase):
284 """Consumer class for 'fanout'."""
285
286 def __init__(self, conf, channel, topic, callback, tag, **kwargs):
287 """Init a 'fanout' queue.
288
289 'channel' is the amqp channel to use
290 'topic' is the topic to listen on
291 'callback' is the callback to call when messages are received
292 'tag' is a unique ID for the consumer on the channel
293
294 Other kombu options may be passed
295 """
296 unique = uuid.uuid4().hex
297 exchange_name = '%s_fanout' % topic
298 queue_name = '%s_fanout_%s' % (topic, unique)
299
300 # Default options
301 options = {'durable': False,
302 'queue_arguments': _get_queue_arguments(conf),
303 'auto_delete': True,
304 'exclusive': False}
305 options.update(kwargs)
306 exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
307 durable=options['durable'],
308 auto_delete=options['auto_delete'])
309 super(FanoutConsumer, self).__init__(channel, callback, tag,
310 name=queue_name,
311 exchange=exchange,
312 routing_key=topic,
313 **options)
314
315
316class Publisher(object):
317 """Base Publisher class."""
318
319 def __init__(self, channel, exchange_name, routing_key, **kwargs):
320 """Init the Publisher class with the exchange_name, routing_key,
321 and other options
322 """
323 self.exchange_name = exchange_name
324 self.routing_key = routing_key
325 self.kwargs = kwargs
326 self.reconnect(channel)
327
328 def reconnect(self, channel):
329 """Re-establish the Producer after a rabbit reconnection."""
330 self.exchange = kombu.entity.Exchange(name=self.exchange_name,
331 **self.kwargs)
332 self.producer = kombu.messaging.Producer(exchange=self.exchange,
333 channel=channel,
334 routing_key=self.routing_key)
335
336 def send(self, msg, timeout=None):
337 """Send a message."""
338 if timeout:
339 #
340 # AMQP TTL is in milliseconds when set in the header.
341 #
342 self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
343 else:
344 self.producer.publish(msg)
345
346
347class DirectPublisher(Publisher):
348 """Publisher class for 'direct'."""
349 def __init__(self, conf, channel, topic, **kwargs):
350 """Init a 'direct' publisher.
351
352 Kombu options may be passed as keyword args to override defaults
353 """
354
355 options = {'durable': False,
356 'auto_delete': True,
357 'exclusive': False}
358 options.update(kwargs)
359 super(DirectPublisher, self).__init__(channel, topic, topic,
360 type='direct', **options)
361
362
363class TopicPublisher(Publisher):
364 """Publisher class for 'topic'."""
365 def __init__(self, conf, channel, exchange_name, topic, **kwargs):
366 """Init a 'topic' publisher.
367
368 Kombu options may be passed as keyword args to override defaults
369 """
370 options = {'durable': conf.amqp_durable_queues,
371 'auto_delete': conf.amqp_auto_delete,
372 'exclusive': False}
373 options.update(kwargs)
374 super(TopicPublisher, self).__init__(channel,
375 exchange_name,
376 topic,
377 type='topic',
378 **options)
379
380
381class FanoutPublisher(Publisher):
382 """Publisher class for 'fanout'."""
383 def __init__(self, conf, channel, topic, **kwargs):
384 """Init a 'fanout' publisher.
385
386 Kombu options may be passed as keyword args to override defaults
387 """
388 options = {'durable': False,
389 'auto_delete': True,
390 'exclusive': False}
391 options.update(kwargs)
392 super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
393 None, type='fanout', **options)
394
395
396class NotifyPublisher(TopicPublisher):
397 """Publisher class for 'notify'."""
398
399 def __init__(self, conf, channel, exchange_name, topic, **kwargs):
400 self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
401 self.queue_arguments = _get_queue_arguments(conf)
402 super(NotifyPublisher, self).__init__(conf, channel, exchange_name,
403 topic, **kwargs)
404
405 def reconnect(self, channel):
406 super(NotifyPublisher, self).reconnect(channel)
407
408 # NOTE(jerdfelt): Normally the consumer would create the queue, but
409 # we do this to ensure that messages don't get dropped if the
410 # consumer is started after we do
411 queue = kombu.entity.Queue(channel=channel,
412 exchange=self.exchange,
413 durable=self.durable,
414 name=self.routing_key,
415 routing_key=self.routing_key,
416 queue_arguments=self.queue_arguments)
417 queue.declare()
418
419
420class Connection(object):
421 """Connection object."""
422
423 pools = {}
424
425 def __init__(self, conf, url):
426 self.consumers = []
427 self.conf = conf
428 self.max_retries = self.conf.rabbit_max_retries
429 # Try forever?
430 if self.max_retries <= 0:
431 self.max_retries = None
432 self.interval_start = self.conf.rabbit_retry_interval
433 self.interval_stepping = self.conf.rabbit_retry_backoff
434 # max retry-interval = 30 seconds
435 self.interval_max = 30
436 self.memory_transport = False
437
438 ssl_params = self._fetch_ssl_params()
439
440 if url.virtual_host is not None:
441 virtual_host = url.virtual_host
442 else:
443 virtual_host = self.conf.rabbit_virtual_host
444
445 self.brokers_params = []
446 if url.hosts:
447 for host in url.hosts:
448 params = {
449 'hostname': host.hostname,
450 'port': host.port or 5672,
451 'userid': host.username or '',
452 'password': host.password or '',
453 'login_method': self.conf.rabbit_login_method,
454 'virtual_host': virtual_host
455 }
456 if self.conf.fake_rabbit:
457 params['transport'] = 'memory'
458 if self.conf.rabbit_use_ssl:
459 params['ssl'] = ssl_params
460
461 self.brokers_params.append(params)
462 else:
463 # Old configuration format
464 for adr in self.conf.rabbit_hosts:
465 hostname, port = netutils.parse_host_port(
466 adr, default_port=self.conf.rabbit_port)
467
468 params = {
469 'hostname': hostname,
470 'port': port,
471 'userid': self.conf.rabbit_userid,
472 'password': self.conf.rabbit_password,
473 'login_method': self.conf.rabbit_login_method,
474 'virtual_host': virtual_host
475 }
476
477 if self.conf.fake_rabbit:
478 params['transport'] = 'memory'
479 if self.conf.rabbit_use_ssl:
480 params['ssl'] = ssl_params
481
482 self.brokers_params.append(params)
483
484 random.shuffle(self.brokers_params)
485 self.brokers = itertools.cycle(self.brokers_params)
486
487 self.memory_transport = self.conf.fake_rabbit
488
489 self.connection = None
490 self.do_consume = None
491 self.reconnect()
492
493 # FIXME(markmc): use oslo sslutils when it is available as a library
494 _SSL_PROTOCOLS = {
495 "tlsv1": ssl.PROTOCOL_TLSv1,
496 "sslv23": ssl.PROTOCOL_SSLv23,
497 "sslv3": ssl.PROTOCOL_SSLv3
498 }
499
500 try:
501 _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
502 except AttributeError:
503 pass
504
505 @classmethod
506 def validate_ssl_version(cls, version):
507 key = version.lower()
508 try:
509 return cls._SSL_PROTOCOLS[key]
510 except KeyError:
511 raise RuntimeError(_("Invalid SSL version : %s") % version)
512
513 def _fetch_ssl_params(self):
514 """Handles fetching what ssl params should be used for the connection
515 (if any).
516 """
517 ssl_params = dict()
518
519 # http://docs.python.org/library/ssl.html - ssl.wrap_socket
520 if self.conf.kombu_ssl_version:
521 ssl_params['ssl_version'] = self.validate_ssl_version(
522 self.conf.kombu_ssl_version)
523 if self.conf.kombu_ssl_keyfile:
524 ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
525 if self.conf.kombu_ssl_certfile:
526 ssl_params['certfile'] = self.conf.kombu_ssl_certfile
527 if self.conf.kombu_ssl_ca_certs:
528 ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
529 # We might want to allow variations in the
530 # future with this?
531 ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
532
533 # Return the extended behavior or just have the default behavior
534 return ssl_params or True
535
536 def _connect(self, broker):
537 """Connect to rabbit. Re-establish any queues that may have
538 been declared before if we are reconnecting. Exceptions should
539 be handled by the caller.
540 """
541 LOG.info(_("Connecting to AMQP server on "
542 "%(hostname)s:%(port)d"), broker)
543 self.connection = kombu.connection.BrokerConnection(**broker)
544 self.connection_errors = self.connection.connection_errors
545 self.channel_errors = self.connection.channel_errors
546 if self.memory_transport:
547 # Kludge to speed up tests.
548 self.connection.transport.polling_interval = 0.0
549 self.do_consume = True
550 self.consumer_num = itertools.count(1)
551 self.connection.connect()
552 self.channel = self.connection.channel()
553 # work around 'memory' transport bug in 1.1.3
554 if self.memory_transport:
555 self.channel._new_queue('ae.undeliver')
556 for consumer in self.consumers:
557 consumer.reconnect(self.channel)
558 LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
559 broker)
560
561 def _disconnect(self):
562 if self.connection:
563 # XXX(nic): when reconnecting to a RabbitMQ cluster
564 # with mirrored queues in use, the attempt to release the
565 # connection can hang "indefinitely" somewhere deep down
566 # in Kombu. Blocking the thread for a bit prior to
567 # release seems to kludge around the problem where it is
568 # otherwise reproduceable.
569 if self.conf.kombu_reconnect_delay > 0:
570 LOG.info(_("Delaying reconnect for %1.1f seconds...") %
571 self.conf.kombu_reconnect_delay)
572 time.sleep(self.conf.kombu_reconnect_delay)
573
574 try:
575 self.connection.release()
576 except self.connection_errors:
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches