Merge lp:~ubuntu-branches/ubuntu/trusty/oslo.messaging/trusty-updates-201602230032 into lp:ubuntu/trusty-updates/oslo.messaging

Proposed by Ubuntu Package Importer
Status: Needs review
Proposed branch: lp:~ubuntu-branches/ubuntu/trusty/oslo.messaging/trusty-updates-201602230032
Merge into: lp:ubuntu/trusty-updates/oslo.messaging
Diff against target: 6223 lines (+22/-5952) (has conflicts)
20 files modified
.pc/.quilt_patches (+0/-1)
.pc/.quilt_series (+0/-1)
.pc/.version (+0/-1)
.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/amqpdriver.py (+0/-443)
.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/common.py (+0/-509)
.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/impl_rabbit.py (+0/-784)
.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers/common.py (+0/-535)
.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests/test_utils.py (+0/-49)
.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py (+0/-535)
.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py (+0/-793)
.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_rabbit.py (+0/-646)
.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py (+0/-64)
.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers/impl_rabbit.py (+0/-819)
.pc/skip-qpid-tests.patch/tests/test_qpid.py (+0/-615)
oslo/messaging/_drivers/amqpdriver.py (+8/-15)
oslo/messaging/_drivers/common.py (+0/-26)
oslo/messaging/_drivers/impl_rabbit.py (+11/-53)
tests/test_qpid.py (+1/-9)
tests/test_rabbit.py (+2/-21)
tests/test_utils.py (+0/-33)
Conflict: can't delete .pc because it is not empty.  Not deleting.
Conflict because .pc is not versioned, but has versioned children.  Versioned directory.
Contents conflict in .pc/applied-patches
To merge this branch: bzr merge lp:~ubuntu-branches/ubuntu/trusty/oslo.messaging/trusty-updates-201602230032
Reviewer Review Type Date Requested Status
Ubuntu Development Team Pending
Review via email: mp+286852@code.launchpad.net

Description of the change

The package importer has detected a possible inconsistency between the package history in the archive and the history in bzr. As the archive is authoritative the importer has made lp:ubuntu/trusty-updates/oslo.messaging reflect what is in the archive and the old bzr branch has been pushed to lp:~ubuntu-branches/ubuntu/trusty/oslo.messaging/trusty-updates-201602230032. This merge proposal was created so that an Ubuntu developer can review the situations and perform a merge/upload if necessary. There are three typical cases where this can happen.
  1. Where someone pushes a change to bzr and someone else uploads the package without that change. This is the reason that this check is done by the importer. If this appears to be the case then a merge/upload should be done if the changes that were in bzr are still desirable.
  2. The importer incorrectly detected the above situation when someone made a change in bzr and then uploaded it.
  3. The importer incorrectly detected the above situation when someone just uploaded a package and didn't touch bzr.

If this case doesn't appear to be the first situation then set the status of the merge proposal to "Rejected" and help avoid the problem in future by filing a bug at https://bugs.launchpad.net/udd linking to this merge proposal.

(this is an automatically generated message)

To post a comment you must log in.

Unmerged revisions

9. By Chuck Short

* Backport upstream fix (LP: #1318721):
  - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
    Redeclare if exception is catched after self.queue.declare() failed.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== removed file '.pc/.quilt_patches'
2--- .pc/.quilt_patches 2013-08-14 14:11:08 +0000
3+++ .pc/.quilt_patches 1970-01-01 00:00:00 +0000
4@@ -1,1 +0,0 @@
5-debian/patches
6
7=== removed file '.pc/.quilt_series'
8--- .pc/.quilt_series 2013-08-14 14:11:08 +0000
9+++ .pc/.quilt_series 1970-01-01 00:00:00 +0000
10@@ -1,1 +0,0 @@
11-series
12
13=== removed file '.pc/.version'
14--- .pc/.version 2013-08-14 14:11:08 +0000
15+++ .pc/.version 1970-01-01 00:00:00 +0000
16@@ -1,1 +0,0 @@
17-2
18
19=== removed directory '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch'
20=== removed directory '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo'
21=== removed directory '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging'
22=== removed directory '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers'
23=== removed file '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/amqpdriver.py'
24--- .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/amqpdriver.py 2015-04-23 15:56:08 +0000
25+++ .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/amqpdriver.py 1970-01-01 00:00:00 +0000
26@@ -1,443 +0,0 @@
27-
28-# Copyright 2013 Red Hat, Inc.
29-#
30-# Licensed under the Apache License, Version 2.0 (the "License"); you may
31-# not use this file except in compliance with the License. You may obtain
32-# a copy of the License at
33-#
34-# http://www.apache.org/licenses/LICENSE-2.0
35-#
36-# Unless required by applicable law or agreed to in writing, software
37-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
38-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
39-# License for the specific language governing permissions and limitations
40-# under the License.
41-
42-__all__ = ['AMQPDriverBase']
43-
44-import logging
45-import threading
46-import uuid
47-
48-from six import moves
49-
50-from oslo import messaging
51-from oslo.messaging._drivers import amqp as rpc_amqp
52-from oslo.messaging._drivers import base
53-from oslo.messaging._drivers import common as rpc_common
54-
55-LOG = logging.getLogger(__name__)
56-
57-
58-class AMQPIncomingMessage(base.IncomingMessage):
59-
60- def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
61- super(AMQPIncomingMessage, self).__init__(listener, ctxt,
62- dict(message))
63-
64- self.unique_id = unique_id
65- self.msg_id = msg_id
66- self.reply_q = reply_q
67- self.acknowledge_callback = message.acknowledge
68- self.requeue_callback = message.requeue
69-
70- def _send_reply(self, conn, reply=None, failure=None,
71- ending=False, log_failure=True):
72- if failure:
73- failure = rpc_common.serialize_remote_exception(failure,
74- log_failure)
75-
76- msg = {'result': reply, 'failure': failure}
77- if ending:
78- msg['ending'] = True
79-
80- rpc_amqp._add_unique_id(msg)
81-
82- # If a reply_q exists, add the msg_id to the reply and pass the
83- # reply_q to direct_send() to use it as the response queue.
84- # Otherwise use the msg_id for backward compatibility.
85- if self.reply_q:
86- msg['_msg_id'] = self.msg_id
87- conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
88- else:
89- conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
90-
91- def reply(self, reply=None, failure=None, log_failure=True):
92- with self.listener.driver._get_connection() as conn:
93- self._send_reply(conn, reply, failure, log_failure=log_failure)
94- self._send_reply(conn, ending=True)
95-
96- def acknowledge(self):
97- self.listener.msg_id_cache.add(self.unique_id)
98- self.acknowledge_callback()
99-
100- def requeue(self):
101- # NOTE(sileht): In case of the connection is lost between receiving the
102- # message and requeing it, this requeue call fail
103- # but because the message is not acknowledged and not added to the
104- # msg_id_cache, the message will be reconsumed, the only difference is
105- # the message stay at the beginning of the queue instead of moving to
106- # the end.
107- self.requeue_callback()
108-
109-
110-class AMQPListener(base.Listener):
111-
112- def __init__(self, driver, conn):
113- super(AMQPListener, self).__init__(driver)
114- self.conn = conn
115- self.msg_id_cache = rpc_amqp._MsgIdCache()
116- self.incoming = []
117-
118- def __call__(self, message):
119- # FIXME(markmc): logging isn't driver specific
120- rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
121-
122- unique_id = self.msg_id_cache.check_duplicate_message(message)
123- ctxt = rpc_amqp.unpack_context(self.conf, message)
124-
125- self.incoming.append(AMQPIncomingMessage(self,
126- ctxt.to_dict(),
127- message,
128- unique_id,
129- ctxt.msg_id,
130- ctxt.reply_q))
131-
132- def poll(self):
133- while True:
134- if self.incoming:
135- return self.incoming.pop(0)
136- self.conn.consume(limit=1)
137-
138-
139-class ReplyWaiters(object):
140-
141- WAKE_UP = object()
142-
143- def __init__(self):
144- self._queues = {}
145- self._wrn_threshold = 10
146-
147- def get(self, msg_id, timeout):
148- try:
149- return self._queues[msg_id].get(block=True, timeout=timeout)
150- except moves.queue.Empty:
151- raise messaging.MessagingTimeout('Timed out waiting for a reply '
152- 'to message ID %s' % msg_id)
153-
154- def check(self, msg_id):
155- try:
156- return self._queues[msg_id].get(block=False)
157- except moves.queue.Empty:
158- return None
159-
160- def put(self, msg_id, message_data):
161- queue = self._queues.get(msg_id)
162- if not queue:
163- LOG.warn('No calling threads waiting for msg_id : %(msg_id)s'
164- ', message : %(data)s', {'msg_id': msg_id,
165- 'data': message_data})
166- LOG.warn('_queues: %s' % str(self._queues))
167- else:
168- queue.put(message_data)
169-
170- def wake_all(self, except_id):
171- msg_ids = [i for i in self._queues.keys() if i != except_id]
172- for msg_id in msg_ids:
173- self.put(msg_id, self.WAKE_UP)
174-
175- def add(self, msg_id, queue):
176- self._queues[msg_id] = queue
177- if len(self._queues) > self._wrn_threshold:
178- LOG.warn('Number of call queues is greater than warning '
179- 'threshold: %d. There could be a leak.' %
180- self._wrn_threshold)
181- self._wrn_threshold *= 2
182-
183- def remove(self, msg_id):
184- del self._queues[msg_id]
185-
186-
187-class ReplyWaiter(object):
188-
189- def __init__(self, conf, reply_q, conn, allowed_remote_exmods):
190- self.conf = conf
191- self.conn = conn
192- self.reply_q = reply_q
193- self.allowed_remote_exmods = allowed_remote_exmods
194-
195- self.conn_lock = threading.Lock()
196- self.incoming = []
197- self.msg_id_cache = rpc_amqp._MsgIdCache()
198- self.waiters = ReplyWaiters()
199-
200- conn.declare_direct_consumer(reply_q, self)
201-
202- def __call__(self, message):
203- message.acknowledge()
204- self.incoming.append(message)
205-
206- def listen(self, msg_id):
207- queue = moves.queue.Queue()
208- self.waiters.add(msg_id, queue)
209-
210- def unlisten(self, msg_id):
211- self.waiters.remove(msg_id)
212-
213- def _process_reply(self, data):
214- result = None
215- ending = False
216- self.msg_id_cache.check_duplicate_message(data)
217- if data['failure']:
218- failure = data['failure']
219- result = rpc_common.deserialize_remote_exception(
220- failure, self.allowed_remote_exmods)
221- elif data.get('ending', False):
222- ending = True
223- else:
224- result = data['result']
225- return result, ending
226-
227- def _poll_connection(self, msg_id, timeout):
228- while True:
229- while self.incoming:
230- message_data = self.incoming.pop(0)
231-
232- incoming_msg_id = message_data.pop('_msg_id', None)
233- if incoming_msg_id == msg_id:
234- return self._process_reply(message_data)
235-
236- self.waiters.put(incoming_msg_id, message_data)
237-
238- try:
239- self.conn.consume(limit=1, timeout=timeout)
240- except rpc_common.Timeout:
241- raise messaging.MessagingTimeout('Timed out waiting for a '
242- 'reply to message ID %s'
243- % msg_id)
244-
245- def _poll_queue(self, msg_id, timeout):
246- message = self.waiters.get(msg_id, timeout)
247- if message is self.waiters.WAKE_UP:
248- return None, None, True # lock was released
249-
250- reply, ending = self._process_reply(message)
251- return reply, ending, False
252-
253- def _check_queue(self, msg_id):
254- while True:
255- message = self.waiters.check(msg_id)
256- if message is self.waiters.WAKE_UP:
257- continue
258- if message is None:
259- return None, None, True # queue is empty
260-
261- reply, ending = self._process_reply(message)
262- return reply, ending, False
263-
264- def wait(self, msg_id, timeout):
265- #
266- # NOTE(markmc): we're waiting for a reply for msg_id to come in for on
267- # the reply_q, but there may be other threads also waiting for replies
268- # to other msg_ids
269- #
270- # Only one thread can be consuming from the queue using this connection
271- # and we don't want to hold open a connection per thread, so instead we
272- # have the first thread take responsibility for passing replies not
273- # intended for itself to the appropriate thread.
274- #
275- final_reply = None
276- while True:
277- if self.conn_lock.acquire(False):
278- # Ok, we're the thread responsible for polling the connection
279- try:
280- # Check the queue to see if a previous lock-holding thread
281- # queued up a reply already
282- while True:
283- reply, ending, empty = self._check_queue(msg_id)
284- if empty:
285- break
286- if not ending:
287- final_reply = reply
288- else:
289- return final_reply
290-
291- # Now actually poll the connection
292- while True:
293- reply, ending = self._poll_connection(msg_id, timeout)
294- if not ending:
295- final_reply = reply
296- else:
297- return final_reply
298- finally:
299- self.conn_lock.release()
300- # We've got our reply, tell the other threads to wake up
301- # so that one of them will take over the responsibility for
302- # polling the connection
303- self.waiters.wake_all(msg_id)
304- else:
305- # We're going to wait for the first thread to pass us our reply
306- reply, ending, trylock = self._poll_queue(msg_id, timeout)
307- if trylock:
308- # The first thread got its reply, let's try and take over
309- # the responsibility for polling
310- continue
311- if not ending:
312- final_reply = reply
313- else:
314- return final_reply
315-
316-
317-class AMQPDriverBase(base.BaseDriver):
318-
319- def __init__(self, conf, url, connection_pool,
320- default_exchange=None, allowed_remote_exmods=[]):
321- super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
322- allowed_remote_exmods)
323-
324- self._server_params = self._server_params_from_url(self._url)
325-
326- self._default_exchange = default_exchange
327-
328- # FIXME(markmc): temp hack
329- if self._default_exchange:
330- self.conf.set_override('control_exchange', self._default_exchange)
331-
332- self._connection_pool = connection_pool
333-
334- self._reply_q_lock = threading.Lock()
335- self._reply_q = None
336- self._reply_q_conn = None
337- self._waiter = None
338-
339- def _server_params_from_url(self, url):
340- sp = {}
341-
342- if url.virtual_host is not None:
343- sp['virtual_host'] = url.virtual_host
344-
345- if url.hosts:
346- # FIXME(markmc): support multiple hosts
347- host = url.hosts[0]
348-
349- sp['hostname'] = host.hostname
350- if host.port is not None:
351- sp['port'] = host.port
352- sp['username'] = host.username or ''
353- sp['password'] = host.password or ''
354-
355- return sp
356-
357- def _get_connection(self, pooled=True):
358- # FIXME(markmc): we don't yet have a connection pool for each
359- # Transport instance, so we'll only use the pool with the
360- # transport configuration from the config file
361- server_params = self._server_params or None
362- if server_params:
363- pooled = False
364- return rpc_amqp.ConnectionContext(self.conf,
365- self._connection_pool,
366- pooled=pooled,
367- server_params=server_params)
368-
369- def _get_reply_q(self):
370- with self._reply_q_lock:
371- if self._reply_q is not None:
372- return self._reply_q
373-
374- reply_q = 'reply_' + uuid.uuid4().hex
375-
376- conn = self._get_connection(pooled=False)
377-
378- self._waiter = ReplyWaiter(self.conf, reply_q, conn,
379- self._allowed_remote_exmods)
380-
381- self._reply_q = reply_q
382- self._reply_q_conn = conn
383-
384- return self._reply_q
385-
386- def _send(self, target, ctxt, message,
387- wait_for_reply=None, timeout=None,
388- envelope=True, notify=False):
389-
390- # FIXME(markmc): remove this temporary hack
391- class Context(object):
392- def __init__(self, d):
393- self.d = d
394-
395- def to_dict(self):
396- return self.d
397-
398- context = Context(ctxt)
399- msg = message
400-
401- if wait_for_reply:
402- msg_id = uuid.uuid4().hex
403- msg.update({'_msg_id': msg_id})
404- LOG.debug('MSG_ID is %s' % (msg_id))
405- msg.update({'_reply_q': self._get_reply_q()})
406-
407- rpc_amqp._add_unique_id(msg)
408- rpc_amqp.pack_context(msg, context)
409-
410- if envelope:
411- msg = rpc_common.serialize_msg(msg)
412-
413- if wait_for_reply:
414- self._waiter.listen(msg_id)
415-
416- try:
417- with self._get_connection() as conn:
418- if notify:
419- conn.notify_send(target.topic, msg)
420- elif target.fanout:
421- conn.fanout_send(target.topic, msg)
422- else:
423- topic = target.topic
424- if target.server:
425- topic = '%s.%s' % (target.topic, target.server)
426- conn.topic_send(topic, msg, timeout=timeout)
427-
428- if wait_for_reply:
429- result = self._waiter.wait(msg_id, timeout)
430- if isinstance(result, Exception):
431- raise result
432- return result
433- finally:
434- if wait_for_reply:
435- self._waiter.unlisten(msg_id)
436-
437- def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
438- return self._send(target, ctxt, message, wait_for_reply, timeout)
439-
440- def send_notification(self, target, ctxt, message, version):
441- return self._send(target, ctxt, message,
442- envelope=(version == 2.0), notify=True)
443-
444- def listen(self, target):
445- conn = self._get_connection(pooled=False)
446-
447- listener = AMQPListener(self, conn)
448-
449- conn.declare_topic_consumer(target.topic, listener)
450- conn.declare_topic_consumer('%s.%s' % (target.topic, target.server),
451- listener)
452- conn.declare_fanout_consumer(target.topic, listener)
453-
454- return listener
455-
456- def listen_for_notifications(self, targets_and_priorities):
457- conn = self._get_connection(pooled=False)
458-
459- listener = AMQPListener(self, conn)
460- for target, priority in targets_and_priorities:
461- conn.declare_topic_consumer('%s.%s' % (target.topic, priority),
462- callback=listener,
463- exchange_name=target.exchange)
464- return listener
465-
466- def cleanup(self):
467- if self._connection_pool:
468- self._connection_pool.empty()
469- self._connection_pool = None
470
471=== removed file '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/common.py'
472--- .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/common.py 2015-04-23 15:56:08 +0000
473+++ .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
474@@ -1,509 +0,0 @@
475-# Copyright 2010 United States Government as represented by the
476-# Administrator of the National Aeronautics and Space Administration.
477-# All Rights Reserved.
478-# Copyright 2011 Red Hat, Inc.
479-#
480-# Licensed under the Apache License, Version 2.0 (the "License"); you may
481-# not use this file except in compliance with the License. You may obtain
482-# a copy of the License at
483-#
484-# http://www.apache.org/licenses/LICENSE-2.0
485-#
486-# Unless required by applicable law or agreed to in writing, software
487-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
488-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
489-# License for the specific language governing permissions and limitations
490-# under the License.
491-
492-import copy
493-import logging
494-import sys
495-import traceback
496-
497-from oslo.config import cfg
498-from oslo import messaging
499-import six
500-
501-from oslo.messaging import _utils as utils
502-from oslo.messaging.openstack.common import importutils
503-from oslo.messaging.openstack.common import jsonutils
504-
505-# FIXME(markmc): remove this
506-_ = lambda s: s
507-
508-LOG = logging.getLogger(__name__)
509-
510-_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
511-
512-
513-'''RPC Envelope Version.
514-
515-This version number applies to the top level structure of messages sent out.
516-It does *not* apply to the message payload, which must be versioned
517-independently. For example, when using rpc APIs, a version number is applied
518-for changes to the API being exposed over rpc. This version number is handled
519-in the rpc proxy and dispatcher modules.
520-
521-This version number applies to the message envelope that is used in the
522-serialization done inside the rpc layer. See serialize_msg() and
523-deserialize_msg().
524-
525-The current message format (version 2.0) is very simple. It is:
526-
527- {
528- 'oslo.version': <RPC Envelope Version as a String>,
529- 'oslo.message': <Application Message Payload, JSON encoded>
530- }
531-
532-Message format version '1.0' is just considered to be the messages we sent
533-without a message envelope.
534-
535-So, the current message envelope just includes the envelope version. It may
536-eventually contain additional information, such as a signature for the message
537-payload.
538-
539-We will JSON encode the application message payload. The message envelope,
540-which includes the JSON encoded application message body, will be passed down
541-to the messaging libraries as a dict.
542-'''
543-_RPC_ENVELOPE_VERSION = '2.0'
544-
545-_VERSION_KEY = 'oslo.version'
546-_MESSAGE_KEY = 'oslo.message'
547-
548-_REMOTE_POSTFIX = '_Remote'
549-
550-_exception_opts = [
551- cfg.ListOpt('allowed_rpc_exception_modules',
552- default=['oslo.messaging.exceptions',
553- 'nova.exception',
554- 'cinder.exception',
555- _EXCEPTIONS_MODULE,
556- ],
557- help='Modules of exceptions that are permitted to be '
558- 'recreated upon receiving exception data from an rpc '
559- 'call.'),
560-]
561-
562-
563-class RPCException(Exception):
564- msg_fmt = _("An unknown RPC related exception occurred.")
565-
566- def __init__(self, message=None, **kwargs):
567- self.kwargs = kwargs
568-
569- if not message:
570- try:
571- message = self.msg_fmt % kwargs
572-
573- except Exception:
574- # kwargs doesn't match a variable in the message
575- # log the issue and the kwargs
576- LOG.exception(_('Exception in string format operation'))
577- for name, value in six.iteritems(kwargs):
578- LOG.error("%s: %s" % (name, value))
579- # at least get the core message out if something happened
580- message = self.msg_fmt
581-
582- super(RPCException, self).__init__(message)
583-
584-
585-class RemoteError(RPCException):
586- """Signifies that a remote class has raised an exception.
587-
588- Contains a string representation of the type of the original exception,
589- the value of the original exception, and the traceback. These are
590- sent to the parent as a joined string so printing the exception
591- contains all of the relevant info.
592-
593- """
594- msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
595-
596- def __init__(self, exc_type=None, value=None, traceback=None):
597- self.exc_type = exc_type
598- self.value = value
599- self.traceback = traceback
600- super(RemoteError, self).__init__(exc_type=exc_type,
601- value=value,
602- traceback=traceback)
603-
604-
605-class Timeout(RPCException):
606- """Signifies that a timeout has occurred.
607-
608- This exception is raised if the rpc_response_timeout is reached while
609- waiting for a response from the remote side.
610- """
611- msg_fmt = _('Timeout while waiting on RPC response - '
612- 'topic: "%(topic)s", RPC method: "%(method)s" '
613- 'info: "%(info)s"')
614-
615- def __init__(self, info=None, topic=None, method=None):
616- """Initiates Timeout object.
617-
618- :param info: Extra info to convey to the user
619- :param topic: The topic that the rpc call was sent to
620- :param rpc_method_name: The name of the rpc method being
621- called
622- """
623- self.info = info
624- self.topic = topic
625- self.method = method
626- super(Timeout, self).__init__(
627- None,
628- info=info or _('<unknown>'),
629- topic=topic or _('<unknown>'),
630- method=method or _('<unknown>'))
631-
632-
633-class DuplicateMessageError(RPCException):
634- msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
635-
636-
637-class InvalidRPCConnectionReuse(RPCException):
638- msg_fmt = _("Invalid reuse of an RPC connection.")
639-
640-
641-class UnsupportedRpcVersion(RPCException):
642- msg_fmt = _("Specified RPC version, %(version)s, not supported by "
643- "this endpoint.")
644-
645-
646-class UnsupportedRpcEnvelopeVersion(RPCException):
647- msg_fmt = _("Specified RPC envelope version, %(version)s, "
648- "not supported by this endpoint.")
649-
650-
651-class RpcVersionCapError(RPCException):
652- msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
653-
654-
655-class Connection(object):
656- """A connection, returned by rpc.create_connection().
657-
658- This class represents a connection to the message bus used for rpc.
659- An instance of this class should never be created by users of the rpc API.
660- Use rpc.create_connection() instead.
661- """
662- def close(self):
663- """Close the connection.
664-
665- This method must be called when the connection will no longer be used.
666- It will ensure that any resources associated with the connection, such
667- as a network connection, and cleaned up.
668- """
669- raise NotImplementedError()
670-
671- def create_consumer(self, topic, proxy, fanout=False):
672- """Create a consumer on this connection.
673-
674- A consumer is associated with a message queue on the backend message
675- bus. The consumer will read messages from the queue, unpack them, and
676- dispatch them to the proxy object. The contents of the message pulled
677- off of the queue will determine which method gets called on the proxy
678- object.
679-
680- :param topic: This is a name associated with what to consume from.
681- Multiple instances of a service may consume from the same
682- topic. For example, all instances of nova-compute consume
683- from a queue called "compute". In that case, the
684- messages will get distributed amongst the consumers in a
685- round-robin fashion if fanout=False. If fanout=True,
686- every consumer associated with this topic will get a
687- copy of every message.
688- :param proxy: The object that will handle all incoming messages.
689- :param fanout: Whether or not this is a fanout topic. See the
690- documentation for the topic parameter for some
691- additional comments on this.
692- """
693- raise NotImplementedError()
694-
695- def create_worker(self, topic, proxy, pool_name):
696- """Create a worker on this connection.
697-
698- A worker is like a regular consumer of messages directed to a
699- topic, except that it is part of a set of such consumers (the
700- "pool") which may run in parallel. Every pool of workers will
701- receive a given message, but only one worker in the pool will
702- be asked to process it. Load is distributed across the members
703- of the pool in round-robin fashion.
704-
705- :param topic: This is a name associated with what to consume from.
706- Multiple instances of a service may consume from the same
707- topic.
708- :param proxy: The object that will handle all incoming messages.
709- :param pool_name: String containing the name of the pool of workers
710- """
711- raise NotImplementedError()
712-
713- def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
714- """Register as a member of a group of consumers.
715-
716- Uses given topic from the specified exchange.
717- Exactly one member of a given pool will receive each message.
718-
719- A message will be delivered to multiple pools, if more than
720- one is created.
721-
722- :param callback: Callable to be invoked for each message.
723- :type callback: callable accepting one argument
724- :param pool_name: The name of the consumer pool.
725- :type pool_name: str
726- :param topic: The routing topic for desired messages.
727- :type topic: str
728- :param exchange_name: The name of the message exchange where
729- the client should attach. Defaults to
730- the configured exchange.
731- :type exchange_name: str
732- """
733- raise NotImplementedError()
734-
735- def consume_in_thread(self):
736- """Spawn a thread to handle incoming messages.
737-
738- Spawn a thread that will be responsible for handling all incoming
739- messages for consumers that were set up on this connection.
740-
741- Message dispatching inside of this is expected to be implemented in a
742- non-blocking manner. An example implementation would be having this
743- thread pull messages in for all of the consumers, but utilize a thread
744- pool for dispatching the messages to the proxy objects.
745- """
746- raise NotImplementedError()
747-
748-
749-def _safe_log(log_func, msg, msg_data):
750- """Sanitizes the msg_data field before logging."""
751- SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
752-
753- def _fix_passwords(d):
754- """Sanitizes the password fields in the dictionary."""
755- for k in six.iterkeys(d):
756- if k.lower().find('password') != -1:
757- d[k] = '<SANITIZED>'
758- elif k.lower() in SANITIZE:
759- d[k] = '<SANITIZED>'
760- elif isinstance(d[k], dict):
761- _fix_passwords(d[k])
762- return d
763-
764- return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
765-
766-
767-def serialize_remote_exception(failure_info, log_failure=True):
768- """Prepares exception data to be sent over rpc.
769-
770- Failure_info should be a sys.exc_info() tuple.
771-
772- """
773- tb = traceback.format_exception(*failure_info)
774- failure = failure_info[1]
775- if log_failure:
776- LOG.error(_("Returning exception %s to caller"),
777- six.text_type(failure))
778- LOG.error(tb)
779-
780- kwargs = {}
781- if hasattr(failure, 'kwargs'):
782- kwargs = failure.kwargs
783-
784- # NOTE(matiu): With cells, it's possible to re-raise remote, remote
785- # exceptions. Lets turn it back into the original exception type.
786- cls_name = str(failure.__class__.__name__)
787- mod_name = str(failure.__class__.__module__)
788- if (cls_name.endswith(_REMOTE_POSTFIX) and
789- mod_name.endswith(_REMOTE_POSTFIX)):
790- cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
791- mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
792-
793- data = {
794- 'class': cls_name,
795- 'module': mod_name,
796- 'message': six.text_type(failure),
797- 'tb': tb,
798- 'args': failure.args,
799- 'kwargs': kwargs
800- }
801-
802- json_data = jsonutils.dumps(data)
803-
804- return json_data
805-
806-
807-def deserialize_remote_exception(data, allowed_remote_exmods):
808- failure = jsonutils.loads(str(data))
809-
810- trace = failure.get('tb', [])
811- message = failure.get('message', "") + "\n" + "\n".join(trace)
812- name = failure.get('class')
813- module = failure.get('module')
814-
815- # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
816- # order to prevent arbitrary code execution.
817- if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
818- return messaging.RemoteError(name, failure.get('message'), trace)
819-
820- try:
821- mod = importutils.import_module(module)
822- klass = getattr(mod, name)
823- if not issubclass(klass, Exception):
824- raise TypeError("Can only deserialize Exceptions")
825-
826- failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
827- except (AttributeError, TypeError, ImportError):
828- return messaging.RemoteError(name, failure.get('message'), trace)
829-
830- ex_type = type(failure)
831- str_override = lambda self: message
832- new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
833- {'__str__': str_override, '__unicode__': str_override})
834- new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
835- try:
836- # NOTE(ameade): Dynamically create a new exception type and swap it in
837- # as the new type for the exception. This only works on user defined
838- # Exceptions and not core Python exceptions. This is important because
839- # we cannot necessarily change an exception message so we must override
840- # the __str__ method.
841- failure.__class__ = new_ex_type
842- except TypeError:
843- # NOTE(ameade): If a core exception then just add the traceback to the
844- # first exception argument.
845- failure.args = (message,) + failure.args[1:]
846- return failure
847-
848-
849-class CommonRpcContext(object):
850- def __init__(self, **kwargs):
851- self.values = kwargs
852-
853- def __getattr__(self, key):
854- try:
855- return self.values[key]
856- except KeyError:
857- raise AttributeError(key)
858-
859- def to_dict(self):
860- return copy.deepcopy(self.values)
861-
862- @classmethod
863- def from_dict(cls, values):
864- return cls(**values)
865-
866- def deepcopy(self):
867- return self.from_dict(self.to_dict())
868-
869- def update_store(self):
870- #local.store.context = self
871- pass
872-
873- def elevated(self, read_deleted=None, overwrite=False):
874- """Return a version of this context with admin flag set."""
875- # TODO(russellb) This method is a bit of a nova-ism. It makes
876- # some assumptions about the data in the request context sent
877- # across rpc, while the rest of this class does not. We could get
878- # rid of this if we changed the nova code that uses this to
879- # convert the RpcContext back to its native RequestContext doing
880- # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
881-
882- context = self.deepcopy()
883- context.values['is_admin'] = True
884-
885- context.values.setdefault('roles', [])
886-
887- if 'admin' not in context.values['roles']:
888- context.values['roles'].append('admin')
889-
890- if read_deleted is not None:
891- context.values['read_deleted'] = read_deleted
892-
893- return context
894-
895-
896-class ClientException(Exception):
897- """Encapsulates actual exception expected to be hit by a RPC proxy object.
898-
899- Merely instantiating it records the current exception information, which
900- will be passed back to the RPC client without exceptional logging.
901- """
902- def __init__(self):
903- self._exc_info = sys.exc_info()
904-
905-
906-def catch_client_exception(exceptions, func, *args, **kwargs):
907- try:
908- return func(*args, **kwargs)
909- except Exception as e:
910- if type(e) in exceptions:
911- raise ClientException()
912- else:
913- raise
914-
915-
916-def client_exceptions(*exceptions):
917- """Decorator for manager methods that raise expected exceptions.
918-
919- Marking a Manager method with this decorator allows the declaration
920- of expected exceptions that the RPC layer should not consider fatal,
921- and not log as if they were generated in a real error scenario. Note
922- that this will cause listed exceptions to be wrapped in a
923- ClientException, which is used internally by the RPC layer.
924- """
925- def outer(func):
926- def inner(*args, **kwargs):
927- return catch_client_exception(exceptions, func, *args, **kwargs)
928- return inner
929- return outer
930-
931-
932-def serialize_msg(raw_msg):
933- # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
934- # information about this format.
935- msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
936- _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
937-
938- return msg
939-
940-
941-def deserialize_msg(msg):
942- # NOTE(russellb): Hang on to your hats, this road is about to
943- # get a little bumpy.
944- #
945- # Robustness Principle:
946- # "Be strict in what you send, liberal in what you accept."
947- #
948- # At this point we have to do a bit of guessing about what it
949- # is we just received. Here is the set of possibilities:
950- #
951- # 1) We received a dict. This could be 2 things:
952- #
953- # a) Inspect it to see if it looks like a standard message envelope.
954- # If so, great!
955- #
956- # b) If it doesn't look like a standard message envelope, it could either
957- # be a notification, or a message from before we added a message
958- # envelope (referred to as version 1.0).
959- # Just return the message as-is.
960- #
961- # 2) It's any other non-dict type. Just return it and hope for the best.
962- # This case covers return values from rpc.call() from before message
963- # envelopes were used. (messages to call a method were always a dict)
964-
965- if not isinstance(msg, dict):
966- # See #2 above.
967- return msg
968-
969- base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
970- if not all(map(lambda key: key in msg, base_envelope_keys)):
971- # See #1.b above.
972- return msg
973-
974- # At this point we think we have the message envelope
975- # format we were expecting. (#1.a above)
976-
977- if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
978- msg[_VERSION_KEY]):
979- raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
980-
981- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
982-
983- return raw_msg
984
985=== removed file '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/impl_rabbit.py'
986--- .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-04-23 15:56:08 +0000
987+++ .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
988@@ -1,784 +0,0 @@
989-# Copyright 2011 OpenStack Foundation
990-#
991-# Licensed under the Apache License, Version 2.0 (the "License"); you may
992-# not use this file except in compliance with the License. You may obtain
993-# a copy of the License at
994-#
995-# http://www.apache.org/licenses/LICENSE-2.0
996-#
997-# Unless required by applicable law or agreed to in writing, software
998-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
999-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1000-# License for the specific language governing permissions and limitations
1001-# under the License.
1002-
1003-import functools
1004-import itertools
1005-import logging
1006-import socket
1007-import ssl
1008-import time
1009-import uuid
1010-
1011-import kombu
1012-import kombu.connection
1013-import kombu.entity
1014-import kombu.messaging
1015-from oslo.config import cfg
1016-import six
1017-
1018-from oslo.messaging._drivers import amqp as rpc_amqp
1019-from oslo.messaging._drivers import amqpdriver
1020-from oslo.messaging._drivers import common as rpc_common
1021-from oslo.messaging.openstack.common import network_utils
1022-
1023-# FIXME(markmc): remove this
1024-_ = lambda s: s
1025-
1026-rabbit_opts = [
1027- cfg.StrOpt('kombu_ssl_version',
1028- default='',
1029- help='SSL version to use (valid only if SSL enabled). '
1030- 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
1031- 'be available on some distributions.'
1032- ),
1033- cfg.StrOpt('kombu_ssl_keyfile',
1034- default='',
1035- help='SSL key file (valid only if SSL enabled).'),
1036- cfg.StrOpt('kombu_ssl_certfile',
1037- default='',
1038- help='SSL cert file (valid only if SSL enabled).'),
1039- cfg.StrOpt('kombu_ssl_ca_certs',
1040- default='',
1041- help=('SSL certification authority file '
1042- '(valid only if SSL enabled).')),
1043- cfg.FloatOpt('kombu_reconnect_delay',
1044- default=1.0,
1045- help='How long to wait before reconnecting in response to an '
1046- 'AMQP consumer cancel notification.'),
1047- cfg.StrOpt('rabbit_host',
1048- default='localhost',
1049- help='The RabbitMQ broker address where a single node is '
1050- 'used.'),
1051- cfg.IntOpt('rabbit_port',
1052- default=5672,
1053- help='The RabbitMQ broker port where a single node is used.'),
1054- cfg.ListOpt('rabbit_hosts',
1055- default=['$rabbit_host:$rabbit_port'],
1056- help='RabbitMQ HA cluster host:port pairs.'),
1057- cfg.BoolOpt('rabbit_use_ssl',
1058- default=False,
1059- help='Connect over SSL for RabbitMQ.'),
1060- cfg.StrOpt('rabbit_userid',
1061- default='guest',
1062- help='The RabbitMQ userid.'),
1063- cfg.StrOpt('rabbit_password',
1064- default='guest',
1065- help='The RabbitMQ password.',
1066- secret=True),
1067- cfg.StrOpt('rabbit_login_method',
1068- default='AMQPLAIN',
1069- help='the RabbitMQ login method'),
1070- cfg.StrOpt('rabbit_virtual_host',
1071- default='/',
1072- help='The RabbitMQ virtual host.'),
1073- cfg.IntOpt('rabbit_retry_interval',
1074- default=1,
1075- help='How frequently to retry connecting with RabbitMQ.'),
1076- cfg.IntOpt('rabbit_retry_backoff',
1077- default=2,
1078- help='How long to backoff for between retries when connecting '
1079- 'to RabbitMQ.'),
1080- cfg.IntOpt('rabbit_max_retries',
1081- default=0,
1082- help='Maximum number of RabbitMQ connection retries. '
1083- 'Default is 0 (infinite retry count).'),
1084- cfg.BoolOpt('rabbit_ha_queues',
1085- default=False,
1086- help='Use HA queues in RabbitMQ (x-ha-policy: all). '
1087- 'If you change this option, you must wipe the '
1088- 'RabbitMQ database.'),
1089-
1090- # FIXME(markmc): this was toplevel in openstack.common.rpc
1091- cfg.BoolOpt('fake_rabbit',
1092- default=False,
1093- help='If passed, use a fake RabbitMQ provider.'),
1094-]
1095-
1096-LOG = logging.getLogger(__name__)
1097-
1098-
1099-def _get_queue_arguments(conf):
1100- """Construct the arguments for declaring a queue.
1101-
1102- If the rabbit_ha_queues option is set, we declare a mirrored queue
1103- as described here:
1104-
1105- http://www.rabbitmq.com/ha.html
1106-
1107- Setting x-ha-policy to all means that the queue will be mirrored
1108- to all nodes in the cluster.
1109- """
1110- return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
1111-
1112-
1113-class RabbitMessage(dict):
1114- def __init__(self, raw_message):
1115- super(RabbitMessage, self).__init__(
1116- rpc_common.deserialize_msg(raw_message.payload))
1117- self._raw_message = raw_message
1118-
1119- def acknowledge(self):
1120- self._raw_message.ack()
1121-
1122- def requeue(self):
1123- self._raw_message.requeue()
1124-
1125-
1126-class ConsumerBase(object):
1127- """Consumer base class."""
1128-
1129- def __init__(self, channel, callback, tag, **kwargs):
1130- """Declare a queue on an amqp channel.
1131-
1132- 'channel' is the amqp channel to use
1133- 'callback' is the callback to call when messages are received
1134- 'tag' is a unique ID for the consumer on the channel
1135-
1136- queue name, exchange name, and other kombu options are
1137- passed in here as a dictionary.
1138- """
1139- self.callback = callback
1140- self.tag = str(tag)
1141- self.kwargs = kwargs
1142- self.queue = None
1143- self.reconnect(channel)
1144-
1145- def reconnect(self, channel):
1146- """Re-declare the queue after a rabbit reconnect."""
1147- self.channel = channel
1148- self.kwargs['channel'] = channel
1149- self.queue = kombu.entity.Queue(**self.kwargs)
1150- self.queue.declare()
1151-
1152- def _callback_handler(self, message, callback):
1153- """Call callback with deserialized message.
1154-
1155- Messages that are processed and ack'ed.
1156- """
1157-
1158- try:
1159- callback(RabbitMessage(message))
1160- except Exception:
1161- LOG.exception(_("Failed to process message"
1162- " ... skipping it."))
1163- message.ack()
1164-
1165- def consume(self, *args, **kwargs):
1166- """Actually declare the consumer on the amqp channel. This will
1167- start the flow of messages from the queue. Using the
1168- Connection.iterconsume() iterator will process the messages,
1169- calling the appropriate callback.
1170-
1171- If a callback is specified in kwargs, use that. Otherwise,
1172- use the callback passed during __init__()
1173-
1174- If kwargs['nowait'] is True, then this call will block until
1175- a message is read.
1176-
1177- """
1178-
1179- options = {'consumer_tag': self.tag}
1180- options['nowait'] = kwargs.get('nowait', False)
1181- callback = kwargs.get('callback', self.callback)
1182- if not callback:
1183- raise ValueError("No callback defined")
1184-
1185- def _callback(raw_message):
1186- message = self.channel.message_to_python(raw_message)
1187- self._callback_handler(message, callback)
1188-
1189- self.queue.consume(*args, callback=_callback, **options)
1190-
1191- def cancel(self):
1192- """Cancel the consuming from the queue, if it has started."""
1193- try:
1194- self.queue.cancel(self.tag)
1195- except KeyError as e:
1196- # NOTE(comstud): Kludge to get around a amqplib bug
1197- if str(e) != "u'%s'" % self.tag:
1198- raise
1199- self.queue = None
1200-
1201-
1202-class DirectConsumer(ConsumerBase):
1203- """Queue/consumer class for 'direct'."""
1204-
1205- def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
1206- """Init a 'direct' queue.
1207-
1208- 'channel' is the amqp channel to use
1209- 'msg_id' is the msg_id to listen on
1210- 'callback' is the callback to call when messages are received
1211- 'tag' is a unique ID for the consumer on the channel
1212-
1213- Other kombu options may be passed
1214- """
1215- # Default options
1216- options = {'durable': False,
1217- 'queue_arguments': _get_queue_arguments(conf),
1218- 'auto_delete': True,
1219- 'exclusive': False}
1220- options.update(kwargs)
1221- exchange = kombu.entity.Exchange(name=msg_id,
1222- type='direct',
1223- durable=options['durable'],
1224- auto_delete=options['auto_delete'])
1225- super(DirectConsumer, self).__init__(channel,
1226- callback,
1227- tag,
1228- name=msg_id,
1229- exchange=exchange,
1230- routing_key=msg_id,
1231- **options)
1232-
1233-
1234-class TopicConsumer(ConsumerBase):
1235- """Consumer class for 'topic'."""
1236-
1237- def __init__(self, conf, channel, topic, callback, tag, name=None,
1238- exchange_name=None, **kwargs):
1239- """Init a 'topic' queue.
1240-
1241- :param channel: the amqp channel to use
1242- :param topic: the topic to listen on
1243- :paramtype topic: str
1244- :param callback: the callback to call when messages are received
1245- :param tag: a unique ID for the consumer on the channel
1246- :param name: optional queue name, defaults to topic
1247- :paramtype name: str
1248-
1249- Other kombu options may be passed as keyword arguments
1250- """
1251- # Default options
1252- options = {'durable': conf.amqp_durable_queues,
1253- 'queue_arguments': _get_queue_arguments(conf),
1254- 'auto_delete': conf.amqp_auto_delete,
1255- 'exclusive': False}
1256- options.update(kwargs)
1257- exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
1258- exchange = kombu.entity.Exchange(name=exchange_name,
1259- type='topic',
1260- durable=options['durable'],
1261- auto_delete=options['auto_delete'])
1262- super(TopicConsumer, self).__init__(channel,
1263- callback,
1264- tag,
1265- name=name or topic,
1266- exchange=exchange,
1267- routing_key=topic,
1268- **options)
1269-
1270-
1271-class FanoutConsumer(ConsumerBase):
1272- """Consumer class for 'fanout'."""
1273-
1274- def __init__(self, conf, channel, topic, callback, tag, **kwargs):
1275- """Init a 'fanout' queue.
1276-
1277- 'channel' is the amqp channel to use
1278- 'topic' is the topic to listen on
1279- 'callback' is the callback to call when messages are received
1280- 'tag' is a unique ID for the consumer on the channel
1281-
1282- Other kombu options may be passed
1283- """
1284- unique = uuid.uuid4().hex
1285- exchange_name = '%s_fanout' % topic
1286- queue_name = '%s_fanout_%s' % (topic, unique)
1287-
1288- # Default options
1289- options = {'durable': False,
1290- 'queue_arguments': _get_queue_arguments(conf),
1291- 'auto_delete': True,
1292- 'exclusive': False}
1293- options.update(kwargs)
1294- exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
1295- durable=options['durable'],
1296- auto_delete=options['auto_delete'])
1297- super(FanoutConsumer, self).__init__(channel, callback, tag,
1298- name=queue_name,
1299- exchange=exchange,
1300- routing_key=topic,
1301- **options)
1302-
1303-
1304-class Publisher(object):
1305- """Base Publisher class."""
1306-
1307- def __init__(self, channel, exchange_name, routing_key, **kwargs):
1308- """Init the Publisher class with the exchange_name, routing_key,
1309- and other options
1310- """
1311- self.exchange_name = exchange_name
1312- self.routing_key = routing_key
1313- self.kwargs = kwargs
1314- self.reconnect(channel)
1315-
1316- def reconnect(self, channel):
1317- """Re-establish the Producer after a rabbit reconnection."""
1318- self.exchange = kombu.entity.Exchange(name=self.exchange_name,
1319- **self.kwargs)
1320- self.producer = kombu.messaging.Producer(exchange=self.exchange,
1321- channel=channel,
1322- routing_key=self.routing_key)
1323-
1324- def send(self, msg, timeout=None):
1325- """Send a message."""
1326- if timeout:
1327- #
1328- # AMQP TTL is in milliseconds when set in the header.
1329- #
1330- self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
1331- else:
1332- self.producer.publish(msg)
1333-
1334-
1335-class DirectPublisher(Publisher):
1336- """Publisher class for 'direct'."""
1337- def __init__(self, conf, channel, msg_id, **kwargs):
1338- """Init a 'direct' publisher.
1339-
1340- Kombu options may be passed as keyword args to override defaults
1341- """
1342-
1343- options = {'durable': False,
1344- 'auto_delete': True,
1345- 'exclusive': False}
1346- options.update(kwargs)
1347- super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
1348- type='direct', **options)
1349-
1350-
1351-class TopicPublisher(Publisher):
1352- """Publisher class for 'topic'."""
1353- def __init__(self, conf, channel, topic, **kwargs):
1354- """Init a 'topic' publisher.
1355-
1356- Kombu options may be passed as keyword args to override defaults
1357- """
1358- options = {'durable': conf.amqp_durable_queues,
1359- 'auto_delete': conf.amqp_auto_delete,
1360- 'exclusive': False}
1361- options.update(kwargs)
1362- exchange_name = rpc_amqp.get_control_exchange(conf)
1363- super(TopicPublisher, self).__init__(channel,
1364- exchange_name,
1365- topic,
1366- type='topic',
1367- **options)
1368-
1369-
1370-class FanoutPublisher(Publisher):
1371- """Publisher class for 'fanout'."""
1372- def __init__(self, conf, channel, topic, **kwargs):
1373- """Init a 'fanout' publisher.
1374-
1375- Kombu options may be passed as keyword args to override defaults
1376- """
1377- options = {'durable': False,
1378- 'auto_delete': True,
1379- 'exclusive': False}
1380- options.update(kwargs)
1381- super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
1382- None, type='fanout', **options)
1383-
1384-
1385-class NotifyPublisher(TopicPublisher):
1386- """Publisher class for 'notify'."""
1387-
1388- def __init__(self, conf, channel, topic, **kwargs):
1389- self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
1390- self.queue_arguments = _get_queue_arguments(conf)
1391- super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
1392-
1393- def reconnect(self, channel):
1394- super(NotifyPublisher, self).reconnect(channel)
1395-
1396- # NOTE(jerdfelt): Normally the consumer would create the queue, but
1397- # we do this to ensure that messages don't get dropped if the
1398- # consumer is started after we do
1399- queue = kombu.entity.Queue(channel=channel,
1400- exchange=self.exchange,
1401- durable=self.durable,
1402- name=self.routing_key,
1403- routing_key=self.routing_key,
1404- queue_arguments=self.queue_arguments)
1405- queue.declare()
1406-
1407-
1408-class Connection(object):
1409- """Connection object."""
1410-
1411- pool = None
1412-
1413- def __init__(self, conf, server_params=None):
1414- self.consumers = []
1415- self.conf = conf
1416- self.max_retries = self.conf.rabbit_max_retries
1417- # Try forever?
1418- if self.max_retries <= 0:
1419- self.max_retries = None
1420- self.interval_start = self.conf.rabbit_retry_interval
1421- self.interval_stepping = self.conf.rabbit_retry_backoff
1422- # max retry-interval = 30 seconds
1423- self.interval_max = 30
1424- self.memory_transport = False
1425-
1426- if server_params is None:
1427- server_params = {}
1428- # Keys to translate from server_params to kombu params
1429- server_params_to_kombu_params = {'username': 'userid'}
1430-
1431- ssl_params = self._fetch_ssl_params()
1432- params_list = []
1433- for adr in self.conf.rabbit_hosts:
1434- hostname, port = network_utils.parse_host_port(
1435- adr, default_port=self.conf.rabbit_port)
1436-
1437- params = {
1438- 'hostname': hostname,
1439- 'port': port,
1440- 'userid': self.conf.rabbit_userid,
1441- 'password': self.conf.rabbit_password,
1442- 'login_method': self.conf.rabbit_login_method,
1443- 'virtual_host': self.conf.rabbit_virtual_host,
1444- }
1445-
1446- for sp_key, value in six.iteritems(server_params):
1447- p_key = server_params_to_kombu_params.get(sp_key, sp_key)
1448- params[p_key] = value
1449-
1450- if self.conf.fake_rabbit:
1451- params['transport'] = 'memory'
1452- if self.conf.rabbit_use_ssl:
1453- params['ssl'] = ssl_params
1454-
1455- params_list.append(params)
1456-
1457- self.params_list = itertools.cycle(params_list)
1458-
1459- self.memory_transport = self.conf.fake_rabbit
1460-
1461- self.connection = None
1462- self.do_consume = None
1463- self.reconnect()
1464-
1465- # FIXME(markmc): use oslo sslutils when it is available as a library
1466- _SSL_PROTOCOLS = {
1467- "tlsv1": ssl.PROTOCOL_TLSv1,
1468- "sslv23": ssl.PROTOCOL_SSLv23,
1469- "sslv3": ssl.PROTOCOL_SSLv3
1470- }
1471-
1472- try:
1473- _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
1474- except AttributeError:
1475- pass
1476-
1477- @classmethod
1478- def validate_ssl_version(cls, version):
1479- key = version.lower()
1480- try:
1481- return cls._SSL_PROTOCOLS[key]
1482- except KeyError:
1483- raise RuntimeError(_("Invalid SSL version : %s") % version)
1484-
1485- def _fetch_ssl_params(self):
1486- """Handles fetching what ssl params should be used for the connection
1487- (if any).
1488- """
1489- ssl_params = dict()
1490-
1491- # http://docs.python.org/library/ssl.html - ssl.wrap_socket
1492- if self.conf.kombu_ssl_version:
1493- ssl_params['ssl_version'] = self.validate_ssl_version(
1494- self.conf.kombu_ssl_version)
1495- if self.conf.kombu_ssl_keyfile:
1496- ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
1497- if self.conf.kombu_ssl_certfile:
1498- ssl_params['certfile'] = self.conf.kombu_ssl_certfile
1499- if self.conf.kombu_ssl_ca_certs:
1500- ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
1501- # We might want to allow variations in the
1502- # future with this?
1503- ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
1504-
1505- # Return the extended behavior or just have the default behavior
1506- return ssl_params or True
1507-
1508- def _connect(self, params):
1509- """Connect to rabbit. Re-establish any queues that may have
1510- been declared before if we are reconnecting. Exceptions should
1511- be handled by the caller.
1512- """
1513- if self.connection:
1514- LOG.info(_("Reconnecting to AMQP server on "
1515- "%(hostname)s:%(port)d") % params)
1516- try:
1517- # XXX(nic): when reconnecting to a RabbitMQ cluster
1518- # with mirrored queues in use, the attempt to release the
1519- # connection can hang "indefinitely" somewhere deep down
1520- # in Kombu. Blocking the thread for a bit prior to
1521- # release seems to kludge around the problem where it is
1522- # otherwise reproduceable.
1523- if self.conf.kombu_reconnect_delay > 0:
1524- LOG.info(_("Delaying reconnect for %1.1f seconds...") %
1525- self.conf.kombu_reconnect_delay)
1526- time.sleep(self.conf.kombu_reconnect_delay)
1527-
1528- self.connection.release()
1529- except self.connection_errors:
1530- pass
1531- # Setting this in case the next statement fails, though
1532- # it shouldn't be doing any network operations, yet.
1533- self.connection = None
1534- self.connection = kombu.connection.BrokerConnection(**params)
1535- self.connection_errors = self.connection.connection_errors
1536- self.channel_errors = self.connection.channel_errors
1537- if self.memory_transport:
1538- # Kludge to speed up tests.
1539- self.connection.transport.polling_interval = 0.0
1540- self.do_consume = True
1541- self.consumer_num = itertools.count(1)
1542- self.connection.connect()
1543- self.channel = self.connection.channel()
1544- # work around 'memory' transport bug in 1.1.3
1545- if self.memory_transport:
1546- self.channel._new_queue('ae.undeliver')
1547- for consumer in self.consumers:
1548- consumer.reconnect(self.channel)
1549- LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
1550- params)
1551-
1552- def reconnect(self):
1553- """Handles reconnecting and re-establishing queues.
1554- Will retry up to self.max_retries number of times.
1555- self.max_retries = 0 means to retry forever.
1556- Sleep between tries, starting at self.interval_start
1557- seconds, backing off self.interval_stepping number of seconds
1558- each attempt.
1559- """
1560-
1561- attempt = 0
1562- while True:
1563- params = six.next(self.params_list)
1564- attempt += 1
1565- try:
1566- self._connect(params)
1567- return
1568- except IOError as e:
1569- pass
1570- except self.connection_errors as e:
1571- pass
1572- except Exception as e:
1573- # NOTE(comstud): Unfortunately it's possible for amqplib
1574- # to return an error not covered by its transport
1575- # connection_errors in the case of a timeout waiting for
1576- # a protocol response. (See paste link in LP888621)
1577- # So, we check all exceptions for 'timeout' in them
1578- # and try to reconnect in this case.
1579- if 'timeout' not in str(e):
1580- raise
1581-
1582- log_info = {}
1583- log_info['err_str'] = str(e)
1584- log_info['max_retries'] = self.max_retries
1585- log_info.update(params)
1586-
1587- if self.max_retries and attempt == self.max_retries:
1588- msg = _('Unable to connect to AMQP server on '
1589- '%(hostname)s:%(port)d after %(max_retries)d '
1590- 'tries: %(err_str)s') % log_info
1591- LOG.error(msg)
1592- raise rpc_common.RPCException(msg)
1593-
1594- if attempt == 1:
1595- sleep_time = self.interval_start or 1
1596- elif attempt > 1:
1597- sleep_time += self.interval_stepping
1598- if self.interval_max:
1599- sleep_time = min(sleep_time, self.interval_max)
1600-
1601- log_info['sleep_time'] = sleep_time
1602- LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
1603- 'unreachable: %(err_str)s. Trying again in '
1604- '%(sleep_time)d seconds.') % log_info)
1605- time.sleep(sleep_time)
1606-
1607- def ensure(self, error_callback, method, *args, **kwargs):
1608- while True:
1609- try:
1610- return method(*args, **kwargs)
1611- except self.connection_errors as e:
1612- if error_callback:
1613- error_callback(e)
1614- except self.channel_errors as e:
1615- if error_callback:
1616- error_callback(e)
1617- except (socket.timeout, IOError) as e:
1618- if error_callback:
1619- error_callback(e)
1620- except Exception as e:
1621- # NOTE(comstud): Unfortunately it's possible for amqplib
1622- # to return an error not covered by its transport
1623- # connection_errors in the case of a timeout waiting for
1624- # a protocol response. (See paste link in LP888621)
1625- # So, we check all exceptions for 'timeout' in them
1626- # and try to reconnect in this case.
1627- if 'timeout' not in str(e):
1628- raise
1629- if error_callback:
1630- error_callback(e)
1631- self.reconnect()
1632-
1633- def get_channel(self):
1634- """Convenience call for bin/clear_rabbit_queues."""
1635- return self.channel
1636-
1637- def close(self):
1638- """Close/release this connection."""
1639- self.connection.release()
1640- self.connection = None
1641-
1642- def reset(self):
1643- """Reset a connection so it can be used again."""
1644- self.channel.close()
1645- self.channel = self.connection.channel()
1646- # work around 'memory' transport bug in 1.1.3
1647- if self.memory_transport:
1648- self.channel._new_queue('ae.undeliver')
1649- self.consumers = []
1650-
1651- def declare_consumer(self, consumer_cls, topic, callback):
1652- """Create a Consumer using the class that was passed in and
1653- add it to our list of consumers
1654- """
1655-
1656- def _connect_error(exc):
1657- log_info = {'topic': topic, 'err_str': str(exc)}
1658- LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
1659- "%(err_str)s") % log_info)
1660-
1661- def _declare_consumer():
1662- consumer = consumer_cls(self.conf, self.channel, topic, callback,
1663- six.next(self.consumer_num))
1664- self.consumers.append(consumer)
1665- return consumer
1666-
1667- return self.ensure(_connect_error, _declare_consumer)
1668-
1669- def iterconsume(self, limit=None, timeout=None):
1670- """Return an iterator that will consume from all queues/consumers."""
1671-
1672- def _error_callback(exc):
1673- if isinstance(exc, socket.timeout):
1674- LOG.debug(_('Timed out waiting for RPC response: %s') %
1675- str(exc))
1676- raise rpc_common.Timeout()
1677- else:
1678- LOG.exception(_('Failed to consume message from queue: %s') %
1679- str(exc))
1680- self.do_consume = True
1681-
1682- def _consume():
1683- if self.do_consume:
1684- queues_head = self.consumers[:-1] # not fanout.
1685- queues_tail = self.consumers[-1] # fanout
1686- for queue in queues_head:
1687- queue.consume(nowait=True)
1688- queues_tail.consume(nowait=False)
1689- self.do_consume = False
1690- return self.connection.drain_events(timeout=timeout)
1691-
1692- for iteration in itertools.count(0):
1693- if limit and iteration >= limit:
1694- raise StopIteration
1695- yield self.ensure(_error_callback, _consume)
1696-
1697- def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
1698- """Send to a publisher based on the publisher class."""
1699-
1700- def _error_callback(exc):
1701- log_info = {'topic': topic, 'err_str': str(exc)}
1702- LOG.exception(_("Failed to publish message to topic "
1703- "'%(topic)s': %(err_str)s") % log_info)
1704-
1705- def _publish():
1706- publisher = cls(self.conf, self.channel, topic, **kwargs)
1707- publisher.send(msg, timeout)
1708-
1709- self.ensure(_error_callback, _publish)
1710-
1711- def declare_direct_consumer(self, topic, callback):
1712- """Create a 'direct' queue.
1713- In nova's use, this is generally a msg_id queue used for
1714- responses for call/multicall
1715- """
1716- self.declare_consumer(DirectConsumer, topic, callback)
1717-
1718- def declare_topic_consumer(self, topic, callback=None, queue_name=None,
1719- exchange_name=None):
1720- """Create a 'topic' consumer."""
1721- self.declare_consumer(functools.partial(TopicConsumer,
1722- name=queue_name,
1723- exchange_name=exchange_name,
1724- ),
1725- topic, callback)
1726-
1727- def declare_fanout_consumer(self, topic, callback):
1728- """Create a 'fanout' consumer."""
1729- self.declare_consumer(FanoutConsumer, topic, callback)
1730-
1731- def direct_send(self, msg_id, msg):
1732- """Send a 'direct' message."""
1733- self.publisher_send(DirectPublisher, msg_id, msg)
1734-
1735- def topic_send(self, topic, msg, timeout=None):
1736- """Send a 'topic' message."""
1737- self.publisher_send(TopicPublisher, topic, msg, timeout)
1738-
1739- def fanout_send(self, topic, msg):
1740- """Send a 'fanout' message."""
1741- self.publisher_send(FanoutPublisher, topic, msg)
1742-
1743- def notify_send(self, topic, msg, **kwargs):
1744- """Send a notify message on a topic."""
1745- self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
1746-
1747- def consume(self, limit=None, timeout=None):
1748- """Consume from all queues/consumers."""
1749- it = self.iterconsume(limit=limit, timeout=timeout)
1750- while True:
1751- try:
1752- six.next(it)
1753- except StopIteration:
1754- return
1755-
1756-
1757-class RabbitDriver(amqpdriver.AMQPDriverBase):
1758-
1759- def __init__(self, conf, url, default_exchange=None,
1760- allowed_remote_exmods=[]):
1761- conf.register_opts(rabbit_opts)
1762- conf.register_opts(rpc_amqp.amqp_opts)
1763-
1764- connection_pool = rpc_amqp.get_connection_pool(conf, Connection)
1765-
1766- super(RabbitDriver, self).__init__(conf, url,
1767- connection_pool,
1768- default_exchange,
1769- allowed_remote_exmods)
1770-
1771- def require_features(self, requeue=True):
1772- pass
1773
1774=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch'
1775=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo'
1776=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging'
1777=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers'
1778=== removed file '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers/common.py'
1779--- .pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers/common.py 2015-04-23 15:56:08 +0000
1780+++ .pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
1781@@ -1,535 +0,0 @@
1782-# Copyright 2010 United States Government as represented by the
1783-# Administrator of the National Aeronautics and Space Administration.
1784-# All Rights Reserved.
1785-# Copyright 2011 Red Hat, Inc.
1786-#
1787-# Licensed under the Apache License, Version 2.0 (the "License"); you may
1788-# not use this file except in compliance with the License. You may obtain
1789-# a copy of the License at
1790-#
1791-# http://www.apache.org/licenses/LICENSE-2.0
1792-#
1793-# Unless required by applicable law or agreed to in writing, software
1794-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
1795-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1796-# License for the specific language governing permissions and limitations
1797-# under the License.
1798-
1799-import copy
1800-import logging
1801-import sys
1802-import time
1803-import traceback
1804-
1805-from oslo.config import cfg
1806-from oslo import messaging
1807-import six
1808-
1809-from oslo.messaging import _utils as utils
1810-from oslo.messaging.openstack.common import importutils
1811-from oslo.messaging.openstack.common import jsonutils
1812-
1813-# FIXME(markmc): remove this
1814-_ = lambda s: s
1815-
1816-LOG = logging.getLogger(__name__)
1817-
1818-_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
1819-
1820-
1821-'''RPC Envelope Version.
1822-
1823-This version number applies to the top level structure of messages sent out.
1824-It does *not* apply to the message payload, which must be versioned
1825-independently. For example, when using rpc APIs, a version number is applied
1826-for changes to the API being exposed over rpc. This version number is handled
1827-in the rpc proxy and dispatcher modules.
1828-
1829-This version number applies to the message envelope that is used in the
1830-serialization done inside the rpc layer. See serialize_msg() and
1831-deserialize_msg().
1832-
1833-The current message format (version 2.0) is very simple. It is:
1834-
1835- {
1836- 'oslo.version': <RPC Envelope Version as a String>,
1837- 'oslo.message': <Application Message Payload, JSON encoded>
1838- }
1839-
1840-Message format version '1.0' is just considered to be the messages we sent
1841-without a message envelope.
1842-
1843-So, the current message envelope just includes the envelope version. It may
1844-eventually contain additional information, such as a signature for the message
1845-payload.
1846-
1847-We will JSON encode the application message payload. The message envelope,
1848-which includes the JSON encoded application message body, will be passed down
1849-to the messaging libraries as a dict.
1850-'''
1851-_RPC_ENVELOPE_VERSION = '2.0'
1852-
1853-_VERSION_KEY = 'oslo.version'
1854-_MESSAGE_KEY = 'oslo.message'
1855-
1856-_REMOTE_POSTFIX = '_Remote'
1857-
1858-_exception_opts = [
1859- cfg.ListOpt('allowed_rpc_exception_modules',
1860- default=['oslo.messaging.exceptions',
1861- 'nova.exception',
1862- 'cinder.exception',
1863- _EXCEPTIONS_MODULE,
1864- ],
1865- help='Modules of exceptions that are permitted to be '
1866- 'recreated upon receiving exception data from an rpc '
1867- 'call.'),
1868-]
1869-
1870-
1871-class RPCException(Exception):
1872- msg_fmt = _("An unknown RPC related exception occurred.")
1873-
1874- def __init__(self, message=None, **kwargs):
1875- self.kwargs = kwargs
1876-
1877- if not message:
1878- try:
1879- message = self.msg_fmt % kwargs
1880-
1881- except Exception:
1882- # kwargs doesn't match a variable in the message
1883- # log the issue and the kwargs
1884- LOG.exception(_('Exception in string format operation'))
1885- for name, value in six.iteritems(kwargs):
1886- LOG.error("%s: %s" % (name, value))
1887- # at least get the core message out if something happened
1888- message = self.msg_fmt
1889-
1890- super(RPCException, self).__init__(message)
1891-
1892-
1893-class RemoteError(RPCException):
1894- """Signifies that a remote class has raised an exception.
1895-
1896- Contains a string representation of the type of the original exception,
1897- the value of the original exception, and the traceback. These are
1898- sent to the parent as a joined string so printing the exception
1899- contains all of the relevant info.
1900-
1901- """
1902- msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
1903-
1904- def __init__(self, exc_type=None, value=None, traceback=None):
1905- self.exc_type = exc_type
1906- self.value = value
1907- self.traceback = traceback
1908- super(RemoteError, self).__init__(exc_type=exc_type,
1909- value=value,
1910- traceback=traceback)
1911-
1912-
1913-class Timeout(RPCException):
1914- """Signifies that a timeout has occurred.
1915-
1916- This exception is raised if the rpc_response_timeout is reached while
1917- waiting for a response from the remote side.
1918- """
1919- msg_fmt = _('Timeout while waiting on RPC response - '
1920- 'topic: "%(topic)s", RPC method: "%(method)s" '
1921- 'info: "%(info)s"')
1922-
1923- def __init__(self, info=None, topic=None, method=None):
1924- """Initiates Timeout object.
1925-
1926- :param info: Extra info to convey to the user
1927- :param topic: The topic that the rpc call was sent to
1928- :param rpc_method_name: The name of the rpc method being
1929- called
1930- """
1931- self.info = info
1932- self.topic = topic
1933- self.method = method
1934- super(Timeout, self).__init__(
1935- None,
1936- info=info or _('<unknown>'),
1937- topic=topic or _('<unknown>'),
1938- method=method or _('<unknown>'))
1939-
1940-
1941-class DuplicateMessageError(RPCException):
1942- msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
1943-
1944-
1945-class InvalidRPCConnectionReuse(RPCException):
1946- msg_fmt = _("Invalid reuse of an RPC connection.")
1947-
1948-
1949-class UnsupportedRpcVersion(RPCException):
1950- msg_fmt = _("Specified RPC version, %(version)s, not supported by "
1951- "this endpoint.")
1952-
1953-
1954-class UnsupportedRpcEnvelopeVersion(RPCException):
1955- msg_fmt = _("Specified RPC envelope version, %(version)s, "
1956- "not supported by this endpoint.")
1957-
1958-
1959-class RpcVersionCapError(RPCException):
1960- msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
1961-
1962-
1963-class Connection(object):
1964- """A connection, returned by rpc.create_connection().
1965-
1966- This class represents a connection to the message bus used for rpc.
1967- An instance of this class should never be created by users of the rpc API.
1968- Use rpc.create_connection() instead.
1969- """
1970- def close(self):
1971- """Close the connection.
1972-
1973- This method must be called when the connection will no longer be used.
1974- It will ensure that any resources associated with the connection, such
1975- as a network connection, and cleaned up.
1976- """
1977- raise NotImplementedError()
1978-
1979- def create_consumer(self, topic, proxy, fanout=False):
1980- """Create a consumer on this connection.
1981-
1982- A consumer is associated with a message queue on the backend message
1983- bus. The consumer will read messages from the queue, unpack them, and
1984- dispatch them to the proxy object. The contents of the message pulled
1985- off of the queue will determine which method gets called on the proxy
1986- object.
1987-
1988- :param topic: This is a name associated with what to consume from.
1989- Multiple instances of a service may consume from the same
1990- topic. For example, all instances of nova-compute consume
1991- from a queue called "compute". In that case, the
1992- messages will get distributed amongst the consumers in a
1993- round-robin fashion if fanout=False. If fanout=True,
1994- every consumer associated with this topic will get a
1995- copy of every message.
1996- :param proxy: The object that will handle all incoming messages.
1997- :param fanout: Whether or not this is a fanout topic. See the
1998- documentation for the topic parameter for some
1999- additional comments on this.
2000- """
2001- raise NotImplementedError()
2002-
2003- def create_worker(self, topic, proxy, pool_name):
2004- """Create a worker on this connection.
2005-
2006- A worker is like a regular consumer of messages directed to a
2007- topic, except that it is part of a set of such consumers (the
2008- "pool") which may run in parallel. Every pool of workers will
2009- receive a given message, but only one worker in the pool will
2010- be asked to process it. Load is distributed across the members
2011- of the pool in round-robin fashion.
2012-
2013- :param topic: This is a name associated with what to consume from.
2014- Multiple instances of a service may consume from the same
2015- topic.
2016- :param proxy: The object that will handle all incoming messages.
2017- :param pool_name: String containing the name of the pool of workers
2018- """
2019- raise NotImplementedError()
2020-
2021- def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
2022- """Register as a member of a group of consumers.
2023-
2024- Uses given topic from the specified exchange.
2025- Exactly one member of a given pool will receive each message.
2026-
2027- A message will be delivered to multiple pools, if more than
2028- one is created.
2029-
2030- :param callback: Callable to be invoked for each message.
2031- :type callback: callable accepting one argument
2032- :param pool_name: The name of the consumer pool.
2033- :type pool_name: str
2034- :param topic: The routing topic for desired messages.
2035- :type topic: str
2036- :param exchange_name: The name of the message exchange where
2037- the client should attach. Defaults to
2038- the configured exchange.
2039- :type exchange_name: str
2040- """
2041- raise NotImplementedError()
2042-
2043- def consume_in_thread(self):
2044- """Spawn a thread to handle incoming messages.
2045-
2046- Spawn a thread that will be responsible for handling all incoming
2047- messages for consumers that were set up on this connection.
2048-
2049- Message dispatching inside of this is expected to be implemented in a
2050- non-blocking manner. An example implementation would be having this
2051- thread pull messages in for all of the consumers, but utilize a thread
2052- pool for dispatching the messages to the proxy objects.
2053- """
2054- raise NotImplementedError()
2055-
2056-
2057-def _safe_log(log_func, msg, msg_data):
2058- """Sanitizes the msg_data field before logging."""
2059- SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
2060-
2061- def _fix_passwords(d):
2062- """Sanitizes the password fields in the dictionary."""
2063- for k in six.iterkeys(d):
2064- if k.lower().find('password') != -1:
2065- d[k] = '<SANITIZED>'
2066- elif k.lower() in SANITIZE:
2067- d[k] = '<SANITIZED>'
2068- elif isinstance(d[k], dict):
2069- _fix_passwords(d[k])
2070- return d
2071-
2072- return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
2073-
2074-
2075-def serialize_remote_exception(failure_info, log_failure=True):
2076- """Prepares exception data to be sent over rpc.
2077-
2078- Failure_info should be a sys.exc_info() tuple.
2079-
2080- """
2081- tb = traceback.format_exception(*failure_info)
2082- failure = failure_info[1]
2083- if log_failure:
2084- LOG.error(_("Returning exception %s to caller"),
2085- six.text_type(failure))
2086- LOG.error(tb)
2087-
2088- kwargs = {}
2089- if hasattr(failure, 'kwargs'):
2090- kwargs = failure.kwargs
2091-
2092- # NOTE(matiu): With cells, it's possible to re-raise remote, remote
2093- # exceptions. Lets turn it back into the original exception type.
2094- cls_name = str(failure.__class__.__name__)
2095- mod_name = str(failure.__class__.__module__)
2096- if (cls_name.endswith(_REMOTE_POSTFIX) and
2097- mod_name.endswith(_REMOTE_POSTFIX)):
2098- cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
2099- mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
2100-
2101- data = {
2102- 'class': cls_name,
2103- 'module': mod_name,
2104- 'message': six.text_type(failure),
2105- 'tb': tb,
2106- 'args': failure.args,
2107- 'kwargs': kwargs
2108- }
2109-
2110- json_data = jsonutils.dumps(data)
2111-
2112- return json_data
2113-
2114-
2115-def deserialize_remote_exception(data, allowed_remote_exmods):
2116- failure = jsonutils.loads(str(data))
2117-
2118- trace = failure.get('tb', [])
2119- message = failure.get('message', "") + "\n" + "\n".join(trace)
2120- name = failure.get('class')
2121- module = failure.get('module')
2122-
2123- # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
2124- # order to prevent arbitrary code execution.
2125- if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
2126- return messaging.RemoteError(name, failure.get('message'), trace)
2127-
2128- try:
2129- mod = importutils.import_module(module)
2130- klass = getattr(mod, name)
2131- if not issubclass(klass, Exception):
2132- raise TypeError("Can only deserialize Exceptions")
2133-
2134- failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
2135- except (AttributeError, TypeError, ImportError):
2136- return messaging.RemoteError(name, failure.get('message'), trace)
2137-
2138- ex_type = type(failure)
2139- str_override = lambda self: message
2140- new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
2141- {'__str__': str_override, '__unicode__': str_override})
2142- new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
2143- try:
2144- # NOTE(ameade): Dynamically create a new exception type and swap it in
2145- # as the new type for the exception. This only works on user defined
2146- # Exceptions and not core Python exceptions. This is important because
2147- # we cannot necessarily change an exception message so we must override
2148- # the __str__ method.
2149- failure.__class__ = new_ex_type
2150- except TypeError:
2151- # NOTE(ameade): If a core exception then just add the traceback to the
2152- # first exception argument.
2153- failure.args = (message,) + failure.args[1:]
2154- return failure
2155-
2156-
2157-class CommonRpcContext(object):
2158- def __init__(self, **kwargs):
2159- self.values = kwargs
2160-
2161- def __getattr__(self, key):
2162- try:
2163- return self.values[key]
2164- except KeyError:
2165- raise AttributeError(key)
2166-
2167- def to_dict(self):
2168- return copy.deepcopy(self.values)
2169-
2170- @classmethod
2171- def from_dict(cls, values):
2172- return cls(**values)
2173-
2174- def deepcopy(self):
2175- return self.from_dict(self.to_dict())
2176-
2177- def update_store(self):
2178- #local.store.context = self
2179- pass
2180-
2181- def elevated(self, read_deleted=None, overwrite=False):
2182- """Return a version of this context with admin flag set."""
2183- # TODO(russellb) This method is a bit of a nova-ism. It makes
2184- # some assumptions about the data in the request context sent
2185- # across rpc, while the rest of this class does not. We could get
2186- # rid of this if we changed the nova code that uses this to
2187- # convert the RpcContext back to its native RequestContext doing
2188- # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
2189-
2190- context = self.deepcopy()
2191- context.values['is_admin'] = True
2192-
2193- context.values.setdefault('roles', [])
2194-
2195- if 'admin' not in context.values['roles']:
2196- context.values['roles'].append('admin')
2197-
2198- if read_deleted is not None:
2199- context.values['read_deleted'] = read_deleted
2200-
2201- return context
2202-
2203-
2204-class ClientException(Exception):
2205- """Encapsulates actual exception expected to be hit by a RPC proxy object.
2206-
2207- Merely instantiating it records the current exception information, which
2208- will be passed back to the RPC client without exceptional logging.
2209- """
2210- def __init__(self):
2211- self._exc_info = sys.exc_info()
2212-
2213-
2214-def catch_client_exception(exceptions, func, *args, **kwargs):
2215- try:
2216- return func(*args, **kwargs)
2217- except Exception as e:
2218- if type(e) in exceptions:
2219- raise ClientException()
2220- else:
2221- raise
2222-
2223-
2224-def client_exceptions(*exceptions):
2225- """Decorator for manager methods that raise expected exceptions.
2226-
2227- Marking a Manager method with this decorator allows the declaration
2228- of expected exceptions that the RPC layer should not consider fatal,
2229- and not log as if they were generated in a real error scenario. Note
2230- that this will cause listed exceptions to be wrapped in a
2231- ClientException, which is used internally by the RPC layer.
2232- """
2233- def outer(func):
2234- def inner(*args, **kwargs):
2235- return catch_client_exception(exceptions, func, *args, **kwargs)
2236- return inner
2237- return outer
2238-
2239-
2240-def serialize_msg(raw_msg):
2241- # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
2242- # information about this format.
2243- msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
2244- _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
2245-
2246- return msg
2247-
2248-
2249-def deserialize_msg(msg):
2250- # NOTE(russellb): Hang on to your hats, this road is about to
2251- # get a little bumpy.
2252- #
2253- # Robustness Principle:
2254- # "Be strict in what you send, liberal in what you accept."
2255- #
2256- # At this point we have to do a bit of guessing about what it
2257- # is we just received. Here is the set of possibilities:
2258- #
2259- # 1) We received a dict. This could be 2 things:
2260- #
2261- # a) Inspect it to see if it looks like a standard message envelope.
2262- # If so, great!
2263- #
2264- # b) If it doesn't look like a standard message envelope, it could either
2265- # be a notification, or a message from before we added a message
2266- # envelope (referred to as version 1.0).
2267- # Just return the message as-is.
2268- #
2269- # 2) It's any other non-dict type. Just return it and hope for the best.
2270- # This case covers return values from rpc.call() from before message
2271- # envelopes were used. (messages to call a method were always a dict)
2272-
2273- if not isinstance(msg, dict):
2274- # See #2 above.
2275- return msg
2276-
2277- base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
2278- if not all(map(lambda key: key in msg, base_envelope_keys)):
2279- # See #1.b above.
2280- return msg
2281-
2282- # At this point we think we have the message envelope
2283- # format we were expecting. (#1.a above)
2284-
2285- if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
2286- msg[_VERSION_KEY]):
2287- raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
2288-
2289- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
2290-
2291- return raw_msg
2292-
2293-
2294-class DecayingTimer(object):
2295- def __init__(self, duration=None):
2296- self._duration = duration
2297- self._ends_at = None
2298-
2299- def start(self):
2300- if self._duration is not None:
2301- self._ends_at = time.time() + max(0, self._duration)
2302- return self
2303-
2304- def check_return(self, timeout_callback, *args, **kwargs):
2305- if self._duration is None:
2306- return None
2307- if self._ends_at is None:
2308- raise RuntimeError("Can not check/return a timeout from a timer"
2309- " that has not been started")
2310-
2311- maximum = kwargs.pop('maximum', None)
2312- left = self._ends_at - time.time()
2313- if left <= 0:
2314- timeout_callback(*args, **kwargs)
2315-
2316- return left if maximum is None else min(left, maximum)
2317
2318=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests'
2319=== removed file '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests/test_utils.py'
2320--- .pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests/test_utils.py 2015-04-23 15:56:08 +0000
2321+++ .pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests/test_utils.py 1970-01-01 00:00:00 +0000
2322@@ -1,49 +0,0 @@
2323-
2324-# Copyright 2013 Red Hat, Inc.
2325-#
2326-# Licensed under the Apache License, Version 2.0 (the "License"); you may
2327-# not use this file except in compliance with the License. You may obtain
2328-# a copy of the License at
2329-#
2330-# http://www.apache.org/licenses/LICENSE-2.0
2331-#
2332-# Unless required by applicable law or agreed to in writing, software
2333-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
2334-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
2335-# License for the specific language governing permissions and limitations
2336-# under the License.
2337-
2338-from oslo.messaging import _utils as utils
2339-from tests import utils as test_utils
2340-
2341-
2342-class VersionIsCompatibleTestCase(test_utils.BaseTestCase):
2343- def test_version_is_compatible_same(self):
2344- self.assertTrue(utils.version_is_compatible('1.23', '1.23'))
2345-
2346- def test_version_is_compatible_newer_minor(self):
2347- self.assertTrue(utils.version_is_compatible('1.24', '1.23'))
2348-
2349- def test_version_is_compatible_older_minor(self):
2350- self.assertFalse(utils.version_is_compatible('1.22', '1.23'))
2351-
2352- def test_version_is_compatible_major_difference1(self):
2353- self.assertFalse(utils.version_is_compatible('2.23', '1.23'))
2354-
2355- def test_version_is_compatible_major_difference2(self):
2356- self.assertFalse(utils.version_is_compatible('1.23', '2.23'))
2357-
2358- def test_version_is_compatible_newer_rev(self):
2359- self.assertFalse(utils.version_is_compatible('1.23', '1.23.1'))
2360-
2361- def test_version_is_compatible_newer_rev_both(self):
2362- self.assertFalse(utils.version_is_compatible('1.23.1', '1.23.2'))
2363-
2364- def test_version_is_compatible_older_rev_both(self):
2365- self.assertTrue(utils.version_is_compatible('1.23.2', '1.23.1'))
2366-
2367- def test_version_is_compatible_older_rev(self):
2368- self.assertTrue(utils.version_is_compatible('1.24', '1.23.1'))
2369-
2370- def test_version_is_compatible_no_rev_is_zero(self):
2371- self.assertTrue(utils.version_is_compatible('1.23.0', '1.23'))
2372
2373=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch'
2374=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo'
2375=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging'
2376=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers'
2377=== removed file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py'
2378--- .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py 2015-04-23 15:56:08 +0000
2379+++ .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
2380@@ -1,535 +0,0 @@
2381-# Copyright 2010 United States Government as represented by the
2382-# Administrator of the National Aeronautics and Space Administration.
2383-# All Rights Reserved.
2384-# Copyright 2011 Red Hat, Inc.
2385-#
2386-# Licensed under the Apache License, Version 2.0 (the "License"); you may
2387-# not use this file except in compliance with the License. You may obtain
2388-# a copy of the License at
2389-#
2390-# http://www.apache.org/licenses/LICENSE-2.0
2391-#
2392-# Unless required by applicable law or agreed to in writing, software
2393-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
2394-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
2395-# License for the specific language governing permissions and limitations
2396-# under the License.
2397-
2398-import copy
2399-import logging
2400-import sys
2401-import time
2402-import traceback
2403-
2404-from oslo.config import cfg
2405-from oslo import messaging
2406-import six
2407-
2408-from oslo.messaging import _utils as utils
2409-from oslo.messaging.openstack.common import importutils
2410-from oslo.messaging.openstack.common import jsonutils
2411-
2412-# FIXME(markmc): remove this
2413-_ = lambda s: s
2414-
2415-LOG = logging.getLogger(__name__)
2416-
2417-_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
2418-
2419-
2420-'''RPC Envelope Version.
2421-
2422-This version number applies to the top level structure of messages sent out.
2423-It does *not* apply to the message payload, which must be versioned
2424-independently. For example, when using rpc APIs, a version number is applied
2425-for changes to the API being exposed over rpc. This version number is handled
2426-in the rpc proxy and dispatcher modules.
2427-
2428-This version number applies to the message envelope that is used in the
2429-serialization done inside the rpc layer. See serialize_msg() and
2430-deserialize_msg().
2431-
2432-The current message format (version 2.0) is very simple. It is:
2433-
2434- {
2435- 'oslo.version': <RPC Envelope Version as a String>,
2436- 'oslo.message': <Application Message Payload, JSON encoded>
2437- }
2438-
2439-Message format version '1.0' is just considered to be the messages we sent
2440-without a message envelope.
2441-
2442-So, the current message envelope just includes the envelope version. It may
2443-eventually contain additional information, such as a signature for the message
2444-payload.
2445-
2446-We will JSON encode the application message payload. The message envelope,
2447-which includes the JSON encoded application message body, will be passed down
2448-to the messaging libraries as a dict.
2449-'''
2450-_RPC_ENVELOPE_VERSION = '2.0'
2451-
2452-_VERSION_KEY = 'oslo.version'
2453-_MESSAGE_KEY = 'oslo.message'
2454-
2455-_REMOTE_POSTFIX = '_Remote'
2456-
2457-_exception_opts = [
2458- cfg.ListOpt('allowed_rpc_exception_modules',
2459- default=['oslo.messaging.exceptions',
2460- 'nova.exception',
2461- 'cinder.exception',
2462- _EXCEPTIONS_MODULE,
2463- ],
2464- help='Modules of exceptions that are permitted to be '
2465- 'recreated upon receiving exception data from an rpc '
2466- 'call.'),
2467-]
2468-
2469-
2470-class RPCException(Exception):
2471- msg_fmt = _("An unknown RPC related exception occurred.")
2472-
2473- def __init__(self, message=None, **kwargs):
2474- self.kwargs = kwargs
2475-
2476- if not message:
2477- try:
2478- message = self.msg_fmt % kwargs
2479-
2480- except Exception:
2481- # kwargs doesn't match a variable in the message
2482- # log the issue and the kwargs
2483- LOG.exception(_('Exception in string format operation'))
2484- for name, value in six.iteritems(kwargs):
2485- LOG.error("%s: %s" % (name, value))
2486- # at least get the core message out if something happened
2487- message = self.msg_fmt
2488-
2489- super(RPCException, self).__init__(message)
2490-
2491-
2492-class RemoteError(RPCException):
2493- """Signifies that a remote class has raised an exception.
2494-
2495- Contains a string representation of the type of the original exception,
2496- the value of the original exception, and the traceback. These are
2497- sent to the parent as a joined string so printing the exception
2498- contains all of the relevant info.
2499-
2500- """
2501- msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
2502-
2503- def __init__(self, exc_type=None, value=None, traceback=None):
2504- self.exc_type = exc_type
2505- self.value = value
2506- self.traceback = traceback
2507- super(RemoteError, self).__init__(exc_type=exc_type,
2508- value=value,
2509- traceback=traceback)
2510-
2511-
2512-class Timeout(RPCException):
2513- """Signifies that a timeout has occurred.
2514-
2515- This exception is raised if the rpc_response_timeout is reached while
2516- waiting for a response from the remote side.
2517- """
2518- msg_fmt = _('Timeout while waiting on RPC response - '
2519- 'topic: "%(topic)s", RPC method: "%(method)s" '
2520- 'info: "%(info)s"')
2521-
2522- def __init__(self, info=None, topic=None, method=None):
2523- """Initiates Timeout object.
2524-
2525- :param info: Extra info to convey to the user
2526- :param topic: The topic that the rpc call was sent to
2527- :param rpc_method_name: The name of the rpc method being
2528- called
2529- """
2530- self.info = info
2531- self.topic = topic
2532- self.method = method
2533- super(Timeout, self).__init__(
2534- None,
2535- info=info or _('<unknown>'),
2536- topic=topic or _('<unknown>'),
2537- method=method or _('<unknown>'))
2538-
2539-
2540-class DuplicateMessageError(RPCException):
2541- msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
2542-
2543-
2544-class InvalidRPCConnectionReuse(RPCException):
2545- msg_fmt = _("Invalid reuse of an RPC connection.")
2546-
2547-
2548-class UnsupportedRpcVersion(RPCException):
2549- msg_fmt = _("Specified RPC version, %(version)s, not supported by "
2550- "this endpoint.")
2551-
2552-
2553-class UnsupportedRpcEnvelopeVersion(RPCException):
2554- msg_fmt = _("Specified RPC envelope version, %(version)s, "
2555- "not supported by this endpoint.")
2556-
2557-
2558-class RpcVersionCapError(RPCException):
2559- msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
2560-
2561-
2562-class Connection(object):
2563- """A connection, returned by rpc.create_connection().
2564-
2565- This class represents a connection to the message bus used for rpc.
2566- An instance of this class should never be created by users of the rpc API.
2567- Use rpc.create_connection() instead.
2568- """
2569- def close(self):
2570- """Close the connection.
2571-
2572- This method must be called when the connection will no longer be used.
2573- It will ensure that any resources associated with the connection, such
2574- as a network connection, and cleaned up.
2575- """
2576- raise NotImplementedError()
2577-
2578- def create_consumer(self, topic, proxy, fanout=False):
2579- """Create a consumer on this connection.
2580-
2581- A consumer is associated with a message queue on the backend message
2582- bus. The consumer will read messages from the queue, unpack them, and
2583- dispatch them to the proxy object. The contents of the message pulled
2584- off of the queue will determine which method gets called on the proxy
2585- object.
2586-
2587- :param topic: This is a name associated with what to consume from.
2588- Multiple instances of a service may consume from the same
2589- topic. For example, all instances of nova-compute consume
2590- from a queue called "compute". In that case, the
2591- messages will get distributed amongst the consumers in a
2592- round-robin fashion if fanout=False. If fanout=True,
2593- every consumer associated with this topic will get a
2594- copy of every message.
2595- :param proxy: The object that will handle all incoming messages.
2596- :param fanout: Whether or not this is a fanout topic. See the
2597- documentation for the topic parameter for some
2598- additional comments on this.
2599- """
2600- raise NotImplementedError()
2601-
2602- def create_worker(self, topic, proxy, pool_name):
2603- """Create a worker on this connection.
2604-
2605- A worker is like a regular consumer of messages directed to a
2606- topic, except that it is part of a set of such consumers (the
2607- "pool") which may run in parallel. Every pool of workers will
2608- receive a given message, but only one worker in the pool will
2609- be asked to process it. Load is distributed across the members
2610- of the pool in round-robin fashion.
2611-
2612- :param topic: This is a name associated with what to consume from.
2613- Multiple instances of a service may consume from the same
2614- topic.
2615- :param proxy: The object that will handle all incoming messages.
2616- :param pool_name: String containing the name of the pool of workers
2617- """
2618- raise NotImplementedError()
2619-
2620- def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
2621- """Register as a member of a group of consumers.
2622-
2623- Uses given topic from the specified exchange.
2624- Exactly one member of a given pool will receive each message.
2625-
2626- A message will be delivered to multiple pools, if more than
2627- one is created.
2628-
2629- :param callback: Callable to be invoked for each message.
2630- :type callback: callable accepting one argument
2631- :param pool_name: The name of the consumer pool.
2632- :type pool_name: str
2633- :param topic: The routing topic for desired messages.
2634- :type topic: str
2635- :param exchange_name: The name of the message exchange where
2636- the client should attach. Defaults to
2637- the configured exchange.
2638- :type exchange_name: str
2639- """
2640- raise NotImplementedError()
2641-
2642- def consume_in_thread(self):
2643- """Spawn a thread to handle incoming messages.
2644-
2645- Spawn a thread that will be responsible for handling all incoming
2646- messages for consumers that were set up on this connection.
2647-
2648- Message dispatching inside of this is expected to be implemented in a
2649- non-blocking manner. An example implementation would be having this
2650- thread pull messages in for all of the consumers, but utilize a thread
2651- pool for dispatching the messages to the proxy objects.
2652- """
2653- raise NotImplementedError()
2654-
2655-
2656-def _safe_log(log_func, msg, msg_data):
2657- """Sanitizes the msg_data field before logging."""
2658- SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
2659-
2660- def _fix_passwords(d):
2661- """Sanitizes the password fields in the dictionary."""
2662- for k in six.iterkeys(d):
2663- if k.lower().find('password') != -1:
2664- d[k] = '<SANITIZED>'
2665- elif k.lower() in SANITIZE:
2666- d[k] = '<SANITIZED>'
2667- elif isinstance(d[k], dict):
2668- _fix_passwords(d[k])
2669- return d
2670-
2671- return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
2672-
2673-
2674-def serialize_remote_exception(failure_info, log_failure=True):
2675- """Prepares exception data to be sent over rpc.
2676-
2677- Failure_info should be a sys.exc_info() tuple.
2678-
2679- """
2680- tb = traceback.format_exception(*failure_info)
2681- failure = failure_info[1]
2682- if log_failure:
2683- LOG.error(_("Returning exception %s to caller"),
2684- six.text_type(failure))
2685- LOG.error(tb)
2686-
2687- kwargs = {}
2688- if hasattr(failure, 'kwargs'):
2689- kwargs = failure.kwargs
2690-
2691- # NOTE(matiu): With cells, it's possible to re-raise remote, remote
2692- # exceptions. Lets turn it back into the original exception type.
2693- cls_name = str(failure.__class__.__name__)
2694- mod_name = str(failure.__class__.__module__)
2695- if (cls_name.endswith(_REMOTE_POSTFIX) and
2696- mod_name.endswith(_REMOTE_POSTFIX)):
2697- cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
2698- mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
2699-
2700- data = {
2701- 'class': cls_name,
2702- 'module': mod_name,
2703- 'message': six.text_type(failure),
2704- 'tb': tb,
2705- 'args': failure.args,
2706- 'kwargs': kwargs
2707- }
2708-
2709- json_data = jsonutils.dumps(data)
2710-
2711- return json_data
2712-
2713-
2714-def deserialize_remote_exception(data, allowed_remote_exmods):
2715- failure = jsonutils.loads(str(data))
2716-
2717- trace = failure.get('tb', [])
2718- message = failure.get('message', "") + "\n" + "\n".join(trace)
2719- name = failure.get('class')
2720- module = failure.get('module')
2721-
2722- # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
2723- # order to prevent arbitrary code execution.
2724- if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
2725- return messaging.RemoteError(name, failure.get('message'), trace)
2726-
2727- try:
2728- mod = importutils.import_module(module)
2729- klass = getattr(mod, name)
2730- if not issubclass(klass, Exception):
2731- raise TypeError("Can only deserialize Exceptions")
2732-
2733- failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
2734- except (AttributeError, TypeError, ImportError):
2735- return messaging.RemoteError(name, failure.get('message'), trace)
2736-
2737- ex_type = type(failure)
2738- str_override = lambda self: message
2739- new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
2740- {'__str__': str_override, '__unicode__': str_override})
2741- new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
2742- try:
2743- # NOTE(ameade): Dynamically create a new exception type and swap it in
2744- # as the new type for the exception. This only works on user defined
2745- # Exceptions and not core Python exceptions. This is important because
2746- # we cannot necessarily change an exception message so we must override
2747- # the __str__ method.
2748- failure.__class__ = new_ex_type
2749- except TypeError:
2750- # NOTE(ameade): If a core exception then just add the traceback to the
2751- # first exception argument.
2752- failure.args = (message,) + failure.args[1:]
2753- return failure
2754-
2755-
2756-class CommonRpcContext(object):
2757- def __init__(self, **kwargs):
2758- self.values = kwargs
2759-
2760- def __getattr__(self, key):
2761- try:
2762- return self.values[key]
2763- except KeyError:
2764- raise AttributeError(key)
2765-
2766- def to_dict(self):
2767- return copy.deepcopy(self.values)
2768-
2769- @classmethod
2770- def from_dict(cls, values):
2771- return cls(**values)
2772-
2773- def deepcopy(self):
2774- return self.from_dict(self.to_dict())
2775-
2776- def update_store(self):
2777- #local.store.context = self
2778- pass
2779-
2780- def elevated(self, read_deleted=None, overwrite=False):
2781- """Return a version of this context with admin flag set."""
2782- # TODO(russellb) This method is a bit of a nova-ism. It makes
2783- # some assumptions about the data in the request context sent
2784- # across rpc, while the rest of this class does not. We could get
2785- # rid of this if we changed the nova code that uses this to
2786- # convert the RpcContext back to its native RequestContext doing
2787- # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
2788-
2789- context = self.deepcopy()
2790- context.values['is_admin'] = True
2791-
2792- context.values.setdefault('roles', [])
2793-
2794- if 'admin' not in context.values['roles']:
2795- context.values['roles'].append('admin')
2796-
2797- if read_deleted is not None:
2798- context.values['read_deleted'] = read_deleted
2799-
2800- return context
2801-
2802-
2803-class ClientException(Exception):
2804- """Encapsulates actual exception expected to be hit by a RPC proxy object.
2805-
2806- Merely instantiating it records the current exception information, which
2807- will be passed back to the RPC client without exceptional logging.
2808- """
2809- def __init__(self):
2810- self._exc_info = sys.exc_info()
2811-
2812-
2813-def catch_client_exception(exceptions, func, *args, **kwargs):
2814- try:
2815- return func(*args, **kwargs)
2816- except Exception as e:
2817- if type(e) in exceptions:
2818- raise ClientException()
2819- else:
2820- raise
2821-
2822-
2823-def client_exceptions(*exceptions):
2824- """Decorator for manager methods that raise expected exceptions.
2825-
2826- Marking a Manager method with this decorator allows the declaration
2827- of expected exceptions that the RPC layer should not consider fatal,
2828- and not log as if they were generated in a real error scenario. Note
2829- that this will cause listed exceptions to be wrapped in a
2830- ClientException, which is used internally by the RPC layer.
2831- """
2832- def outer(func):
2833- def inner(*args, **kwargs):
2834- return catch_client_exception(exceptions, func, *args, **kwargs)
2835- return inner
2836- return outer
2837-
2838-
2839-def serialize_msg(raw_msg):
2840- # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
2841- # information about this format.
2842- msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
2843- _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
2844-
2845- return msg
2846-
2847-
2848-def deserialize_msg(msg):
2849- # NOTE(russellb): Hang on to your hats, this road is about to
2850- # get a little bumpy.
2851- #
2852- # Robustness Principle:
2853- # "Be strict in what you send, liberal in what you accept."
2854- #
2855- # At this point we have to do a bit of guessing about what it
2856- # is we just received. Here is the set of possibilities:
2857- #
2858- # 1) We received a dict. This could be 2 things:
2859- #
2860- # a) Inspect it to see if it looks like a standard message envelope.
2861- # If so, great!
2862- #
2863- # b) If it doesn't look like a standard message envelope, it could either
2864- # be a notification, or a message from before we added a message
2865- # envelope (referred to as version 1.0).
2866- # Just return the message as-is.
2867- #
2868- # 2) It's any other non-dict type. Just return it and hope for the best.
2869- # This case covers return values from rpc.call() from before message
2870- # envelopes were used. (messages to call a method were always a dict)
2871-
2872- if not isinstance(msg, dict):
2873- # See #2 above.
2874- return msg
2875-
2876- base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
2877- if not all(map(lambda key: key in msg, base_envelope_keys)):
2878- # See #1.b above.
2879- return msg
2880-
2881- # At this point we think we have the message envelope
2882- # format we were expecting. (#1.a above)
2883-
2884- if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
2885- msg[_VERSION_KEY]):
2886- raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
2887-
2888- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
2889-
2890- return raw_msg
2891-
2892-
2893-class DecayingTimer(object):
2894- def __init__(self, duration=None):
2895- self._duration = duration
2896- self._ends_at = None
2897-
2898- def start(self):
2899- if self._duration is not None:
2900- self._ends_at = time.time() + max(0, self._duration)
2901- return self
2902-
2903- def check_return(self, timeout_callback, *args, **kwargs):
2904- maximum = kwargs.pop('maximum', None)
2905- if self._duration is None:
2906- return None if maximum is None else maximum
2907- if self._ends_at is None:
2908- raise RuntimeError("Can not check/return a timeout from a timer"
2909- " that has not been started")
2910-
2911- left = self._ends_at - time.time()
2912- if left <= 0:
2913- timeout_callback(*args, **kwargs)
2914-
2915- return left if maximum is None else min(left, maximum)
2916
2917=== removed file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py'
2918--- .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-04-23 15:56:08 +0000
2919+++ .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
2920@@ -1,793 +0,0 @@
2921-# Copyright 2011 OpenStack Foundation
2922-#
2923-# Licensed under the Apache License, Version 2.0 (the "License"); you may
2924-# not use this file except in compliance with the License. You may obtain
2925-# a copy of the License at
2926-#
2927-# http://www.apache.org/licenses/LICENSE-2.0
2928-#
2929-# Unless required by applicable law or agreed to in writing, software
2930-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
2931-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
2932-# License for the specific language governing permissions and limitations
2933-# under the License.
2934-
2935-import functools
2936-import itertools
2937-import logging
2938-import socket
2939-import ssl
2940-import time
2941-import uuid
2942-
2943-import kombu
2944-import kombu.connection
2945-import kombu.entity
2946-import kombu.messaging
2947-from oslo.config import cfg
2948-import six
2949-
2950-from oslo.messaging._drivers import amqp as rpc_amqp
2951-from oslo.messaging._drivers import amqpdriver
2952-from oslo.messaging._drivers import common as rpc_common
2953-from oslo.messaging.openstack.common import network_utils
2954-
2955-# FIXME(markmc): remove this
2956-_ = lambda s: s
2957-
2958-rabbit_opts = [
2959- cfg.StrOpt('kombu_ssl_version',
2960- default='',
2961- help='SSL version to use (valid only if SSL enabled). '
2962- 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
2963- 'be available on some distributions.'
2964- ),
2965- cfg.StrOpt('kombu_ssl_keyfile',
2966- default='',
2967- help='SSL key file (valid only if SSL enabled).'),
2968- cfg.StrOpt('kombu_ssl_certfile',
2969- default='',
2970- help='SSL cert file (valid only if SSL enabled).'),
2971- cfg.StrOpt('kombu_ssl_ca_certs',
2972- default='',
2973- help=('SSL certification authority file '
2974- '(valid only if SSL enabled).')),
2975- cfg.FloatOpt('kombu_reconnect_delay',
2976- default=1.0,
2977- help='How long to wait before reconnecting in response to an '
2978- 'AMQP consumer cancel notification.'),
2979- cfg.StrOpt('rabbit_host',
2980- default='localhost',
2981- help='The RabbitMQ broker address where a single node is '
2982- 'used.'),
2983- cfg.IntOpt('rabbit_port',
2984- default=5672,
2985- help='The RabbitMQ broker port where a single node is used.'),
2986- cfg.ListOpt('rabbit_hosts',
2987- default=['$rabbit_host:$rabbit_port'],
2988- help='RabbitMQ HA cluster host:port pairs.'),
2989- cfg.BoolOpt('rabbit_use_ssl',
2990- default=False,
2991- help='Connect over SSL for RabbitMQ.'),
2992- cfg.StrOpt('rabbit_userid',
2993- default='guest',
2994- help='The RabbitMQ userid.'),
2995- cfg.StrOpt('rabbit_password',
2996- default='guest',
2997- help='The RabbitMQ password.',
2998- secret=True),
2999- cfg.StrOpt('rabbit_login_method',
3000- default='AMQPLAIN',
3001- help='the RabbitMQ login method'),
3002- cfg.StrOpt('rabbit_virtual_host',
3003- default='/',
3004- help='The RabbitMQ virtual host.'),
3005- cfg.IntOpt('rabbit_retry_interval',
3006- default=1,
3007- help='How frequently to retry connecting with RabbitMQ.'),
3008- cfg.IntOpt('rabbit_retry_backoff',
3009- default=2,
3010- help='How long to backoff for between retries when connecting '
3011- 'to RabbitMQ.'),
3012- cfg.IntOpt('rabbit_max_retries',
3013- default=0,
3014- help='Maximum number of RabbitMQ connection retries. '
3015- 'Default is 0 (infinite retry count).'),
3016- cfg.BoolOpt('rabbit_ha_queues',
3017- default=False,
3018- help='Use HA queues in RabbitMQ (x-ha-policy: all). '
3019- 'If you change this option, you must wipe the '
3020- 'RabbitMQ database.'),
3021-
3022- # FIXME(markmc): this was toplevel in openstack.common.rpc
3023- cfg.BoolOpt('fake_rabbit',
3024- default=False,
3025- help='If passed, use a fake RabbitMQ provider.'),
3026-]
3027-
3028-LOG = logging.getLogger(__name__)
3029-
3030-
3031-def _get_queue_arguments(conf):
3032- """Construct the arguments for declaring a queue.
3033-
3034- If the rabbit_ha_queues option is set, we declare a mirrored queue
3035- as described here:
3036-
3037- http://www.rabbitmq.com/ha.html
3038-
3039- Setting x-ha-policy to all means that the queue will be mirrored
3040- to all nodes in the cluster.
3041- """
3042- return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
3043-
3044-
3045-class RabbitMessage(dict):
3046- def __init__(self, raw_message):
3047- super(RabbitMessage, self).__init__(
3048- rpc_common.deserialize_msg(raw_message.payload))
3049- self._raw_message = raw_message
3050-
3051- def acknowledge(self):
3052- self._raw_message.ack()
3053-
3054- def requeue(self):
3055- self._raw_message.requeue()
3056-
3057-
3058-class ConsumerBase(object):
3059- """Consumer base class."""
3060-
3061- def __init__(self, channel, callback, tag, **kwargs):
3062- """Declare a queue on an amqp channel.
3063-
3064- 'channel' is the amqp channel to use
3065- 'callback' is the callback to call when messages are received
3066- 'tag' is a unique ID for the consumer on the channel
3067-
3068- queue name, exchange name, and other kombu options are
3069- passed in here as a dictionary.
3070- """
3071- self.callback = callback
3072- self.tag = str(tag)
3073- self.kwargs = kwargs
3074- self.queue = None
3075- self.reconnect(channel)
3076-
3077- def reconnect(self, channel):
3078- """Re-declare the queue after a rabbit reconnect."""
3079- self.channel = channel
3080- self.kwargs['channel'] = channel
3081- self.queue = kombu.entity.Queue(**self.kwargs)
3082- self.queue.declare()
3083-
3084- def _callback_handler(self, message, callback):
3085- """Call callback with deserialized message.
3086-
3087- Messages that are processed and ack'ed.
3088- """
3089-
3090- try:
3091- callback(RabbitMessage(message))
3092- except Exception:
3093- LOG.exception(_("Failed to process message"
3094- " ... skipping it."))
3095- message.ack()
3096-
3097- def consume(self, *args, **kwargs):
3098- """Actually declare the consumer on the amqp channel. This will
3099- start the flow of messages from the queue. Using the
3100- Connection.iterconsume() iterator will process the messages,
3101- calling the appropriate callback.
3102-
3103- If a callback is specified in kwargs, use that. Otherwise,
3104- use the callback passed during __init__()
3105-
3106- If kwargs['nowait'] is True, then this call will block until
3107- a message is read.
3108-
3109- """
3110-
3111- options = {'consumer_tag': self.tag}
3112- options['nowait'] = kwargs.get('nowait', False)
3113- callback = kwargs.get('callback', self.callback)
3114- if not callback:
3115- raise ValueError("No callback defined")
3116-
3117- def _callback(raw_message):
3118- message = self.channel.message_to_python(raw_message)
3119- self._callback_handler(message, callback)
3120-
3121- self.queue.consume(*args, callback=_callback, **options)
3122-
3123- def cancel(self):
3124- """Cancel the consuming from the queue, if it has started."""
3125- try:
3126- self.queue.cancel(self.tag)
3127- except KeyError as e:
3128- # NOTE(comstud): Kludge to get around a amqplib bug
3129- if str(e) != "u'%s'" % self.tag:
3130- raise
3131- self.queue = None
3132-
3133-
3134-class DirectConsumer(ConsumerBase):
3135- """Queue/consumer class for 'direct'."""
3136-
3137- def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
3138- """Init a 'direct' queue.
3139-
3140- 'channel' is the amqp channel to use
3141- 'msg_id' is the msg_id to listen on
3142- 'callback' is the callback to call when messages are received
3143- 'tag' is a unique ID for the consumer on the channel
3144-
3145- Other kombu options may be passed
3146- """
3147- # Default options
3148- options = {'durable': False,
3149- 'queue_arguments': _get_queue_arguments(conf),
3150- 'auto_delete': True,
3151- 'exclusive': False}
3152- options.update(kwargs)
3153- exchange = kombu.entity.Exchange(name=msg_id,
3154- type='direct',
3155- durable=options['durable'],
3156- auto_delete=options['auto_delete'])
3157- super(DirectConsumer, self).__init__(channel,
3158- callback,
3159- tag,
3160- name=msg_id,
3161- exchange=exchange,
3162- routing_key=msg_id,
3163- **options)
3164-
3165-
3166-class TopicConsumer(ConsumerBase):
3167- """Consumer class for 'topic'."""
3168-
3169- def __init__(self, conf, channel, topic, callback, tag, name=None,
3170- exchange_name=None, **kwargs):
3171- """Init a 'topic' queue.
3172-
3173- :param channel: the amqp channel to use
3174- :param topic: the topic to listen on
3175- :paramtype topic: str
3176- :param callback: the callback to call when messages are received
3177- :param tag: a unique ID for the consumer on the channel
3178- :param name: optional queue name, defaults to topic
3179- :paramtype name: str
3180-
3181- Other kombu options may be passed as keyword arguments
3182- """
3183- # Default options
3184- options = {'durable': conf.amqp_durable_queues,
3185- 'queue_arguments': _get_queue_arguments(conf),
3186- 'auto_delete': conf.amqp_auto_delete,
3187- 'exclusive': False}
3188- options.update(kwargs)
3189- exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
3190- exchange = kombu.entity.Exchange(name=exchange_name,
3191- type='topic',
3192- durable=options['durable'],
3193- auto_delete=options['auto_delete'])
3194- super(TopicConsumer, self).__init__(channel,
3195- callback,
3196- tag,
3197- name=name or topic,
3198- exchange=exchange,
3199- routing_key=topic,
3200- **options)
3201-
3202-
3203-class FanoutConsumer(ConsumerBase):
3204- """Consumer class for 'fanout'."""
3205-
3206- def __init__(self, conf, channel, topic, callback, tag, **kwargs):
3207- """Init a 'fanout' queue.
3208-
3209- 'channel' is the amqp channel to use
3210- 'topic' is the topic to listen on
3211- 'callback' is the callback to call when messages are received
3212- 'tag' is a unique ID for the consumer on the channel
3213-
3214- Other kombu options may be passed
3215- """
3216- unique = uuid.uuid4().hex
3217- exchange_name = '%s_fanout' % topic
3218- queue_name = '%s_fanout_%s' % (topic, unique)
3219-
3220- # Default options
3221- options = {'durable': False,
3222- 'queue_arguments': _get_queue_arguments(conf),
3223- 'auto_delete': True,
3224- 'exclusive': False}
3225- options.update(kwargs)
3226- exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
3227- durable=options['durable'],
3228- auto_delete=options['auto_delete'])
3229- super(FanoutConsumer, self).__init__(channel, callback, tag,
3230- name=queue_name,
3231- exchange=exchange,
3232- routing_key=topic,
3233- **options)
3234-
3235-
3236-class Publisher(object):
3237- """Base Publisher class."""
3238-
3239- def __init__(self, channel, exchange_name, routing_key, **kwargs):
3240- """Init the Publisher class with the exchange_name, routing_key,
3241- and other options
3242- """
3243- self.exchange_name = exchange_name
3244- self.routing_key = routing_key
3245- self.kwargs = kwargs
3246- self.reconnect(channel)
3247-
3248- def reconnect(self, channel):
3249- """Re-establish the Producer after a rabbit reconnection."""
3250- self.exchange = kombu.entity.Exchange(name=self.exchange_name,
3251- **self.kwargs)
3252- self.producer = kombu.messaging.Producer(exchange=self.exchange,
3253- channel=channel,
3254- routing_key=self.routing_key)
3255-
3256- def send(self, msg, timeout=None):
3257- """Send a message."""
3258- if timeout:
3259- #
3260- # AMQP TTL is in milliseconds when set in the header.
3261- #
3262- self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
3263- else:
3264- self.producer.publish(msg)
3265-
3266-
3267-class DirectPublisher(Publisher):
3268- """Publisher class for 'direct'."""
3269- def __init__(self, conf, channel, msg_id, **kwargs):
3270- """Init a 'direct' publisher.
3271-
3272- Kombu options may be passed as keyword args to override defaults
3273- """
3274-
3275- options = {'durable': False,
3276- 'auto_delete': True,
3277- 'exclusive': False}
3278- options.update(kwargs)
3279- super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
3280- type='direct', **options)
3281-
3282-
3283-class TopicPublisher(Publisher):
3284- """Publisher class for 'topic'."""
3285- def __init__(self, conf, channel, topic, **kwargs):
3286- """Init a 'topic' publisher.
3287-
3288- Kombu options may be passed as keyword args to override defaults
3289- """
3290- options = {'durable': conf.amqp_durable_queues,
3291- 'auto_delete': conf.amqp_auto_delete,
3292- 'exclusive': False}
3293- options.update(kwargs)
3294- exchange_name = rpc_amqp.get_control_exchange(conf)
3295- super(TopicPublisher, self).__init__(channel,
3296- exchange_name,
3297- topic,
3298- type='topic',
3299- **options)
3300-
3301-
3302-class FanoutPublisher(Publisher):
3303- """Publisher class for 'fanout'."""
3304- def __init__(self, conf, channel, topic, **kwargs):
3305- """Init a 'fanout' publisher.
3306-
3307- Kombu options may be passed as keyword args to override defaults
3308- """
3309- options = {'durable': False,
3310- 'auto_delete': True,
3311- 'exclusive': False}
3312- options.update(kwargs)
3313- super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
3314- None, type='fanout', **options)
3315-
3316-
3317-class NotifyPublisher(TopicPublisher):
3318- """Publisher class for 'notify'."""
3319-
3320- def __init__(self, conf, channel, topic, **kwargs):
3321- self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
3322- self.queue_arguments = _get_queue_arguments(conf)
3323- super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
3324-
3325- def reconnect(self, channel):
3326- super(NotifyPublisher, self).reconnect(channel)
3327-
3328- # NOTE(jerdfelt): Normally the consumer would create the queue, but
3329- # we do this to ensure that messages don't get dropped if the
3330- # consumer is started after we do
3331- queue = kombu.entity.Queue(channel=channel,
3332- exchange=self.exchange,
3333- durable=self.durable,
3334- name=self.routing_key,
3335- routing_key=self.routing_key,
3336- queue_arguments=self.queue_arguments)
3337- queue.declare()
3338-
3339-
3340-class Connection(object):
3341- """Connection object."""
3342-
3343- pool = None
3344-
3345- def __init__(self, conf, server_params=None):
3346- self.consumers = []
3347- self.conf = conf
3348- self.max_retries = self.conf.rabbit_max_retries
3349- # Try forever?
3350- if self.max_retries <= 0:
3351- self.max_retries = None
3352- self.interval_start = self.conf.rabbit_retry_interval
3353- self.interval_stepping = self.conf.rabbit_retry_backoff
3354- # max retry-interval = 30 seconds
3355- self.interval_max = 30
3356- self.memory_transport = False
3357-
3358- if server_params is None:
3359- server_params = {}
3360- # Keys to translate from server_params to kombu params
3361- server_params_to_kombu_params = {'username': 'userid'}
3362-
3363- ssl_params = self._fetch_ssl_params()
3364- params_list = []
3365- for adr in self.conf.rabbit_hosts:
3366- hostname, port = network_utils.parse_host_port(
3367- adr, default_port=self.conf.rabbit_port)
3368-
3369- params = {
3370- 'hostname': hostname,
3371- 'port': port,
3372- 'userid': self.conf.rabbit_userid,
3373- 'password': self.conf.rabbit_password,
3374- 'login_method': self.conf.rabbit_login_method,
3375- 'virtual_host': self.conf.rabbit_virtual_host,
3376- }
3377-
3378- for sp_key, value in six.iteritems(server_params):
3379- p_key = server_params_to_kombu_params.get(sp_key, sp_key)
3380- params[p_key] = value
3381-
3382- if self.conf.fake_rabbit:
3383- params['transport'] = 'memory'
3384- if self.conf.rabbit_use_ssl:
3385- params['ssl'] = ssl_params
3386-
3387- params_list.append(params)
3388-
3389- self.params_list = itertools.cycle(params_list)
3390-
3391- self.memory_transport = self.conf.fake_rabbit
3392-
3393- self.connection = None
3394- self.do_consume = None
3395- self.reconnect()
3396-
3397- # FIXME(markmc): use oslo sslutils when it is available as a library
3398- _SSL_PROTOCOLS = {
3399- "tlsv1": ssl.PROTOCOL_TLSv1,
3400- "sslv23": ssl.PROTOCOL_SSLv23,
3401- "sslv3": ssl.PROTOCOL_SSLv3
3402- }
3403-
3404- try:
3405- _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
3406- except AttributeError:
3407- pass
3408-
3409- @classmethod
3410- def validate_ssl_version(cls, version):
3411- key = version.lower()
3412- try:
3413- return cls._SSL_PROTOCOLS[key]
3414- except KeyError:
3415- raise RuntimeError(_("Invalid SSL version : %s") % version)
3416-
3417- def _fetch_ssl_params(self):
3418- """Handles fetching what ssl params should be used for the connection
3419- (if any).
3420- """
3421- ssl_params = dict()
3422-
3423- # http://docs.python.org/library/ssl.html - ssl.wrap_socket
3424- if self.conf.kombu_ssl_version:
3425- ssl_params['ssl_version'] = self.validate_ssl_version(
3426- self.conf.kombu_ssl_version)
3427- if self.conf.kombu_ssl_keyfile:
3428- ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
3429- if self.conf.kombu_ssl_certfile:
3430- ssl_params['certfile'] = self.conf.kombu_ssl_certfile
3431- if self.conf.kombu_ssl_ca_certs:
3432- ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
3433- # We might want to allow variations in the
3434- # future with this?
3435- ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
3436-
3437- # Return the extended behavior or just have the default behavior
3438- return ssl_params or True
3439-
3440- def _connect(self, params):
3441- """Connect to rabbit. Re-establish any queues that may have
3442- been declared before if we are reconnecting. Exceptions should
3443- be handled by the caller.
3444- """
3445- if self.connection:
3446- LOG.info(_("Reconnecting to AMQP server on "
3447- "%(hostname)s:%(port)d") % params)
3448- try:
3449- # XXX(nic): when reconnecting to a RabbitMQ cluster
3450- # with mirrored queues in use, the attempt to release the
3451- # connection can hang "indefinitely" somewhere deep down
3452- # in Kombu. Blocking the thread for a bit prior to
3453- # release seems to kludge around the problem where it is
3454- # otherwise reproduceable.
3455- if self.conf.kombu_reconnect_delay > 0:
3456- LOG.info(_("Delaying reconnect for %1.1f seconds...") %
3457- self.conf.kombu_reconnect_delay)
3458- time.sleep(self.conf.kombu_reconnect_delay)
3459-
3460- self.connection.release()
3461- except self.connection_errors:
3462- pass
3463- # Setting this in case the next statement fails, though
3464- # it shouldn't be doing any network operations, yet.
3465- self.connection = None
3466- self.connection = kombu.connection.BrokerConnection(**params)
3467- self.connection_errors = self.connection.connection_errors
3468- self.channel_errors = self.connection.channel_errors
3469- if self.memory_transport:
3470- # Kludge to speed up tests.
3471- self.connection.transport.polling_interval = 0.0
3472- self.do_consume = True
3473- self.consumer_num = itertools.count(1)
3474- self.connection.connect()
3475- self.channel = self.connection.channel()
3476- # work around 'memory' transport bug in 1.1.3
3477- if self.memory_transport:
3478- self.channel._new_queue('ae.undeliver')
3479- for consumer in self.consumers:
3480- consumer.reconnect(self.channel)
3481- LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
3482- params)
3483-
3484- def reconnect(self):
3485- """Handles reconnecting and re-establishing queues.
3486- Will retry up to self.max_retries number of times.
3487- self.max_retries = 0 means to retry forever.
3488- Sleep between tries, starting at self.interval_start
3489- seconds, backing off self.interval_stepping number of seconds
3490- each attempt.
3491- """
3492-
3493- attempt = 0
3494- while True:
3495- params = six.next(self.params_list)
3496- attempt += 1
3497- try:
3498- self._connect(params)
3499- return
3500- except IOError as e:
3501- pass
3502- except self.connection_errors as e:
3503- pass
3504- except Exception as e:
3505- # NOTE(comstud): Unfortunately it's possible for amqplib
3506- # to return an error not covered by its transport
3507- # connection_errors in the case of a timeout waiting for
3508- # a protocol response. (See paste link in LP888621)
3509- # So, we check all exceptions for 'timeout' in them
3510- # and try to reconnect in this case.
3511- if 'timeout' not in str(e):
3512- raise
3513-
3514- log_info = {}
3515- log_info['err_str'] = str(e)
3516- log_info['max_retries'] = self.max_retries
3517- log_info.update(params)
3518-
3519- if self.max_retries and attempt == self.max_retries:
3520- msg = _('Unable to connect to AMQP server on '
3521- '%(hostname)s:%(port)d after %(max_retries)d '
3522- 'tries: %(err_str)s') % log_info
3523- LOG.error(msg)
3524- raise rpc_common.RPCException(msg)
3525-
3526- if attempt == 1:
3527- sleep_time = self.interval_start or 1
3528- elif attempt > 1:
3529- sleep_time += self.interval_stepping
3530- if self.interval_max:
3531- sleep_time = min(sleep_time, self.interval_max)
3532-
3533- log_info['sleep_time'] = sleep_time
3534- LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
3535- 'unreachable: %(err_str)s. Trying again in '
3536- '%(sleep_time)d seconds.') % log_info)
3537- time.sleep(sleep_time)
3538-
3539- def ensure(self, error_callback, method, *args, **kwargs):
3540- while True:
3541- try:
3542- return method(*args, **kwargs)
3543- except self.connection_errors as e:
3544- if error_callback:
3545- error_callback(e)
3546- except self.channel_errors as e:
3547- if error_callback:
3548- error_callback(e)
3549- except (socket.timeout, IOError) as e:
3550- if error_callback:
3551- error_callback(e)
3552- except Exception as e:
3553- # NOTE(comstud): Unfortunately it's possible for amqplib
3554- # to return an error not covered by its transport
3555- # connection_errors in the case of a timeout waiting for
3556- # a protocol response. (See paste link in LP888621)
3557- # So, we check all exceptions for 'timeout' in them
3558- # and try to reconnect in this case.
3559- if 'timeout' not in str(e):
3560- raise
3561- if error_callback:
3562- error_callback(e)
3563- self.reconnect()
3564-
3565- def get_channel(self):
3566- """Convenience call for bin/clear_rabbit_queues."""
3567- return self.channel
3568-
3569- def close(self):
3570- """Close/release this connection."""
3571- self.connection.release()
3572- self.connection = None
3573-
3574- def reset(self):
3575- """Reset a connection so it can be used again."""
3576- self.channel.close()
3577- self.channel = self.connection.channel()
3578- # work around 'memory' transport bug in 1.1.3
3579- if self.memory_transport:
3580- self.channel._new_queue('ae.undeliver')
3581- self.consumers = []
3582-
3583- def declare_consumer(self, consumer_cls, topic, callback):
3584- """Create a Consumer using the class that was passed in and
3585- add it to our list of consumers
3586- """
3587-
3588- def _connect_error(exc):
3589- log_info = {'topic': topic, 'err_str': str(exc)}
3590- LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
3591- "%(err_str)s") % log_info)
3592-
3593- def _declare_consumer():
3594- consumer = consumer_cls(self.conf, self.channel, topic, callback,
3595- six.next(self.consumer_num))
3596- self.consumers.append(consumer)
3597- return consumer
3598-
3599- return self.ensure(_connect_error, _declare_consumer)
3600-
3601- def iterconsume(self, limit=None, timeout=None):
3602- """Return an iterator that will consume from all queues/consumers."""
3603- timer = rpc_common.DecayingTimer(duration=timeout)
3604- timer.start()
3605-
3606- def _raise_timeout(exc):
3607- LOG.debug('Timed out waiting for RPC response: %s', exc)
3608- raise rpc_common.Timeout()
3609-
3610- def _error_callback(exc):
3611- self.do_consume = True
3612- timer.check_return(_raise_timeout, exc)
3613- LOG.exception(_('Failed to consume message from queue: %s'),
3614- exc)
3615-
3616- def _consume():
3617- if self.do_consume:
3618- queues_head = self.consumers[:-1] # not fanout.
3619- queues_tail = self.consumers[-1] # fanout
3620- for queue in queues_head:
3621- queue.consume(nowait=True)
3622- queues_tail.consume(nowait=False)
3623- self.do_consume = False
3624-
3625- poll_timeout = 1 if timeout is None else min(timeout, 1)
3626- while True:
3627- try:
3628- return self.connection.drain_events(timeout=poll_timeout)
3629- except socket.timeout as exc:
3630- poll_timeout = timer.check_return(_raise_timeout, exc,
3631- maximum=1)
3632-
3633- for iteration in itertools.count(0):
3634- if limit and iteration >= limit:
3635- raise StopIteration
3636- yield self.ensure(_error_callback, _consume)
3637-
3638- def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
3639- """Send to a publisher based on the publisher class."""
3640-
3641- def _error_callback(exc):
3642- log_info = {'topic': topic, 'err_str': str(exc)}
3643- LOG.exception(_("Failed to publish message to topic "
3644- "'%(topic)s': %(err_str)s") % log_info)
3645-
3646- def _publish():
3647- publisher = cls(self.conf, self.channel, topic, **kwargs)
3648- publisher.send(msg, timeout)
3649-
3650- self.ensure(_error_callback, _publish)
3651-
3652- def declare_direct_consumer(self, topic, callback):
3653- """Create a 'direct' queue.
3654- In nova's use, this is generally a msg_id queue used for
3655- responses for call/multicall
3656- """
3657- self.declare_consumer(DirectConsumer, topic, callback)
3658-
3659- def declare_topic_consumer(self, topic, callback=None, queue_name=None,
3660- exchange_name=None):
3661- """Create a 'topic' consumer."""
3662- self.declare_consumer(functools.partial(TopicConsumer,
3663- name=queue_name,
3664- exchange_name=exchange_name,
3665- ),
3666- topic, callback)
3667-
3668- def declare_fanout_consumer(self, topic, callback):
3669- """Create a 'fanout' consumer."""
3670- self.declare_consumer(FanoutConsumer, topic, callback)
3671-
3672- def direct_send(self, msg_id, msg):
3673- """Send a 'direct' message."""
3674- self.publisher_send(DirectPublisher, msg_id, msg)
3675-
3676- def topic_send(self, topic, msg, timeout=None):
3677- """Send a 'topic' message."""
3678- self.publisher_send(TopicPublisher, topic, msg, timeout)
3679-
3680- def fanout_send(self, topic, msg):
3681- """Send a 'fanout' message."""
3682- self.publisher_send(FanoutPublisher, topic, msg)
3683-
3684- def notify_send(self, topic, msg, **kwargs):
3685- """Send a notify message on a topic."""
3686- self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
3687-
3688- def consume(self, limit=None, timeout=None):
3689- """Consume from all queues/consumers."""
3690- it = self.iterconsume(limit=limit, timeout=timeout)
3691- while True:
3692- try:
3693- six.next(it)
3694- except StopIteration:
3695- return
3696-
3697-
3698-class RabbitDriver(amqpdriver.AMQPDriverBase):
3699-
3700- def __init__(self, conf, url, default_exchange=None,
3701- allowed_remote_exmods=[]):
3702- conf.register_opts(rabbit_opts)
3703- conf.register_opts(rpc_amqp.amqp_opts)
3704-
3705- connection_pool = rpc_amqp.get_connection_pool(conf, Connection)
3706-
3707- super(RabbitDriver, self).__init__(conf, url,
3708- connection_pool,
3709- default_exchange,
3710- allowed_remote_exmods)
3711-
3712- def require_features(self, requeue=True):
3713- pass
3714
3715=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests'
3716=== removed file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_rabbit.py'
3717--- .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_rabbit.py 2015-04-23 15:56:08 +0000
3718+++ .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_rabbit.py 1970-01-01 00:00:00 +0000
3719@@ -1,646 +0,0 @@
3720-# Copyright 2013 Red Hat, Inc.
3721-#
3722-# Licensed under the Apache License, Version 2.0 (the "License"); you may
3723-# not use this file except in compliance with the License. You may obtain
3724-# a copy of the License at
3725-#
3726-# http://www.apache.org/licenses/LICENSE-2.0
3727-#
3728-# Unless required by applicable law or agreed to in writing, software
3729-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
3730-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
3731-# License for the specific language governing permissions and limitations
3732-# under the License.
3733-
3734-import datetime
3735-import sys
3736-import threading
3737-import uuid
3738-
3739-import fixtures
3740-import kombu
3741-import testscenarios
3742-
3743-from oslo import messaging
3744-from oslo.messaging._drivers import amqpdriver
3745-from oslo.messaging._drivers import common as driver_common
3746-from oslo.messaging._drivers import impl_rabbit as rabbit_driver
3747-from oslo.messaging.openstack.common import jsonutils
3748-from tests import utils as test_utils
3749-
3750-load_tests = testscenarios.load_tests_apply_scenarios
3751-
3752-
3753-class TestRabbitDriverLoad(test_utils.BaseTestCase):
3754-
3755- def setUp(self):
3756- super(TestRabbitDriverLoad, self).setUp()
3757- self.messaging_conf.transport_driver = 'rabbit'
3758- self.messaging_conf.in_memory = True
3759-
3760- def test_driver_load(self):
3761- transport = messaging.get_transport(self.conf)
3762- self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
3763-
3764-
3765-class TestRabbitTransportURL(test_utils.BaseTestCase):
3766-
3767- scenarios = [
3768- ('none', dict(url=None, expected=None)),
3769- ('empty',
3770- dict(url='rabbit:///',
3771- expected=dict(virtual_host=''))),
3772- ('localhost',
3773- dict(url='rabbit://localhost/',
3774- expected=dict(hostname='localhost',
3775- username='',
3776- password='',
3777- virtual_host=''))),
3778- ('virtual_host',
3779- dict(url='rabbit:///vhost',
3780- expected=dict(virtual_host='vhost'))),
3781- ('no_creds',
3782- dict(url='rabbit://host/virtual_host',
3783- expected=dict(hostname='host',
3784- username='',
3785- password='',
3786- virtual_host='virtual_host'))),
3787- ('no_port',
3788- dict(url='rabbit://user:password@host/virtual_host',
3789- expected=dict(hostname='host',
3790- username='user',
3791- password='password',
3792- virtual_host='virtual_host'))),
3793- ('full_url',
3794- dict(url='rabbit://user:password@host:10/virtual_host',
3795- expected=dict(hostname='host',
3796- port=10,
3797- username='user',
3798- password='password',
3799- virtual_host='virtual_host'))),
3800- ]
3801-
3802- def setUp(self):
3803- super(TestRabbitTransportURL, self).setUp()
3804-
3805- self.messaging_conf.transport_driver = 'rabbit'
3806- self.messaging_conf.in_memory = True
3807-
3808- self._server_params = []
3809- cnx_init = rabbit_driver.Connection.__init__
3810-
3811- def record_params(cnx, conf, server_params=None):
3812- self._server_params.append(server_params)
3813- return cnx_init(cnx, conf, server_params)
3814-
3815- def dummy_send(cnx, topic, msg, timeout=None):
3816- pass
3817-
3818- self.stubs.Set(rabbit_driver.Connection, '__init__', record_params)
3819- self.stubs.Set(rabbit_driver.Connection, 'topic_send', dummy_send)
3820-
3821- self._driver = messaging.get_transport(self.conf, self.url)._driver
3822- self._target = messaging.Target(topic='testtopic')
3823-
3824- def test_transport_url_listen(self):
3825- self._driver.listen(self._target)
3826- self.assertEqual(self._server_params[0], self.expected)
3827-
3828- def test_transport_url_listen_for_notification(self):
3829- self._driver.listen_for_notifications(
3830- [(messaging.Target(topic='topic'), 'info')])
3831- self.assertEqual(self._server_params[0], self.expected)
3832-
3833- def test_transport_url_send(self):
3834- self._driver.send(self._target, {}, {})
3835- self.assertEqual(self._server_params[0], self.expected)
3836-
3837-
3838-class TestSendReceive(test_utils.BaseTestCase):
3839-
3840- _n_senders = [
3841- ('single_sender', dict(n_senders=1)),
3842- ('multiple_senders', dict(n_senders=10)),
3843- ]
3844-
3845- _context = [
3846- ('empty_context', dict(ctxt={})),
3847- ('with_context', dict(ctxt={'user': 'mark'})),
3848- ]
3849-
3850- _reply = [
3851- ('rx_id', dict(rx_id=True, reply=None)),
3852- ('none', dict(rx_id=False, reply=None)),
3853- ('empty_list', dict(rx_id=False, reply=[])),
3854- ('empty_dict', dict(rx_id=False, reply={})),
3855- ('false', dict(rx_id=False, reply=False)),
3856- ('zero', dict(rx_id=False, reply=0)),
3857- ]
3858-
3859- _failure = [
3860- ('success', dict(failure=False)),
3861- ('failure', dict(failure=True, expected=False)),
3862- ('expected_failure', dict(failure=True, expected=True)),
3863- ]
3864-
3865- _timeout = [
3866- ('no_timeout', dict(timeout=None)),
3867- ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
3868- ]
3869-
3870- @classmethod
3871- def generate_scenarios(cls):
3872- cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
3873- cls._context,
3874- cls._reply,
3875- cls._failure,
3876- cls._timeout)
3877-
3878- def setUp(self):
3879- super(TestSendReceive, self).setUp()
3880- self.messaging_conf.transport_driver = 'rabbit'
3881- self.messaging_conf.in_memory = True
3882-
3883- def test_send_receive(self):
3884- transport = messaging.get_transport(self.conf)
3885- self.addCleanup(transport.cleanup)
3886-
3887- driver = transport._driver
3888-
3889- target = messaging.Target(topic='testtopic')
3890-
3891- listener = driver.listen(target)
3892-
3893- senders = []
3894- replies = []
3895- msgs = []
3896- errors = []
3897-
3898- def stub_error(msg, *a, **kw):
3899- if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
3900- a = a[0]
3901- errors.append(str(msg) % a)
3902-
3903- self.stubs.Set(driver_common.LOG, 'error', stub_error)
3904-
3905- def send_and_wait_for_reply(i):
3906- try:
3907- replies.append(driver.send(target,
3908- self.ctxt,
3909- {'tx_id': i},
3910- wait_for_reply=True,
3911- timeout=self.timeout))
3912- self.assertFalse(self.failure)
3913- self.assertIsNone(self.timeout)
3914- except (ZeroDivisionError, messaging.MessagingTimeout) as e:
3915- replies.append(e)
3916- self.assertTrue(self.failure or self.timeout is not None)
3917-
3918- while len(senders) < self.n_senders:
3919- senders.append(threading.Thread(target=send_and_wait_for_reply,
3920- args=(len(senders), )))
3921-
3922- for i in range(len(senders)):
3923- senders[i].start()
3924-
3925- received = listener.poll()
3926- self.assertIsNotNone(received)
3927- self.assertEqual(received.ctxt, self.ctxt)
3928- self.assertEqual(received.message, {'tx_id': i})
3929- msgs.append(received)
3930-
3931- # reply in reverse, except reply to the first guy second from last
3932- order = list(range(len(senders)-1, -1, -1))
3933- if len(order) > 1:
3934- order[-1], order[-2] = order[-2], order[-1]
3935-
3936- for i in order:
3937- if self.timeout is None:
3938- if self.failure:
3939- try:
3940- raise ZeroDivisionError
3941- except Exception:
3942- failure = sys.exc_info()
3943- msgs[i].reply(failure=failure,
3944- log_failure=not self.expected)
3945- elif self.rx_id:
3946- msgs[i].reply({'rx_id': i})
3947- else:
3948- msgs[i].reply(self.reply)
3949- senders[i].join()
3950-
3951- self.assertEqual(len(replies), len(senders))
3952- for i, reply in enumerate(replies):
3953- if self.timeout is not None:
3954- self.assertIsInstance(reply, messaging.MessagingTimeout)
3955- elif self.failure:
3956- self.assertIsInstance(reply, ZeroDivisionError)
3957- elif self.rx_id:
3958- self.assertEqual(reply, {'rx_id': order[i]})
3959- else:
3960- self.assertEqual(reply, self.reply)
3961-
3962- if not self.timeout and self.failure and not self.expected:
3963- self.assertTrue(len(errors) > 0, errors)
3964- else:
3965- self.assertEqual(len(errors), 0, errors)
3966-
3967-
3968-TestSendReceive.generate_scenarios()
3969-
3970-
3971-class TestRacyWaitForReply(test_utils.BaseTestCase):
3972-
3973- def setUp(self):
3974- super(TestRacyWaitForReply, self).setUp()
3975- self.messaging_conf.transport_driver = 'rabbit'
3976- self.messaging_conf.in_memory = True
3977-
3978- def test_send_receive(self):
3979- transport = messaging.get_transport(self.conf)
3980- self.addCleanup(transport.cleanup)
3981-
3982- driver = transport._driver
3983-
3984- target = messaging.Target(topic='testtopic')
3985-
3986- listener = driver.listen(target)
3987-
3988- senders = []
3989- replies = []
3990- msgs = []
3991-
3992- wait_conditions = []
3993- orig_reply_waiter = amqpdriver.ReplyWaiter.wait
3994-
3995- def reply_waiter(self, msg_id, timeout):
3996- if wait_conditions:
3997- with wait_conditions[0]:
3998- wait_conditions.pop().wait()
3999- return orig_reply_waiter(self, msg_id, timeout)
4000-
4001- self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
4002-
4003- def send_and_wait_for_reply(i):
4004- replies.append(driver.send(target,
4005- {},
4006- {'tx_id': i},
4007- wait_for_reply=True,
4008- timeout=None))
4009-
4010- while len(senders) < 2:
4011- t = threading.Thread(target=send_and_wait_for_reply,
4012- args=(len(senders), ))
4013- t.daemon = True
4014- senders.append(t)
4015-
4016- # Start the first guy, receive his message, but delay his polling
4017- notify_condition = threading.Condition()
4018- wait_conditions.append(notify_condition)
4019- senders[0].start()
4020-
4021- msgs.append(listener.poll())
4022- self.assertEqual(msgs[-1].message, {'tx_id': 0})
4023-
4024- # Start the second guy, receive his message
4025- senders[1].start()
4026-
4027- msgs.append(listener.poll())
4028- self.assertEqual(msgs[-1].message, {'tx_id': 1})
4029-
4030- # Reply to both in order, making the second thread queue
4031- # the reply meant for the first thread
4032- msgs[0].reply({'rx_id': 0})
4033- msgs[1].reply({'rx_id': 1})
4034-
4035- # Wait for the second thread to finish
4036- senders[1].join()
4037-
4038- # Let the first thread continue
4039- with notify_condition:
4040- notify_condition.notify()
4041-
4042- # Wait for the first thread to finish
4043- senders[0].join()
4044-
4045- # Verify replies were received out of order
4046- self.assertEqual(len(replies), len(senders))
4047- self.assertEqual(replies[0], {'rx_id': 1})
4048- self.assertEqual(replies[1], {'rx_id': 0})
4049-
4050-
4051-def _declare_queue(target):
4052- connection = kombu.connection.BrokerConnection(transport='memory')
4053-
4054- # Kludge to speed up tests.
4055- connection.transport.polling_interval = 0.0
4056-
4057- connection.connect()
4058- channel = connection.channel()
4059-
4060- # work around 'memory' transport bug in 1.1.3
4061- channel._new_queue('ae.undeliver')
4062-
4063- if target.fanout:
4064- exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
4065- type='fanout',
4066- durable=False,
4067- auto_delete=True)
4068- queue = kombu.entity.Queue(name=target.topic + '_fanout_12345',
4069- channel=channel,
4070- exchange=exchange,
4071- routing_key=target.topic)
4072- if target.server:
4073- exchange = kombu.entity.Exchange(name='openstack',
4074- type='topic',
4075- durable=False,
4076- auto_delete=False)
4077- topic = '%s.%s' % (target.topic, target.server)
4078- queue = kombu.entity.Queue(name=topic,
4079- channel=channel,
4080- exchange=exchange,
4081- routing_key=topic)
4082- else:
4083- exchange = kombu.entity.Exchange(name='openstack',
4084- type='topic',
4085- durable=False,
4086- auto_delete=False)
4087- queue = kombu.entity.Queue(name=target.topic,
4088- channel=channel,
4089- exchange=exchange,
4090- routing_key=target.topic)
4091-
4092- queue.declare()
4093-
4094- return connection, channel, queue
4095-
4096-
4097-class TestRequestWireFormat(test_utils.BaseTestCase):
4098-
4099- _target = [
4100- ('topic_target',
4101- dict(topic='testtopic', server=None, fanout=False)),
4102- ('server_target',
4103- dict(topic='testtopic', server='testserver', fanout=False)),
4104- # NOTE(markmc): https://github.com/celery/kombu/issues/195
4105- ('fanout_target',
4106- dict(topic='testtopic', server=None, fanout=True,
4107- skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
4108- ]
4109-
4110- _msg = [
4111- ('empty_msg',
4112- dict(msg={}, expected={})),
4113- ('primitive_msg',
4114- dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
4115- ('complex_msg',
4116- dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}},
4117- expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
4118- ]
4119-
4120- _context = [
4121- ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
4122- ('user_project_ctxt',
4123- dict(ctxt={'user': 'mark', 'project': 'snarkybunch'},
4124- expected_ctxt={'_context_user': 'mark',
4125- '_context_project': 'snarkybunch'})),
4126- ]
4127-
4128- @classmethod
4129- def generate_scenarios(cls):
4130- cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
4131- cls._context,
4132- cls._target)
4133-
4134- def setUp(self):
4135- super(TestRequestWireFormat, self).setUp()
4136- self.messaging_conf.transport_driver = 'rabbit'
4137- self.messaging_conf.in_memory = True
4138-
4139- self.uuids = []
4140- self.orig_uuid4 = uuid.uuid4
4141- self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
4142-
4143- def mock_uuid4(self):
4144- self.uuids.append(self.orig_uuid4())
4145- return self.uuids[-1]
4146-
4147- def test_request_wire_format(self):
4148- if hasattr(self, 'skip_msg'):
4149- self.skipTest(self.skip_msg)
4150-
4151- transport = messaging.get_transport(self.conf)
4152- self.addCleanup(transport.cleanup)
4153-
4154- driver = transport._driver
4155-
4156- target = messaging.Target(topic=self.topic,
4157- server=self.server,
4158- fanout=self.fanout)
4159-
4160- connection, channel, queue = _declare_queue(target)
4161- self.addCleanup(connection.release)
4162-
4163- driver.send(target, self.ctxt, self.msg)
4164-
4165- msgs = []
4166-
4167- def callback(msg):
4168- msg = channel.message_to_python(msg)
4169- msg.ack()
4170- msgs.append(msg.payload)
4171-
4172- queue.consume(callback=callback,
4173- consumer_tag='1',
4174- nowait=False)
4175-
4176- connection.drain_events()
4177-
4178- self.assertEqual(1, len(msgs))
4179- self.assertIn('oslo.message', msgs[0])
4180-
4181- received = msgs[0]
4182- received['oslo.message'] = jsonutils.loads(received['oslo.message'])
4183-
4184- # FIXME(markmc): add _msg_id and _reply_q check
4185- expected_msg = {
4186- '_unique_id': self.uuids[0].hex,
4187- }
4188- expected_msg.update(self.expected)
4189- expected_msg.update(self.expected_ctxt)
4190-
4191- expected = {
4192- 'oslo.version': '2.0',
4193- 'oslo.message': expected_msg,
4194- }
4195-
4196- self.assertEqual(expected, received)
4197-
4198-
4199-TestRequestWireFormat.generate_scenarios()
4200-
4201-
4202-def _create_producer(target):
4203- connection = kombu.connection.BrokerConnection(transport='memory')
4204-
4205- # Kludge to speed up tests.
4206- connection.transport.polling_interval = 0.0
4207-
4208- connection.connect()
4209- channel = connection.channel()
4210-
4211- # work around 'memory' transport bug in 1.1.3
4212- channel._new_queue('ae.undeliver')
4213-
4214- if target.fanout:
4215- exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
4216- type='fanout',
4217- durable=False,
4218- auto_delete=True)
4219- producer = kombu.messaging.Producer(exchange=exchange,
4220- channel=channel,
4221- routing_key=target.topic)
4222- elif target.server:
4223- exchange = kombu.entity.Exchange(name='openstack',
4224- type='topic',
4225- durable=False,
4226- auto_delete=False)
4227- topic = '%s.%s' % (target.topic, target.server)
4228- producer = kombu.messaging.Producer(exchange=exchange,
4229- channel=channel,
4230- routing_key=topic)
4231- else:
4232- exchange = kombu.entity.Exchange(name='openstack',
4233- type='topic',
4234- durable=False,
4235- auto_delete=False)
4236- producer = kombu.messaging.Producer(exchange=exchange,
4237- channel=channel,
4238- routing_key=target.topic)
4239-
4240- return connection, producer
4241-
4242-
4243-class TestReplyWireFormat(test_utils.BaseTestCase):
4244-
4245- _target = [
4246- ('topic_target',
4247- dict(topic='testtopic', server=None, fanout=False)),
4248- ('server_target',
4249- dict(topic='testtopic', server='testserver', fanout=False)),
4250- # NOTE(markmc): https://github.com/celery/kombu/issues/195
4251- ('fanout_target',
4252- dict(topic='testtopic', server=None, fanout=True,
4253- skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
4254- ]
4255-
4256- _msg = [
4257- ('empty_msg',
4258- dict(msg={}, expected={})),
4259- ('primitive_msg',
4260- dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
4261- ('complex_msg',
4262- dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}},
4263- expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
4264- ]
4265-
4266- _context = [
4267- ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
4268- ('user_project_ctxt',
4269- dict(ctxt={'_context_user': 'mark',
4270- '_context_project': 'snarkybunch'},
4271- expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
4272- ]
4273-
4274- @classmethod
4275- def generate_scenarios(cls):
4276- cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
4277- cls._context,
4278- cls._target)
4279-
4280- def setUp(self):
4281- super(TestReplyWireFormat, self).setUp()
4282- self.messaging_conf.transport_driver = 'rabbit'
4283- self.messaging_conf.in_memory = True
4284-
4285- def test_reply_wire_format(self):
4286- if hasattr(self, 'skip_msg'):
4287- self.skipTest(self.skip_msg)
4288-
4289- transport = messaging.get_transport(self.conf)
4290- self.addCleanup(transport.cleanup)
4291-
4292- driver = transport._driver
4293-
4294- target = messaging.Target(topic=self.topic,
4295- server=self.server,
4296- fanout=self.fanout)
4297-
4298- listener = driver.listen(target)
4299-
4300- connection, producer = _create_producer(target)
4301- self.addCleanup(connection.release)
4302-
4303- msg = {
4304- 'oslo.version': '2.0',
4305- 'oslo.message': {}
4306- }
4307-
4308- msg['oslo.message'].update(self.msg)
4309- msg['oslo.message'].update(self.ctxt)
4310-
4311- msg['oslo.message'].update({
4312- '_msg_id': uuid.uuid4().hex,
4313- '_unique_id': uuid.uuid4().hex,
4314- '_reply_q': 'reply_' + uuid.uuid4().hex,
4315- })
4316-
4317- msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
4318-
4319- producer.publish(msg)
4320-
4321- received = listener.poll()
4322- self.assertIsNotNone(received)
4323- self.assertEqual(self.expected_ctxt, received.ctxt)
4324- self.assertEqual(self.expected, received.message)
4325-
4326-
4327-TestReplyWireFormat.generate_scenarios()
4328-
4329-
4330-class RpcKombuHATestCase(test_utils.BaseTestCase):
4331-
4332- def test_reconnect_order(self):
4333- brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
4334- brokers_count = len(brokers)
4335-
4336- self.conf.rabbit_hosts = brokers
4337- self.conf.rabbit_max_retries = 1
4338-
4339- info = {'attempt': 0}
4340-
4341- def _connect(myself, params):
4342- # do as little work that is enough to pass connection attempt
4343- myself.connection = kombu.connection.BrokerConnection(**params)
4344- myself.connection_errors = myself.connection.connection_errors
4345-
4346- expected_broker = brokers[info['attempt'] % brokers_count]
4347- self.assertEqual(params['hostname'], expected_broker)
4348-
4349- info['attempt'] += 1
4350-
4351- # just make sure connection instantiation does not fail with an
4352- # exception
4353- self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
4354-
4355- # starting from the first broker in the list
4356- connection = rabbit_driver.Connection(self.conf)
4357-
4358- # now that we have connection object, revert to the real 'connect'
4359- # implementation
4360- self.stubs.UnsetAll()
4361-
4362- for i in range(len(brokers)):
4363- self.assertRaises(driver_common.RPCException, connection.reconnect)
4364-
4365- connection.close()
4366
4367=== removed file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py'
4368--- .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py 2015-04-23 15:56:08 +0000
4369+++ .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py 1970-01-01 00:00:00 +0000
4370@@ -1,64 +0,0 @@
4371-
4372-# Copyright 2013 Red Hat, Inc.
4373-#
4374-# Licensed under the Apache License, Version 2.0 (the "License"); you may
4375-# not use this file except in compliance with the License. You may obtain
4376-# a copy of the License at
4377-#
4378-# http://www.apache.org/licenses/LICENSE-2.0
4379-#
4380-# Unless required by applicable law or agreed to in writing, software
4381-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
4382-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
4383-# License for the specific language governing permissions and limitations
4384-# under the License.
4385-
4386-from oslo.messaging._drivers import common
4387-from oslo.messaging import _utils as utils
4388-from tests import utils as test_utils
4389-
4390-
4391-class VersionIsCompatibleTestCase(test_utils.BaseTestCase):
4392- def test_version_is_compatible_same(self):
4393- self.assertTrue(utils.version_is_compatible('1.23', '1.23'))
4394-
4395- def test_version_is_compatible_newer_minor(self):
4396- self.assertTrue(utils.version_is_compatible('1.24', '1.23'))
4397-
4398- def test_version_is_compatible_older_minor(self):
4399- self.assertFalse(utils.version_is_compatible('1.22', '1.23'))
4400-
4401- def test_version_is_compatible_major_difference1(self):
4402- self.assertFalse(utils.version_is_compatible('2.23', '1.23'))
4403-
4404- def test_version_is_compatible_major_difference2(self):
4405- self.assertFalse(utils.version_is_compatible('1.23', '2.23'))
4406-
4407- def test_version_is_compatible_newer_rev(self):
4408- self.assertFalse(utils.version_is_compatible('1.23', '1.23.1'))
4409-
4410- def test_version_is_compatible_newer_rev_both(self):
4411- self.assertFalse(utils.version_is_compatible('1.23.1', '1.23.2'))
4412-
4413- def test_version_is_compatible_older_rev_both(self):
4414- self.assertTrue(utils.version_is_compatible('1.23.2', '1.23.1'))
4415-
4416- def test_version_is_compatible_older_rev(self):
4417- self.assertTrue(utils.version_is_compatible('1.24', '1.23.1'))
4418-
4419- def test_version_is_compatible_no_rev_is_zero(self):
4420- self.assertTrue(utils.version_is_compatible('1.23.0', '1.23'))
4421-
4422-
4423-class TimerTestCase(test_utils.BaseTestCase):
4424- def test_duration_is_none(self):
4425- t = common.DecayingTimer()
4426- t.start()
4427- remaining = t.check_return(None)
4428- self.assertEqual(None, remaining)
4429-
4430- def test_duration_is_none_and_maximun_set(self):
4431- t = common.DecayingTimer()
4432- t.start()
4433- remaining = t.check_return(None, maximum=2)
4434- self.assertEqual(2, remaining)
4435
4436=== renamed file '.pc/applied-patches' => '.pc/applied-patches.THIS'
4437=== removed directory '.pc/redeclare-consumers-when-ack-requeue-fails.patch'
4438=== removed directory '.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo'
4439=== removed directory '.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging'
4440=== removed directory '.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers'
4441=== removed file '.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers/impl_rabbit.py'
4442--- .pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-06-25 09:59:42 +0000
4443+++ .pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
4444@@ -1,819 +0,0 @@
4445-# Copyright 2011 OpenStack Foundation
4446-#
4447-# Licensed under the Apache License, Version 2.0 (the "License"); you may
4448-# not use this file except in compliance with the License. You may obtain
4449-# a copy of the License at
4450-#
4451-# http://www.apache.org/licenses/LICENSE-2.0
4452-#
4453-# Unless required by applicable law or agreed to in writing, software
4454-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
4455-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
4456-# License for the specific language governing permissions and limitations
4457-# under the License.
4458-
4459-import functools
4460-import itertools
4461-import logging
4462-import socket
4463-import ssl
4464-import time
4465-import uuid
4466-
4467-import kombu
4468-import kombu.connection
4469-import kombu.entity
4470-import kombu.messaging
4471-from oslo.config import cfg
4472-import six
4473-
4474-from oslo.messaging._drivers import amqp as rpc_amqp
4475-from oslo.messaging._drivers import amqpdriver
4476-from oslo.messaging._drivers import common as rpc_common
4477-from oslo.messaging.openstack.common import network_utils
4478-
4479-# FIXME(markmc): remove this
4480-_ = lambda s: s
4481-
4482-rabbit_opts = [
4483- cfg.StrOpt('kombu_ssl_version',
4484- default='',
4485- help='SSL version to use (valid only if SSL enabled). '
4486- 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
4487- 'be available on some distributions.'
4488- ),
4489- cfg.StrOpt('kombu_ssl_keyfile',
4490- default='',
4491- help='SSL key file (valid only if SSL enabled).'),
4492- cfg.StrOpt('kombu_ssl_certfile',
4493- default='',
4494- help='SSL cert file (valid only if SSL enabled).'),
4495- cfg.StrOpt('kombu_ssl_ca_certs',
4496- default='',
4497- help=('SSL certification authority file '
4498- '(valid only if SSL enabled).')),
4499- cfg.FloatOpt('kombu_reconnect_delay',
4500- default=1.0,
4501- help='How long to wait before reconnecting in response to an '
4502- 'AMQP consumer cancel notification.'),
4503- cfg.StrOpt('rabbit_host',
4504- default='localhost',
4505- help='The RabbitMQ broker address where a single node is '
4506- 'used.'),
4507- cfg.IntOpt('rabbit_port',
4508- default=5672,
4509- help='The RabbitMQ broker port where a single node is used.'),
4510- cfg.ListOpt('rabbit_hosts',
4511- default=['$rabbit_host:$rabbit_port'],
4512- help='RabbitMQ HA cluster host:port pairs.'),
4513- cfg.BoolOpt('rabbit_use_ssl',
4514- default=False,
4515- help='Connect over SSL for RabbitMQ.'),
4516- cfg.StrOpt('rabbit_userid',
4517- default='guest',
4518- help='The RabbitMQ userid.'),
4519- cfg.StrOpt('rabbit_password',
4520- default='guest',
4521- help='The RabbitMQ password.',
4522- secret=True),
4523- cfg.StrOpt('rabbit_login_method',
4524- default='AMQPLAIN',
4525- help='the RabbitMQ login method'),
4526- cfg.StrOpt('rabbit_virtual_host',
4527- default='/',
4528- help='The RabbitMQ virtual host.'),
4529- cfg.IntOpt('rabbit_retry_interval',
4530- default=1,
4531- help='How frequently to retry connecting with RabbitMQ.'),
4532- cfg.IntOpt('rabbit_retry_backoff',
4533- default=2,
4534- help='How long to backoff for between retries when connecting '
4535- 'to RabbitMQ.'),
4536- cfg.IntOpt('rabbit_max_retries',
4537- default=0,
4538- help='Maximum number of RabbitMQ connection retries. '
4539- 'Default is 0 (infinite retry count).'),
4540- cfg.BoolOpt('rabbit_ha_queues',
4541- default=False,
4542- help='Use HA queues in RabbitMQ (x-ha-policy: all). '
4543- 'If you change this option, you must wipe the '
4544- 'RabbitMQ database.'),
4545-
4546- # FIXME(markmc): this was toplevel in openstack.common.rpc
4547- cfg.BoolOpt('fake_rabbit',
4548- default=False,
4549- help='If passed, use a fake RabbitMQ provider.'),
4550-]
4551-
4552-LOG = logging.getLogger(__name__)
4553-
4554-
4555-def _get_queue_arguments(conf):
4556- """Construct the arguments for declaring a queue.
4557-
4558- If the rabbit_ha_queues option is set, we declare a mirrored queue
4559- as described here:
4560-
4561- http://www.rabbitmq.com/ha.html
4562-
4563- Setting x-ha-policy to all means that the queue will be mirrored
4564- to all nodes in the cluster.
4565- """
4566- return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
4567-
4568-
4569-class RabbitMessage(dict):
4570- def __init__(self, raw_message):
4571- super(RabbitMessage, self).__init__(
4572- rpc_common.deserialize_msg(raw_message.payload))
4573- self._raw_message = raw_message
4574-
4575- def acknowledge(self):
4576- self._raw_message.ack()
4577-
4578- def requeue(self):
4579- self._raw_message.requeue()
4580-
4581-
4582-class ConsumerBase(object):
4583- """Consumer base class."""
4584-
4585- def __init__(self, channel, callback, tag, **kwargs):
4586- """Declare a queue on an amqp channel.
4587-
4588- 'channel' is the amqp channel to use
4589- 'callback' is the callback to call when messages are received
4590- 'tag' is a unique ID for the consumer on the channel
4591-
4592- queue name, exchange name, and other kombu options are
4593- passed in here as a dictionary.
4594- """
4595- self.callback = callback
4596- self.tag = str(tag)
4597- self.kwargs = kwargs
4598- self.queue = None
4599- self.reconnect(channel)
4600-
4601- def reconnect(self, channel):
4602- """Re-declare the queue after a rabbit reconnect."""
4603- self.channel = channel
4604- self.kwargs['channel'] = channel
4605- self.queue = kombu.entity.Queue(**self.kwargs)
4606- self.queue.declare()
4607-
4608- def _callback_handler(self, message, callback):
4609- """Call callback with deserialized message.
4610-
4611- Messages that are processed and ack'ed.
4612- """
4613-
4614- try:
4615- callback(RabbitMessage(message))
4616- except Exception:
4617- LOG.exception(_("Failed to process message"
4618- " ... skipping it."))
4619- message.ack()
4620-
4621- def consume(self, *args, **kwargs):
4622- """Actually declare the consumer on the amqp channel. This will
4623- start the flow of messages from the queue. Using the
4624- Connection.iterconsume() iterator will process the messages,
4625- calling the appropriate callback.
4626-
4627- If a callback is specified in kwargs, use that. Otherwise,
4628- use the callback passed during __init__()
4629-
4630- If kwargs['nowait'] is True, then this call will block until
4631- a message is read.
4632-
4633- """
4634-
4635- options = {'consumer_tag': self.tag}
4636- options['nowait'] = kwargs.get('nowait', False)
4637- callback = kwargs.get('callback', self.callback)
4638- if not callback:
4639- raise ValueError("No callback defined")
4640-
4641- def _callback(raw_message):
4642- message = self.channel.message_to_python(raw_message)
4643- self._callback_handler(message, callback)
4644-
4645- self.queue.consume(*args, callback=_callback, **options)
4646-
4647- def cancel(self):
4648- """Cancel the consuming from the queue, if it has started."""
4649- try:
4650- self.queue.cancel(self.tag)
4651- except KeyError as e:
4652- # NOTE(comstud): Kludge to get around a amqplib bug
4653- if str(e) != "u'%s'" % self.tag:
4654- raise
4655- self.queue = None
4656-
4657-
4658-class DirectConsumer(ConsumerBase):
4659- """Queue/consumer class for 'direct'."""
4660-
4661- def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
4662- """Init a 'direct' queue.
4663-
4664- 'channel' is the amqp channel to use
4665- 'msg_id' is the msg_id to listen on
4666- 'callback' is the callback to call when messages are received
4667- 'tag' is a unique ID for the consumer on the channel
4668-
4669- Other kombu options may be passed
4670- """
4671- # Default options
4672- options = {'durable': False,
4673- 'queue_arguments': _get_queue_arguments(conf),
4674- 'auto_delete': True,
4675- 'exclusive': False}
4676- options.update(kwargs)
4677- exchange = kombu.entity.Exchange(name=msg_id,
4678- type='direct',
4679- durable=options['durable'],
4680- auto_delete=options['auto_delete'])
4681- super(DirectConsumer, self).__init__(channel,
4682- callback,
4683- tag,
4684- name=msg_id,
4685- exchange=exchange,
4686- routing_key=msg_id,
4687- **options)
4688-
4689-
4690-class TopicConsumer(ConsumerBase):
4691- """Consumer class for 'topic'."""
4692-
4693- def __init__(self, conf, channel, topic, callback, tag, name=None,
4694- exchange_name=None, **kwargs):
4695- """Init a 'topic' queue.
4696-
4697- :param channel: the amqp channel to use
4698- :param topic: the topic to listen on
4699- :paramtype topic: str
4700- :param callback: the callback to call when messages are received
4701- :param tag: a unique ID for the consumer on the channel
4702- :param name: optional queue name, defaults to topic
4703- :paramtype name: str
4704-
4705- Other kombu options may be passed as keyword arguments
4706- """
4707- # Default options
4708- options = {'durable': conf.amqp_durable_queues,
4709- 'queue_arguments': _get_queue_arguments(conf),
4710- 'auto_delete': conf.amqp_auto_delete,
4711- 'exclusive': False}
4712- options.update(kwargs)
4713- exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
4714- exchange = kombu.entity.Exchange(name=exchange_name,
4715- type='topic',
4716- durable=options['durable'],
4717- auto_delete=options['auto_delete'])
4718- super(TopicConsumer, self).__init__(channel,
4719- callback,
4720- tag,
4721- name=name or topic,
4722- exchange=exchange,
4723- routing_key=topic,
4724- **options)
4725-
4726-
4727-class FanoutConsumer(ConsumerBase):
4728- """Consumer class for 'fanout'."""
4729-
4730- def __init__(self, conf, channel, topic, callback, tag, **kwargs):
4731- """Init a 'fanout' queue.
4732-
4733- 'channel' is the amqp channel to use
4734- 'topic' is the topic to listen on
4735- 'callback' is the callback to call when messages are received
4736- 'tag' is a unique ID for the consumer on the channel
4737-
4738- Other kombu options may be passed
4739- """
4740- unique = uuid.uuid4().hex
4741- exchange_name = '%s_fanout' % topic
4742- queue_name = '%s_fanout_%s' % (topic, unique)
4743-
4744- # Default options
4745- options = {'durable': False,
4746- 'queue_arguments': _get_queue_arguments(conf),
4747- 'auto_delete': True,
4748- 'exclusive': False}
4749- options.update(kwargs)
4750- exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
4751- durable=options['durable'],
4752- auto_delete=options['auto_delete'])
4753- super(FanoutConsumer, self).__init__(channel, callback, tag,
4754- name=queue_name,
4755- exchange=exchange,
4756- routing_key=topic,
4757- **options)
4758-
4759-
4760-class Publisher(object):
4761- """Base Publisher class."""
4762-
4763- def __init__(self, channel, exchange_name, routing_key, **kwargs):
4764- """Init the Publisher class with the exchange_name, routing_key,
4765- and other options
4766- """
4767- self.exchange_name = exchange_name
4768- self.routing_key = routing_key
4769- self.kwargs = kwargs
4770- self.reconnect(channel)
4771-
4772- def reconnect(self, channel):
4773- """Re-establish the Producer after a rabbit reconnection."""
4774- self.exchange = kombu.entity.Exchange(name=self.exchange_name,
4775- **self.kwargs)
4776- self.producer = kombu.messaging.Producer(exchange=self.exchange,
4777- channel=channel,
4778- routing_key=self.routing_key)
4779-
4780- def send(self, msg, timeout=None):
4781- """Send a message."""
4782- if timeout:
4783- #
4784- # AMQP TTL is in milliseconds when set in the header.
4785- #
4786- self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
4787- else:
4788- self.producer.publish(msg)
4789-
4790-
4791-class DirectPublisher(Publisher):
4792- """Publisher class for 'direct'."""
4793- def __init__(self, conf, channel, msg_id, **kwargs):
4794- """Init a 'direct' publisher.
4795-
4796- Kombu options may be passed as keyword args to override defaults
4797- """
4798-
4799- options = {'durable': False,
4800- 'auto_delete': True,
4801- 'exclusive': False,
4802- 'passive': True}
4803- options.update(kwargs)
4804- super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
4805- type='direct', **options)
4806-
4807-
4808-class TopicPublisher(Publisher):
4809- """Publisher class for 'topic'."""
4810- def __init__(self, conf, channel, topic, **kwargs):
4811- """Init a 'topic' publisher.
4812-
4813- Kombu options may be passed as keyword args to override defaults
4814- """
4815- options = {'durable': conf.amqp_durable_queues,
4816- 'auto_delete': conf.amqp_auto_delete,
4817- 'exclusive': False}
4818-
4819- options.update(kwargs)
4820- exchange_name = rpc_amqp.get_control_exchange(conf)
4821- super(TopicPublisher, self).__init__(channel,
4822- exchange_name,
4823- topic,
4824- type='topic',
4825- **options)
4826-
4827-
4828-class FanoutPublisher(Publisher):
4829- """Publisher class for 'fanout'."""
4830- def __init__(self, conf, channel, topic, **kwargs):
4831- """Init a 'fanout' publisher.
4832-
4833- Kombu options may be passed as keyword args to override defaults
4834- """
4835- options = {'durable': False,
4836- 'auto_delete': True,
4837- 'exclusive': False}
4838- options.update(kwargs)
4839- super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
4840- None, type='fanout', **options)
4841-
4842-
4843-class NotifyPublisher(TopicPublisher):
4844- """Publisher class for 'notify'."""
4845-
4846- def __init__(self, conf, channel, topic, **kwargs):
4847- self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
4848- self.queue_arguments = _get_queue_arguments(conf)
4849- super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
4850-
4851- def reconnect(self, channel):
4852- super(NotifyPublisher, self).reconnect(channel)
4853-
4854- # NOTE(jerdfelt): Normally the consumer would create the queue, but
4855- # we do this to ensure that messages don't get dropped if the
4856- # consumer is started after we do
4857- queue = kombu.entity.Queue(channel=channel,
4858- exchange=self.exchange,
4859- durable=self.durable,
4860- name=self.routing_key,
4861- routing_key=self.routing_key,
4862- queue_arguments=self.queue_arguments)
4863- queue.declare()
4864-
4865-
4866-class Connection(object):
4867- """Connection object."""
4868-
4869- pool = None
4870-
4871- def __init__(self, conf, server_params=None):
4872- self.consumers = []
4873- self.conf = conf
4874- self.max_retries = self.conf.rabbit_max_retries
4875- # Try forever?
4876- if self.max_retries <= 0:
4877- self.max_retries = None
4878- self.interval_start = self.conf.rabbit_retry_interval
4879- self.interval_stepping = self.conf.rabbit_retry_backoff
4880- # max retry-interval = 30 seconds
4881- self.interval_max = 30
4882- self.memory_transport = False
4883-
4884- if server_params is None:
4885- server_params = {}
4886- # Keys to translate from server_params to kombu params
4887- server_params_to_kombu_params = {'username': 'userid'}
4888-
4889- ssl_params = self._fetch_ssl_params()
4890- params_list = []
4891- for adr in self.conf.rabbit_hosts:
4892- hostname, port = network_utils.parse_host_port(
4893- adr, default_port=self.conf.rabbit_port)
4894-
4895- params = {
4896- 'hostname': hostname,
4897- 'port': port,
4898- 'userid': self.conf.rabbit_userid,
4899- 'password': self.conf.rabbit_password,
4900- 'login_method': self.conf.rabbit_login_method,
4901- 'virtual_host': self.conf.rabbit_virtual_host,
4902- }
4903-
4904- for sp_key, value in six.iteritems(server_params):
4905- p_key = server_params_to_kombu_params.get(sp_key, sp_key)
4906- params[p_key] = value
4907-
4908- if self.conf.fake_rabbit:
4909- params['transport'] = 'memory'
4910- if self.conf.rabbit_use_ssl:
4911- params['ssl'] = ssl_params
4912-
4913- params_list.append(params)
4914-
4915- self.params_list = itertools.cycle(params_list)
4916-
4917- self.memory_transport = self.conf.fake_rabbit
4918-
4919- self.connection = None
4920- self.do_consume = None
4921- self.reconnect()
4922-
4923- # FIXME(markmc): use oslo sslutils when it is available as a library
4924- _SSL_PROTOCOLS = {
4925- "tlsv1": ssl.PROTOCOL_TLSv1,
4926- "sslv23": ssl.PROTOCOL_SSLv23,
4927- "sslv3": ssl.PROTOCOL_SSLv3
4928- }
4929-
4930- try:
4931- _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
4932- except AttributeError:
4933- pass
4934-
4935- @classmethod
4936- def validate_ssl_version(cls, version):
4937- key = version.lower()
4938- try:
4939- return cls._SSL_PROTOCOLS[key]
4940- except KeyError:
4941- raise RuntimeError(_("Invalid SSL version : %s") % version)
4942-
4943- def _fetch_ssl_params(self):
4944- """Handles fetching what ssl params should be used for the connection
4945- (if any).
4946- """
4947- ssl_params = dict()
4948-
4949- # http://docs.python.org/library/ssl.html - ssl.wrap_socket
4950- if self.conf.kombu_ssl_version:
4951- ssl_params['ssl_version'] = self.validate_ssl_version(
4952- self.conf.kombu_ssl_version)
4953- if self.conf.kombu_ssl_keyfile:
4954- ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
4955- if self.conf.kombu_ssl_certfile:
4956- ssl_params['certfile'] = self.conf.kombu_ssl_certfile
4957- if self.conf.kombu_ssl_ca_certs:
4958- ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
4959- # We might want to allow variations in the
4960- # future with this?
4961- ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
4962-
4963- # Return the extended behavior or just have the default behavior
4964- return ssl_params or True
4965-
4966- def _connect(self, params):
4967- """Connect to rabbit. Re-establish any queues that may have
4968- been declared before if we are reconnecting. Exceptions should
4969- be handled by the caller.
4970- """
4971- if self.connection:
4972- LOG.info(_("Reconnecting to AMQP server on "
4973- "%(hostname)s:%(port)d") % params)
4974- try:
4975- # XXX(nic): when reconnecting to a RabbitMQ cluster
4976- # with mirrored queues in use, the attempt to release the
4977- # connection can hang "indefinitely" somewhere deep down
4978- # in Kombu. Blocking the thread for a bit prior to
4979- # release seems to kludge around the problem where it is
4980- # otherwise reproduceable.
4981- if self.conf.kombu_reconnect_delay > 0:
4982- LOG.info(_("Delaying reconnect for %1.1f seconds...") %
4983- self.conf.kombu_reconnect_delay)
4984- time.sleep(self.conf.kombu_reconnect_delay)
4985-
4986- self.connection.release()
4987- except self.connection_errors:
4988- pass
4989- # Setting this in case the next statement fails, though
4990- # it shouldn't be doing any network operations, yet.
4991- self.connection = None
4992- self.connection = kombu.connection.BrokerConnection(**params)
4993- self.connection_errors = self.connection.connection_errors
4994- self.channel_errors = self.connection.channel_errors
4995- if self.memory_transport:
4996- # Kludge to speed up tests.
4997- self.connection.transport.polling_interval = 0.0
4998- self.do_consume = True
4999- self.consumer_num = itertools.count(1)
5000- self.connection.connect()
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches

to all changes: