Merge lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721 into lp:~ubuntu-cloud-archive/ubuntu/trusty/oslo.messaging/juno

Proposed by Xiang Hui
Status: Needs review
Proposed branch: lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721
Merge into: lp:~ubuntu-cloud-archive/ubuntu/trusty/oslo.messaging/juno
Diff against target: 9657 lines (+9295/-23)
29 files modified
.pc/.quilt_patches (+1/-0)
.pc/.quilt_series (+1/-0)
.pc/.version (+1/-0)
.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py (+444/-0)
.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py (+349/-0)
.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py (+821/-0)
.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py (+374/-0)
.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py (+49/-0)
.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py (+729/-0)
.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py (+832/-0)
.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py (+735/-0)
.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py (+839/-0)
.pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py (+374/-0)
.pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py (+839/-0)
.pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/drivers/test_impl_rabbit.py (+756/-0)
.pc/0006-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py (+64/-0)
.pc/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging/_drivers/impl_rabbit.py (+865/-0)
.pc/applied-patches (+8/-0)
.pc/zmq-server-routing.patch/oslo/messaging/_drivers/impl_zmq.py (+995/-0)
debian/changelog (+8/-0)
debian/patches/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch (+24/-0)
debian/patches/series (+1/-0)
oslo/messaging/_drivers/amqpdriver.py (+16/-8)
oslo/messaging/_drivers/common.py (+26/-0)
oslo/messaging/_drivers/impl_qpid.py (+0/-1)
oslo/messaging/_drivers/impl_rabbit.py (+69/-12)
oslo/messaging/_drivers/impl_zmq.py (+2/-0)
tests/drivers/test_impl_rabbit.py (+41/-2)
tests/test_utils.py (+32/-0)
To merge this branch: bzr merge lp:~xianghui/ubuntu/trusty/oslo.messaging/juno-lp1318721
Reviewer Review Type Date Requested Status
Ubuntu Cloud Archive Team Pending
Review via email: mp+280822@code.launchpad.net

Description of the change

  * Backport of upstream release. (LP: #1318721):-
    - d/p/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
      Redeclare if exception is catched after self.queue.declare() failed.

To post a comment you must log in.

Unmerged revisions

3. By Xiang Hui

  * Backport of upstream release. (LP: #1318721):-
    - d/p/0007-fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
      Redeclare if exception is catched after self.queue.declare() failed.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added directory '.pc'
2=== added file '.pc/.quilt_patches'
3--- .pc/.quilt_patches 1970-01-01 00:00:00 +0000
4+++ .pc/.quilt_patches 2015-12-17 11:03:34 +0000
5@@ -0,0 +1,1 @@
6+debian/patches
7
8=== added file '.pc/.quilt_series'
9--- .pc/.quilt_series 1970-01-01 00:00:00 +0000
10+++ .pc/.quilt_series 2015-12-17 11:03:34 +0000
11@@ -0,0 +1,1 @@
12+series
13
14=== added file '.pc/.version'
15--- .pc/.version 1970-01-01 00:00:00 +0000
16+++ .pc/.version 2015-12-17 11:03:34 +0000
17@@ -0,0 +1,1 @@
18+2
19
20=== added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch'
21=== added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/.timestamp'
22=== added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo'
23=== added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging'
24=== added directory '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers'
25=== added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py'
26--- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py 1970-01-01 00:00:00 +0000
27+++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/amqpdriver.py 2015-12-17 11:03:34 +0000
28@@ -0,0 +1,444 @@
29+
30+# Copyright 2013 Red Hat, Inc.
31+#
32+# Licensed under the Apache License, Version 2.0 (the "License"); you may
33+# not use this file except in compliance with the License. You may obtain
34+# a copy of the License at
35+#
36+# http://www.apache.org/licenses/LICENSE-2.0
37+#
38+# Unless required by applicable law or agreed to in writing, software
39+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
40+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
41+# License for the specific language governing permissions and limitations
42+# under the License.
43+
44+__all__ = ['AMQPDriverBase']
45+
46+import logging
47+import threading
48+import time
49+import uuid
50+
51+from six import moves
52+
53+from oslo import messaging
54+from oslo.messaging._drivers import amqp as rpc_amqp
55+from oslo.messaging._drivers import base
56+from oslo.messaging._drivers import common as rpc_common
57+
58+LOG = logging.getLogger(__name__)
59+
60+
61+class AMQPIncomingMessage(base.IncomingMessage):
62+
63+ def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
64+ super(AMQPIncomingMessage, self).__init__(listener, ctxt,
65+ dict(message))
66+
67+ self.unique_id = unique_id
68+ self.msg_id = msg_id
69+ self.reply_q = reply_q
70+ self.acknowledge_callback = message.acknowledge
71+ self.requeue_callback = message.requeue
72+
73+ def _send_reply(self, conn, reply=None, failure=None,
74+ ending=False, log_failure=True):
75+ if failure:
76+ failure = rpc_common.serialize_remote_exception(failure,
77+ log_failure)
78+
79+ msg = {'result': reply, 'failure': failure}
80+ if ending:
81+ msg['ending'] = True
82+
83+ rpc_amqp._add_unique_id(msg)
84+
85+ # If a reply_q exists, add the msg_id to the reply and pass the
86+ # reply_q to direct_send() to use it as the response queue.
87+ # Otherwise use the msg_id for backward compatibility.
88+ if self.reply_q:
89+ msg['_msg_id'] = self.msg_id
90+ conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
91+ else:
92+ conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
93+
94+ def reply(self, reply=None, failure=None, log_failure=True):
95+ if not self.msg_id:
96+ # NOTE(Alexei_987) not sending reply, if msg_id is empty
97+ # because reply should not be expected by caller side
98+ return
99+ with self.listener.driver._get_connection() as conn:
100+ self._send_reply(conn, reply, failure, log_failure=log_failure)
101+ self._send_reply(conn, ending=True)
102+
103+ def acknowledge(self):
104+ self.listener.msg_id_cache.add(self.unique_id)
105+ self.acknowledge_callback()
106+
107+ def requeue(self):
108+ # NOTE(sileht): In case of the connection is lost between receiving the
109+ # message and requeing it, this requeue call fail
110+ # but because the message is not acknowledged and not added to the
111+ # msg_id_cache, the message will be reconsumed, the only difference is
112+ # the message stay at the beginning of the queue instead of moving to
113+ # the end.
114+ self.requeue_callback()
115+
116+
117+class AMQPListener(base.Listener):
118+
119+ def __init__(self, driver, conn):
120+ super(AMQPListener, self).__init__(driver)
121+ self.conn = conn
122+ self.msg_id_cache = rpc_amqp._MsgIdCache()
123+ self.incoming = []
124+
125+ def __call__(self, message):
126+ # FIXME(markmc): logging isn't driver specific
127+ rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
128+
129+ unique_id = self.msg_id_cache.check_duplicate_message(message)
130+ ctxt = rpc_amqp.unpack_context(self.conf, message)
131+
132+ self.incoming.append(AMQPIncomingMessage(self,
133+ ctxt.to_dict(),
134+ message,
135+ unique_id,
136+ ctxt.msg_id,
137+ ctxt.reply_q))
138+
139+ def poll(self, timeout=None):
140+ if timeout is not None:
141+ deadline = time.time() + timeout
142+ else:
143+ deadline = None
144+ while True:
145+ if self.incoming:
146+ return self.incoming.pop(0)
147+ if deadline is not None:
148+ timeout = deadline - time.time()
149+ if timeout < 0:
150+ return None
151+ try:
152+ self.conn.consume(limit=1, timeout=timeout)
153+ except rpc_common.Timeout:
154+ return None
155+ else:
156+ self.conn.consume(limit=1)
157+
158+
159+class ReplyWaiters(object):
160+
161+ WAKE_UP = object()
162+
163+ def __init__(self):
164+ self._queues = {}
165+ self._wrn_threshold = 10
166+
167+ def get(self, msg_id, timeout):
168+ try:
169+ return self._queues[msg_id].get(block=True, timeout=timeout)
170+ except moves.queue.Empty:
171+ raise messaging.MessagingTimeout('Timed out waiting for a reply '
172+ 'to message ID %s' % msg_id)
173+
174+ def check(self, msg_id):
175+ try:
176+ return self._queues[msg_id].get(block=False)
177+ except moves.queue.Empty:
178+ return None
179+
180+ def put(self, msg_id, message_data):
181+ queue = self._queues.get(msg_id)
182+ if not queue:
183+ LOG.warn('No calling threads waiting for msg_id : %(msg_id)s'
184+ ', message : %(data)s', {'msg_id': msg_id,
185+ 'data': message_data})
186+ LOG.warn('_queues: %s', self._queues)
187+ else:
188+ queue.put(message_data)
189+
190+ def wake_all(self, except_id):
191+ msg_ids = [i for i in self._queues.keys() if i != except_id]
192+ for msg_id in msg_ids:
193+ self.put(msg_id, self.WAKE_UP)
194+
195+ def add(self, msg_id, queue):
196+ self._queues[msg_id] = queue
197+ if len(self._queues) > self._wrn_threshold:
198+ LOG.warn('Number of call queues is greater than warning '
199+ 'threshold: %d. There could be a leak.',
200+ self._wrn_threshold)
201+ self._wrn_threshold *= 2
202+
203+ def remove(self, msg_id):
204+ del self._queues[msg_id]
205+
206+
207+class ReplyWaiter(object):
208+
209+ def __init__(self, conf, reply_q, conn, allowed_remote_exmods):
210+ self.conf = conf
211+ self.conn = conn
212+ self.reply_q = reply_q
213+ self.allowed_remote_exmods = allowed_remote_exmods
214+
215+ self.conn_lock = threading.Lock()
216+ self.incoming = []
217+ self.msg_id_cache = rpc_amqp._MsgIdCache()
218+ self.waiters = ReplyWaiters()
219+
220+ conn.declare_direct_consumer(reply_q, self)
221+
222+ def __call__(self, message):
223+ message.acknowledge()
224+ self.incoming.append(message)
225+
226+ def listen(self, msg_id):
227+ queue = moves.queue.Queue()
228+ self.waiters.add(msg_id, queue)
229+
230+ def unlisten(self, msg_id):
231+ self.waiters.remove(msg_id)
232+
233+ def _process_reply(self, data):
234+ result = None
235+ ending = False
236+ self.msg_id_cache.check_duplicate_message(data)
237+ if data['failure']:
238+ failure = data['failure']
239+ result = rpc_common.deserialize_remote_exception(
240+ failure, self.allowed_remote_exmods)
241+ elif data.get('ending', False):
242+ ending = True
243+ else:
244+ result = data['result']
245+ return result, ending
246+
247+ def _poll_connection(self, msg_id, timeout):
248+ while True:
249+ while self.incoming:
250+ message_data = self.incoming.pop(0)
251+
252+ incoming_msg_id = message_data.pop('_msg_id', None)
253+ if incoming_msg_id == msg_id:
254+ return self._process_reply(message_data)
255+
256+ self.waiters.put(incoming_msg_id, message_data)
257+
258+ try:
259+ self.conn.consume(limit=1, timeout=timeout)
260+ except rpc_common.Timeout:
261+ raise messaging.MessagingTimeout('Timed out waiting for a '
262+ 'reply to message ID %s'
263+ % msg_id)
264+
265+ def _poll_queue(self, msg_id, timeout):
266+ message = self.waiters.get(msg_id, timeout)
267+ if message is self.waiters.WAKE_UP:
268+ return None, None, True # lock was released
269+
270+ reply, ending = self._process_reply(message)
271+ return reply, ending, False
272+
273+ def _check_queue(self, msg_id):
274+ while True:
275+ message = self.waiters.check(msg_id)
276+ if message is self.waiters.WAKE_UP:
277+ continue
278+ if message is None:
279+ return None, None, True # queue is empty
280+
281+ reply, ending = self._process_reply(message)
282+ return reply, ending, False
283+
284+ def wait(self, msg_id, timeout):
285+ #
286+ # NOTE(markmc): we're waiting for a reply for msg_id to come in for on
287+ # the reply_q, but there may be other threads also waiting for replies
288+ # to other msg_ids
289+ #
290+ # Only one thread can be consuming from the queue using this connection
291+ # and we don't want to hold open a connection per thread, so instead we
292+ # have the first thread take responsibility for passing replies not
293+ # intended for itself to the appropriate thread.
294+ #
295+ final_reply = None
296+ while True:
297+ if self.conn_lock.acquire(False):
298+ # Ok, we're the thread responsible for polling the connection
299+ try:
300+ # Check the queue to see if a previous lock-holding thread
301+ # queued up a reply already
302+ while True:
303+ reply, ending, empty = self._check_queue(msg_id)
304+ if empty:
305+ break
306+ if not ending:
307+ final_reply = reply
308+ else:
309+ return final_reply
310+
311+ # Now actually poll the connection
312+ while True:
313+ reply, ending = self._poll_connection(msg_id, timeout)
314+ if not ending:
315+ final_reply = reply
316+ else:
317+ return final_reply
318+ finally:
319+ self.conn_lock.release()
320+ # We've got our reply, tell the other threads to wake up
321+ # so that one of them will take over the responsibility for
322+ # polling the connection
323+ self.waiters.wake_all(msg_id)
324+ else:
325+ # We're going to wait for the first thread to pass us our reply
326+ reply, ending, trylock = self._poll_queue(msg_id, timeout)
327+ if trylock:
328+ # The first thread got its reply, let's try and take over
329+ # the responsibility for polling
330+ continue
331+ if not ending:
332+ final_reply = reply
333+ else:
334+ return final_reply
335+
336+
337+class AMQPDriverBase(base.BaseDriver):
338+
339+ def __init__(self, conf, url, connection_pool,
340+ default_exchange=None, allowed_remote_exmods=None):
341+ super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
342+ allowed_remote_exmods)
343+
344+ self._default_exchange = default_exchange
345+
346+ self._connection_pool = connection_pool
347+
348+ self._reply_q_lock = threading.Lock()
349+ self._reply_q = None
350+ self._reply_q_conn = None
351+ self._waiter = None
352+
353+ def _get_exchange(self, target):
354+ return target.exchange or self._default_exchange
355+
356+ def _get_connection(self, pooled=True):
357+ return rpc_amqp.ConnectionContext(self.conf,
358+ self._url,
359+ self._connection_pool,
360+ pooled=pooled)
361+
362+ def _get_reply_q(self):
363+ with self._reply_q_lock:
364+ if self._reply_q is not None:
365+ return self._reply_q
366+
367+ reply_q = 'reply_' + uuid.uuid4().hex
368+
369+ conn = self._get_connection(pooled=False)
370+
371+ self._waiter = ReplyWaiter(self.conf, reply_q, conn,
372+ self._allowed_remote_exmods)
373+
374+ self._reply_q = reply_q
375+ self._reply_q_conn = conn
376+
377+ return self._reply_q
378+
379+ def _send(self, target, ctxt, message,
380+ wait_for_reply=None, timeout=None,
381+ envelope=True, notify=False, retry=None):
382+
383+ # FIXME(markmc): remove this temporary hack
384+ class Context(object):
385+ def __init__(self, d):
386+ self.d = d
387+
388+ def to_dict(self):
389+ return self.d
390+
391+ context = Context(ctxt)
392+ msg = message
393+
394+ if wait_for_reply:
395+ msg_id = uuid.uuid4().hex
396+ msg.update({'_msg_id': msg_id})
397+ LOG.debug('MSG_ID is %s', msg_id)
398+ msg.update({'_reply_q': self._get_reply_q()})
399+
400+ rpc_amqp._add_unique_id(msg)
401+ rpc_amqp.pack_context(msg, context)
402+
403+ if envelope:
404+ msg = rpc_common.serialize_msg(msg)
405+
406+ if wait_for_reply:
407+ self._waiter.listen(msg_id)
408+
409+ try:
410+ with self._get_connection() as conn:
411+ if notify:
412+ conn.notify_send(self._get_exchange(target),
413+ target.topic, msg, retry=retry)
414+ elif target.fanout:
415+ conn.fanout_send(target.topic, msg, retry=retry)
416+ else:
417+ topic = target.topic
418+ if target.server:
419+ topic = '%s.%s' % (target.topic, target.server)
420+ conn.topic_send(exchange_name=self._get_exchange(target),
421+ topic=topic, msg=msg, timeout=timeout,
422+ retry=retry)
423+
424+ if wait_for_reply:
425+ result = self._waiter.wait(msg_id, timeout)
426+ if isinstance(result, Exception):
427+ raise result
428+ return result
429+ finally:
430+ if wait_for_reply:
431+ self._waiter.unlisten(msg_id)
432+
433+ def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
434+ retry=None):
435+ return self._send(target, ctxt, message, wait_for_reply, timeout,
436+ retry=retry)
437+
438+ def send_notification(self, target, ctxt, message, version, retry=None):
439+ return self._send(target, ctxt, message,
440+ envelope=(version == 2.0), notify=True, retry=retry)
441+
442+ def listen(self, target):
443+ conn = self._get_connection(pooled=False)
444+
445+ listener = AMQPListener(self, conn)
446+
447+ conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
448+ topic=target.topic,
449+ callback=listener)
450+ conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
451+ topic='%s.%s' % (target.topic,
452+ target.server),
453+ callback=listener)
454+ conn.declare_fanout_consumer(target.topic, listener)
455+
456+ return listener
457+
458+ def listen_for_notifications(self, targets_and_priorities):
459+ conn = self._get_connection(pooled=False)
460+
461+ listener = AMQPListener(self, conn)
462+ for target, priority in targets_and_priorities:
463+ conn.declare_topic_consumer(
464+ exchange_name=self._get_exchange(target),
465+ topic='%s.%s' % (target.topic, priority),
466+ callback=listener)
467+ return listener
468+
469+ def cleanup(self):
470+ if self._connection_pool:
471+ self._connection_pool.empty()
472+ self._connection_pool = None
473
474=== added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py'
475--- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
476+++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/common.py 2015-12-17 11:03:34 +0000
477@@ -0,0 +1,349 @@
478+# Copyright 2010 United States Government as represented by the
479+# Administrator of the National Aeronautics and Space Administration.
480+# All Rights Reserved.
481+# Copyright 2011 Red Hat, Inc.
482+#
483+# Licensed under the Apache License, Version 2.0 (the "License"); you may
484+# not use this file except in compliance with the License. You may obtain
485+# a copy of the License at
486+#
487+# http://www.apache.org/licenses/LICENSE-2.0
488+#
489+# Unless required by applicable law or agreed to in writing, software
490+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
491+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
492+# License for the specific language governing permissions and limitations
493+# under the License.
494+
495+import copy
496+import logging
497+import sys
498+import traceback
499+
500+import six
501+
502+from oslo import messaging
503+from oslo.messaging import _utils as utils
504+from oslo.messaging.openstack.common.gettextutils import _
505+from oslo.messaging.openstack.common import jsonutils
506+
507+LOG = logging.getLogger(__name__)
508+
509+_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
510+
511+
512+'''RPC Envelope Version.
513+
514+This version number applies to the top level structure of messages sent out.
515+It does *not* apply to the message payload, which must be versioned
516+independently. For example, when using rpc APIs, a version number is applied
517+for changes to the API being exposed over rpc. This version number is handled
518+in the rpc proxy and dispatcher modules.
519+
520+This version number applies to the message envelope that is used in the
521+serialization done inside the rpc layer. See serialize_msg() and
522+deserialize_msg().
523+
524+The current message format (version 2.0) is very simple. It is:
525+
526+ {
527+ 'oslo.version': <RPC Envelope Version as a String>,
528+ 'oslo.message': <Application Message Payload, JSON encoded>
529+ }
530+
531+Message format version '1.0' is just considered to be the messages we sent
532+without a message envelope.
533+
534+So, the current message envelope just includes the envelope version. It may
535+eventually contain additional information, such as a signature for the message
536+payload.
537+
538+We will JSON encode the application message payload. The message envelope,
539+which includes the JSON encoded application message body, will be passed down
540+to the messaging libraries as a dict.
541+'''
542+_RPC_ENVELOPE_VERSION = '2.0'
543+
544+_VERSION_KEY = 'oslo.version'
545+_MESSAGE_KEY = 'oslo.message'
546+
547+_REMOTE_POSTFIX = '_Remote'
548+
549+
550+class RPCException(Exception):
551+ msg_fmt = _("An unknown RPC related exception occurred.")
552+
553+ def __init__(self, message=None, **kwargs):
554+ self.kwargs = kwargs
555+
556+ if not message:
557+ try:
558+ message = self.msg_fmt % kwargs
559+
560+ except Exception:
561+ # kwargs doesn't match a variable in the message
562+ # log the issue and the kwargs
563+ LOG.exception(_('Exception in string format operation'))
564+ for name, value in six.iteritems(kwargs):
565+ LOG.error("%s: %s", name, value)
566+ # at least get the core message out if something happened
567+ message = self.msg_fmt
568+
569+ super(RPCException, self).__init__(message)
570+
571+
572+class Timeout(RPCException):
573+ """Signifies that a timeout has occurred.
574+
575+ This exception is raised if the rpc_response_timeout is reached while
576+ waiting for a response from the remote side.
577+ """
578+ msg_fmt = _('Timeout while waiting on RPC response - '
579+ 'topic: "%(topic)s", RPC method: "%(method)s" '
580+ 'info: "%(info)s"')
581+
582+ def __init__(self, info=None, topic=None, method=None):
583+ """Initiates Timeout object.
584+
585+ :param info: Extra info to convey to the user
586+ :param topic: The topic that the rpc call was sent to
587+ :param rpc_method_name: The name of the rpc method being
588+ called
589+ """
590+ self.info = info
591+ self.topic = topic
592+ self.method = method
593+ super(Timeout, self).__init__(
594+ None,
595+ info=info or _('<unknown>'),
596+ topic=topic or _('<unknown>'),
597+ method=method or _('<unknown>'))
598+
599+
600+class DuplicateMessageError(RPCException):
601+ msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
602+
603+
604+class InvalidRPCConnectionReuse(RPCException):
605+ msg_fmt = _("Invalid reuse of an RPC connection.")
606+
607+
608+class UnsupportedRpcVersion(RPCException):
609+ msg_fmt = _("Specified RPC version, %(version)s, not supported by "
610+ "this endpoint.")
611+
612+
613+class UnsupportedRpcEnvelopeVersion(RPCException):
614+ msg_fmt = _("Specified RPC envelope version, %(version)s, "
615+ "not supported by this endpoint.")
616+
617+
618+class RpcVersionCapError(RPCException):
619+ msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
620+
621+
622+class Connection(object):
623+ """A connection, returned by rpc.create_connection().
624+
625+ This class represents a connection to the message bus used for rpc.
626+ An instance of this class should never be created by users of the rpc API.
627+ Use rpc.create_connection() instead.
628+ """
629+ def close(self):
630+ """Close the connection.
631+
632+ This method must be called when the connection will no longer be used.
633+ It will ensure that any resources associated with the connection, such
634+ as a network connection, and cleaned up.
635+ """
636+ raise NotImplementedError()
637+
638+
639+def _safe_log(log_func, msg, msg_data):
640+ """Sanitizes the msg_data field before logging."""
641+ SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
642+
643+ def _fix_passwords(d):
644+ """Sanitizes the password fields in the dictionary."""
645+ for k in six.iterkeys(d):
646+ if k.lower().find('password') != -1:
647+ d[k] = '<SANITIZED>'
648+ elif k.lower() in SANITIZE:
649+ d[k] = '<SANITIZED>'
650+ elif isinstance(d[k], dict):
651+ _fix_passwords(d[k])
652+ return d
653+
654+ return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
655+
656+
657+def serialize_remote_exception(failure_info, log_failure=True):
658+ """Prepares exception data to be sent over rpc.
659+
660+ Failure_info should be a sys.exc_info() tuple.
661+
662+ """
663+ tb = traceback.format_exception(*failure_info)
664+ failure = failure_info[1]
665+ if log_failure:
666+ LOG.error(_("Returning exception %s to caller"),
667+ six.text_type(failure))
668+ LOG.error(tb)
669+
670+ kwargs = {}
671+ if hasattr(failure, 'kwargs'):
672+ kwargs = failure.kwargs
673+
674+ # NOTE(matiu): With cells, it's possible to re-raise remote, remote
675+ # exceptions. Lets turn it back into the original exception type.
676+ cls_name = six.text_type(failure.__class__.__name__)
677+ mod_name = six.text_type(failure.__class__.__module__)
678+ if (cls_name.endswith(_REMOTE_POSTFIX) and
679+ mod_name.endswith(_REMOTE_POSTFIX)):
680+ cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
681+ mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
682+
683+ data = {
684+ 'class': cls_name,
685+ 'module': mod_name,
686+ 'message': six.text_type(failure),
687+ 'tb': tb,
688+ 'args': failure.args,
689+ 'kwargs': kwargs
690+ }
691+
692+ json_data = jsonutils.dumps(data)
693+
694+ return json_data
695+
696+
697+def deserialize_remote_exception(data, allowed_remote_exmods):
698+ failure = jsonutils.loads(six.text_type(data))
699+
700+ trace = failure.get('tb', [])
701+ message = failure.get('message', "") + "\n" + "\n".join(trace)
702+ name = failure.get('class')
703+ module = failure.get('module')
704+
705+ # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
706+ # order to prevent arbitrary code execution.
707+ if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
708+ return messaging.RemoteError(name, failure.get('message'), trace)
709+
710+ try:
711+ __import__(module)
712+ mod = sys.modules[module]
713+ klass = getattr(mod, name)
714+ if not issubclass(klass, Exception):
715+ raise TypeError("Can only deserialize Exceptions")
716+
717+ failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
718+ except (AttributeError, TypeError, ImportError):
719+ return messaging.RemoteError(name, failure.get('message'), trace)
720+
721+ ex_type = type(failure)
722+ str_override = lambda self: message
723+ new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
724+ {'__str__': str_override, '__unicode__': str_override})
725+ new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
726+ try:
727+ # NOTE(ameade): Dynamically create a new exception type and swap it in
728+ # as the new type for the exception. This only works on user defined
729+ # Exceptions and not core Python exceptions. This is important because
730+ # we cannot necessarily change an exception message so we must override
731+ # the __str__ method.
732+ failure.__class__ = new_ex_type
733+ except TypeError:
734+ # NOTE(ameade): If a core exception then just add the traceback to the
735+ # first exception argument.
736+ failure.args = (message,) + failure.args[1:]
737+ return failure
738+
739+
740+class CommonRpcContext(object):
741+ def __init__(self, **kwargs):
742+ self.values = kwargs
743+
744+ def __getattr__(self, key):
745+ try:
746+ return self.values[key]
747+ except KeyError:
748+ raise AttributeError(key)
749+
750+ def to_dict(self):
751+ return copy.deepcopy(self.values)
752+
753+ @classmethod
754+ def from_dict(cls, values):
755+ return cls(**values)
756+
757+ def deepcopy(self):
758+ return self.from_dict(self.to_dict())
759+
760+ def update_store(self):
761+ # local.store.context = self
762+ pass
763+
764+
765+class ClientException(Exception):
766+ """Encapsulates actual exception expected to be hit by a RPC proxy object.
767+
768+ Merely instantiating it records the current exception information, which
769+ will be passed back to the RPC client without exceptional logging.
770+ """
771+ def __init__(self):
772+ self._exc_info = sys.exc_info()
773+
774+
775+def serialize_msg(raw_msg):
776+ # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
777+ # information about this format.
778+ msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
779+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
780+
781+ return msg
782+
783+
784+def deserialize_msg(msg):
785+ # NOTE(russellb): Hang on to your hats, this road is about to
786+ # get a little bumpy.
787+ #
788+ # Robustness Principle:
789+ # "Be strict in what you send, liberal in what you accept."
790+ #
791+ # At this point we have to do a bit of guessing about what it
792+ # is we just received. Here is the set of possibilities:
793+ #
794+ # 1) We received a dict. This could be 2 things:
795+ #
796+ # a) Inspect it to see if it looks like a standard message envelope.
797+ # If so, great!
798+ #
799+ # b) If it doesn't look like a standard message envelope, it could either
800+ # be a notification, or a message from before we added a message
801+ # envelope (referred to as version 1.0).
802+ # Just return the message as-is.
803+ #
804+ # 2) It's any other non-dict type. Just return it and hope for the best.
805+ # This case covers return values from rpc.call() from before message
806+ # envelopes were used. (messages to call a method were always a dict)
807+
808+ if not isinstance(msg, dict):
809+ # See #2 above.
810+ return msg
811+
812+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
813+ if not all(map(lambda key: key in msg, base_envelope_keys)):
814+ # See #1.b above.
815+ return msg
816+
817+ # At this point we think we have the message envelope
818+ # format we were expecting. (#1.a above)
819+
820+ if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
821+ msg[_VERSION_KEY]):
822+ raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
823+
824+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
825+
826+ return raw_msg
827
828=== added file '.pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py'
829--- .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
830+++ .pc/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000
831@@ -0,0 +1,821 @@
832+# Copyright 2011 OpenStack Foundation
833+#
834+# Licensed under the Apache License, Version 2.0 (the "License"); you may
835+# not use this file except in compliance with the License. You may obtain
836+# a copy of the License at
837+#
838+# http://www.apache.org/licenses/LICENSE-2.0
839+#
840+# Unless required by applicable law or agreed to in writing, software
841+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
842+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
843+# License for the specific language governing permissions and limitations
844+# under the License.
845+
846+import functools
847+import itertools
848+import logging
849+import random
850+import socket
851+import ssl
852+import time
853+import uuid
854+
855+import kombu
856+import kombu.connection
857+import kombu.entity
858+import kombu.messaging
859+import six
860+
861+from oslo.config import cfg
862+from oslo.messaging._drivers import amqp as rpc_amqp
863+from oslo.messaging._drivers import amqpdriver
864+from oslo.messaging._drivers import common as rpc_common
865+from oslo.messaging import exceptions
866+from oslo.messaging.openstack.common.gettextutils import _
867+from oslo.utils import netutils
868+
869+rabbit_opts = [
870+ cfg.StrOpt('kombu_ssl_version',
871+ default='',
872+ help='SSL version to use (valid only if SSL enabled). '
873+ 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
874+ 'be available on some distributions.'
875+ ),
876+ cfg.StrOpt('kombu_ssl_keyfile',
877+ default='',
878+ help='SSL key file (valid only if SSL enabled).'),
879+ cfg.StrOpt('kombu_ssl_certfile',
880+ default='',
881+ help='SSL cert file (valid only if SSL enabled).'),
882+ cfg.StrOpt('kombu_ssl_ca_certs',
883+ default='',
884+ help='SSL certification authority file '
885+ '(valid only if SSL enabled).'),
886+ cfg.FloatOpt('kombu_reconnect_delay',
887+ default=1.0,
888+ help='How long to wait before reconnecting in response to an '
889+ 'AMQP consumer cancel notification.'),
890+ cfg.StrOpt('rabbit_host',
891+ default='localhost',
892+ help='The RabbitMQ broker address where a single node is '
893+ 'used.'),
894+ cfg.IntOpt('rabbit_port',
895+ default=5672,
896+ help='The RabbitMQ broker port where a single node is used.'),
897+ cfg.ListOpt('rabbit_hosts',
898+ default=['$rabbit_host:$rabbit_port'],
899+ help='RabbitMQ HA cluster host:port pairs.'),
900+ cfg.BoolOpt('rabbit_use_ssl',
901+ default=False,
902+ help='Connect over SSL for RabbitMQ.'),
903+ cfg.StrOpt('rabbit_userid',
904+ default='guest',
905+ help='The RabbitMQ userid.'),
906+ cfg.StrOpt('rabbit_password',
907+ default='guest',
908+ help='The RabbitMQ password.',
909+ secret=True),
910+ cfg.StrOpt('rabbit_login_method',
911+ default='AMQPLAIN',
912+ help='the RabbitMQ login method'),
913+ cfg.StrOpt('rabbit_virtual_host',
914+ default='/',
915+ help='The RabbitMQ virtual host.'),
916+ cfg.IntOpt('rabbit_retry_interval',
917+ default=1,
918+ help='How frequently to retry connecting with RabbitMQ.'),
919+ cfg.IntOpt('rabbit_retry_backoff',
920+ default=2,
921+ help='How long to backoff for between retries when connecting '
922+ 'to RabbitMQ.'),
923+ cfg.IntOpt('rabbit_max_retries',
924+ default=0,
925+ help='Maximum number of RabbitMQ connection retries. '
926+ 'Default is 0 (infinite retry count).'),
927+ cfg.BoolOpt('rabbit_ha_queues',
928+ default=False,
929+ help='Use HA queues in RabbitMQ (x-ha-policy: all). '
930+ 'If you change this option, you must wipe the '
931+ 'RabbitMQ database.'),
932+
933+ # FIXME(markmc): this was toplevel in openstack.common.rpc
934+ cfg.BoolOpt('fake_rabbit',
935+ default=False,
936+ help='If passed, use a fake RabbitMQ provider.'),
937+]
938+
939+LOG = logging.getLogger(__name__)
940+
941+
942+def _get_queue_arguments(conf):
943+ """Construct the arguments for declaring a queue.
944+
945+ If the rabbit_ha_queues option is set, we declare a mirrored queue
946+ as described here:
947+
948+ http://www.rabbitmq.com/ha.html
949+
950+ Setting x-ha-policy to all means that the queue will be mirrored
951+ to all nodes in the cluster.
952+ """
953+ return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
954+
955+
956+class RabbitMessage(dict):
957+ def __init__(self, raw_message):
958+ super(RabbitMessage, self).__init__(
959+ rpc_common.deserialize_msg(raw_message.payload))
960+ self._raw_message = raw_message
961+
962+ def acknowledge(self):
963+ self._raw_message.ack()
964+
965+ def requeue(self):
966+ self._raw_message.requeue()
967+
968+
969+class ConsumerBase(object):
970+ """Consumer base class."""
971+
972+ def __init__(self, channel, callback, tag, **kwargs):
973+ """Declare a queue on an amqp channel.
974+
975+ 'channel' is the amqp channel to use
976+ 'callback' is the callback to call when messages are received
977+ 'tag' is a unique ID for the consumer on the channel
978+
979+ queue name, exchange name, and other kombu options are
980+ passed in here as a dictionary.
981+ """
982+ self.callback = callback
983+ self.tag = six.text_type(tag)
984+ self.kwargs = kwargs
985+ self.queue = None
986+ self.reconnect(channel)
987+
988+ def reconnect(self, channel):
989+ """Re-declare the queue after a rabbit reconnect."""
990+ self.channel = channel
991+ self.kwargs['channel'] = channel
992+ self.queue = kombu.entity.Queue(**self.kwargs)
993+ self.queue.declare()
994+
995+ def _callback_handler(self, message, callback):
996+ """Call callback with deserialized message.
997+
998+ Messages that are processed and ack'ed.
999+ """
1000+
1001+ try:
1002+ callback(RabbitMessage(message))
1003+ except Exception:
1004+ LOG.exception(_("Failed to process message"
1005+ " ... skipping it."))
1006+ message.ack()
1007+
1008+ def consume(self, *args, **kwargs):
1009+ """Actually declare the consumer on the amqp channel. This will
1010+ start the flow of messages from the queue. Using the
1011+ Connection.iterconsume() iterator will process the messages,
1012+ calling the appropriate callback.
1013+
1014+ If a callback is specified in kwargs, use that. Otherwise,
1015+ use the callback passed during __init__()
1016+
1017+ If kwargs['nowait'] is True, then this call will block until
1018+ a message is read.
1019+
1020+ """
1021+
1022+ options = {'consumer_tag': self.tag}
1023+ options['nowait'] = kwargs.get('nowait', False)
1024+ callback = kwargs.get('callback', self.callback)
1025+ if not callback:
1026+ raise ValueError("No callback defined")
1027+
1028+ def _callback(raw_message):
1029+ message = self.channel.message_to_python(raw_message)
1030+ self._callback_handler(message, callback)
1031+
1032+ self.queue.consume(*args, callback=_callback, **options)
1033+
1034+ def cancel(self):
1035+ """Cancel the consuming from the queue, if it has started."""
1036+ try:
1037+ self.queue.cancel(self.tag)
1038+ except KeyError as e:
1039+ # NOTE(comstud): Kludge to get around a amqplib bug
1040+ if six.text_type(e) != "u'%s'" % self.tag:
1041+ raise
1042+ self.queue = None
1043+
1044+
1045+class DirectConsumer(ConsumerBase):
1046+ """Queue/consumer class for 'direct'."""
1047+
1048+ def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
1049+ """Init a 'direct' queue.
1050+
1051+ 'channel' is the amqp channel to use
1052+ 'msg_id' is the msg_id to listen on
1053+ 'callback' is the callback to call when messages are received
1054+ 'tag' is a unique ID for the consumer on the channel
1055+
1056+ Other kombu options may be passed
1057+ """
1058+ # Default options
1059+ options = {'durable': False,
1060+ 'queue_arguments': _get_queue_arguments(conf),
1061+ 'auto_delete': True,
1062+ 'exclusive': False}
1063+ options.update(kwargs)
1064+ exchange = kombu.entity.Exchange(name=msg_id,
1065+ type='direct',
1066+ durable=options['durable'],
1067+ auto_delete=options['auto_delete'])
1068+ super(DirectConsumer, self).__init__(channel,
1069+ callback,
1070+ tag,
1071+ name=msg_id,
1072+ exchange=exchange,
1073+ routing_key=msg_id,
1074+ **options)
1075+
1076+
1077+class TopicConsumer(ConsumerBase):
1078+ """Consumer class for 'topic'."""
1079+
1080+ def __init__(self, conf, channel, topic, callback, tag, exchange_name,
1081+ name=None, **kwargs):
1082+ """Init a 'topic' queue.
1083+
1084+ :param channel: the amqp channel to use
1085+ :param topic: the topic to listen on
1086+ :paramtype topic: str
1087+ :param callback: the callback to call when messages are received
1088+ :param tag: a unique ID for the consumer on the channel
1089+ :param exchange_name: the exchange name to use
1090+ :param name: optional queue name, defaults to topic
1091+ :paramtype name: str
1092+
1093+ Other kombu options may be passed as keyword arguments
1094+ """
1095+ # Default options
1096+ options = {'durable': conf.amqp_durable_queues,
1097+ 'queue_arguments': _get_queue_arguments(conf),
1098+ 'auto_delete': conf.amqp_auto_delete,
1099+ 'exclusive': False}
1100+ options.update(kwargs)
1101+ exchange = kombu.entity.Exchange(name=exchange_name,
1102+ type='topic',
1103+ durable=options['durable'],
1104+ auto_delete=options['auto_delete'])
1105+ super(TopicConsumer, self).__init__(channel,
1106+ callback,
1107+ tag,
1108+ name=name or topic,
1109+ exchange=exchange,
1110+ routing_key=topic,
1111+ **options)
1112+
1113+
1114+class FanoutConsumer(ConsumerBase):
1115+ """Consumer class for 'fanout'."""
1116+
1117+ def __init__(self, conf, channel, topic, callback, tag, **kwargs):
1118+ """Init a 'fanout' queue.
1119+
1120+ 'channel' is the amqp channel to use
1121+ 'topic' is the topic to listen on
1122+ 'callback' is the callback to call when messages are received
1123+ 'tag' is a unique ID for the consumer on the channel
1124+
1125+ Other kombu options may be passed
1126+ """
1127+ unique = uuid.uuid4().hex
1128+ exchange_name = '%s_fanout' % topic
1129+ queue_name = '%s_fanout_%s' % (topic, unique)
1130+
1131+ # Default options
1132+ options = {'durable': False,
1133+ 'queue_arguments': _get_queue_arguments(conf),
1134+ 'auto_delete': True,
1135+ 'exclusive': False}
1136+ options.update(kwargs)
1137+ exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
1138+ durable=options['durable'],
1139+ auto_delete=options['auto_delete'])
1140+ super(FanoutConsumer, self).__init__(channel, callback, tag,
1141+ name=queue_name,
1142+ exchange=exchange,
1143+ routing_key=topic,
1144+ **options)
1145+
1146+
1147+class Publisher(object):
1148+ """Base Publisher class."""
1149+
1150+ def __init__(self, channel, exchange_name, routing_key, **kwargs):
1151+ """Init the Publisher class with the exchange_name, routing_key,
1152+ and other options
1153+ """
1154+ self.exchange_name = exchange_name
1155+ self.routing_key = routing_key
1156+ self.kwargs = kwargs
1157+ self.reconnect(channel)
1158+
1159+ def reconnect(self, channel):
1160+ """Re-establish the Producer after a rabbit reconnection."""
1161+ self.exchange = kombu.entity.Exchange(name=self.exchange_name,
1162+ **self.kwargs)
1163+ self.producer = kombu.messaging.Producer(exchange=self.exchange,
1164+ channel=channel,
1165+ routing_key=self.routing_key)
1166+
1167+ def send(self, msg, timeout=None):
1168+ """Send a message."""
1169+ if timeout:
1170+ #
1171+ # AMQP TTL is in milliseconds when set in the header.
1172+ #
1173+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
1174+ else:
1175+ self.producer.publish(msg)
1176+
1177+
1178+class DirectPublisher(Publisher):
1179+ """Publisher class for 'direct'."""
1180+ def __init__(self, conf, channel, topic, **kwargs):
1181+ """Init a 'direct' publisher.
1182+
1183+ Kombu options may be passed as keyword args to override defaults
1184+ """
1185+
1186+ options = {'durable': False,
1187+ 'auto_delete': True,
1188+ 'exclusive': False}
1189+ options.update(kwargs)
1190+ super(DirectPublisher, self).__init__(channel, topic, topic,
1191+ type='direct', **options)
1192+
1193+
1194+class TopicPublisher(Publisher):
1195+ """Publisher class for 'topic'."""
1196+ def __init__(self, conf, channel, exchange_name, topic, **kwargs):
1197+ """Init a 'topic' publisher.
1198+
1199+ Kombu options may be passed as keyword args to override defaults
1200+ """
1201+ options = {'durable': conf.amqp_durable_queues,
1202+ 'auto_delete': conf.amqp_auto_delete,
1203+ 'exclusive': False}
1204+ options.update(kwargs)
1205+ super(TopicPublisher, self).__init__(channel,
1206+ exchange_name,
1207+ topic,
1208+ type='topic',
1209+ **options)
1210+
1211+
1212+class FanoutPublisher(Publisher):
1213+ """Publisher class for 'fanout'."""
1214+ def __init__(self, conf, channel, topic, **kwargs):
1215+ """Init a 'fanout' publisher.
1216+
1217+ Kombu options may be passed as keyword args to override defaults
1218+ """
1219+ options = {'durable': False,
1220+ 'auto_delete': True,
1221+ 'exclusive': False}
1222+ options.update(kwargs)
1223+ super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
1224+ None, type='fanout', **options)
1225+
1226+
1227+class NotifyPublisher(TopicPublisher):
1228+ """Publisher class for 'notify'."""
1229+
1230+ def __init__(self, conf, channel, exchange_name, topic, **kwargs):
1231+ self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
1232+ self.queue_arguments = _get_queue_arguments(conf)
1233+ super(NotifyPublisher, self).__init__(conf, channel, exchange_name,
1234+ topic, **kwargs)
1235+
1236+ def reconnect(self, channel):
1237+ super(NotifyPublisher, self).reconnect(channel)
1238+
1239+ # NOTE(jerdfelt): Normally the consumer would create the queue, but
1240+ # we do this to ensure that messages don't get dropped if the
1241+ # consumer is started after we do
1242+ queue = kombu.entity.Queue(channel=channel,
1243+ exchange=self.exchange,
1244+ durable=self.durable,
1245+ name=self.routing_key,
1246+ routing_key=self.routing_key,
1247+ queue_arguments=self.queue_arguments)
1248+ queue.declare()
1249+
1250+
1251+class Connection(object):
1252+ """Connection object."""
1253+
1254+ pools = {}
1255+
1256+ def __init__(self, conf, url):
1257+ self.consumers = []
1258+ self.conf = conf
1259+ self.max_retries = self.conf.rabbit_max_retries
1260+ # Try forever?
1261+ if self.max_retries <= 0:
1262+ self.max_retries = None
1263+ self.interval_start = self.conf.rabbit_retry_interval
1264+ self.interval_stepping = self.conf.rabbit_retry_backoff
1265+ # max retry-interval = 30 seconds
1266+ self.interval_max = 30
1267+ self.memory_transport = False
1268+
1269+ ssl_params = self._fetch_ssl_params()
1270+
1271+ if url.virtual_host is not None:
1272+ virtual_host = url.virtual_host
1273+ else:
1274+ virtual_host = self.conf.rabbit_virtual_host
1275+
1276+ self.brokers_params = []
1277+ if url.hosts:
1278+ for host in url.hosts:
1279+ params = {
1280+ 'hostname': host.hostname,
1281+ 'port': host.port or 5672,
1282+ 'userid': host.username or '',
1283+ 'password': host.password or '',
1284+ 'login_method': self.conf.rabbit_login_method,
1285+ 'virtual_host': virtual_host
1286+ }
1287+ if self.conf.fake_rabbit:
1288+ params['transport'] = 'memory'
1289+ if self.conf.rabbit_use_ssl:
1290+ params['ssl'] = ssl_params
1291+
1292+ self.brokers_params.append(params)
1293+ else:
1294+ # Old configuration format
1295+ for adr in self.conf.rabbit_hosts:
1296+ hostname, port = netutils.parse_host_port(
1297+ adr, default_port=self.conf.rabbit_port)
1298+
1299+ params = {
1300+ 'hostname': hostname,
1301+ 'port': port,
1302+ 'userid': self.conf.rabbit_userid,
1303+ 'password': self.conf.rabbit_password,
1304+ 'login_method': self.conf.rabbit_login_method,
1305+ 'virtual_host': virtual_host
1306+ }
1307+
1308+ if self.conf.fake_rabbit:
1309+ params['transport'] = 'memory'
1310+ if self.conf.rabbit_use_ssl:
1311+ params['ssl'] = ssl_params
1312+
1313+ self.brokers_params.append(params)
1314+
1315+ random.shuffle(self.brokers_params)
1316+ self.brokers = itertools.cycle(self.brokers_params)
1317+
1318+ self.memory_transport = self.conf.fake_rabbit
1319+
1320+ self.connection = None
1321+ self.do_consume = None
1322+ self.reconnect()
1323+
1324+ # FIXME(markmc): use oslo sslutils when it is available as a library
1325+ _SSL_PROTOCOLS = {
1326+ "tlsv1": ssl.PROTOCOL_TLSv1,
1327+ "sslv23": ssl.PROTOCOL_SSLv23,
1328+ "sslv3": ssl.PROTOCOL_SSLv3
1329+ }
1330+
1331+ try:
1332+ _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
1333+ except AttributeError:
1334+ pass
1335+
1336+ @classmethod
1337+ def validate_ssl_version(cls, version):
1338+ key = version.lower()
1339+ try:
1340+ return cls._SSL_PROTOCOLS[key]
1341+ except KeyError:
1342+ raise RuntimeError(_("Invalid SSL version : %s") % version)
1343+
1344+ def _fetch_ssl_params(self):
1345+ """Handles fetching what ssl params should be used for the connection
1346+ (if any).
1347+ """
1348+ ssl_params = dict()
1349+
1350+ # http://docs.python.org/library/ssl.html - ssl.wrap_socket
1351+ if self.conf.kombu_ssl_version:
1352+ ssl_params['ssl_version'] = self.validate_ssl_version(
1353+ self.conf.kombu_ssl_version)
1354+ if self.conf.kombu_ssl_keyfile:
1355+ ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
1356+ if self.conf.kombu_ssl_certfile:
1357+ ssl_params['certfile'] = self.conf.kombu_ssl_certfile
1358+ if self.conf.kombu_ssl_ca_certs:
1359+ ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
1360+ # We might want to allow variations in the
1361+ # future with this?
1362+ ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
1363+
1364+ # Return the extended behavior or just have the default behavior
1365+ return ssl_params or True
1366+
1367+ def _connect(self, broker):
1368+ """Connect to rabbit. Re-establish any queues that may have
1369+ been declared before if we are reconnecting. Exceptions should
1370+ be handled by the caller.
1371+ """
1372+ LOG.info(_("Connecting to AMQP server on "
1373+ "%(hostname)s:%(port)d"), broker)
1374+ self.connection = kombu.connection.BrokerConnection(**broker)
1375+ self.connection_errors = self.connection.connection_errors
1376+ self.channel_errors = self.connection.channel_errors
1377+ if self.memory_transport:
1378+ # Kludge to speed up tests.
1379+ self.connection.transport.polling_interval = 0.0
1380+ self.do_consume = True
1381+ self.consumer_num = itertools.count(1)
1382+ self.connection.connect()
1383+ self.channel = self.connection.channel()
1384+ # work around 'memory' transport bug in 1.1.3
1385+ if self.memory_transport:
1386+ self.channel._new_queue('ae.undeliver')
1387+ for consumer in self.consumers:
1388+ consumer.reconnect(self.channel)
1389+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
1390+ broker)
1391+
1392+ def _disconnect(self):
1393+ if self.connection:
1394+ # XXX(nic): when reconnecting to a RabbitMQ cluster
1395+ # with mirrored queues in use, the attempt to release the
1396+ # connection can hang "indefinitely" somewhere deep down
1397+ # in Kombu. Blocking the thread for a bit prior to
1398+ # release seems to kludge around the problem where it is
1399+ # otherwise reproduceable.
1400+ if self.conf.kombu_reconnect_delay > 0:
1401+ LOG.info(_("Delaying reconnect for %1.1f seconds...") %
1402+ self.conf.kombu_reconnect_delay)
1403+ time.sleep(self.conf.kombu_reconnect_delay)
1404+
1405+ try:
1406+ self.connection.release()
1407+ except self.connection_errors:
1408+ pass
1409+ self.connection = None
1410+
1411+ def reconnect(self, retry=None):
1412+ """Handles reconnecting and re-establishing queues.
1413+ Will retry up to retry number of times.
1414+ retry = None means use the value of rabbit_max_retries
1415+ retry = -1 means to retry forever
1416+ retry = 0 means no retry
1417+ retry = N means N retries
1418+ Sleep between tries, starting at self.interval_start
1419+ seconds, backing off self.interval_stepping number of seconds
1420+ each attempt.
1421+ """
1422+
1423+ attempt = 0
1424+ loop_forever = False
1425+ if retry is None:
1426+ retry = self.max_retries
1427+ if retry is None or retry < 0:
1428+ loop_forever = True
1429+
1430+ while True:
1431+ self._disconnect()
1432+
1433+ broker = six.next(self.brokers)
1434+ attempt += 1
1435+ try:
1436+ self._connect(broker)
1437+ return
1438+ except IOError as ex:
1439+ e = ex
1440+ except self.connection_errors as ex:
1441+ e = ex
1442+ except Exception as ex:
1443+ # NOTE(comstud): Unfortunately it's possible for amqplib
1444+ # to return an error not covered by its transport
1445+ # connection_errors in the case of a timeout waiting for
1446+ # a protocol response. (See paste link in LP888621)
1447+ # So, we check all exceptions for 'timeout' in them
1448+ # and try to reconnect in this case.
1449+ if 'timeout' not in six.text_type(e):
1450+ raise
1451+ e = ex
1452+
1453+ log_info = {}
1454+ log_info['err_str'] = e
1455+ log_info['retry'] = retry or 0
1456+ log_info.update(broker)
1457+
1458+ if not loop_forever and attempt > retry:
1459+ msg = _('Unable to connect to AMQP server on '
1460+ '%(hostname)s:%(port)d after %(retry)d '
1461+ 'tries: %(err_str)s') % log_info
1462+ LOG.error(msg)
1463+ raise exceptions.MessageDeliveryFailure(msg)
1464+ else:
1465+ if attempt == 1:
1466+ sleep_time = self.interval_start or 1
1467+ elif attempt > 1:
1468+ sleep_time += self.interval_stepping
1469+
1470+ sleep_time = min(sleep_time, self.interval_max)
1471+
1472+ log_info['sleep_time'] = sleep_time
1473+ if 'Socket closed' in six.text_type(e):
1474+ LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
1475+ ' the connection. Check login credentials:'
1476+ ' %(err_str)s'), log_info)
1477+ else:
1478+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
1479+ 'unreachable: %(err_str)s. Trying again in '
1480+ '%(sleep_time)d seconds.'), log_info)
1481+ time.sleep(sleep_time)
1482+
1483+ def ensure(self, error_callback, method, retry=None):
1484+ while True:
1485+ try:
1486+ return method()
1487+ except self.connection_errors as e:
1488+ if error_callback:
1489+ error_callback(e)
1490+ except self.channel_errors as e:
1491+ if error_callback:
1492+ error_callback(e)
1493+ except (socket.timeout, IOError) as e:
1494+ if error_callback:
1495+ error_callback(e)
1496+ except Exception as e:
1497+ # NOTE(comstud): Unfortunately it's possible for amqplib
1498+ # to return an error not covered by its transport
1499+ # connection_errors in the case of a timeout waiting for
1500+ # a protocol response. (See paste link in LP888621)
1501+ # So, we check all exceptions for 'timeout' in them
1502+ # and try to reconnect in this case.
1503+ if 'timeout' not in six.text_type(e):
1504+ raise
1505+ if error_callback:
1506+ error_callback(e)
1507+ self.reconnect(retry=retry)
1508+
1509+ def get_channel(self):
1510+ """Convenience call for bin/clear_rabbit_queues."""
1511+ return self.channel
1512+
1513+ def close(self):
1514+ """Close/release this connection."""
1515+ if self.connection:
1516+ self.connection.release()
1517+ self.connection = None
1518+
1519+ def reset(self):
1520+ """Reset a connection so it can be used again."""
1521+ self.channel.close()
1522+ self.channel = self.connection.channel()
1523+ # work around 'memory' transport bug in 1.1.3
1524+ if self.memory_transport:
1525+ self.channel._new_queue('ae.undeliver')
1526+ self.consumers = []
1527+
1528+ def declare_consumer(self, consumer_cls, topic, callback):
1529+ """Create a Consumer using the class that was passed in and
1530+ add it to our list of consumers
1531+ """
1532+
1533+ def _connect_error(exc):
1534+ log_info = {'topic': topic, 'err_str': exc}
1535+ LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
1536+ "%(err_str)s"), log_info)
1537+
1538+ def _declare_consumer():
1539+ consumer = consumer_cls(self.conf, self.channel, topic, callback,
1540+ six.next(self.consumer_num))
1541+ self.consumers.append(consumer)
1542+ return consumer
1543+
1544+ return self.ensure(_connect_error, _declare_consumer)
1545+
1546+ def iterconsume(self, limit=None, timeout=None):
1547+ """Return an iterator that will consume from all queues/consumers."""
1548+
1549+ def _error_callback(exc):
1550+ if isinstance(exc, socket.timeout):
1551+ LOG.debug('Timed out waiting for RPC response: %s', exc)
1552+ raise rpc_common.Timeout()
1553+ else:
1554+ LOG.exception(_('Failed to consume message from queue: %s'),
1555+ exc)
1556+ self.do_consume = True
1557+
1558+ def _consume():
1559+ if self.do_consume:
1560+ queues_head = self.consumers[:-1] # not fanout.
1561+ queues_tail = self.consumers[-1] # fanout
1562+ for queue in queues_head:
1563+ queue.consume(nowait=True)
1564+ queues_tail.consume(nowait=False)
1565+ self.do_consume = False
1566+ return self.connection.drain_events(timeout=timeout)
1567+
1568+ for iteration in itertools.count(0):
1569+ if limit and iteration >= limit:
1570+ raise StopIteration
1571+ yield self.ensure(_error_callback, _consume)
1572+
1573+ def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
1574+ **kwargs):
1575+ """Send to a publisher based on the publisher class."""
1576+
1577+ def _error_callback(exc):
1578+ log_info = {'topic': topic, 'err_str': exc}
1579+ LOG.exception(_("Failed to publish message to topic "
1580+ "'%(topic)s': %(err_str)s"), log_info)
1581+
1582+ def _publish():
1583+ publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
1584+ publisher.send(msg, timeout)
1585+
1586+ self.ensure(_error_callback, _publish, retry=retry)
1587+
1588+ def declare_direct_consumer(self, topic, callback):
1589+ """Create a 'direct' queue.
1590+ In nova's use, this is generally a msg_id queue used for
1591+ responses for call/multicall
1592+ """
1593+ self.declare_consumer(DirectConsumer, topic, callback)
1594+
1595+ def declare_topic_consumer(self, exchange_name, topic, callback=None,
1596+ queue_name=None):
1597+ """Create a 'topic' consumer."""
1598+ self.declare_consumer(functools.partial(TopicConsumer,
1599+ name=queue_name,
1600+ exchange_name=exchange_name,
1601+ ),
1602+ topic, callback)
1603+
1604+ def declare_fanout_consumer(self, topic, callback):
1605+ """Create a 'fanout' consumer."""
1606+ self.declare_consumer(FanoutConsumer, topic, callback)
1607+
1608+ def direct_send(self, msg_id, msg):
1609+ """Send a 'direct' message."""
1610+ self.publisher_send(DirectPublisher, msg_id, msg)
1611+
1612+ def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
1613+ """Send a 'topic' message."""
1614+ self.publisher_send(TopicPublisher, topic, msg, timeout,
1615+ exchange_name=exchange_name, retry=retry)
1616+
1617+ def fanout_send(self, topic, msg, retry=None):
1618+ """Send a 'fanout' message."""
1619+ self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
1620+
1621+ def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
1622+ """Send a notify message on a topic."""
1623+ self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
1624+ exchange_name=exchange_name, retry=retry, **kwargs)
1625+
1626+ def consume(self, limit=None, timeout=None):
1627+ """Consume from all queues/consumers."""
1628+ it = self.iterconsume(limit=limit, timeout=timeout)
1629+ while True:
1630+ try:
1631+ six.next(it)
1632+ except StopIteration:
1633+ return
1634+
1635+
1636+class RabbitDriver(amqpdriver.AMQPDriverBase):
1637+
1638+ def __init__(self, conf, url,
1639+ default_exchange=None,
1640+ allowed_remote_exmods=None):
1641+ conf.register_opts(rabbit_opts)
1642+ conf.register_opts(rpc_amqp.amqp_opts)
1643+
1644+ connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection)
1645+
1646+ super(RabbitDriver, self).__init__(conf, url,
1647+ connection_pool,
1648+ default_exchange,
1649+ allowed_remote_exmods)
1650+
1651+ def require_features(self, requeue=True):
1652+ pass
1653
1654=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch'
1655=== added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/.timestamp'
1656=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo'
1657=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging'
1658=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers'
1659=== added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py'
1660--- .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
1661+++ .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/oslo/messaging/_drivers/common.py 2015-12-17 11:03:34 +0000
1662@@ -0,0 +1,374 @@
1663+# Copyright 2010 United States Government as represented by the
1664+# Administrator of the National Aeronautics and Space Administration.
1665+# All Rights Reserved.
1666+# Copyright 2011 Red Hat, Inc.
1667+#
1668+# Licensed under the Apache License, Version 2.0 (the "License"); you may
1669+# not use this file except in compliance with the License. You may obtain
1670+# a copy of the License at
1671+#
1672+# http://www.apache.org/licenses/LICENSE-2.0
1673+#
1674+# Unless required by applicable law or agreed to in writing, software
1675+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
1676+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1677+# License for the specific language governing permissions and limitations
1678+# under the License.
1679+
1680+import copy
1681+import logging
1682+import sys
1683+import time
1684+import traceback
1685+
1686+import six
1687+
1688+from oslo import messaging
1689+from oslo.messaging import _utils as utils
1690+from oslo.messaging.openstack.common.gettextutils import _
1691+from oslo.messaging.openstack.common import jsonutils
1692+
1693+LOG = logging.getLogger(__name__)
1694+
1695+_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
1696+
1697+
1698+'''RPC Envelope Version.
1699+
1700+This version number applies to the top level structure of messages sent out.
1701+It does *not* apply to the message payload, which must be versioned
1702+independently. For example, when using rpc APIs, a version number is applied
1703+for changes to the API being exposed over rpc. This version number is handled
1704+in the rpc proxy and dispatcher modules.
1705+
1706+This version number applies to the message envelope that is used in the
1707+serialization done inside the rpc layer. See serialize_msg() and
1708+deserialize_msg().
1709+
1710+The current message format (version 2.0) is very simple. It is:
1711+
1712+ {
1713+ 'oslo.version': <RPC Envelope Version as a String>,
1714+ 'oslo.message': <Application Message Payload, JSON encoded>
1715+ }
1716+
1717+Message format version '1.0' is just considered to be the messages we sent
1718+without a message envelope.
1719+
1720+So, the current message envelope just includes the envelope version. It may
1721+eventually contain additional information, such as a signature for the message
1722+payload.
1723+
1724+We will JSON encode the application message payload. The message envelope,
1725+which includes the JSON encoded application message body, will be passed down
1726+to the messaging libraries as a dict.
1727+'''
1728+_RPC_ENVELOPE_VERSION = '2.0'
1729+
1730+_VERSION_KEY = 'oslo.version'
1731+_MESSAGE_KEY = 'oslo.message'
1732+
1733+_REMOTE_POSTFIX = '_Remote'
1734+
1735+
1736+class RPCException(Exception):
1737+ msg_fmt = _("An unknown RPC related exception occurred.")
1738+
1739+ def __init__(self, message=None, **kwargs):
1740+ self.kwargs = kwargs
1741+
1742+ if not message:
1743+ try:
1744+ message = self.msg_fmt % kwargs
1745+
1746+ except Exception:
1747+ # kwargs doesn't match a variable in the message
1748+ # log the issue and the kwargs
1749+ LOG.exception(_('Exception in string format operation'))
1750+ for name, value in six.iteritems(kwargs):
1751+ LOG.error("%s: %s", name, value)
1752+ # at least get the core message out if something happened
1753+ message = self.msg_fmt
1754+
1755+ super(RPCException, self).__init__(message)
1756+
1757+
1758+class Timeout(RPCException):
1759+ """Signifies that a timeout has occurred.
1760+
1761+ This exception is raised if the rpc_response_timeout is reached while
1762+ waiting for a response from the remote side.
1763+ """
1764+ msg_fmt = _('Timeout while waiting on RPC response - '
1765+ 'topic: "%(topic)s", RPC method: "%(method)s" '
1766+ 'info: "%(info)s"')
1767+
1768+ def __init__(self, info=None, topic=None, method=None):
1769+ """Initiates Timeout object.
1770+
1771+ :param info: Extra info to convey to the user
1772+ :param topic: The topic that the rpc call was sent to
1773+ :param rpc_method_name: The name of the rpc method being
1774+ called
1775+ """
1776+ self.info = info
1777+ self.topic = topic
1778+ self.method = method
1779+ super(Timeout, self).__init__(
1780+ None,
1781+ info=info or _('<unknown>'),
1782+ topic=topic or _('<unknown>'),
1783+ method=method or _('<unknown>'))
1784+
1785+
1786+class DuplicateMessageError(RPCException):
1787+ msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
1788+
1789+
1790+class InvalidRPCConnectionReuse(RPCException):
1791+ msg_fmt = _("Invalid reuse of an RPC connection.")
1792+
1793+
1794+class UnsupportedRpcVersion(RPCException):
1795+ msg_fmt = _("Specified RPC version, %(version)s, not supported by "
1796+ "this endpoint.")
1797+
1798+
1799+class UnsupportedRpcEnvelopeVersion(RPCException):
1800+ msg_fmt = _("Specified RPC envelope version, %(version)s, "
1801+ "not supported by this endpoint.")
1802+
1803+
1804+class RpcVersionCapError(RPCException):
1805+ msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
1806+
1807+
1808+class Connection(object):
1809+ """A connection, returned by rpc.create_connection().
1810+
1811+ This class represents a connection to the message bus used for rpc.
1812+ An instance of this class should never be created by users of the rpc API.
1813+ Use rpc.create_connection() instead.
1814+ """
1815+ def close(self):
1816+ """Close the connection.
1817+
1818+ This method must be called when the connection will no longer be used.
1819+ It will ensure that any resources associated with the connection, such
1820+ as a network connection, and cleaned up.
1821+ """
1822+ raise NotImplementedError()
1823+
1824+
1825+def _safe_log(log_func, msg, msg_data):
1826+ """Sanitizes the msg_data field before logging."""
1827+ SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
1828+
1829+ def _fix_passwords(d):
1830+ """Sanitizes the password fields in the dictionary."""
1831+ for k in six.iterkeys(d):
1832+ if k.lower().find('password') != -1:
1833+ d[k] = '<SANITIZED>'
1834+ elif k.lower() in SANITIZE:
1835+ d[k] = '<SANITIZED>'
1836+ elif isinstance(d[k], dict):
1837+ _fix_passwords(d[k])
1838+ return d
1839+
1840+ return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
1841+
1842+
1843+def serialize_remote_exception(failure_info, log_failure=True):
1844+ """Prepares exception data to be sent over rpc.
1845+
1846+ Failure_info should be a sys.exc_info() tuple.
1847+
1848+ """
1849+ tb = traceback.format_exception(*failure_info)
1850+ failure = failure_info[1]
1851+ if log_failure:
1852+ LOG.error(_("Returning exception %s to caller"),
1853+ six.text_type(failure))
1854+ LOG.error(tb)
1855+
1856+ kwargs = {}
1857+ if hasattr(failure, 'kwargs'):
1858+ kwargs = failure.kwargs
1859+
1860+ # NOTE(matiu): With cells, it's possible to re-raise remote, remote
1861+ # exceptions. Lets turn it back into the original exception type.
1862+ cls_name = six.text_type(failure.__class__.__name__)
1863+ mod_name = six.text_type(failure.__class__.__module__)
1864+ if (cls_name.endswith(_REMOTE_POSTFIX) and
1865+ mod_name.endswith(_REMOTE_POSTFIX)):
1866+ cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
1867+ mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
1868+
1869+ data = {
1870+ 'class': cls_name,
1871+ 'module': mod_name,
1872+ 'message': six.text_type(failure),
1873+ 'tb': tb,
1874+ 'args': failure.args,
1875+ 'kwargs': kwargs
1876+ }
1877+
1878+ json_data = jsonutils.dumps(data)
1879+
1880+ return json_data
1881+
1882+
1883+def deserialize_remote_exception(data, allowed_remote_exmods):
1884+ failure = jsonutils.loads(six.text_type(data))
1885+
1886+ trace = failure.get('tb', [])
1887+ message = failure.get('message', "") + "\n" + "\n".join(trace)
1888+ name = failure.get('class')
1889+ module = failure.get('module')
1890+
1891+ # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
1892+ # order to prevent arbitrary code execution.
1893+ if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
1894+ return messaging.RemoteError(name, failure.get('message'), trace)
1895+
1896+ try:
1897+ __import__(module)
1898+ mod = sys.modules[module]
1899+ klass = getattr(mod, name)
1900+ if not issubclass(klass, Exception):
1901+ raise TypeError("Can only deserialize Exceptions")
1902+
1903+ failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
1904+ except (AttributeError, TypeError, ImportError):
1905+ return messaging.RemoteError(name, failure.get('message'), trace)
1906+
1907+ ex_type = type(failure)
1908+ str_override = lambda self: message
1909+ new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
1910+ {'__str__': str_override, '__unicode__': str_override})
1911+ new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
1912+ try:
1913+ # NOTE(ameade): Dynamically create a new exception type and swap it in
1914+ # as the new type for the exception. This only works on user defined
1915+ # Exceptions and not core Python exceptions. This is important because
1916+ # we cannot necessarily change an exception message so we must override
1917+ # the __str__ method.
1918+ failure.__class__ = new_ex_type
1919+ except TypeError:
1920+ # NOTE(ameade): If a core exception then just add the traceback to the
1921+ # first exception argument.
1922+ failure.args = (message,) + failure.args[1:]
1923+ return failure
1924+
1925+
1926+class CommonRpcContext(object):
1927+ def __init__(self, **kwargs):
1928+ self.values = kwargs
1929+
1930+ def __getattr__(self, key):
1931+ try:
1932+ return self.values[key]
1933+ except KeyError:
1934+ raise AttributeError(key)
1935+
1936+ def to_dict(self):
1937+ return copy.deepcopy(self.values)
1938+
1939+ @classmethod
1940+ def from_dict(cls, values):
1941+ return cls(**values)
1942+
1943+ def deepcopy(self):
1944+ return self.from_dict(self.to_dict())
1945+
1946+ def update_store(self):
1947+ # local.store.context = self
1948+ pass
1949+
1950+
1951+class ClientException(Exception):
1952+ """Encapsulates actual exception expected to be hit by a RPC proxy object.
1953+
1954+ Merely instantiating it records the current exception information, which
1955+ will be passed back to the RPC client without exceptional logging.
1956+ """
1957+ def __init__(self):
1958+ self._exc_info = sys.exc_info()
1959+
1960+
1961+def serialize_msg(raw_msg):
1962+ # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
1963+ # information about this format.
1964+ msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
1965+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
1966+
1967+ return msg
1968+
1969+
1970+def deserialize_msg(msg):
1971+ # NOTE(russellb): Hang on to your hats, this road is about to
1972+ # get a little bumpy.
1973+ #
1974+ # Robustness Principle:
1975+ # "Be strict in what you send, liberal in what you accept."
1976+ #
1977+ # At this point we have to do a bit of guessing about what it
1978+ # is we just received. Here is the set of possibilities:
1979+ #
1980+ # 1) We received a dict. This could be 2 things:
1981+ #
1982+ # a) Inspect it to see if it looks like a standard message envelope.
1983+ # If so, great!
1984+ #
1985+ # b) If it doesn't look like a standard message envelope, it could either
1986+ # be a notification, or a message from before we added a message
1987+ # envelope (referred to as version 1.0).
1988+ # Just return the message as-is.
1989+ #
1990+ # 2) It's any other non-dict type. Just return it and hope for the best.
1991+ # This case covers return values from rpc.call() from before message
1992+ # envelopes were used. (messages to call a method were always a dict)
1993+
1994+ if not isinstance(msg, dict):
1995+ # See #2 above.
1996+ return msg
1997+
1998+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
1999+ if not all(map(lambda key: key in msg, base_envelope_keys)):
2000+ # See #1.b above.
2001+ return msg
2002+
2003+ # At this point we think we have the message envelope
2004+ # format we were expecting. (#1.a above)
2005+
2006+ if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
2007+ msg[_VERSION_KEY]):
2008+ raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
2009+
2010+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
2011+
2012+ return raw_msg
2013+
2014+
2015+class DecayingTimer(object):
2016+ def __init__(self, duration=None):
2017+ self._duration = duration
2018+ self._ends_at = None
2019+
2020+ def start(self):
2021+ if self._duration is not None:
2022+ self._ends_at = time.time() + max(0, self._duration)
2023+
2024+ def check_return(self, timeout_callback, *args, **kwargs):
2025+ if self._duration is None:
2026+ return None
2027+ if self._ends_at is None:
2028+ raise RuntimeError(_("Can not check/return a timeout from a timer"
2029+ " that has not been started."))
2030+
2031+ maximum = kwargs.pop('maximum', None)
2032+ left = self._ends_at - time.time()
2033+ if left <= 0:
2034+ timeout_callback(*args, **kwargs)
2035+
2036+ return left if maximum is None else min(left, maximum)
2037
2038=== added directory '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests'
2039=== added file '.pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py'
2040--- .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py 1970-01-01 00:00:00 +0000
2041+++ .pc/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch/tests/test_utils.py 2015-12-17 11:03:34 +0000
2042@@ -0,0 +1,49 @@
2043+
2044+# Copyright 2013 Red Hat, Inc.
2045+#
2046+# Licensed under the Apache License, Version 2.0 (the "License"); you may
2047+# not use this file except in compliance with the License. You may obtain
2048+# a copy of the License at
2049+#
2050+# http://www.apache.org/licenses/LICENSE-2.0
2051+#
2052+# Unless required by applicable law or agreed to in writing, software
2053+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
2054+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
2055+# License for the specific language governing permissions and limitations
2056+# under the License.
2057+
2058+from oslo.messaging import _utils as utils
2059+from tests import utils as test_utils
2060+
2061+
2062+class VersionIsCompatibleTestCase(test_utils.BaseTestCase):
2063+ def test_version_is_compatible_same(self):
2064+ self.assertTrue(utils.version_is_compatible('1.23', '1.23'))
2065+
2066+ def test_version_is_compatible_newer_minor(self):
2067+ self.assertTrue(utils.version_is_compatible('1.24', '1.23'))
2068+
2069+ def test_version_is_compatible_older_minor(self):
2070+ self.assertFalse(utils.version_is_compatible('1.22', '1.23'))
2071+
2072+ def test_version_is_compatible_major_difference1(self):
2073+ self.assertFalse(utils.version_is_compatible('2.23', '1.23'))
2074+
2075+ def test_version_is_compatible_major_difference2(self):
2076+ self.assertFalse(utils.version_is_compatible('1.23', '2.23'))
2077+
2078+ def test_version_is_compatible_newer_rev(self):
2079+ self.assertFalse(utils.version_is_compatible('1.23', '1.23.1'))
2080+
2081+ def test_version_is_compatible_newer_rev_both(self):
2082+ self.assertFalse(utils.version_is_compatible('1.23.1', '1.23.2'))
2083+
2084+ def test_version_is_compatible_older_rev_both(self):
2085+ self.assertTrue(utils.version_is_compatible('1.23.2', '1.23.1'))
2086+
2087+ def test_version_is_compatible_older_rev(self):
2088+ self.assertTrue(utils.version_is_compatible('1.24', '1.23.1'))
2089+
2090+ def test_version_is_compatible_no_rev_is_zero(self):
2091+ self.assertTrue(utils.version_is_compatible('1.23.0', '1.23'))
2092
2093=== added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch'
2094=== added file '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/.timestamp'
2095=== added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo'
2096=== added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging'
2097=== added directory '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers'
2098=== added file '.pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py'
2099--- .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py 1970-01-01 00:00:00 +0000
2100+++ .pc/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch/oslo/messaging/_drivers/impl_qpid.py 2015-12-17 11:03:34 +0000
2101@@ -0,0 +1,729 @@
2102+# Copyright 2011 OpenStack Foundation
2103+# Copyright 2011 - 2012, Red Hat, Inc.
2104+#
2105+# Licensed under the Apache License, Version 2.0 (the "License"); you may
2106+# not use this file except in compliance with the License. You may obtain
2107+# a copy of the License at
2108+#
2109+# http://www.apache.org/licenses/LICENSE-2.0
2110+#
2111+# Unless required by applicable law or agreed to in writing, software
2112+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
2113+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
2114+# License for the specific language governing permissions and limitations
2115+# under the License.
2116+
2117+import functools
2118+import itertools
2119+import logging
2120+import random
2121+import time
2122+
2123+import six
2124+
2125+from oslo.config import cfg
2126+from oslo.messaging._drivers import amqp as rpc_amqp
2127+from oslo.messaging._drivers import amqpdriver
2128+from oslo.messaging._drivers import common as rpc_common
2129+from oslo.messaging import exceptions
2130+from oslo.messaging.openstack.common.gettextutils import _
2131+from oslo.messaging.openstack.common import jsonutils
2132+from oslo.utils import importutils
2133+from oslo.utils import netutils
2134+
2135+qpid_codec = importutils.try_import("qpid.codec010")
2136+qpid_messaging = importutils.try_import("qpid.messaging")
2137+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
2138+
2139+LOG = logging.getLogger(__name__)
2140+
2141+qpid_opts = [
2142+ cfg.StrOpt('qpid_hostname',
2143+ default='localhost',
2144+ help='Qpid broker hostname.'),
2145+ cfg.IntOpt('qpid_port',
2146+ default=5672,
2147+ help='Qpid broker port.'),
2148+ cfg.ListOpt('qpid_hosts',
2149+ default=['$qpid_hostname:$qpid_port'],
2150+ help='Qpid HA cluster host:port pairs.'),
2151+ cfg.StrOpt('qpid_username',
2152+ default='',
2153+ help='Username for Qpid connection.'),
2154+ cfg.StrOpt('qpid_password',
2155+ default='',
2156+ help='Password for Qpid connection.',
2157+ secret=True),
2158+ cfg.StrOpt('qpid_sasl_mechanisms',
2159+ default='',
2160+ help='Space separated list of SASL mechanisms to use for '
2161+ 'auth.'),
2162+ cfg.IntOpt('qpid_heartbeat',
2163+ default=60,
2164+ help='Seconds between connection keepalive heartbeats.'),
2165+ cfg.StrOpt('qpid_protocol',
2166+ default='tcp',
2167+ help="Transport to use, either 'tcp' or 'ssl'."),
2168+ cfg.BoolOpt('qpid_tcp_nodelay',
2169+ default=True,
2170+ help='Whether to disable the Nagle algorithm.'),
2171+ cfg.IntOpt('qpid_receiver_capacity',
2172+ default=1,
2173+ help='The number of prefetched messages held by receiver.'),
2174+ # NOTE(russellb) If any additional versions are added (beyond 1 and 2),
2175+ # this file could probably use some additional refactoring so that the
2176+ # differences between each version are split into different classes.
2177+ cfg.IntOpt('qpid_topology_version',
2178+ default=1,
2179+ help="The qpid topology version to use. Version 1 is what "
2180+ "was originally used by impl_qpid. Version 2 includes "
2181+ "some backwards-incompatible changes that allow broker "
2182+ "federation to work. Users should update to version 2 "
2183+ "when they are able to take everything down, as it "
2184+ "requires a clean break."),
2185+]
2186+
2187+JSON_CONTENT_TYPE = 'application/json; charset=utf8'
2188+
2189+
2190+def raise_invalid_topology_version(conf):
2191+ msg = (_("Invalid value for qpid_topology_version: %d") %
2192+ conf.qpid_topology_version)
2193+ LOG.error(msg)
2194+ raise Exception(msg)
2195+
2196+
2197+class QpidMessage(dict):
2198+ def __init__(self, session, raw_message):
2199+ super(QpidMessage, self).__init__(
2200+ rpc_common.deserialize_msg(raw_message.content))
2201+ self._raw_message = raw_message
2202+ self._session = session
2203+
2204+ def acknowledge(self):
2205+ self._session.acknowledge(self._raw_message)
2206+
2207+ def requeue(self):
2208+ pass
2209+
2210+
2211+class ConsumerBase(object):
2212+ """Consumer base class."""
2213+
2214+ def __init__(self, conf, session, callback, node_name, node_opts,
2215+ link_name, link_opts):
2216+ """Declare a queue on an amqp session.
2217+
2218+ 'session' is the amqp session to use
2219+ 'callback' is the callback to call when messages are received
2220+ 'node_name' is the first part of the Qpid address string, before ';'
2221+ 'node_opts' will be applied to the "x-declare" section of "node"
2222+ in the address string.
2223+ 'link_name' goes into the "name" field of the "link" in the address
2224+ string
2225+ 'link_opts' will be applied to the "x-declare" section of "link"
2226+ in the address string.
2227+ """
2228+ self.callback = callback
2229+ self.receiver = None
2230+ self.rcv_capacity = conf.qpid_receiver_capacity
2231+ self.session = None
2232+
2233+ if conf.qpid_topology_version == 1:
2234+ addr_opts = {
2235+ "create": "always",
2236+ "node": {
2237+ "type": "topic",
2238+ "x-declare": {
2239+ "durable": True,
2240+ "auto-delete": True,
2241+ },
2242+ },
2243+ "link": {
2244+ "durable": True,
2245+ "x-declare": {
2246+ "durable": False,
2247+ "auto-delete": True,
2248+ "exclusive": False,
2249+ },
2250+ },
2251+ }
2252+ addr_opts["node"]["x-declare"].update(node_opts)
2253+ elif conf.qpid_topology_version == 2:
2254+ addr_opts = {
2255+ "link": {
2256+ "x-declare": {
2257+ "auto-delete": True,
2258+ "exclusive": False,
2259+ },
2260+ },
2261+ }
2262+ else:
2263+ raise_invalid_topology_version(conf)
2264+
2265+ addr_opts["link"]["x-declare"].update(link_opts)
2266+ if link_name:
2267+ addr_opts["link"]["name"] = link_name
2268+
2269+ self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
2270+
2271+ self.connect(session)
2272+
2273+ def connect(self, session):
2274+ """Declare the receiver on connect."""
2275+ self._declare_receiver(session)
2276+
2277+ def reconnect(self, session):
2278+ """Re-declare the receiver after a Qpid reconnect."""
2279+ self._declare_receiver(session)
2280+
2281+ def _declare_receiver(self, session):
2282+ self.session = session
2283+ self.receiver = session.receiver(self.address)
2284+ self.receiver.capacity = self.rcv_capacity
2285+
2286+ def _unpack_json_msg(self, msg):
2287+ """Load the JSON data in msg if msg.content_type indicates that it
2288+ is necessary. Put the loaded data back into msg.content and
2289+ update msg.content_type appropriately.
2290+
2291+ A Qpid Message containing a dict will have a content_type of
2292+ 'amqp/map', whereas one containing a string that needs to be converted
2293+ back from JSON will have a content_type of JSON_CONTENT_TYPE.
2294+
2295+ :param msg: a Qpid Message object
2296+ :returns: None
2297+ """
2298+ if msg.content_type == JSON_CONTENT_TYPE:
2299+ msg.content = jsonutils.loads(msg.content)
2300+ msg.content_type = 'amqp/map'
2301+
2302+ def consume(self):
2303+ """Fetch the message and pass it to the callback object."""
2304+ message = self.receiver.fetch()
2305+ try:
2306+ self._unpack_json_msg(message)
2307+ self.callback(QpidMessage(self.session, message))
2308+ except Exception:
2309+ LOG.exception(_("Failed to process message... skipping it."))
2310+ self.session.acknowledge(message)
2311+
2312+ def get_receiver(self):
2313+ return self.receiver
2314+
2315+ def get_node_name(self):
2316+ return self.address.split(';')[0]
2317+
2318+
2319+class DirectConsumer(ConsumerBase):
2320+ """Queue/consumer class for 'direct'."""
2321+
2322+ def __init__(self, conf, session, msg_id, callback):
2323+ """Init a 'direct' queue.
2324+
2325+ 'session' is the amqp session to use
2326+ 'msg_id' is the msg_id to listen on
2327+ 'callback' is the callback to call when messages are received
2328+ """
2329+
2330+ link_opts = {
2331+ "auto-delete": conf.amqp_auto_delete,
2332+ "exclusive": True,
2333+ "durable": conf.amqp_durable_queues,
2334+ }
2335+
2336+ if conf.qpid_topology_version == 1:
2337+ node_name = "%s/%s" % (msg_id, msg_id)
2338+ node_opts = {"type": "direct"}
2339+ link_name = msg_id
2340+ elif conf.qpid_topology_version == 2:
2341+ node_name = "amq.direct/%s" % msg_id
2342+ node_opts = {}
2343+ link_name = msg_id
2344+ else:
2345+ raise_invalid_topology_version(conf)
2346+
2347+ super(DirectConsumer, self).__init__(conf, session, callback,
2348+ node_name, node_opts, link_name,
2349+ link_opts)
2350+
2351+
2352+class TopicConsumer(ConsumerBase):
2353+ """Consumer class for 'topic'."""
2354+
2355+ def __init__(self, conf, session, topic, callback, exchange_name,
2356+ name=None):
2357+ """Init a 'topic' queue.
2358+
2359+ :param session: the amqp session to use
2360+ :param topic: is the topic to listen on
2361+ :paramtype topic: str
2362+ :param callback: the callback to call when messages are received
2363+ :param name: optional queue name, defaults to topic
2364+ """
2365+
2366+ link_opts = {
2367+ "auto-delete": conf.amqp_auto_delete,
2368+ "durable": conf.amqp_durable_queues,
2369+ }
2370+
2371+ if conf.qpid_topology_version == 1:
2372+ node_name = "%s/%s" % (exchange_name, topic)
2373+ elif conf.qpid_topology_version == 2:
2374+ node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
2375+ else:
2376+ raise_invalid_topology_version(conf)
2377+
2378+ super(TopicConsumer, self).__init__(conf, session, callback, node_name,
2379+ {}, name or topic, link_opts)
2380+
2381+
2382+class FanoutConsumer(ConsumerBase):
2383+ """Consumer class for 'fanout'."""
2384+
2385+ def __init__(self, conf, session, topic, callback):
2386+ """Init a 'fanout' queue.
2387+
2388+ 'session' is the amqp session to use
2389+ 'topic' is the topic to listen on
2390+ 'callback' is the callback to call when messages are received
2391+ """
2392+ self.conf = conf
2393+
2394+ link_opts = {"exclusive": True}
2395+
2396+ if conf.qpid_topology_version == 1:
2397+ node_name = "%s_fanout" % topic
2398+ node_opts = {"durable": False, "type": "fanout"}
2399+ elif conf.qpid_topology_version == 2:
2400+ node_name = "amq.topic/fanout/%s" % topic
2401+ node_opts = {}
2402+ else:
2403+ raise_invalid_topology_version(conf)
2404+
2405+ super(FanoutConsumer, self).__init__(conf, session, callback,
2406+ node_name, node_opts, None,
2407+ link_opts)
2408+
2409+
2410+class Publisher(object):
2411+ """Base Publisher class."""
2412+
2413+ def __init__(self, conf, session, node_name, node_opts=None):
2414+ """Init the Publisher class with the exchange_name, routing_key,
2415+ and other options
2416+ """
2417+ self.sender = None
2418+ self.session = session
2419+
2420+ if conf.qpid_topology_version == 1:
2421+ addr_opts = {
2422+ "create": "always",
2423+ "node": {
2424+ "type": "topic",
2425+ "x-declare": {
2426+ "durable": False,
2427+ # auto-delete isn't implemented for exchanges in qpid,
2428+ # but put in here anyway
2429+ "auto-delete": True,
2430+ },
2431+ },
2432+ }
2433+ if node_opts:
2434+ addr_opts["node"]["x-declare"].update(node_opts)
2435+
2436+ self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
2437+ elif conf.qpid_topology_version == 2:
2438+ self.address = node_name
2439+ else:
2440+ raise_invalid_topology_version(conf)
2441+
2442+ self.reconnect(session)
2443+
2444+ def reconnect(self, session):
2445+ """Re-establish the Sender after a reconnection."""
2446+ self.sender = session.sender(self.address)
2447+
2448+ def _pack_json_msg(self, msg):
2449+ """Qpid cannot serialize dicts containing strings longer than 65535
2450+ characters. This function dumps the message content to a JSON
2451+ string, which Qpid is able to handle.
2452+
2453+ :param msg: May be either a Qpid Message object or a bare dict.
2454+ :returns: A Qpid Message with its content field JSON encoded.
2455+ """
2456+ try:
2457+ msg.content = jsonutils.dumps(msg.content)
2458+ except AttributeError:
2459+ # Need to have a Qpid message so we can set the content_type.
2460+ msg = qpid_messaging.Message(jsonutils.dumps(msg))
2461+ msg.content_type = JSON_CONTENT_TYPE
2462+ return msg
2463+
2464+ def send(self, msg):
2465+ """Send a message."""
2466+ try:
2467+ # Check if Qpid can encode the message
2468+ check_msg = msg
2469+ if not hasattr(check_msg, 'content_type'):
2470+ check_msg = qpid_messaging.Message(msg)
2471+ content_type = check_msg.content_type
2472+ enc, dec = qpid_messaging.message.get_codec(content_type)
2473+ enc(check_msg.content)
2474+ except qpid_codec.CodecException:
2475+ # This means the message couldn't be serialized as a dict.
2476+ msg = self._pack_json_msg(msg)
2477+ self.sender.send(msg)
2478+
2479+
2480+class DirectPublisher(Publisher):
2481+ """Publisher class for 'direct'."""
2482+ def __init__(self, conf, session, topic):
2483+ """Init a 'direct' publisher."""
2484+
2485+ if conf.qpid_topology_version == 1:
2486+ node_name = "%s/%s" % (topic, topic)
2487+ node_opts = {"type": "direct"}
2488+ elif conf.qpid_topology_version == 2:
2489+ node_name = "amq.direct/%s" % topic
2490+ node_opts = {}
2491+ else:
2492+ raise_invalid_topology_version(conf)
2493+
2494+ super(DirectPublisher, self).__init__(conf, session, node_name,
2495+ node_opts)
2496+
2497+
2498+class TopicPublisher(Publisher):
2499+ """Publisher class for 'topic'."""
2500+ def __init__(self, conf, session, exchange_name, topic):
2501+ """Init a 'topic' publisher.
2502+ """
2503+ if conf.qpid_topology_version == 1:
2504+ node_name = "%s/%s" % (exchange_name, topic)
2505+ elif conf.qpid_topology_version == 2:
2506+ node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
2507+ else:
2508+ raise_invalid_topology_version(conf)
2509+
2510+ super(TopicPublisher, self).__init__(conf, session, node_name)
2511+
2512+
2513+class FanoutPublisher(Publisher):
2514+ """Publisher class for 'fanout'."""
2515+ def __init__(self, conf, session, topic):
2516+ """Init a 'fanout' publisher.
2517+ """
2518+
2519+ if conf.qpid_topology_version == 1:
2520+ node_name = "%s_fanout" % topic
2521+ node_opts = {"type": "fanout"}
2522+ elif conf.qpid_topology_version == 2:
2523+ node_name = "amq.topic/fanout/%s" % topic
2524+ node_opts = {}
2525+ else:
2526+ raise_invalid_topology_version(conf)
2527+
2528+ super(FanoutPublisher, self).__init__(conf, session, node_name,
2529+ node_opts)
2530+
2531+
2532+class NotifyPublisher(Publisher):
2533+ """Publisher class for notifications."""
2534+ def __init__(self, conf, session, exchange_name, topic):
2535+ """Init a 'topic' publisher.
2536+ """
2537+ node_opts = {"durable": True}
2538+
2539+ if conf.qpid_topology_version == 1:
2540+ node_name = "%s/%s" % (exchange_name, topic)
2541+ elif conf.qpid_topology_version == 2:
2542+ node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
2543+ else:
2544+ raise_invalid_topology_version(conf)
2545+
2546+ super(NotifyPublisher, self).__init__(conf, session, node_name,
2547+ node_opts)
2548+
2549+
2550+class Connection(object):
2551+ """Connection object."""
2552+
2553+ pools = {}
2554+
2555+ def __init__(self, conf, url):
2556+ if not qpid_messaging:
2557+ raise ImportError("Failed to import qpid.messaging")
2558+
2559+ self.connection = None
2560+ self.session = None
2561+ self.consumers = {}
2562+ self.conf = conf
2563+
2564+ self.brokers_params = []
2565+ if url.hosts:
2566+ for host in url.hosts:
2567+ params = {
2568+ 'username': host.username or '',
2569+ 'password': host.password or '',
2570+ }
2571+ if host.port is not None:
2572+ params['host'] = '%s:%d' % (host.hostname, host.port)
2573+ else:
2574+ params['host'] = host.hostname
2575+ self.brokers_params.append(params)
2576+ else:
2577+ # Old configuration format
2578+ for adr in self.conf.qpid_hosts:
2579+ hostname, port = netutils.parse_host_port(
2580+ adr, default_port=5672)
2581+
2582+ params = {
2583+ 'host': '%s:%d' % (hostname, port),
2584+ 'username': self.conf.qpid_username,
2585+ 'password': self.conf.qpid_password,
2586+ }
2587+ self.brokers_params.append(params)
2588+
2589+ random.shuffle(self.brokers_params)
2590+ self.brokers = itertools.cycle(self.brokers_params)
2591+
2592+ self.reconnect()
2593+
2594+ def _connect(self, broker):
2595+ # Create the connection - this does not open the connection
2596+ self.connection = qpid_messaging.Connection(broker['host'])
2597+
2598+ # Check if flags are set and if so set them for the connection
2599+ # before we call open
2600+ self.connection.username = broker['username']
2601+ self.connection.password = broker['password']
2602+
2603+ self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
2604+ # Reconnection is done by self.reconnect()
2605+ self.connection.reconnect = False
2606+ self.connection.heartbeat = self.conf.qpid_heartbeat
2607+ self.connection.transport = self.conf.qpid_protocol
2608+ self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
2609+ self.connection.open()
2610+
2611+ def _register_consumer(self, consumer):
2612+ self.consumers[six.text_type(consumer.get_receiver())] = consumer
2613+
2614+ def _lookup_consumer(self, receiver):
2615+ return self.consumers[six.text_type(receiver)]
2616+
2617+ def _disconnect(self):
2618+ # Close the session if necessary
2619+ if self.connection is not None and self.connection.opened():
2620+ try:
2621+ self.connection.close()
2622+ except qpid_exceptions.MessagingError:
2623+ pass
2624+ self.connection = None
2625+
2626+ def reconnect(self, retry=None):
2627+ """Handles reconnecting and re-establishing sessions and queues.
2628+ Will retry up to retry number of times.
2629+ retry = None or -1 means to retry forever
2630+ retry = 0 means no retry
2631+ retry = N means N retries
2632+ """
2633+ delay = 1
2634+ attempt = 0
2635+ loop_forever = False
2636+ if retry is None or retry < 0:
2637+ loop_forever = True
2638+
2639+ while True:
2640+ self._disconnect()
2641+
2642+ attempt += 1
2643+ broker = six.next(self.brokers)
2644+ try:
2645+ self._connect(broker)
2646+ except qpid_exceptions.MessagingError as e:
2647+ msg_dict = dict(e=e,
2648+ delay=delay,
2649+ retry=retry,
2650+ broker=broker)
2651+ if not loop_forever and attempt > retry:
2652+ msg = _('Unable to connect to AMQP server on '
2653+ '%(broker)s after %(retry)d '
2654+ 'tries: %(e)s') % msg_dict
2655+ LOG.error(msg)
2656+ raise exceptions.MessageDeliveryFailure(msg)
2657+ else:
2658+ msg = _("Unable to connect to AMQP server on %(broker)s: "
2659+ "%(e)s. Sleeping %(delay)s seconds") % msg_dict
2660+ LOG.error(msg)
2661+ time.sleep(delay)
2662+ delay = min(delay + 1, 5)
2663+ else:
2664+ LOG.info(_('Connected to AMQP server on %s'), broker['host'])
2665+ break
2666+
2667+ self.session = self.connection.session()
2668+
2669+ if self.consumers:
2670+ consumers = self.consumers
2671+ self.consumers = {}
2672+
2673+ for consumer in six.itervalues(consumers):
2674+ consumer.reconnect(self.session)
2675+ self._register_consumer(consumer)
2676+
2677+ LOG.debug("Re-established AMQP queues")
2678+
2679+ def ensure(self, error_callback, method, retry=None):
2680+ while True:
2681+ try:
2682+ return method()
2683+ except (qpid_exceptions.Empty,
2684+ qpid_exceptions.MessagingError) as e:
2685+ if error_callback:
2686+ error_callback(e)
2687+ self.reconnect(retry=retry)
2688+
2689+ def close(self):
2690+ """Close/release this connection."""
2691+ try:
2692+ self.connection.close()
2693+ except Exception:
2694+ # NOTE(dripton) Logging exceptions that happen during cleanup just
2695+ # causes confusion; there's really nothing useful we can do with
2696+ # them.
2697+ pass
2698+ self.connection = None
2699+
2700+ def reset(self):
2701+ """Reset a connection so it can be used again."""
2702+ self.session.close()
2703+ self.session = self.connection.session()
2704+ self.consumers = {}
2705+
2706+ def declare_consumer(self, consumer_cls, topic, callback):
2707+ """Create a Consumer using the class that was passed in and
2708+ add it to our list of consumers
2709+ """
2710+ def _connect_error(exc):
2711+ log_info = {'topic': topic, 'err_str': exc}
2712+ LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
2713+ "%(err_str)s"), log_info)
2714+
2715+ def _declare_consumer():
2716+ consumer = consumer_cls(self.conf, self.session, topic, callback)
2717+ self._register_consumer(consumer)
2718+ return consumer
2719+
2720+ return self.ensure(_connect_error, _declare_consumer)
2721+
2722+ def iterconsume(self, limit=None, timeout=None):
2723+ """Return an iterator that will consume from all queues/consumers."""
2724+
2725+ def _error_callback(exc):
2726+ if isinstance(exc, qpid_exceptions.Empty):
2727+ LOG.debug('Timed out waiting for RPC response: %s', exc)
2728+ raise rpc_common.Timeout()
2729+ else:
2730+ LOG.exception(_('Failed to consume message from queue: %s'),
2731+ exc)
2732+
2733+ def _consume():
2734+ nxt_receiver = self.session.next_receiver(timeout=timeout)
2735+ try:
2736+ self._lookup_consumer(nxt_receiver).consume()
2737+ except Exception:
2738+ LOG.exception(_("Error processing message. Skipping it."))
2739+
2740+ for iteration in itertools.count(0):
2741+ if limit and iteration >= limit:
2742+ raise StopIteration
2743+ yield self.ensure(_error_callback, _consume)
2744+
2745+ def publisher_send(self, cls, topic, msg, retry=None, **kwargs):
2746+ """Send to a publisher based on the publisher class."""
2747+
2748+ def _connect_error(exc):
2749+ log_info = {'topic': topic, 'err_str': exc}
2750+ LOG.exception(_("Failed to publish message to topic "
2751+ "'%(topic)s': %(err_str)s"), log_info)
2752+
2753+ def _publisher_send():
2754+ publisher = cls(self.conf, self.session, topic=topic, **kwargs)
2755+ publisher.send(msg)
2756+
2757+ return self.ensure(_connect_error, _publisher_send, retry=retry)
2758+
2759+ def declare_direct_consumer(self, topic, callback):
2760+ """Create a 'direct' queue.
2761+ In nova's use, this is generally a msg_id queue used for
2762+ responses for call/multicall
2763+ """
2764+ self.declare_consumer(DirectConsumer, topic, callback)
2765+
2766+ def declare_topic_consumer(self, exchange_name, topic, callback=None,
2767+ queue_name=None):
2768+ """Create a 'topic' consumer."""
2769+ self.declare_consumer(functools.partial(TopicConsumer,
2770+ name=queue_name,
2771+ exchange_name=exchange_name,
2772+ ),
2773+ topic, callback)
2774+
2775+ def declare_fanout_consumer(self, topic, callback):
2776+ """Create a 'fanout' consumer."""
2777+ self.declare_consumer(FanoutConsumer, topic, callback)
2778+
2779+ def direct_send(self, msg_id, msg):
2780+ """Send a 'direct' message."""
2781+ self.publisher_send(DirectPublisher, topic=msg_id, msg=msg)
2782+
2783+ def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
2784+ """Send a 'topic' message."""
2785+ #
2786+ # We want to create a message with attributes, for example a TTL. We
2787+ # don't really need to keep 'msg' in its JSON format any longer
2788+ # so let's create an actual Qpid message here and get some
2789+ # value-add on the go.
2790+ #
2791+ # WARNING: Request timeout happens to be in the same units as
2792+ # Qpid's TTL (seconds). If this changes in the future, then this
2793+ # will need to be altered accordingly.
2794+ #
2795+ qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
2796+ self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message,
2797+ exchange_name=exchange_name, retry=retry)
2798+
2799+ def fanout_send(self, topic, msg, retry=None):
2800+ """Send a 'fanout' message."""
2801+ self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry)
2802+
2803+ def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
2804+ """Send a notify message on a topic."""
2805+ self.publisher_send(NotifyPublisher, topic=topic, msg=msg,
2806+ exchange_name=exchange_name, retry=retry)
2807+
2808+ def consume(self, limit=None, timeout=None):
2809+ """Consume from all queues/consumers."""
2810+ it = self.iterconsume(limit=limit, timeout=timeout)
2811+ while True:
2812+ try:
2813+ six.next(it)
2814+ except StopIteration:
2815+ return
2816+
2817+
2818+class QpidDriver(amqpdriver.AMQPDriverBase):
2819+
2820+ def __init__(self, conf, url,
2821+ default_exchange=None, allowed_remote_exmods=None):
2822+ conf.register_opts(qpid_opts)
2823+ conf.register_opts(rpc_amqp.amqp_opts)
2824+
2825+ connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection)
2826+
2827+ super(QpidDriver, self).__init__(conf, url,
2828+ connection_pool,
2829+ default_exchange,
2830+ allowed_remote_exmods)
2831
2832=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch'
2833=== added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/.timestamp'
2834=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo'
2835=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging'
2836=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers'
2837=== added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py'
2838--- .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
2839+++ .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000
2840@@ -0,0 +1,832 @@
2841+# Copyright 2011 OpenStack Foundation
2842+#
2843+# Licensed under the Apache License, Version 2.0 (the "License"); you may
2844+# not use this file except in compliance with the License. You may obtain
2845+# a copy of the License at
2846+#
2847+# http://www.apache.org/licenses/LICENSE-2.0
2848+#
2849+# Unless required by applicable law or agreed to in writing, software
2850+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
2851+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
2852+# License for the specific language governing permissions and limitations
2853+# under the License.
2854+
2855+import functools
2856+import itertools
2857+import logging
2858+import random
2859+import socket
2860+import ssl
2861+import time
2862+import uuid
2863+
2864+import kombu
2865+import kombu.connection
2866+import kombu.entity
2867+import kombu.messaging
2868+import six
2869+
2870+from oslo.config import cfg
2871+from oslo.messaging._drivers import amqp as rpc_amqp
2872+from oslo.messaging._drivers import amqpdriver
2873+from oslo.messaging._drivers import common as rpc_common
2874+from oslo.messaging import exceptions
2875+from oslo.messaging.openstack.common.gettextutils import _
2876+from oslo.utils import netutils
2877+
2878+rabbit_opts = [
2879+ cfg.StrOpt('kombu_ssl_version',
2880+ default='',
2881+ help='SSL version to use (valid only if SSL enabled). '
2882+ 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
2883+ 'be available on some distributions.'
2884+ ),
2885+ cfg.StrOpt('kombu_ssl_keyfile',
2886+ default='',
2887+ help='SSL key file (valid only if SSL enabled).'),
2888+ cfg.StrOpt('kombu_ssl_certfile',
2889+ default='',
2890+ help='SSL cert file (valid only if SSL enabled).'),
2891+ cfg.StrOpt('kombu_ssl_ca_certs',
2892+ default='',
2893+ help='SSL certification authority file '
2894+ '(valid only if SSL enabled).'),
2895+ cfg.FloatOpt('kombu_reconnect_delay',
2896+ default=1.0,
2897+ help='How long to wait before reconnecting in response to an '
2898+ 'AMQP consumer cancel notification.'),
2899+ cfg.StrOpt('rabbit_host',
2900+ default='localhost',
2901+ help='The RabbitMQ broker address where a single node is '
2902+ 'used.'),
2903+ cfg.IntOpt('rabbit_port',
2904+ default=5672,
2905+ help='The RabbitMQ broker port where a single node is used.'),
2906+ cfg.ListOpt('rabbit_hosts',
2907+ default=['$rabbit_host:$rabbit_port'],
2908+ help='RabbitMQ HA cluster host:port pairs.'),
2909+ cfg.BoolOpt('rabbit_use_ssl',
2910+ default=False,
2911+ help='Connect over SSL for RabbitMQ.'),
2912+ cfg.StrOpt('rabbit_userid',
2913+ default='guest',
2914+ help='The RabbitMQ userid.'),
2915+ cfg.StrOpt('rabbit_password',
2916+ default='guest',
2917+ help='The RabbitMQ password.',
2918+ secret=True),
2919+ cfg.StrOpt('rabbit_login_method',
2920+ default='AMQPLAIN',
2921+ help='the RabbitMQ login method'),
2922+ cfg.StrOpt('rabbit_virtual_host',
2923+ default='/',
2924+ help='The RabbitMQ virtual host.'),
2925+ cfg.IntOpt('rabbit_retry_interval',
2926+ default=1,
2927+ help='How frequently to retry connecting with RabbitMQ.'),
2928+ cfg.IntOpt('rabbit_retry_backoff',
2929+ default=2,
2930+ help='How long to backoff for between retries when connecting '
2931+ 'to RabbitMQ.'),
2932+ cfg.IntOpt('rabbit_max_retries',
2933+ default=0,
2934+ help='Maximum number of RabbitMQ connection retries. '
2935+ 'Default is 0 (infinite retry count).'),
2936+ cfg.BoolOpt('rabbit_ha_queues',
2937+ default=False,
2938+ help='Use HA queues in RabbitMQ (x-ha-policy: all). '
2939+ 'If you change this option, you must wipe the '
2940+ 'RabbitMQ database.'),
2941+
2942+ # FIXME(markmc): this was toplevel in openstack.common.rpc
2943+ cfg.BoolOpt('fake_rabbit',
2944+ default=False,
2945+ help='If passed, use a fake RabbitMQ provider.'),
2946+]
2947+
2948+LOG = logging.getLogger(__name__)
2949+
2950+
2951+def _get_queue_arguments(conf):
2952+ """Construct the arguments for declaring a queue.
2953+
2954+ If the rabbit_ha_queues option is set, we declare a mirrored queue
2955+ as described here:
2956+
2957+ http://www.rabbitmq.com/ha.html
2958+
2959+ Setting x-ha-policy to all means that the queue will be mirrored
2960+ to all nodes in the cluster.
2961+ """
2962+ return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
2963+
2964+
2965+class RabbitMessage(dict):
2966+ def __init__(self, raw_message):
2967+ super(RabbitMessage, self).__init__(
2968+ rpc_common.deserialize_msg(raw_message.payload))
2969+ self._raw_message = raw_message
2970+
2971+ def acknowledge(self):
2972+ self._raw_message.ack()
2973+
2974+ def requeue(self):
2975+ self._raw_message.requeue()
2976+
2977+
2978+class ConsumerBase(object):
2979+ """Consumer base class."""
2980+
2981+ def __init__(self, channel, callback, tag, **kwargs):
2982+ """Declare a queue on an amqp channel.
2983+
2984+ 'channel' is the amqp channel to use
2985+ 'callback' is the callback to call when messages are received
2986+ 'tag' is a unique ID for the consumer on the channel
2987+
2988+ queue name, exchange name, and other kombu options are
2989+ passed in here as a dictionary.
2990+ """
2991+ self.callback = callback
2992+ self.tag = six.text_type(tag)
2993+ self.kwargs = kwargs
2994+ self.queue = None
2995+ self.reconnect(channel)
2996+
2997+ def reconnect(self, channel):
2998+ """Re-declare the queue after a rabbit reconnect."""
2999+ self.channel = channel
3000+ self.kwargs['channel'] = channel
3001+ self.queue = kombu.entity.Queue(**self.kwargs)
3002+ self.queue.declare()
3003+
3004+ def _callback_handler(self, message, callback):
3005+ """Call callback with deserialized message.
3006+
3007+ Messages that are processed and ack'ed.
3008+ """
3009+
3010+ try:
3011+ callback(RabbitMessage(message))
3012+ except Exception:
3013+ LOG.exception(_("Failed to process message"
3014+ " ... skipping it."))
3015+ message.ack()
3016+
3017+ def consume(self, *args, **kwargs):
3018+ """Actually declare the consumer on the amqp channel. This will
3019+ start the flow of messages from the queue. Using the
3020+ Connection.iterconsume() iterator will process the messages,
3021+ calling the appropriate callback.
3022+
3023+ If a callback is specified in kwargs, use that. Otherwise,
3024+ use the callback passed during __init__()
3025+
3026+ If kwargs['nowait'] is True, then this call will block until
3027+ a message is read.
3028+
3029+ """
3030+
3031+ options = {'consumer_tag': self.tag}
3032+ options['nowait'] = kwargs.get('nowait', False)
3033+ callback = kwargs.get('callback', self.callback)
3034+ if not callback:
3035+ raise ValueError("No callback defined")
3036+
3037+ def _callback(raw_message):
3038+ message = self.channel.message_to_python(raw_message)
3039+ self._callback_handler(message, callback)
3040+
3041+ self.queue.consume(*args, callback=_callback, **options)
3042+
3043+ def cancel(self):
3044+ """Cancel the consuming from the queue, if it has started."""
3045+ try:
3046+ self.queue.cancel(self.tag)
3047+ except KeyError as e:
3048+ # NOTE(comstud): Kludge to get around a amqplib bug
3049+ if six.text_type(e) != "u'%s'" % self.tag:
3050+ raise
3051+ self.queue = None
3052+
3053+
3054+class DirectConsumer(ConsumerBase):
3055+ """Queue/consumer class for 'direct'."""
3056+
3057+ def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
3058+ """Init a 'direct' queue.
3059+
3060+ 'channel' is the amqp channel to use
3061+ 'msg_id' is the msg_id to listen on
3062+ 'callback' is the callback to call when messages are received
3063+ 'tag' is a unique ID for the consumer on the channel
3064+
3065+ Other kombu options may be passed
3066+ """
3067+ # Default options
3068+ options = {'durable': False,
3069+ 'queue_arguments': _get_queue_arguments(conf),
3070+ 'auto_delete': True,
3071+ 'exclusive': False}
3072+ options.update(kwargs)
3073+ exchange = kombu.entity.Exchange(name=msg_id,
3074+ type='direct',
3075+ durable=options['durable'],
3076+ auto_delete=options['auto_delete'])
3077+ super(DirectConsumer, self).__init__(channel,
3078+ callback,
3079+ tag,
3080+ name=msg_id,
3081+ exchange=exchange,
3082+ routing_key=msg_id,
3083+ **options)
3084+
3085+
3086+class TopicConsumer(ConsumerBase):
3087+ """Consumer class for 'topic'."""
3088+
3089+ def __init__(self, conf, channel, topic, callback, tag, exchange_name,
3090+ name=None, **kwargs):
3091+ """Init a 'topic' queue.
3092+
3093+ :param channel: the amqp channel to use
3094+ :param topic: the topic to listen on
3095+ :paramtype topic: str
3096+ :param callback: the callback to call when messages are received
3097+ :param tag: a unique ID for the consumer on the channel
3098+ :param exchange_name: the exchange name to use
3099+ :param name: optional queue name, defaults to topic
3100+ :paramtype name: str
3101+
3102+ Other kombu options may be passed as keyword arguments
3103+ """
3104+ # Default options
3105+ options = {'durable': conf.amqp_durable_queues,
3106+ 'queue_arguments': _get_queue_arguments(conf),
3107+ 'auto_delete': conf.amqp_auto_delete,
3108+ 'exclusive': False}
3109+ options.update(kwargs)
3110+ exchange = kombu.entity.Exchange(name=exchange_name,
3111+ type='topic',
3112+ durable=options['durable'],
3113+ auto_delete=options['auto_delete'])
3114+ super(TopicConsumer, self).__init__(channel,
3115+ callback,
3116+ tag,
3117+ name=name or topic,
3118+ exchange=exchange,
3119+ routing_key=topic,
3120+ **options)
3121+
3122+
3123+class FanoutConsumer(ConsumerBase):
3124+ """Consumer class for 'fanout'."""
3125+
3126+ def __init__(self, conf, channel, topic, callback, tag, **kwargs):
3127+ """Init a 'fanout' queue.
3128+
3129+ 'channel' is the amqp channel to use
3130+ 'topic' is the topic to listen on
3131+ 'callback' is the callback to call when messages are received
3132+ 'tag' is a unique ID for the consumer on the channel
3133+
3134+ Other kombu options may be passed
3135+ """
3136+ unique = uuid.uuid4().hex
3137+ exchange_name = '%s_fanout' % topic
3138+ queue_name = '%s_fanout_%s' % (topic, unique)
3139+
3140+ # Default options
3141+ options = {'durable': False,
3142+ 'queue_arguments': _get_queue_arguments(conf),
3143+ 'auto_delete': True,
3144+ 'exclusive': False}
3145+ options.update(kwargs)
3146+ exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
3147+ durable=options['durable'],
3148+ auto_delete=options['auto_delete'])
3149+ super(FanoutConsumer, self).__init__(channel, callback, tag,
3150+ name=queue_name,
3151+ exchange=exchange,
3152+ routing_key=topic,
3153+ **options)
3154+
3155+
3156+class Publisher(object):
3157+ """Base Publisher class."""
3158+
3159+ def __init__(self, channel, exchange_name, routing_key, **kwargs):
3160+ """Init the Publisher class with the exchange_name, routing_key,
3161+ and other options
3162+ """
3163+ self.exchange_name = exchange_name
3164+ self.routing_key = routing_key
3165+ self.kwargs = kwargs
3166+ self.reconnect(channel)
3167+
3168+ def reconnect(self, channel):
3169+ """Re-establish the Producer after a rabbit reconnection."""
3170+ self.exchange = kombu.entity.Exchange(name=self.exchange_name,
3171+ **self.kwargs)
3172+ self.producer = kombu.messaging.Producer(exchange=self.exchange,
3173+ channel=channel,
3174+ routing_key=self.routing_key)
3175+
3176+ def send(self, msg, timeout=None):
3177+ """Send a message."""
3178+ if timeout:
3179+ #
3180+ # AMQP TTL is in milliseconds when set in the header.
3181+ #
3182+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
3183+ else:
3184+ self.producer.publish(msg)
3185+
3186+
3187+class DirectPublisher(Publisher):
3188+ """Publisher class for 'direct'."""
3189+ def __init__(self, conf, channel, topic, **kwargs):
3190+ """Init a 'direct' publisher.
3191+
3192+ Kombu options may be passed as keyword args to override defaults
3193+ """
3194+
3195+ options = {'durable': False,
3196+ 'auto_delete': True,
3197+ 'exclusive': False}
3198+ options.update(kwargs)
3199+ super(DirectPublisher, self).__init__(channel, topic, topic,
3200+ type='direct', **options)
3201+
3202+
3203+class TopicPublisher(Publisher):
3204+ """Publisher class for 'topic'."""
3205+ def __init__(self, conf, channel, exchange_name, topic, **kwargs):
3206+ """Init a 'topic' publisher.
3207+
3208+ Kombu options may be passed as keyword args to override defaults
3209+ """
3210+ options = {'durable': conf.amqp_durable_queues,
3211+ 'auto_delete': conf.amqp_auto_delete,
3212+ 'exclusive': False}
3213+ options.update(kwargs)
3214+ super(TopicPublisher, self).__init__(channel,
3215+ exchange_name,
3216+ topic,
3217+ type='topic',
3218+ **options)
3219+
3220+
3221+class FanoutPublisher(Publisher):
3222+ """Publisher class for 'fanout'."""
3223+ def __init__(self, conf, channel, topic, **kwargs):
3224+ """Init a 'fanout' publisher.
3225+
3226+ Kombu options may be passed as keyword args to override defaults
3227+ """
3228+ options = {'durable': False,
3229+ 'auto_delete': True,
3230+ 'exclusive': False}
3231+ options.update(kwargs)
3232+ super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
3233+ None, type='fanout', **options)
3234+
3235+
3236+class NotifyPublisher(TopicPublisher):
3237+ """Publisher class for 'notify'."""
3238+
3239+ def __init__(self, conf, channel, exchange_name, topic, **kwargs):
3240+ self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
3241+ self.queue_arguments = _get_queue_arguments(conf)
3242+ super(NotifyPublisher, self).__init__(conf, channel, exchange_name,
3243+ topic, **kwargs)
3244+
3245+ def reconnect(self, channel):
3246+ super(NotifyPublisher, self).reconnect(channel)
3247+
3248+ # NOTE(jerdfelt): Normally the consumer would create the queue, but
3249+ # we do this to ensure that messages don't get dropped if the
3250+ # consumer is started after we do
3251+ queue = kombu.entity.Queue(channel=channel,
3252+ exchange=self.exchange,
3253+ durable=self.durable,
3254+ name=self.routing_key,
3255+ routing_key=self.routing_key,
3256+ queue_arguments=self.queue_arguments)
3257+ queue.declare()
3258+
3259+
3260+class Connection(object):
3261+ """Connection object."""
3262+
3263+ pools = {}
3264+
3265+ def __init__(self, conf, url):
3266+ self.consumers = []
3267+ self.conf = conf
3268+ self.max_retries = self.conf.rabbit_max_retries
3269+ # Try forever?
3270+ if self.max_retries <= 0:
3271+ self.max_retries = None
3272+ self.interval_start = self.conf.rabbit_retry_interval
3273+ self.interval_stepping = self.conf.rabbit_retry_backoff
3274+ # max retry-interval = 30 seconds
3275+ self.interval_max = 30
3276+ self.memory_transport = False
3277+
3278+ ssl_params = self._fetch_ssl_params()
3279+
3280+ if url.virtual_host is not None:
3281+ virtual_host = url.virtual_host
3282+ else:
3283+ virtual_host = self.conf.rabbit_virtual_host
3284+
3285+ self.brokers_params = []
3286+ if url.hosts:
3287+ for host in url.hosts:
3288+ params = {
3289+ 'hostname': host.hostname,
3290+ 'port': host.port or 5672,
3291+ 'userid': host.username or '',
3292+ 'password': host.password or '',
3293+ 'login_method': self.conf.rabbit_login_method,
3294+ 'virtual_host': virtual_host
3295+ }
3296+ if self.conf.fake_rabbit:
3297+ params['transport'] = 'memory'
3298+ if self.conf.rabbit_use_ssl:
3299+ params['ssl'] = ssl_params
3300+
3301+ self.brokers_params.append(params)
3302+ else:
3303+ # Old configuration format
3304+ for adr in self.conf.rabbit_hosts:
3305+ hostname, port = netutils.parse_host_port(
3306+ adr, default_port=self.conf.rabbit_port)
3307+
3308+ params = {
3309+ 'hostname': hostname,
3310+ 'port': port,
3311+ 'userid': self.conf.rabbit_userid,
3312+ 'password': self.conf.rabbit_password,
3313+ 'login_method': self.conf.rabbit_login_method,
3314+ 'virtual_host': virtual_host
3315+ }
3316+
3317+ if self.conf.fake_rabbit:
3318+ params['transport'] = 'memory'
3319+ if self.conf.rabbit_use_ssl:
3320+ params['ssl'] = ssl_params
3321+
3322+ self.brokers_params.append(params)
3323+
3324+ random.shuffle(self.brokers_params)
3325+ self.brokers = itertools.cycle(self.brokers_params)
3326+
3327+ self.memory_transport = self.conf.fake_rabbit
3328+
3329+ self.connection = None
3330+ self.do_consume = None
3331+ self.reconnect()
3332+
3333+ # FIXME(markmc): use oslo sslutils when it is available as a library
3334+ _SSL_PROTOCOLS = {
3335+ "tlsv1": ssl.PROTOCOL_TLSv1,
3336+ "sslv23": ssl.PROTOCOL_SSLv23,
3337+ "sslv3": ssl.PROTOCOL_SSLv3
3338+ }
3339+
3340+ try:
3341+ _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
3342+ except AttributeError:
3343+ pass
3344+
3345+ @classmethod
3346+ def validate_ssl_version(cls, version):
3347+ key = version.lower()
3348+ try:
3349+ return cls._SSL_PROTOCOLS[key]
3350+ except KeyError:
3351+ raise RuntimeError(_("Invalid SSL version : %s") % version)
3352+
3353+ def _fetch_ssl_params(self):
3354+ """Handles fetching what ssl params should be used for the connection
3355+ (if any).
3356+ """
3357+ ssl_params = dict()
3358+
3359+ # http://docs.python.org/library/ssl.html - ssl.wrap_socket
3360+ if self.conf.kombu_ssl_version:
3361+ ssl_params['ssl_version'] = self.validate_ssl_version(
3362+ self.conf.kombu_ssl_version)
3363+ if self.conf.kombu_ssl_keyfile:
3364+ ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
3365+ if self.conf.kombu_ssl_certfile:
3366+ ssl_params['certfile'] = self.conf.kombu_ssl_certfile
3367+ if self.conf.kombu_ssl_ca_certs:
3368+ ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
3369+ # We might want to allow variations in the
3370+ # future with this?
3371+ ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
3372+
3373+ # Return the extended behavior or just have the default behavior
3374+ return ssl_params or True
3375+
3376+ def _connect(self, broker):
3377+ """Connect to rabbit. Re-establish any queues that may have
3378+ been declared before if we are reconnecting. Exceptions should
3379+ be handled by the caller.
3380+ """
3381+ LOG.info(_("Connecting to AMQP server on "
3382+ "%(hostname)s:%(port)d"), broker)
3383+ self.connection = kombu.connection.BrokerConnection(**broker)
3384+ self.connection_errors = self.connection.connection_errors
3385+ self.channel_errors = self.connection.channel_errors
3386+ if self.memory_transport:
3387+ # Kludge to speed up tests.
3388+ self.connection.transport.polling_interval = 0.0
3389+ self.do_consume = True
3390+ self.consumer_num = itertools.count(1)
3391+ self.connection.connect()
3392+ self.channel = self.connection.channel()
3393+ # work around 'memory' transport bug in 1.1.3
3394+ if self.memory_transport:
3395+ self.channel._new_queue('ae.undeliver')
3396+ for consumer in self.consumers:
3397+ consumer.reconnect(self.channel)
3398+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
3399+ broker)
3400+
3401+ def _disconnect(self):
3402+ if self.connection:
3403+ # XXX(nic): when reconnecting to a RabbitMQ cluster
3404+ # with mirrored queues in use, the attempt to release the
3405+ # connection can hang "indefinitely" somewhere deep down
3406+ # in Kombu. Blocking the thread for a bit prior to
3407+ # release seems to kludge around the problem where it is
3408+ # otherwise reproduceable.
3409+ if self.conf.kombu_reconnect_delay > 0:
3410+ LOG.info(_("Delaying reconnect for %1.1f seconds...") %
3411+ self.conf.kombu_reconnect_delay)
3412+ time.sleep(self.conf.kombu_reconnect_delay)
3413+
3414+ try:
3415+ self.connection.release()
3416+ except self.connection_errors:
3417+ pass
3418+ self.connection = None
3419+
3420+ def reconnect(self, retry=None):
3421+ """Handles reconnecting and re-establishing queues.
3422+ Will retry up to retry number of times.
3423+ retry = None means use the value of rabbit_max_retries
3424+ retry = -1 means to retry forever
3425+ retry = 0 means no retry
3426+ retry = N means N retries
3427+ Sleep between tries, starting at self.interval_start
3428+ seconds, backing off self.interval_stepping number of seconds
3429+ each attempt.
3430+ """
3431+
3432+ attempt = 0
3433+ loop_forever = False
3434+ if retry is None:
3435+ retry = self.max_retries
3436+ if retry is None or retry < 0:
3437+ loop_forever = True
3438+
3439+ while True:
3440+ self._disconnect()
3441+
3442+ broker = six.next(self.brokers)
3443+ attempt += 1
3444+ try:
3445+ self._connect(broker)
3446+ return
3447+ except IOError as ex:
3448+ e = ex
3449+ except self.connection_errors as ex:
3450+ e = ex
3451+ except Exception as ex:
3452+ # NOTE(comstud): Unfortunately it's possible for amqplib
3453+ # to return an error not covered by its transport
3454+ # connection_errors in the case of a timeout waiting for
3455+ # a protocol response. (See paste link in LP888621)
3456+ # So, we check all exceptions for 'timeout' in them
3457+ # and try to reconnect in this case.
3458+ if 'timeout' not in six.text_type(e):
3459+ raise
3460+ e = ex
3461+
3462+ log_info = {}
3463+ log_info['err_str'] = e
3464+ log_info['retry'] = retry or 0
3465+ log_info.update(broker)
3466+
3467+ if not loop_forever and attempt > retry:
3468+ msg = _('Unable to connect to AMQP server on '
3469+ '%(hostname)s:%(port)d after %(retry)d '
3470+ 'tries: %(err_str)s') % log_info
3471+ LOG.error(msg)
3472+ raise exceptions.MessageDeliveryFailure(msg)
3473+ else:
3474+ if attempt == 1:
3475+ sleep_time = self.interval_start or 1
3476+ elif attempt > 1:
3477+ sleep_time += self.interval_stepping
3478+
3479+ sleep_time = min(sleep_time, self.interval_max)
3480+
3481+ log_info['sleep_time'] = sleep_time
3482+ if 'Socket closed' in six.text_type(e):
3483+ LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
3484+ ' the connection. Check login credentials:'
3485+ ' %(err_str)s'), log_info)
3486+ else:
3487+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
3488+ 'unreachable: %(err_str)s. Trying again in '
3489+ '%(sleep_time)d seconds.'), log_info)
3490+ time.sleep(sleep_time)
3491+
3492+ def ensure(self, error_callback, method, retry=None):
3493+ while True:
3494+ try:
3495+ return method()
3496+ except self.connection_errors as e:
3497+ if error_callback:
3498+ error_callback(e)
3499+ except self.channel_errors as e:
3500+ if error_callback:
3501+ error_callback(e)
3502+ except (socket.timeout, IOError) as e:
3503+ if error_callback:
3504+ error_callback(e)
3505+ except Exception as e:
3506+ # NOTE(comstud): Unfortunately it's possible for amqplib
3507+ # to return an error not covered by its transport
3508+ # connection_errors in the case of a timeout waiting for
3509+ # a protocol response. (See paste link in LP888621)
3510+ # So, we check all exceptions for 'timeout' in them
3511+ # and try to reconnect in this case.
3512+ if 'timeout' not in six.text_type(e):
3513+ raise
3514+ if error_callback:
3515+ error_callback(e)
3516+ self.reconnect(retry=retry)
3517+
3518+ def get_channel(self):
3519+ """Convenience call for bin/clear_rabbit_queues."""
3520+ return self.channel
3521+
3522+ def close(self):
3523+ """Close/release this connection."""
3524+ if self.connection:
3525+ self.connection.release()
3526+ self.connection = None
3527+
3528+ def reset(self):
3529+ """Reset a connection so it can be used again."""
3530+ self.channel.close()
3531+ self.channel = self.connection.channel()
3532+ # work around 'memory' transport bug in 1.1.3
3533+ if self.memory_transport:
3534+ self.channel._new_queue('ae.undeliver')
3535+ self.consumers = []
3536+
3537+ def declare_consumer(self, consumer_cls, topic, callback):
3538+ """Create a Consumer using the class that was passed in and
3539+ add it to our list of consumers
3540+ """
3541+
3542+ def _connect_error(exc):
3543+ log_info = {'topic': topic, 'err_str': exc}
3544+ LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
3545+ "%(err_str)s"), log_info)
3546+
3547+ def _declare_consumer():
3548+ consumer = consumer_cls(self.conf, self.channel, topic, callback,
3549+ six.next(self.consumer_num))
3550+ self.consumers.append(consumer)
3551+ return consumer
3552+
3553+ return self.ensure(_connect_error, _declare_consumer)
3554+
3555+ def iterconsume(self, limit=None, timeout=None):
3556+ """Return an iterator that will consume from all queues/consumers."""
3557+
3558+ timer = rpc_common.DecayingTimer(duration=timeout)
3559+ timer.start()
3560+
3561+ def _raise_timeout(exc):
3562+ LOG.debug('Timed out waiting for RPC response: %s', exc)
3563+ raise rpc_common.Timeout()
3564+
3565+ def _error_callback(exc):
3566+ timer.check_return(_raise_timeout, exc)
3567+ LOG.exception(_('Failed to consume message from queue: %s'),
3568+ exc)
3569+ self.do_consume = True
3570+
3571+ def _consume():
3572+ if self.do_consume:
3573+ queues_head = self.consumers[:-1] # not fanout.
3574+ queues_tail = self.consumers[-1] # fanout
3575+ for queue in queues_head:
3576+ queue.consume(nowait=True)
3577+ queues_tail.consume(nowait=False)
3578+ self.do_consume = False
3579+
3580+ poll_timeout = 1 if timeout is None else min(timeout, 1)
3581+ while True:
3582+ try:
3583+ return self.connection.drain_events(timeout=poll_timeout)
3584+ except socket.timeout as exc:
3585+ poll_timeout = timer.check_return(_raise_timeout, exc,
3586+ maximum=1)
3587+
3588+ for iteration in itertools.count(0):
3589+ if limit and iteration >= limit:
3590+ raise StopIteration
3591+ yield self.ensure(_error_callback, _consume)
3592+
3593+ def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
3594+ **kwargs):
3595+ """Send to a publisher based on the publisher class."""
3596+
3597+ def _error_callback(exc):
3598+ log_info = {'topic': topic, 'err_str': exc}
3599+ LOG.exception(_("Failed to publish message to topic "
3600+ "'%(topic)s': %(err_str)s"), log_info)
3601+
3602+ def _publish():
3603+ publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
3604+ publisher.send(msg, timeout)
3605+
3606+ self.ensure(_error_callback, _publish, retry=retry)
3607+
3608+ def declare_direct_consumer(self, topic, callback):
3609+ """Create a 'direct' queue.
3610+ In nova's use, this is generally a msg_id queue used for
3611+ responses for call/multicall
3612+ """
3613+ self.declare_consumer(DirectConsumer, topic, callback)
3614+
3615+ def declare_topic_consumer(self, exchange_name, topic, callback=None,
3616+ queue_name=None):
3617+ """Create a 'topic' consumer."""
3618+ self.declare_consumer(functools.partial(TopicConsumer,
3619+ name=queue_name,
3620+ exchange_name=exchange_name,
3621+ ),
3622+ topic, callback)
3623+
3624+ def declare_fanout_consumer(self, topic, callback):
3625+ """Create a 'fanout' consumer."""
3626+ self.declare_consumer(FanoutConsumer, topic, callback)
3627+
3628+ def direct_send(self, msg_id, msg):
3629+ """Send a 'direct' message."""
3630+ self.publisher_send(DirectPublisher, msg_id, msg)
3631+
3632+ def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
3633+ """Send a 'topic' message."""
3634+ self.publisher_send(TopicPublisher, topic, msg, timeout,
3635+ exchange_name=exchange_name, retry=retry)
3636+
3637+ def fanout_send(self, topic, msg, retry=None):
3638+ """Send a 'fanout' message."""
3639+ self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
3640+
3641+ def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
3642+ """Send a notify message on a topic."""
3643+ self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
3644+ exchange_name=exchange_name, retry=retry, **kwargs)
3645+
3646+ def consume(self, limit=None, timeout=None):
3647+ """Consume from all queues/consumers."""
3648+ it = self.iterconsume(limit=limit, timeout=timeout)
3649+ while True:
3650+ try:
3651+ six.next(it)
3652+ except StopIteration:
3653+ return
3654+
3655+
3656+class RabbitDriver(amqpdriver.AMQPDriverBase):
3657+
3658+ def __init__(self, conf, url,
3659+ default_exchange=None,
3660+ allowed_remote_exmods=None):
3661+ conf.register_opts(rabbit_opts)
3662+ conf.register_opts(rpc_amqp.amqp_opts)
3663+
3664+ connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection)
3665+
3666+ super(RabbitDriver, self).__init__(conf, url,
3667+ connection_pool,
3668+ default_exchange,
3669+ allowed_remote_exmods)
3670+
3671+ def require_features(self, requeue=True):
3672+ pass
3673
3674=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests'
3675=== added directory '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers'
3676=== added file '.pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py'
3677--- .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py 1970-01-01 00:00:00 +0000
3678+++ .pc/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch/tests/drivers/test_impl_rabbit.py 2015-12-17 11:03:34 +0000
3679@@ -0,0 +1,735 @@
3680+# Copyright 2013 Red Hat, Inc.
3681+#
3682+# Licensed under the Apache License, Version 2.0 (the "License"); you may
3683+# not use this file except in compliance with the License. You may obtain
3684+# a copy of the License at
3685+#
3686+# http://www.apache.org/licenses/LICENSE-2.0
3687+#
3688+# Unless required by applicable law or agreed to in writing, software
3689+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
3690+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
3691+# License for the specific language governing permissions and limitations
3692+# under the License.
3693+
3694+import datetime
3695+import operator
3696+import sys
3697+import threading
3698+import uuid
3699+
3700+import fixtures
3701+import kombu
3702+import mock
3703+import testscenarios
3704+
3705+from oslo import messaging
3706+from oslo.messaging._drivers import amqpdriver
3707+from oslo.messaging._drivers import common as driver_common
3708+from oslo.messaging._drivers import impl_rabbit as rabbit_driver
3709+from oslo.messaging.openstack.common import jsonutils
3710+from tests import utils as test_utils
3711+
3712+load_tests = testscenarios.load_tests_apply_scenarios
3713+
3714+
3715+class TestRabbitDriverLoad(test_utils.BaseTestCase):
3716+
3717+ def setUp(self):
3718+ super(TestRabbitDriverLoad, self).setUp()
3719+ self.messaging_conf.transport_driver = 'rabbit'
3720+ self.messaging_conf.in_memory = True
3721+
3722+ def test_driver_load(self):
3723+ transport = messaging.get_transport(self.conf)
3724+ self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
3725+
3726+
3727+class TestRabbitTransportURL(test_utils.BaseTestCase):
3728+
3729+ scenarios = [
3730+ ('none', dict(url=None,
3731+ expected=[dict(hostname='localhost',
3732+ port=5672,
3733+ userid='guest',
3734+ password='guest',
3735+ virtual_host='/')])),
3736+ ('empty',
3737+ dict(url='rabbit:///',
3738+ expected=[dict(hostname='localhost',
3739+ port=5672,
3740+ userid='guest',
3741+ password='guest',
3742+ virtual_host='')])),
3743+ ('localhost',
3744+ dict(url='rabbit://localhost/',
3745+ expected=[dict(hostname='localhost',
3746+ port=5672,
3747+ userid='',
3748+ password='',
3749+ virtual_host='')])),
3750+ ('virtual_host',
3751+ dict(url='rabbit:///vhost',
3752+ expected=[dict(hostname='localhost',
3753+ port=5672,
3754+ userid='guest',
3755+ password='guest',
3756+ virtual_host='vhost')])),
3757+ ('no_creds',
3758+ dict(url='rabbit://host/virtual_host',
3759+ expected=[dict(hostname='host',
3760+ port=5672,
3761+ userid='',
3762+ password='',
3763+ virtual_host='virtual_host')])),
3764+ ('no_port',
3765+ dict(url='rabbit://user:password@host/virtual_host',
3766+ expected=[dict(hostname='host',
3767+ port=5672,
3768+ userid='user',
3769+ password='password',
3770+ virtual_host='virtual_host')])),
3771+ ('full_url',
3772+ dict(url='rabbit://user:password@host:10/virtual_host',
3773+ expected=[dict(hostname='host',
3774+ port=10,
3775+ userid='user',
3776+ password='password',
3777+ virtual_host='virtual_host')])),
3778+ ('full_two_url',
3779+ dict(url='rabbit://user:password@host:10,'
3780+ 'user2:password2@host2:12/virtual_host',
3781+ expected=[dict(hostname='host',
3782+ port=10,
3783+ userid='user',
3784+ password='password',
3785+ virtual_host='virtual_host'),
3786+ dict(hostname='host2',
3787+ port=12,
3788+ userid='user2',
3789+ password='password2',
3790+ virtual_host='virtual_host')
3791+ ]
3792+ )),
3793+
3794+ ]
3795+
3796+ def test_transport_url(self):
3797+ self.messaging_conf.in_memory = True
3798+
3799+ transport = messaging.get_transport(self.conf, self.url)
3800+ self.addCleanup(transport.cleanup)
3801+ driver = transport._driver
3802+
3803+ brokers_params = driver._get_connection().brokers_params[:]
3804+ brokers_params = [dict((k, v) for k, v in broker.items()
3805+ if k not in ['transport', 'login_method'])
3806+ for broker in brokers_params]
3807+
3808+ self.assertEqual(sorted(self.expected,
3809+ key=operator.itemgetter('hostname')),
3810+ sorted(brokers_params,
3811+ key=operator.itemgetter('hostname')))
3812+
3813+
3814+class TestSendReceive(test_utils.BaseTestCase):
3815+
3816+ _n_senders = [
3817+ ('single_sender', dict(n_senders=1)),
3818+ ('multiple_senders', dict(n_senders=10)),
3819+ ]
3820+
3821+ _context = [
3822+ ('empty_context', dict(ctxt={})),
3823+ ('with_context', dict(ctxt={'user': 'mark'})),
3824+ ]
3825+
3826+ _reply = [
3827+ ('rx_id', dict(rx_id=True, reply=None)),
3828+ ('none', dict(rx_id=False, reply=None)),
3829+ ('empty_list', dict(rx_id=False, reply=[])),
3830+ ('empty_dict', dict(rx_id=False, reply={})),
3831+ ('false', dict(rx_id=False, reply=False)),
3832+ ('zero', dict(rx_id=False, reply=0)),
3833+ ]
3834+
3835+ _failure = [
3836+ ('success', dict(failure=False)),
3837+ ('failure', dict(failure=True, expected=False)),
3838+ ('expected_failure', dict(failure=True, expected=True)),
3839+ ]
3840+
3841+ _timeout = [
3842+ ('no_timeout', dict(timeout=None)),
3843+ ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
3844+ ]
3845+
3846+ @classmethod
3847+ def generate_scenarios(cls):
3848+ cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
3849+ cls._context,
3850+ cls._reply,
3851+ cls._failure,
3852+ cls._timeout)
3853+
3854+ def setUp(self):
3855+ super(TestSendReceive, self).setUp()
3856+ self.messaging_conf.transport_driver = 'rabbit'
3857+ self.messaging_conf.in_memory = True
3858+
3859+ def test_send_receive(self):
3860+ transport = messaging.get_transport(self.conf)
3861+ self.addCleanup(transport.cleanup)
3862+
3863+ driver = transport._driver
3864+
3865+ target = messaging.Target(topic='testtopic')
3866+
3867+ listener = driver.listen(target)
3868+
3869+ senders = []
3870+ replies = []
3871+ msgs = []
3872+ errors = []
3873+
3874+ def stub_error(msg, *a, **kw):
3875+ if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
3876+ a = a[0]
3877+ errors.append(str(msg) % a)
3878+
3879+ self.stubs.Set(driver_common.LOG, 'error', stub_error)
3880+
3881+ def send_and_wait_for_reply(i):
3882+ try:
3883+ replies.append(driver.send(target,
3884+ self.ctxt,
3885+ {'tx_id': i},
3886+ wait_for_reply=True,
3887+ timeout=self.timeout))
3888+ self.assertFalse(self.failure)
3889+ self.assertIsNone(self.timeout)
3890+ except (ZeroDivisionError, messaging.MessagingTimeout) as e:
3891+ replies.append(e)
3892+ self.assertTrue(self.failure or self.timeout is not None)
3893+
3894+ while len(senders) < self.n_senders:
3895+ senders.append(threading.Thread(target=send_and_wait_for_reply,
3896+ args=(len(senders), )))
3897+
3898+ for i in range(len(senders)):
3899+ senders[i].start()
3900+
3901+ received = listener.poll()
3902+ self.assertIsNotNone(received)
3903+ self.assertEqual(self.ctxt, received.ctxt)
3904+ self.assertEqual({'tx_id': i}, received.message)
3905+ msgs.append(received)
3906+
3907+ # reply in reverse, except reply to the first guy second from last
3908+ order = list(range(len(senders) - 1, -1, -1))
3909+ if len(order) > 1:
3910+ order[-1], order[-2] = order[-2], order[-1]
3911+
3912+ for i in order:
3913+ if self.timeout is None:
3914+ if self.failure:
3915+ try:
3916+ raise ZeroDivisionError
3917+ except Exception:
3918+ failure = sys.exc_info()
3919+ msgs[i].reply(failure=failure,
3920+ log_failure=not self.expected)
3921+ elif self.rx_id:
3922+ msgs[i].reply({'rx_id': i})
3923+ else:
3924+ msgs[i].reply(self.reply)
3925+ senders[i].join()
3926+
3927+ self.assertEqual(len(senders), len(replies))
3928+ for i, reply in enumerate(replies):
3929+ if self.timeout is not None:
3930+ self.assertIsInstance(reply, messaging.MessagingTimeout)
3931+ elif self.failure:
3932+ self.assertIsInstance(reply, ZeroDivisionError)
3933+ elif self.rx_id:
3934+ self.assertEqual({'rx_id': order[i]}, reply)
3935+ else:
3936+ self.assertEqual(self.reply, reply)
3937+
3938+ if not self.timeout and self.failure and not self.expected:
3939+ self.assertTrue(len(errors) > 0, errors)
3940+ else:
3941+ self.assertEqual(0, len(errors), errors)
3942+
3943+
3944+TestSendReceive.generate_scenarios()
3945+
3946+
3947+class TestPollAsync(test_utils.BaseTestCase):
3948+
3949+ def setUp(self):
3950+ super(TestPollAsync, self).setUp()
3951+ self.messaging_conf.transport_driver = 'rabbit'
3952+ self.messaging_conf.in_memory = True
3953+
3954+ def test_poll_timeout(self):
3955+ transport = messaging.get_transport(self.conf)
3956+ self.addCleanup(transport.cleanup)
3957+ driver = transport._driver
3958+ target = messaging.Target(topic='testtopic')
3959+ listener = driver.listen(target)
3960+ received = listener.poll(timeout=0.050)
3961+ self.assertIsNone(received)
3962+
3963+
3964+class TestRacyWaitForReply(test_utils.BaseTestCase):
3965+
3966+ def setUp(self):
3967+ super(TestRacyWaitForReply, self).setUp()
3968+ self.messaging_conf.transport_driver = 'rabbit'
3969+ self.messaging_conf.in_memory = True
3970+
3971+ def test_send_receive(self):
3972+ transport = messaging.get_transport(self.conf)
3973+ self.addCleanup(transport.cleanup)
3974+
3975+ driver = transport._driver
3976+
3977+ target = messaging.Target(topic='testtopic')
3978+
3979+ listener = driver.listen(target)
3980+
3981+ senders = []
3982+ replies = []
3983+ msgs = []
3984+
3985+ wait_conditions = []
3986+ orig_reply_waiter = amqpdriver.ReplyWaiter.wait
3987+
3988+ def reply_waiter(self, msg_id, timeout):
3989+ if wait_conditions:
3990+ cond = wait_conditions.pop()
3991+ with cond:
3992+ cond.notify()
3993+ with cond:
3994+ cond.wait()
3995+ return orig_reply_waiter(self, msg_id, timeout)
3996+
3997+ self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
3998+
3999+ def send_and_wait_for_reply(i, wait_for_reply):
4000+ replies.append(driver.send(target,
4001+ {},
4002+ {'tx_id': i},
4003+ wait_for_reply=wait_for_reply,
4004+ timeout=None))
4005+
4006+ while len(senders) < 2:
4007+ t = threading.Thread(target=send_and_wait_for_reply,
4008+ args=(len(senders), True))
4009+ t.daemon = True
4010+ senders.append(t)
4011+
4012+ # test the case then msg_id is not set
4013+ t = threading.Thread(target=send_and_wait_for_reply,
4014+ args=(len(senders), False))
4015+ t.daemon = True
4016+ senders.append(t)
4017+
4018+ # Start the first guy, receive his message, but delay his polling
4019+ notify_condition = threading.Condition()
4020+ wait_conditions.append(notify_condition)
4021+ with notify_condition:
4022+ senders[0].start()
4023+ notify_condition.wait()
4024+
4025+ msgs.append(listener.poll())
4026+ self.assertEqual({'tx_id': 0}, msgs[-1].message)
4027+
4028+ # Start the second guy, receive his message
4029+ senders[1].start()
4030+
4031+ msgs.append(listener.poll())
4032+ self.assertEqual({'tx_id': 1}, msgs[-1].message)
4033+
4034+ # Reply to both in order, making the second thread queue
4035+ # the reply meant for the first thread
4036+ msgs[0].reply({'rx_id': 0})
4037+ msgs[1].reply({'rx_id': 1})
4038+
4039+ # Wait for the second thread to finish
4040+ senders[1].join()
4041+
4042+ # Start the 3rd guy, receive his message
4043+ senders[2].start()
4044+
4045+ msgs.append(listener.poll())
4046+ self.assertEqual({'tx_id': 2}, msgs[-1].message)
4047+
4048+ # Verify the _send_reply was not invoked by driver:
4049+ with mock.patch.object(msgs[2], '_send_reply') as method:
4050+ msgs[2].reply({'rx_id': 2})
4051+ self.assertEqual(method.call_count, 0)
4052+
4053+ # Wait for the 3rd thread to finish
4054+ senders[2].join()
4055+
4056+ # Let the first thread continue
4057+ with notify_condition:
4058+ notify_condition.notify()
4059+
4060+ # Wait for the first thread to finish
4061+ senders[0].join()
4062+
4063+ # Verify replies were received out of order
4064+ self.assertEqual(len(senders), len(replies))
4065+ self.assertEqual({'rx_id': 1}, replies[0])
4066+ self.assertIsNone(replies[1])
4067+ self.assertEqual({'rx_id': 0}, replies[2])
4068+
4069+
4070+def _declare_queue(target):
4071+ connection = kombu.connection.BrokerConnection(transport='memory')
4072+
4073+ # Kludge to speed up tests.
4074+ connection.transport.polling_interval = 0.0
4075+
4076+ connection.connect()
4077+ channel = connection.channel()
4078+
4079+ # work around 'memory' transport bug in 1.1.3
4080+ channel._new_queue('ae.undeliver')
4081+
4082+ if target.fanout:
4083+ exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
4084+ type='fanout',
4085+ durable=False,
4086+ auto_delete=True)
4087+ queue = kombu.entity.Queue(name=target.topic + '_fanout_12345',
4088+ channel=channel,
4089+ exchange=exchange,
4090+ routing_key=target.topic)
4091+ if target.server:
4092+ exchange = kombu.entity.Exchange(name='openstack',
4093+ type='topic',
4094+ durable=False,
4095+ auto_delete=False)
4096+ topic = '%s.%s' % (target.topic, target.server)
4097+ queue = kombu.entity.Queue(name=topic,
4098+ channel=channel,
4099+ exchange=exchange,
4100+ routing_key=topic)
4101+ else:
4102+ exchange = kombu.entity.Exchange(name='openstack',
4103+ type='topic',
4104+ durable=False,
4105+ auto_delete=False)
4106+ queue = kombu.entity.Queue(name=target.topic,
4107+ channel=channel,
4108+ exchange=exchange,
4109+ routing_key=target.topic)
4110+
4111+ queue.declare()
4112+
4113+ return connection, channel, queue
4114+
4115+
4116+class TestRequestWireFormat(test_utils.BaseTestCase):
4117+
4118+ _target = [
4119+ ('topic_target',
4120+ dict(topic='testtopic', server=None, fanout=False)),
4121+ ('server_target',
4122+ dict(topic='testtopic', server='testserver', fanout=False)),
4123+ # NOTE(markmc): https://github.com/celery/kombu/issues/195
4124+ ('fanout_target',
4125+ dict(topic='testtopic', server=None, fanout=True,
4126+ skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
4127+ ]
4128+
4129+ _msg = [
4130+ ('empty_msg',
4131+ dict(msg={}, expected={})),
4132+ ('primitive_msg',
4133+ dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
4134+ ('complex_msg',
4135+ dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}},
4136+ expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
4137+ ]
4138+
4139+ _context = [
4140+ ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
4141+ ('user_project_ctxt',
4142+ dict(ctxt={'user': 'mark', 'project': 'snarkybunch'},
4143+ expected_ctxt={'_context_user': 'mark',
4144+ '_context_project': 'snarkybunch'})),
4145+ ]
4146+
4147+ @classmethod
4148+ def generate_scenarios(cls):
4149+ cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
4150+ cls._context,
4151+ cls._target)
4152+
4153+ def setUp(self):
4154+ super(TestRequestWireFormat, self).setUp()
4155+ self.messaging_conf.transport_driver = 'rabbit'
4156+ self.messaging_conf.in_memory = True
4157+
4158+ self.uuids = []
4159+ self.orig_uuid4 = uuid.uuid4
4160+ self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
4161+
4162+ def mock_uuid4(self):
4163+ self.uuids.append(self.orig_uuid4())
4164+ return self.uuids[-1]
4165+
4166+ def test_request_wire_format(self):
4167+ if hasattr(self, 'skip_msg'):
4168+ self.skipTest(self.skip_msg)
4169+
4170+ transport = messaging.get_transport(self.conf)
4171+ self.addCleanup(transport.cleanup)
4172+
4173+ driver = transport._driver
4174+
4175+ target = messaging.Target(topic=self.topic,
4176+ server=self.server,
4177+ fanout=self.fanout)
4178+
4179+ connection, channel, queue = _declare_queue(target)
4180+ self.addCleanup(connection.release)
4181+
4182+ driver.send(target, self.ctxt, self.msg)
4183+
4184+ msgs = []
4185+
4186+ def callback(msg):
4187+ msg = channel.message_to_python(msg)
4188+ msg.ack()
4189+ msgs.append(msg.payload)
4190+
4191+ queue.consume(callback=callback,
4192+ consumer_tag='1',
4193+ nowait=False)
4194+
4195+ connection.drain_events()
4196+
4197+ self.assertEqual(1, len(msgs))
4198+ self.assertIn('oslo.message', msgs[0])
4199+
4200+ received = msgs[0]
4201+ received['oslo.message'] = jsonutils.loads(received['oslo.message'])
4202+
4203+ # FIXME(markmc): add _msg_id and _reply_q check
4204+ expected_msg = {
4205+ '_unique_id': self.uuids[0].hex,
4206+ }
4207+ expected_msg.update(self.expected)
4208+ expected_msg.update(self.expected_ctxt)
4209+
4210+ expected = {
4211+ 'oslo.version': '2.0',
4212+ 'oslo.message': expected_msg,
4213+ }
4214+
4215+ self.assertEqual(expected, received)
4216+
4217+
4218+TestRequestWireFormat.generate_scenarios()
4219+
4220+
4221+def _create_producer(target):
4222+ connection = kombu.connection.BrokerConnection(transport='memory')
4223+
4224+ # Kludge to speed up tests.
4225+ connection.transport.polling_interval = 0.0
4226+
4227+ connection.connect()
4228+ channel = connection.channel()
4229+
4230+ # work around 'memory' transport bug in 1.1.3
4231+ channel._new_queue('ae.undeliver')
4232+
4233+ if target.fanout:
4234+ exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
4235+ type='fanout',
4236+ durable=False,
4237+ auto_delete=True)
4238+ producer = kombu.messaging.Producer(exchange=exchange,
4239+ channel=channel,
4240+ routing_key=target.topic)
4241+ elif target.server:
4242+ exchange = kombu.entity.Exchange(name='openstack',
4243+ type='topic',
4244+ durable=False,
4245+ auto_delete=False)
4246+ topic = '%s.%s' % (target.topic, target.server)
4247+ producer = kombu.messaging.Producer(exchange=exchange,
4248+ channel=channel,
4249+ routing_key=topic)
4250+ else:
4251+ exchange = kombu.entity.Exchange(name='openstack',
4252+ type='topic',
4253+ durable=False,
4254+ auto_delete=False)
4255+ producer = kombu.messaging.Producer(exchange=exchange,
4256+ channel=channel,
4257+ routing_key=target.topic)
4258+
4259+ return connection, producer
4260+
4261+
4262+class TestReplyWireFormat(test_utils.BaseTestCase):
4263+
4264+ _target = [
4265+ ('topic_target',
4266+ dict(topic='testtopic', server=None, fanout=False)),
4267+ ('server_target',
4268+ dict(topic='testtopic', server='testserver', fanout=False)),
4269+ # NOTE(markmc): https://github.com/celery/kombu/issues/195
4270+ ('fanout_target',
4271+ dict(topic='testtopic', server=None, fanout=True,
4272+ skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
4273+ ]
4274+
4275+ _msg = [
4276+ ('empty_msg',
4277+ dict(msg={}, expected={})),
4278+ ('primitive_msg',
4279+ dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
4280+ ('complex_msg',
4281+ dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}},
4282+ expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
4283+ ]
4284+
4285+ _context = [
4286+ ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
4287+ ('user_project_ctxt',
4288+ dict(ctxt={'_context_user': 'mark',
4289+ '_context_project': 'snarkybunch'},
4290+ expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
4291+ ]
4292+
4293+ @classmethod
4294+ def generate_scenarios(cls):
4295+ cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
4296+ cls._context,
4297+ cls._target)
4298+
4299+ def setUp(self):
4300+ super(TestReplyWireFormat, self).setUp()
4301+ self.messaging_conf.transport_driver = 'rabbit'
4302+ self.messaging_conf.in_memory = True
4303+
4304+ def test_reply_wire_format(self):
4305+ if hasattr(self, 'skip_msg'):
4306+ self.skipTest(self.skip_msg)
4307+
4308+ transport = messaging.get_transport(self.conf)
4309+ self.addCleanup(transport.cleanup)
4310+
4311+ driver = transport._driver
4312+
4313+ target = messaging.Target(topic=self.topic,
4314+ server=self.server,
4315+ fanout=self.fanout)
4316+
4317+ listener = driver.listen(target)
4318+
4319+ connection, producer = _create_producer(target)
4320+ self.addCleanup(connection.release)
4321+
4322+ msg = {
4323+ 'oslo.version': '2.0',
4324+ 'oslo.message': {}
4325+ }
4326+
4327+ msg['oslo.message'].update(self.msg)
4328+ msg['oslo.message'].update(self.ctxt)
4329+
4330+ msg['oslo.message'].update({
4331+ '_msg_id': uuid.uuid4().hex,
4332+ '_unique_id': uuid.uuid4().hex,
4333+ '_reply_q': 'reply_' + uuid.uuid4().hex,
4334+ })
4335+
4336+ msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
4337+
4338+ producer.publish(msg)
4339+
4340+ received = listener.poll()
4341+ self.assertIsNotNone(received)
4342+ self.assertEqual(self.expected_ctxt, received.ctxt)
4343+ self.assertEqual(self.expected, received.message)
4344+
4345+
4346+TestReplyWireFormat.generate_scenarios()
4347+
4348+
4349+class RpcKombuHATestCase(test_utils.BaseTestCase):
4350+
4351+ def setUp(self):
4352+ super(RpcKombuHATestCase, self).setUp()
4353+ self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
4354+ self.config(rabbit_hosts=self.brokers)
4355+
4356+ hostname_sets = set()
4357+ self.info = {'attempt': 0,
4358+ 'fail': False}
4359+
4360+ def _connect(myself, params):
4361+ # do as little work that is enough to pass connection attempt
4362+ myself.connection = kombu.connection.BrokerConnection(**params)
4363+ myself.connection_errors = myself.connection.connection_errors
4364+
4365+ hostname = params['hostname']
4366+ self.assertNotIn(hostname, hostname_sets)
4367+ hostname_sets.add(hostname)
4368+
4369+ self.info['attempt'] += 1
4370+ if self.info['fail']:
4371+ raise IOError('fake fail')
4372+
4373+ # just make sure connection instantiation does not fail with an
4374+ # exception
4375+ self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
4376+
4377+ # starting from the first broker in the list
4378+ url = messaging.TransportURL.parse(self.conf, None)
4379+ self.connection = rabbit_driver.Connection(self.conf, url)
4380+ self.addCleanup(self.connection.close)
4381+
4382+ self.info.update({'attempt': 0,
4383+ 'fail': True})
4384+ hostname_sets.clear()
4385+
4386+ def test_reconnect_order(self):
4387+ self.assertRaises(messaging.MessageDeliveryFailure,
4388+ self.connection.reconnect,
4389+ retry=len(self.brokers) - 1)
4390+ self.assertEqual(len(self.brokers), self.info['attempt'])
4391+
4392+ def test_ensure_four_retry(self):
4393+ mock_callback = mock.Mock(side_effect=IOError)
4394+ self.assertRaises(messaging.MessageDeliveryFailure,
4395+ self.connection.ensure, None, mock_callback,
4396+ retry=4)
4397+ self.assertEqual(5, self.info['attempt'])
4398+ self.assertEqual(1, mock_callback.call_count)
4399+
4400+ def test_ensure_one_retry(self):
4401+ mock_callback = mock.Mock(side_effect=IOError)
4402+ self.assertRaises(messaging.MessageDeliveryFailure,
4403+ self.connection.ensure, None, mock_callback,
4404+ retry=1)
4405+ self.assertEqual(2, self.info['attempt'])
4406+ self.assertEqual(1, mock_callback.call_count)
4407+
4408+ def test_ensure_no_retry(self):
4409+ mock_callback = mock.Mock(side_effect=IOError)
4410+ self.assertRaises(messaging.MessageDeliveryFailure,
4411+ self.connection.ensure, None, mock_callback,
4412+ retry=0)
4413+ self.assertEqual(1, self.info['attempt'])
4414+ self.assertEqual(1, mock_callback.call_count)
4415
4416=== added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch'
4417=== added file '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/.timestamp'
4418=== added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo'
4419=== added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging'
4420=== added directory '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers'
4421=== added file '.pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py'
4422--- .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
4423+++ .pc/0005-Fix-possible-usage-of-undefined-variable.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-12-17 11:03:34 +0000
4424@@ -0,0 +1,839 @@
4425+# Copyright 2011 OpenStack Foundation
4426+#
4427+# Licensed under the Apache License, Version 2.0 (the "License"); you may
4428+# not use this file except in compliance with the License. You may obtain
4429+# a copy of the License at
4430+#
4431+# http://www.apache.org/licenses/LICENSE-2.0
4432+#
4433+# Unless required by applicable law or agreed to in writing, software
4434+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
4435+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
4436+# License for the specific language governing permissions and limitations
4437+# under the License.
4438+
4439+import functools
4440+import itertools
4441+import logging
4442+import random
4443+import socket
4444+import ssl
4445+import time
4446+import uuid
4447+
4448+import kombu
4449+import kombu.connection
4450+import kombu.entity
4451+import kombu.messaging
4452+import six
4453+
4454+from oslo.config import cfg
4455+from oslo.messaging._drivers import amqp as rpc_amqp
4456+from oslo.messaging._drivers import amqpdriver
4457+from oslo.messaging._drivers import common as rpc_common
4458+from oslo.messaging import exceptions
4459+from oslo.messaging.openstack.common.gettextutils import _
4460+from oslo.utils import netutils
4461+
4462+rabbit_opts = [
4463+ cfg.StrOpt('kombu_ssl_version',
4464+ default='',
4465+ help='SSL version to use (valid only if SSL enabled). '
4466+ 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
4467+ 'be available on some distributions.'
4468+ ),
4469+ cfg.StrOpt('kombu_ssl_keyfile',
4470+ default='',
4471+ help='SSL key file (valid only if SSL enabled).'),
4472+ cfg.StrOpt('kombu_ssl_certfile',
4473+ default='',
4474+ help='SSL cert file (valid only if SSL enabled).'),
4475+ cfg.StrOpt('kombu_ssl_ca_certs',
4476+ default='',
4477+ help='SSL certification authority file '
4478+ '(valid only if SSL enabled).'),
4479+ cfg.FloatOpt('kombu_reconnect_delay',
4480+ default=1.0,
4481+ help='How long to wait before reconnecting in response to an '
4482+ 'AMQP consumer cancel notification.'),
4483+ cfg.StrOpt('rabbit_host',
4484+ default='localhost',
4485+ help='The RabbitMQ broker address where a single node is '
4486+ 'used.'),
4487+ cfg.IntOpt('rabbit_port',
4488+ default=5672,
4489+ help='The RabbitMQ broker port where a single node is used.'),
4490+ cfg.ListOpt('rabbit_hosts',
4491+ default=['$rabbit_host:$rabbit_port'],
4492+ help='RabbitMQ HA cluster host:port pairs.'),
4493+ cfg.BoolOpt('rabbit_use_ssl',
4494+ default=False,
4495+ help='Connect over SSL for RabbitMQ.'),
4496+ cfg.StrOpt('rabbit_userid',
4497+ default='guest',
4498+ help='The RabbitMQ userid.'),
4499+ cfg.StrOpt('rabbit_password',
4500+ default='guest',
4501+ help='The RabbitMQ password.',
4502+ secret=True),
4503+ cfg.StrOpt('rabbit_login_method',
4504+ default='AMQPLAIN',
4505+ help='the RabbitMQ login method'),
4506+ cfg.StrOpt('rabbit_virtual_host',
4507+ default='/',
4508+ help='The RabbitMQ virtual host.'),
4509+ cfg.IntOpt('rabbit_retry_interval',
4510+ default=1,
4511+ help='How frequently to retry connecting with RabbitMQ.'),
4512+ cfg.IntOpt('rabbit_retry_backoff',
4513+ default=2,
4514+ help='How long to backoff for between retries when connecting '
4515+ 'to RabbitMQ.'),
4516+ cfg.IntOpt('rabbit_max_retries',
4517+ default=0,
4518+ help='Maximum number of RabbitMQ connection retries. '
4519+ 'Default is 0 (infinite retry count).'),
4520+ cfg.BoolOpt('rabbit_ha_queues',
4521+ default=False,
4522+ help='Use HA queues in RabbitMQ (x-ha-policy: all). '
4523+ 'If you change this option, you must wipe the '
4524+ 'RabbitMQ database.'),
4525+
4526+ # FIXME(markmc): this was toplevel in openstack.common.rpc
4527+ cfg.BoolOpt('fake_rabbit',
4528+ default=False,
4529+ help='If passed, use a fake RabbitMQ provider.'),
4530+]
4531+
4532+LOG = logging.getLogger(__name__)
4533+
4534+
4535+def _get_queue_arguments(conf):
4536+ """Construct the arguments for declaring a queue.
4537+
4538+ If the rabbit_ha_queues option is set, we declare a mirrored queue
4539+ as described here:
4540+
4541+ http://www.rabbitmq.com/ha.html
4542+
4543+ Setting x-ha-policy to all means that the queue will be mirrored
4544+ to all nodes in the cluster.
4545+ """
4546+ return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
4547+
4548+
4549+class RabbitMessage(dict):
4550+ def __init__(self, raw_message):
4551+ super(RabbitMessage, self).__init__(
4552+ rpc_common.deserialize_msg(raw_message.payload))
4553+ self._raw_message = raw_message
4554+
4555+ def acknowledge(self):
4556+ self._raw_message.ack()
4557+
4558+ def requeue(self):
4559+ self._raw_message.requeue()
4560+
4561+
4562+class ConsumerBase(object):
4563+ """Consumer base class."""
4564+
4565+ def __init__(self, channel, callback, tag, **kwargs):
4566+ """Declare a queue on an amqp channel.
4567+
4568+ 'channel' is the amqp channel to use
4569+ 'callback' is the callback to call when messages are received
4570+ 'tag' is a unique ID for the consumer on the channel
4571+
4572+ queue name, exchange name, and other kombu options are
4573+ passed in here as a dictionary.
4574+ """
4575+ self.callback = callback
4576+ self.tag = six.text_type(tag)
4577+ self.kwargs = kwargs
4578+ self.queue = None
4579+ self.reconnect(channel)
4580+
4581+ def reconnect(self, channel):
4582+ """Re-declare the queue after a rabbit reconnect."""
4583+ self.channel = channel
4584+ self.kwargs['channel'] = channel
4585+ self.queue = kombu.entity.Queue(**self.kwargs)
4586+ self.queue.declare()
4587+
4588+ def _callback_handler(self, message, callback):
4589+ """Call callback with deserialized message.
4590+
4591+ Messages that are processed and ack'ed.
4592+ """
4593+
4594+ try:
4595+ callback(RabbitMessage(message))
4596+ except Exception:
4597+ LOG.exception(_("Failed to process message"
4598+ " ... skipping it."))
4599+ message.ack()
4600+
4601+ def consume(self, *args, **kwargs):
4602+ """Actually declare the consumer on the amqp channel. This will
4603+ start the flow of messages from the queue. Using the
4604+ Connection.iterconsume() iterator will process the messages,
4605+ calling the appropriate callback.
4606+
4607+ If a callback is specified in kwargs, use that. Otherwise,
4608+ use the callback passed during __init__()
4609+
4610+ If kwargs['nowait'] is True, then this call will block until
4611+ a message is read.
4612+
4613+ """
4614+
4615+ options = {'consumer_tag': self.tag}
4616+ options['nowait'] = kwargs.get('nowait', False)
4617+ callback = kwargs.get('callback', self.callback)
4618+ if not callback:
4619+ raise ValueError("No callback defined")
4620+
4621+ def _callback(raw_message):
4622+ message = self.channel.message_to_python(raw_message)
4623+ self._callback_handler(message, callback)
4624+
4625+ self.queue.consume(*args, callback=_callback, **options)
4626+
4627+ def cancel(self):
4628+ """Cancel the consuming from the queue, if it has started."""
4629+ try:
4630+ self.queue.cancel(self.tag)
4631+ except KeyError as e:
4632+ # NOTE(comstud): Kludge to get around a amqplib bug
4633+ if six.text_type(e) != "u'%s'" % self.tag:
4634+ raise
4635+ self.queue = None
4636+
4637+
4638+class DirectConsumer(ConsumerBase):
4639+ """Queue/consumer class for 'direct'."""
4640+
4641+ def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
4642+ """Init a 'direct' queue.
4643+
4644+ 'channel' is the amqp channel to use
4645+ 'msg_id' is the msg_id to listen on
4646+ 'callback' is the callback to call when messages are received
4647+ 'tag' is a unique ID for the consumer on the channel
4648+
4649+ Other kombu options may be passed
4650+ """
4651+ # Default options
4652+ options = {'durable': False,
4653+ 'queue_arguments': _get_queue_arguments(conf),
4654+ 'auto_delete': True,
4655+ 'exclusive': False}
4656+ options.update(kwargs)
4657+ exchange = kombu.entity.Exchange(name=msg_id,
4658+ type='direct',
4659+ durable=options['durable'],
4660+ auto_delete=options['auto_delete'])
4661+ super(DirectConsumer, self).__init__(channel,
4662+ callback,
4663+ tag,
4664+ name=msg_id,
4665+ exchange=exchange,
4666+ routing_key=msg_id,
4667+ **options)
4668+
4669+
4670+class TopicConsumer(ConsumerBase):
4671+ """Consumer class for 'topic'."""
4672+
4673+ def __init__(self, conf, channel, topic, callback, tag, exchange_name,
4674+ name=None, **kwargs):
4675+ """Init a 'topic' queue.
4676+
4677+ :param channel: the amqp channel to use
4678+ :param topic: the topic to listen on
4679+ :paramtype topic: str
4680+ :param callback: the callback to call when messages are received
4681+ :param tag: a unique ID for the consumer on the channel
4682+ :param exchange_name: the exchange name to use
4683+ :param name: optional queue name, defaults to topic
4684+ :paramtype name: str
4685+
4686+ Other kombu options may be passed as keyword arguments
4687+ """
4688+ # Default options
4689+ options = {'durable': conf.amqp_durable_queues,
4690+ 'queue_arguments': _get_queue_arguments(conf),
4691+ 'auto_delete': conf.amqp_auto_delete,
4692+ 'exclusive': False}
4693+ options.update(kwargs)
4694+ exchange = kombu.entity.Exchange(name=exchange_name,
4695+ type='topic',
4696+ durable=options['durable'],
4697+ auto_delete=options['auto_delete'])
4698+ super(TopicConsumer, self).__init__(channel,
4699+ callback,
4700+ tag,
4701+ name=name or topic,
4702+ exchange=exchange,
4703+ routing_key=topic,
4704+ **options)
4705+
4706+
4707+class FanoutConsumer(ConsumerBase):
4708+ """Consumer class for 'fanout'."""
4709+
4710+ def __init__(self, conf, channel, topic, callback, tag, **kwargs):
4711+ """Init a 'fanout' queue.
4712+
4713+ 'channel' is the amqp channel to use
4714+ 'topic' is the topic to listen on
4715+ 'callback' is the callback to call when messages are received
4716+ 'tag' is a unique ID for the consumer on the channel
4717+
4718+ Other kombu options may be passed
4719+ """
4720+ unique = uuid.uuid4().hex
4721+ exchange_name = '%s_fanout' % topic
4722+ queue_name = '%s_fanout_%s' % (topic, unique)
4723+
4724+ # Default options
4725+ options = {'durable': False,
4726+ 'queue_arguments': _get_queue_arguments(conf),
4727+ 'auto_delete': True,
4728+ 'exclusive': False}
4729+ options.update(kwargs)
4730+ exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
4731+ durable=options['durable'],
4732+ auto_delete=options['auto_delete'])
4733+ super(FanoutConsumer, self).__init__(channel, callback, tag,
4734+ name=queue_name,
4735+ exchange=exchange,
4736+ routing_key=topic,
4737+ **options)
4738+
4739+
4740+class Publisher(object):
4741+ """Base Publisher class."""
4742+
4743+ def __init__(self, channel, exchange_name, routing_key, **kwargs):
4744+ """Init the Publisher class with the exchange_name, routing_key,
4745+ and other options
4746+ """
4747+ self.exchange_name = exchange_name
4748+ self.routing_key = routing_key
4749+ self.kwargs = kwargs
4750+ self.reconnect(channel)
4751+
4752+ def reconnect(self, channel):
4753+ """Re-establish the Producer after a rabbit reconnection."""
4754+ self.exchange = kombu.entity.Exchange(name=self.exchange_name,
4755+ **self.kwargs)
4756+ self.producer = kombu.messaging.Producer(exchange=self.exchange,
4757+ channel=channel,
4758+ routing_key=self.routing_key)
4759+
4760+ def send(self, msg, timeout=None):
4761+ """Send a message."""
4762+ if timeout:
4763+ #
4764+ # AMQP TTL is in milliseconds when set in the header.
4765+ #
4766+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
4767+ else:
4768+ self.producer.publish(msg)
4769+
4770+
4771+class DirectPublisher(Publisher):
4772+ """Publisher class for 'direct'."""
4773+ def __init__(self, conf, channel, topic, **kwargs):
4774+ """Init a 'direct' publisher.
4775+
4776+ Kombu options may be passed as keyword args to override defaults
4777+ """
4778+
4779+ options = {'durable': False,
4780+ 'auto_delete': True,
4781+ 'exclusive': False}
4782+ options.update(kwargs)
4783+ super(DirectPublisher, self).__init__(channel, topic, topic,
4784+ type='direct', **options)
4785+
4786+
4787+class TopicPublisher(Publisher):
4788+ """Publisher class for 'topic'."""
4789+ def __init__(self, conf, channel, exchange_name, topic, **kwargs):
4790+ """Init a 'topic' publisher.
4791+
4792+ Kombu options may be passed as keyword args to override defaults
4793+ """
4794+ options = {'durable': conf.amqp_durable_queues,
4795+ 'auto_delete': conf.amqp_auto_delete,
4796+ 'exclusive': False}
4797+ options.update(kwargs)
4798+ super(TopicPublisher, self).__init__(channel,
4799+ exchange_name,
4800+ topic,
4801+ type='topic',
4802+ **options)
4803+
4804+
4805+class FanoutPublisher(Publisher):
4806+ """Publisher class for 'fanout'."""
4807+ def __init__(self, conf, channel, topic, **kwargs):
4808+ """Init a 'fanout' publisher.
4809+
4810+ Kombu options may be passed as keyword args to override defaults
4811+ """
4812+ options = {'durable': False,
4813+ 'auto_delete': True,
4814+ 'exclusive': False}
4815+ options.update(kwargs)
4816+ super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
4817+ None, type='fanout', **options)
4818+
4819+
4820+class NotifyPublisher(TopicPublisher):
4821+ """Publisher class for 'notify'."""
4822+
4823+ def __init__(self, conf, channel, exchange_name, topic, **kwargs):
4824+ self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
4825+ self.queue_arguments = _get_queue_arguments(conf)
4826+ super(NotifyPublisher, self).__init__(conf, channel, exchange_name,
4827+ topic, **kwargs)
4828+
4829+ def reconnect(self, channel):
4830+ super(NotifyPublisher, self).reconnect(channel)
4831+
4832+ # NOTE(jerdfelt): Normally the consumer would create the queue, but
4833+ # we do this to ensure that messages don't get dropped if the
4834+ # consumer is started after we do
4835+ queue = kombu.entity.Queue(channel=channel,
4836+ exchange=self.exchange,
4837+ durable=self.durable,
4838+ name=self.routing_key,
4839+ routing_key=self.routing_key,
4840+ queue_arguments=self.queue_arguments)
4841+ queue.declare()
4842+
4843+
4844+class Connection(object):
4845+ """Connection object."""
4846+
4847+ pools = {}
4848+
4849+ def __init__(self, conf, url):
4850+ self.consumers = []
4851+ self.conf = conf
4852+ self.max_retries = self.conf.rabbit_max_retries
4853+ # Try forever?
4854+ if self.max_retries <= 0:
4855+ self.max_retries = None
4856+ self.interval_start = self.conf.rabbit_retry_interval
4857+ self.interval_stepping = self.conf.rabbit_retry_backoff
4858+ # max retry-interval = 30 seconds
4859+ self.interval_max = 30
4860+ self.memory_transport = False
4861+
4862+ ssl_params = self._fetch_ssl_params()
4863+
4864+ if url.virtual_host is not None:
4865+ virtual_host = url.virtual_host
4866+ else:
4867+ virtual_host = self.conf.rabbit_virtual_host
4868+
4869+ self.brokers_params = []
4870+ if url.hosts:
4871+ for host in url.hosts:
4872+ params = {
4873+ 'hostname': host.hostname,
4874+ 'port': host.port or 5672,
4875+ 'userid': host.username or '',
4876+ 'password': host.password or '',
4877+ 'login_method': self.conf.rabbit_login_method,
4878+ 'virtual_host': virtual_host
4879+ }
4880+ if self.conf.fake_rabbit:
4881+ params['transport'] = 'memory'
4882+ if self.conf.rabbit_use_ssl:
4883+ params['ssl'] = ssl_params
4884+
4885+ self.brokers_params.append(params)
4886+ else:
4887+ # Old configuration format
4888+ for adr in self.conf.rabbit_hosts:
4889+ hostname, port = netutils.parse_host_port(
4890+ adr, default_port=self.conf.rabbit_port)
4891+
4892+ params = {
4893+ 'hostname': hostname,
4894+ 'port': port,
4895+ 'userid': self.conf.rabbit_userid,
4896+ 'password': self.conf.rabbit_password,
4897+ 'login_method': self.conf.rabbit_login_method,
4898+ 'virtual_host': virtual_host
4899+ }
4900+
4901+ if self.conf.fake_rabbit:
4902+ params['transport'] = 'memory'
4903+ if self.conf.rabbit_use_ssl:
4904+ params['ssl'] = ssl_params
4905+
4906+ self.brokers_params.append(params)
4907+
4908+ random.shuffle(self.brokers_params)
4909+ self.brokers = itertools.cycle(self.brokers_params)
4910+
4911+ self.memory_transport = self.conf.fake_rabbit
4912+
4913+ self.connection = None
4914+ self.do_consume = None
4915+ self.reconnect()
4916+
4917+ # FIXME(markmc): use oslo sslutils when it is available as a library
4918+ _SSL_PROTOCOLS = {
4919+ "tlsv1": ssl.PROTOCOL_TLSv1,
4920+ "sslv23": ssl.PROTOCOL_SSLv23,
4921+ "sslv3": ssl.PROTOCOL_SSLv3
4922+ }
4923+
4924+ try:
4925+ _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
4926+ except AttributeError:
4927+ pass
4928+
4929+ @classmethod
4930+ def validate_ssl_version(cls, version):
4931+ key = version.lower()
4932+ try:
4933+ return cls._SSL_PROTOCOLS[key]
4934+ except KeyError:
4935+ raise RuntimeError(_("Invalid SSL version : %s") % version)
4936+
4937+ def _fetch_ssl_params(self):
4938+ """Handles fetching what ssl params should be used for the connection
4939+ (if any).
4940+ """
4941+ ssl_params = dict()
4942+
4943+ # http://docs.python.org/library/ssl.html - ssl.wrap_socket
4944+ if self.conf.kombu_ssl_version:
4945+ ssl_params['ssl_version'] = self.validate_ssl_version(
4946+ self.conf.kombu_ssl_version)
4947+ if self.conf.kombu_ssl_keyfile:
4948+ ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
4949+ if self.conf.kombu_ssl_certfile:
4950+ ssl_params['certfile'] = self.conf.kombu_ssl_certfile
4951+ if self.conf.kombu_ssl_ca_certs:
4952+ ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
4953+ # We might want to allow variations in the
4954+ # future with this?
4955+ ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
4956+
4957+ # Return the extended behavior or just have the default behavior
4958+ return ssl_params or True
4959+
4960+ def _connect(self, broker):
4961+ """Connect to rabbit. Re-establish any queues that may have
4962+ been declared before if we are reconnecting. Exceptions should
4963+ be handled by the caller.
4964+ """
4965+ LOG.info(_("Connecting to AMQP server on "
4966+ "%(hostname)s:%(port)d"), broker)
4967+ self.connection = kombu.connection.BrokerConnection(**broker)
4968+ self.connection_errors = self.connection.connection_errors
4969+ self.channel_errors = self.connection.channel_errors
4970+ if self.memory_transport:
4971+ # Kludge to speed up tests.
4972+ self.connection.transport.polling_interval = 0.0
4973+ self.do_consume = True
4974+ self.consumer_num = itertools.count(1)
4975+ self.connection.connect()
4976+ self.channel = self.connection.channel()
4977+ # work around 'memory' transport bug in 1.1.3
4978+ if self.memory_transport:
4979+ self.channel._new_queue('ae.undeliver')
4980+ for consumer in self.consumers:
4981+ consumer.reconnect(self.channel)
4982+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
4983+ broker)
4984+
4985+ def _disconnect(self):
4986+ if self.connection:
4987+ # XXX(nic): when reconnecting to a RabbitMQ cluster
4988+ # with mirrored queues in use, the attempt to release the
4989+ # connection can hang "indefinitely" somewhere deep down
4990+ # in Kombu. Blocking the thread for a bit prior to
4991+ # release seems to kludge around the problem where it is
4992+ # otherwise reproduceable.
4993+ if self.conf.kombu_reconnect_delay > 0:
4994+ LOG.info(_("Delaying reconnect for %1.1f seconds...") %
4995+ self.conf.kombu_reconnect_delay)
4996+ time.sleep(self.conf.kombu_reconnect_delay)
4997+
4998+ try:
4999+ self.connection.release()
5000+ except self.connection_errors:
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches