Merge lp:~xianghui/ubuntu/trusty/oslo.messaging/lp1318721 into lp:ubuntu/trusty-proposed/oslo.messaging

Proposed by Xiang Hui
Status: Merged
Merge reported by: Marc Deslauriers
Merged at revision: not available
Proposed branch: lp:~xianghui/ubuntu/trusty/oslo.messaging/lp1318721
Merge into: lp:ubuntu/trusty-proposed/oslo.messaging
Diff against target: 992 lines (+936/-1)
6 files modified
.pc/applied-patches (+1/-0)
.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging/_drivers/impl_rabbit.py (+834/-0)
debian/changelog (+8/-0)
debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch (+78/-0)
debian/patches/series (+1/-0)
oslo/messaging/_drivers/impl_rabbit.py (+14/-1)
To merge this branch: bzr merge lp:~xianghui/ubuntu/trusty/oslo.messaging/lp1318721
Reviewer Review Type Date Requested Status
Ubuntu Development Team Pending
Review via email: mp+280836@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Xiang Hui (xianghui) wrote :

* Backport upstream release. (LP: #1318721)
    - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
      Redeclare if exception is catched after self.queue.declare() failed.

12. By Xiang Hui

Add dep3 header

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/.timestamp'
2=== added file '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/.timestamp'
3=== added file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/.timestamp'
4=== added file '.pc/0004-fix-lp-1362863.patch/.timestamp'
5=== modified file '.pc/applied-patches'
6--- .pc/applied-patches 2015-07-13 15:59:14 +0000
7+++ .pc/applied-patches 2016-01-08 06:17:57 +0000
8@@ -4,3 +4,4 @@
9 0003-Declare-DirectPublisher-exchanges-with-passive-True.patch
10 redeclare-consumers-when-ack-requeue-fails.patch
11 0004-fix-lp-1362863.patch
12+fix-reconnect-race-condition-with-rabbitmq-cluster.patch
13
14=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch'
15=== added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/.timestamp'
16=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo'
17=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging'
18=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging/_drivers'
19=== added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging/_drivers/impl_rabbit.py'
20--- .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
21+++ .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/oslo/messaging/_drivers/impl_rabbit.py 2016-01-08 06:17:57 +0000
22@@ -0,0 +1,834 @@
23+# Copyright 2011 OpenStack Foundation
24+#
25+# Licensed under the Apache License, Version 2.0 (the "License"); you may
26+# not use this file except in compliance with the License. You may obtain
27+# a copy of the License at
28+#
29+# http://www.apache.org/licenses/LICENSE-2.0
30+#
31+# Unless required by applicable law or agreed to in writing, software
32+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
33+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
34+# License for the specific language governing permissions and limitations
35+# under the License.
36+
37+import functools
38+import itertools
39+import logging
40+import socket
41+import ssl
42+import time
43+import uuid
44+
45+import kombu
46+import kombu.connection
47+import kombu.entity
48+import kombu.messaging
49+from oslo.config import cfg
50+import six
51+
52+from oslo.messaging._drivers import amqp as rpc_amqp
53+from oslo.messaging._drivers import amqpdriver
54+from oslo.messaging._drivers import common as rpc_common
55+from oslo.messaging.openstack.common import network_utils
56+
57+# FIXME(markmc): remove this
58+_ = lambda s: s
59+
60+rabbit_opts = [
61+ cfg.StrOpt('kombu_ssl_version',
62+ default='',
63+ help='SSL version to use (valid only if SSL enabled). '
64+ 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
65+ 'be available on some distributions.'
66+ ),
67+ cfg.StrOpt('kombu_ssl_keyfile',
68+ default='',
69+ help='SSL key file (valid only if SSL enabled).'),
70+ cfg.StrOpt('kombu_ssl_certfile',
71+ default='',
72+ help='SSL cert file (valid only if SSL enabled).'),
73+ cfg.StrOpt('kombu_ssl_ca_certs',
74+ default='',
75+ help=('SSL certification authority file '
76+ '(valid only if SSL enabled).')),
77+ cfg.FloatOpt('kombu_reconnect_delay',
78+ default=1.0,
79+ help='How long to wait before reconnecting in response to an '
80+ 'AMQP consumer cancel notification.'),
81+ cfg.StrOpt('rabbit_host',
82+ default='localhost',
83+ help='The RabbitMQ broker address where a single node is '
84+ 'used.'),
85+ cfg.IntOpt('rabbit_port',
86+ default=5672,
87+ help='The RabbitMQ broker port where a single node is used.'),
88+ cfg.ListOpt('rabbit_hosts',
89+ default=['$rabbit_host:$rabbit_port'],
90+ help='RabbitMQ HA cluster host:port pairs.'),
91+ cfg.BoolOpt('rabbit_use_ssl',
92+ default=False,
93+ help='Connect over SSL for RabbitMQ.'),
94+ cfg.StrOpt('rabbit_userid',
95+ default='guest',
96+ help='The RabbitMQ userid.'),
97+ cfg.StrOpt('rabbit_password',
98+ default='guest',
99+ help='The RabbitMQ password.',
100+ secret=True),
101+ cfg.StrOpt('rabbit_login_method',
102+ default='AMQPLAIN',
103+ help='the RabbitMQ login method'),
104+ cfg.StrOpt('rabbit_virtual_host',
105+ default='/',
106+ help='The RabbitMQ virtual host.'),
107+ cfg.IntOpt('rabbit_retry_interval',
108+ default=1,
109+ help='How frequently to retry connecting with RabbitMQ.'),
110+ cfg.IntOpt('rabbit_retry_backoff',
111+ default=2,
112+ help='How long to backoff for between retries when connecting '
113+ 'to RabbitMQ.'),
114+ cfg.IntOpt('rabbit_max_retries',
115+ default=0,
116+ help='Maximum number of RabbitMQ connection retries. '
117+ 'Default is 0 (infinite retry count).'),
118+ cfg.BoolOpt('rabbit_ha_queues',
119+ default=False,
120+ help='Use HA queues in RabbitMQ (x-ha-policy: all). '
121+ 'If you change this option, you must wipe the '
122+ 'RabbitMQ database.'),
123+
124+ # FIXME(markmc): this was toplevel in openstack.common.rpc
125+ cfg.BoolOpt('fake_rabbit',
126+ default=False,
127+ help='If passed, use a fake RabbitMQ provider.'),
128+]
129+
130+LOG = logging.getLogger(__name__)
131+
132+
133+def _get_queue_arguments(conf):
134+ """Construct the arguments for declaring a queue.
135+
136+ If the rabbit_ha_queues option is set, we declare a mirrored queue
137+ as described here:
138+
139+ http://www.rabbitmq.com/ha.html
140+
141+ Setting x-ha-policy to all means that the queue will be mirrored
142+ to all nodes in the cluster.
143+ """
144+ return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
145+
146+
147+class RabbitMessage(dict):
148+ def __init__(self, raw_message):
149+ super(RabbitMessage, self).__init__(
150+ rpc_common.deserialize_msg(raw_message.payload))
151+ self._raw_message = raw_message
152+
153+ def acknowledge(self):
154+ self._raw_message.ack()
155+
156+ def requeue(self):
157+ self._raw_message.requeue()
158+
159+
160+class ConsumerBase(object):
161+ """Consumer base class."""
162+
163+ def __init__(self, channel, callback, tag, **kwargs):
164+ """Declare a queue on an amqp channel.
165+
166+ 'channel' is the amqp channel to use
167+ 'callback' is the callback to call when messages are received
168+ 'tag' is a unique ID for the consumer on the channel
169+
170+ queue name, exchange name, and other kombu options are
171+ passed in here as a dictionary.
172+ """
173+ self.callback = callback
174+ self.tag = str(tag)
175+ self.kwargs = kwargs
176+ self.queue = None
177+ self.reconnect(channel)
178+
179+ def reconnect(self, channel):
180+ """Re-declare the queue after a rabbit reconnect."""
181+ self.channel = channel
182+ self.kwargs['channel'] = channel
183+ self.queue = kombu.entity.Queue(**self.kwargs)
184+ self.queue.declare()
185+
186+ def _callback_handler(self, message, callback):
187+ """Call callback with deserialized message.
188+
189+ Messages that are processed and ack'ed.
190+ """
191+
192+ try:
193+ callback(RabbitMessage(message))
194+ except Exception:
195+ LOG.exception(_("Failed to process message"
196+ " ... skipping it."))
197+ message.ack()
198+
199+ def consume(self, *args, **kwargs):
200+ """Actually declare the consumer on the amqp channel. This will
201+ start the flow of messages from the queue. Using the
202+ Connection.iterconsume() iterator will process the messages,
203+ calling the appropriate callback.
204+
205+ If a callback is specified in kwargs, use that. Otherwise,
206+ use the callback passed during __init__()
207+
208+ If kwargs['nowait'] is True, then this call will block until
209+ a message is read.
210+
211+ """
212+
213+ options = {'consumer_tag': self.tag}
214+ options['nowait'] = kwargs.get('nowait', False)
215+ callback = kwargs.get('callback', self.callback)
216+ if not callback:
217+ raise ValueError("No callback defined")
218+
219+ def _callback(raw_message):
220+ message = self.channel.message_to_python(raw_message)
221+ self._callback_handler(message, callback)
222+
223+ self.queue.consume(*args, callback=_callback, **options)
224+
225+ def cancel(self):
226+ """Cancel the consuming from the queue, if it has started."""
227+ try:
228+ self.queue.cancel(self.tag)
229+ except KeyError as e:
230+ # NOTE(comstud): Kludge to get around a amqplib bug
231+ if str(e) != "u'%s'" % self.tag:
232+ raise
233+ self.queue = None
234+
235+
236+class DirectConsumer(ConsumerBase):
237+ """Queue/consumer class for 'direct'."""
238+
239+ def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
240+ """Init a 'direct' queue.
241+
242+ 'channel' is the amqp channel to use
243+ 'msg_id' is the msg_id to listen on
244+ 'callback' is the callback to call when messages are received
245+ 'tag' is a unique ID for the consumer on the channel
246+
247+ Other kombu options may be passed
248+ """
249+ # Default options
250+ options = {'durable': False,
251+ 'queue_arguments': _get_queue_arguments(conf),
252+ 'auto_delete': True,
253+ 'exclusive': False}
254+ options.update(kwargs)
255+ exchange = kombu.entity.Exchange(name=msg_id,
256+ type='direct',
257+ durable=options['durable'],
258+ auto_delete=options['auto_delete'])
259+ super(DirectConsumer, self).__init__(channel,
260+ callback,
261+ tag,
262+ name=msg_id,
263+ exchange=exchange,
264+ routing_key=msg_id,
265+ **options)
266+
267+
268+class TopicConsumer(ConsumerBase):
269+ """Consumer class for 'topic'."""
270+
271+ def __init__(self, conf, channel, topic, callback, tag, name=None,
272+ exchange_name=None, **kwargs):
273+ """Init a 'topic' queue.
274+
275+ :param channel: the amqp channel to use
276+ :param topic: the topic to listen on
277+ :paramtype topic: str
278+ :param callback: the callback to call when messages are received
279+ :param tag: a unique ID for the consumer on the channel
280+ :param name: optional queue name, defaults to topic
281+ :paramtype name: str
282+
283+ Other kombu options may be passed as keyword arguments
284+ """
285+ # Default options
286+ options = {'durable': conf.amqp_durable_queues,
287+ 'queue_arguments': _get_queue_arguments(conf),
288+ 'auto_delete': conf.amqp_auto_delete,
289+ 'exclusive': False}
290+ options.update(kwargs)
291+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
292+ exchange = kombu.entity.Exchange(name=exchange_name,
293+ type='topic',
294+ durable=options['durable'],
295+ auto_delete=options['auto_delete'])
296+ super(TopicConsumer, self).__init__(channel,
297+ callback,
298+ tag,
299+ name=name or topic,
300+ exchange=exchange,
301+ routing_key=topic,
302+ **options)
303+
304+
305+class FanoutConsumer(ConsumerBase):
306+ """Consumer class for 'fanout'."""
307+
308+ def __init__(self, conf, channel, topic, callback, tag, **kwargs):
309+ """Init a 'fanout' queue.
310+
311+ 'channel' is the amqp channel to use
312+ 'topic' is the topic to listen on
313+ 'callback' is the callback to call when messages are received
314+ 'tag' is a unique ID for the consumer on the channel
315+
316+ Other kombu options may be passed
317+ """
318+ unique = uuid.uuid4().hex
319+ exchange_name = '%s_fanout' % topic
320+ queue_name = '%s_fanout_%s' % (topic, unique)
321+
322+ # Default options
323+ options = {'durable': False,
324+ 'queue_arguments': _get_queue_arguments(conf),
325+ 'auto_delete': True,
326+ 'exclusive': False}
327+ options.update(kwargs)
328+ exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
329+ durable=options['durable'],
330+ auto_delete=options['auto_delete'])
331+ super(FanoutConsumer, self).__init__(channel, callback, tag,
332+ name=queue_name,
333+ exchange=exchange,
334+ routing_key=topic,
335+ **options)
336+
337+
338+class Publisher(object):
339+ """Base Publisher class."""
340+
341+ def __init__(self, channel, exchange_name, routing_key, **kwargs):
342+ """Init the Publisher class with the exchange_name, routing_key,
343+ and other options
344+ """
345+ self.exchange_name = exchange_name
346+ self.routing_key = routing_key
347+ self.kwargs = kwargs
348+ self.reconnect(channel)
349+
350+ def reconnect(self, channel):
351+ """Re-establish the Producer after a rabbit reconnection."""
352+ self.exchange = kombu.entity.Exchange(name=self.exchange_name,
353+ **self.kwargs)
354+ self.producer = kombu.messaging.Producer(exchange=self.exchange,
355+ channel=channel,
356+ routing_key=self.routing_key)
357+
358+ def send(self, msg, timeout=None):
359+ """Send a message."""
360+ if timeout:
361+ #
362+ # AMQP TTL is in milliseconds when set in the header.
363+ #
364+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
365+ else:
366+ self.producer.publish(msg)
367+
368+
369+class DirectPublisher(Publisher):
370+ """Publisher class for 'direct'."""
371+ def __init__(self, conf, channel, msg_id, **kwargs):
372+ """Init a 'direct' publisher.
373+
374+ Kombu options may be passed as keyword args to override defaults
375+ """
376+
377+ options = {'durable': False,
378+ 'auto_delete': True,
379+ 'exclusive': False,
380+ 'passive': True}
381+ options.update(kwargs)
382+ super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
383+ type='direct', **options)
384+
385+
386+class TopicPublisher(Publisher):
387+ """Publisher class for 'topic'."""
388+ def __init__(self, conf, channel, topic, **kwargs):
389+ """Init a 'topic' publisher.
390+
391+ Kombu options may be passed as keyword args to override defaults
392+ """
393+ options = {'durable': conf.amqp_durable_queues,
394+ 'auto_delete': conf.amqp_auto_delete,
395+ 'exclusive': False}
396+
397+ options.update(kwargs)
398+ exchange_name = rpc_amqp.get_control_exchange(conf)
399+ super(TopicPublisher, self).__init__(channel,
400+ exchange_name,
401+ topic,
402+ type='topic',
403+ **options)
404+
405+
406+class FanoutPublisher(Publisher):
407+ """Publisher class for 'fanout'."""
408+ def __init__(self, conf, channel, topic, **kwargs):
409+ """Init a 'fanout' publisher.
410+
411+ Kombu options may be passed as keyword args to override defaults
412+ """
413+ options = {'durable': False,
414+ 'auto_delete': True,
415+ 'exclusive': False}
416+ options.update(kwargs)
417+ super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
418+ None, type='fanout', **options)
419+
420+
421+class NotifyPublisher(TopicPublisher):
422+ """Publisher class for 'notify'."""
423+
424+ def __init__(self, conf, channel, topic, **kwargs):
425+ self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
426+ self.queue_arguments = _get_queue_arguments(conf)
427+ super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
428+
429+ def reconnect(self, channel):
430+ super(NotifyPublisher, self).reconnect(channel)
431+
432+ # NOTE(jerdfelt): Normally the consumer would create the queue, but
433+ # we do this to ensure that messages don't get dropped if the
434+ # consumer is started after we do
435+ queue = kombu.entity.Queue(channel=channel,
436+ exchange=self.exchange,
437+ durable=self.durable,
438+ name=self.routing_key,
439+ routing_key=self.routing_key,
440+ queue_arguments=self.queue_arguments)
441+ queue.declare()
442+
443+
444+class Connection(object):
445+ """Connection object."""
446+
447+ pool = None
448+
449+ def __init__(self, conf, server_params=None):
450+ self.consumers = []
451+ self.conf = conf
452+ self.max_retries = self.conf.rabbit_max_retries
453+ # Try forever?
454+ if self.max_retries <= 0:
455+ self.max_retries = None
456+ self.interval_start = self.conf.rabbit_retry_interval
457+ self.interval_stepping = self.conf.rabbit_retry_backoff
458+ # max retry-interval = 30 seconds
459+ self.interval_max = 30
460+ self.memory_transport = False
461+
462+ if server_params is None:
463+ server_params = {}
464+ # Keys to translate from server_params to kombu params
465+ server_params_to_kombu_params = {'username': 'userid'}
466+
467+ ssl_params = self._fetch_ssl_params()
468+ params_list = []
469+ for adr in self.conf.rabbit_hosts:
470+ hostname, port = network_utils.parse_host_port(
471+ adr, default_port=self.conf.rabbit_port)
472+
473+ params = {
474+ 'hostname': hostname,
475+ 'port': port,
476+ 'userid': self.conf.rabbit_userid,
477+ 'password': self.conf.rabbit_password,
478+ 'login_method': self.conf.rabbit_login_method,
479+ 'virtual_host': self.conf.rabbit_virtual_host,
480+ }
481+
482+ for sp_key, value in six.iteritems(server_params):
483+ p_key = server_params_to_kombu_params.get(sp_key, sp_key)
484+ params[p_key] = value
485+
486+ if self.conf.fake_rabbit:
487+ params['transport'] = 'memory'
488+ if self.conf.rabbit_use_ssl:
489+ params['ssl'] = ssl_params
490+
491+ params_list.append(params)
492+
493+ self.params_list = itertools.cycle(params_list)
494+
495+ self.memory_transport = self.conf.fake_rabbit
496+
497+ self.connection = None
498+ self.do_consume = None
499+ self._consume_loop_stopped = False
500+
501+ self.reconnect()
502+
503+ # FIXME(markmc): use oslo sslutils when it is available as a library
504+ _SSL_PROTOCOLS = {
505+ "tlsv1": ssl.PROTOCOL_TLSv1,
506+ "sslv23": ssl.PROTOCOL_SSLv23,
507+ "sslv3": ssl.PROTOCOL_SSLv3
508+ }
509+
510+ try:
511+ _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
512+ except AttributeError:
513+ pass
514+
515+ @classmethod
516+ def validate_ssl_version(cls, version):
517+ key = version.lower()
518+ try:
519+ return cls._SSL_PROTOCOLS[key]
520+ except KeyError:
521+ raise RuntimeError(_("Invalid SSL version : %s") % version)
522+
523+ def _fetch_ssl_params(self):
524+ """Handles fetching what ssl params should be used for the connection
525+ (if any).
526+ """
527+ ssl_params = dict()
528+
529+ # http://docs.python.org/library/ssl.html - ssl.wrap_socket
530+ if self.conf.kombu_ssl_version:
531+ ssl_params['ssl_version'] = self.validate_ssl_version(
532+ self.conf.kombu_ssl_version)
533+ if self.conf.kombu_ssl_keyfile:
534+ ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
535+ if self.conf.kombu_ssl_certfile:
536+ ssl_params['certfile'] = self.conf.kombu_ssl_certfile
537+ if self.conf.kombu_ssl_ca_certs:
538+ ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
539+ # We might want to allow variations in the
540+ # future with this?
541+ ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
542+
543+ # Return the extended behavior or just have the default behavior
544+ return ssl_params or True
545+
546+ def _connect(self, params):
547+ """Connect to rabbit. Re-establish any queues that may have
548+ been declared before if we are reconnecting. Exceptions should
549+ be handled by the caller.
550+ """
551+ if self.connection:
552+ LOG.info(_("Reconnecting to AMQP server on "
553+ "%(hostname)s:%(port)d") % params)
554+ try:
555+ # XXX(nic): when reconnecting to a RabbitMQ cluster
556+ # with mirrored queues in use, the attempt to release the
557+ # connection can hang "indefinitely" somewhere deep down
558+ # in Kombu. Blocking the thread for a bit prior to
559+ # release seems to kludge around the problem where it is
560+ # otherwise reproduceable.
561+ if self.conf.kombu_reconnect_delay > 0:
562+ LOG.info(_("Delaying reconnect for %1.1f seconds...") %
563+ self.conf.kombu_reconnect_delay)
564+ time.sleep(self.conf.kombu_reconnect_delay)
565+
566+ self.connection.release()
567+ except self.connection_errors:
568+ pass
569+ # Setting this in case the next statement fails, though
570+ # it shouldn't be doing any network operations, yet.
571+ self.connection = None
572+ self.connection = kombu.connection.BrokerConnection(**params)
573+ self.connection_errors = self.connection.connection_errors
574+ self.channel_errors = self.connection.channel_errors
575+ if self.memory_transport:
576+ # Kludge to speed up tests.
577+ self.connection.transport.polling_interval = 0.0
578+ self.do_consume = True
579+ self.consumer_num = itertools.count(1)
580+ self.connection.connect()
581+ self.channel = self.connection.channel()
582+ # work around 'memory' transport bug in 1.1.3
583+ if self.memory_transport:
584+ self.channel._new_queue('ae.undeliver')
585+ for consumer in self.consumers:
586+ consumer.reconnect(self.channel)
587+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
588+ params)
589+
590+ def reconnect(self):
591+ """Handles reconnecting and re-establishing queues.
592+ Will retry up to self.max_retries number of times.
593+ self.max_retries = 0 means to retry forever.
594+ Sleep between tries, starting at self.interval_start
595+ seconds, backing off self.interval_stepping number of seconds
596+ each attempt.
597+ """
598+
599+ attempt = 0
600+ while True:
601+ params = six.next(self.params_list)
602+ attempt += 1
603+ try:
604+ self._connect(params)
605+ return
606+ except IOError as e:
607+ pass
608+ except self.connection_errors as e:
609+ pass
610+ except Exception as e:
611+ # NOTE(comstud): Unfortunately it's possible for amqplib
612+ # to return an error not covered by its transport
613+ # connection_errors in the case of a timeout waiting for
614+ # a protocol response. (See paste link in LP888621)
615+ # So, we check all exceptions for 'timeout' in them
616+ # and try to reconnect in this case.
617+ if 'timeout' not in str(e):
618+ raise
619+
620+ log_info = {}
621+ log_info['err_str'] = str(e)
622+ log_info['max_retries'] = self.max_retries
623+ log_info.update(params)
624+
625+ if self.max_retries and attempt == self.max_retries:
626+ msg = _('Unable to connect to AMQP server on '
627+ '%(hostname)s:%(port)d after %(max_retries)d '
628+ 'tries: %(err_str)s') % log_info
629+ LOG.error(msg)
630+ raise rpc_common.RPCException(msg)
631+
632+ if attempt == 1:
633+ sleep_time = self.interval_start or 1
634+ elif attempt > 1:
635+ sleep_time += self.interval_stepping
636+ if self.interval_max:
637+ sleep_time = min(sleep_time, self.interval_max)
638+
639+ log_info['sleep_time'] = sleep_time
640+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
641+ 'unreachable: %(err_str)s. Trying again in '
642+ '%(sleep_time)d seconds.') % log_info)
643+ time.sleep(sleep_time)
644+
645+ def ensure(self, error_callback, method, *args, **kwargs):
646+ while True:
647+ try:
648+ return method(*args, **kwargs)
649+ except self.connection_errors as e:
650+ if error_callback:
651+ error_callback(e)
652+ except self.channel_errors as e:
653+ if error_callback:
654+ error_callback(e)
655+ except (socket.timeout, IOError) as e:
656+ if error_callback:
657+ error_callback(e)
658+ except Exception as e:
659+ # NOTE(comstud): Unfortunately it's possible for amqplib
660+ # to return an error not covered by its transport
661+ # connection_errors in the case of a timeout waiting for
662+ # a protocol response. (See paste link in LP888621)
663+ # So, we check all exceptions for 'timeout' in them
664+ # and try to reconnect in this case.
665+ if 'timeout' not in str(e):
666+ raise
667+ if error_callback:
668+ error_callback(e)
669+ self.reconnect()
670+
671+ def get_channel(self):
672+ """Convenience call for bin/clear_rabbit_queues."""
673+ return self.channel
674+
675+ def close(self):
676+ """Close/release this connection."""
677+ self.connection.release()
678+ self.connection = None
679+
680+ def reset(self):
681+ """Reset a connection so it can be used again."""
682+ self.channel.close()
683+ self.channel = self.connection.channel()
684+ # work around 'memory' transport bug in 1.1.3
685+ if self.memory_transport:
686+ self.channel._new_queue('ae.undeliver')
687+ self.consumers = []
688+
689+ def declare_consumer(self, consumer_cls, topic, callback):
690+ """Create a Consumer using the class that was passed in and
691+ add it to our list of consumers
692+ """
693+
694+ def _connect_error(exc):
695+ log_info = {'topic': topic, 'err_str': str(exc)}
696+ LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
697+ "%(err_str)s") % log_info)
698+
699+ def _declare_consumer():
700+ consumer = consumer_cls(self.conf, self.channel, topic, callback,
701+ six.next(self.consumer_num))
702+ self.consumers.append(consumer)
703+ return consumer
704+
705+ return self.ensure(_connect_error, _declare_consumer)
706+
707+ def iterconsume(self, limit=None, timeout=None):
708+ """Return an iterator that will consume from all queues/consumers."""
709+ timer = rpc_common.DecayingTimer(duration=timeout)
710+ timer.start()
711+
712+ def _raise_timeout(exc):
713+ LOG.debug('Timed out waiting for RPC response: %s', exc)
714+ raise rpc_common.Timeout()
715+
716+ def _error_callback(exc):
717+ self.do_consume = True
718+ timer.check_return(_raise_timeout, exc)
719+ LOG.exception(_('Failed to consume message from queue: %s'),
720+ exc)
721+
722+ def _consume():
723+ # NOTE(sileht): in case the acknowledgement or requeue of a
724+ # message fail, the kombu transport can be disconnected
725+ # In this case, we must redeclare our consumers, so raise
726+ # a recoverable error to trigger the reconnection code.
727+ if not self.connection.connected:
728+ raise self.connection.recoverable_connection_errors[0]
729+
730+ if self.do_consume:
731+ queues_head = self.consumers[:-1] # not fanout.
732+ queues_tail = self.consumers[-1] # fanout
733+ for queue in queues_head:
734+ queue.consume(nowait=True)
735+ queues_tail.consume(nowait=False)
736+ self.do_consume = False
737+
738+ poll_timeout = 1 if timeout is None else min(timeout, 1)
739+ while True:
740+ if self._consume_loop_stopped:
741+ self._consume_loop_stopped = False
742+ raise StopIteration
743+ try:
744+ return self.connection.drain_events(timeout=poll_timeout)
745+ except socket.timeout as exc:
746+ poll_timeout = timer.check_return(_raise_timeout, exc,
747+ maximum=1)
748+
749+ for iteration in itertools.count(0):
750+ if limit and iteration >= limit:
751+ raise StopIteration
752+ yield self.ensure(_error_callback, _consume)
753+
754+ def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
755+ """Send to a publisher based on the publisher class."""
756+
757+ def _error_callback(exc):
758+ log_info = {'topic': topic, 'err_str': str(exc)}
759+ LOG.exception(_("Failed to publish message to topic "
760+ "'%(topic)s': %(err_str)s") % log_info)
761+
762+ def _publish():
763+ publisher = cls(self.conf, self.channel, topic, **kwargs)
764+ publisher.send(msg, timeout)
765+
766+ self.ensure(_error_callback, _publish)
767+
768+ def declare_direct_consumer(self, topic, callback):
769+ """Create a 'direct' queue.
770+ In nova's use, this is generally a msg_id queue used for
771+ responses for call/multicall
772+ """
773+ self.declare_consumer(DirectConsumer, topic, callback)
774+
775+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
776+ exchange_name=None):
777+ """Create a 'topic' consumer."""
778+ self.declare_consumer(functools.partial(TopicConsumer,
779+ name=queue_name,
780+ exchange_name=exchange_name,
781+ ),
782+ topic, callback)
783+
784+ def declare_fanout_consumer(self, topic, callback):
785+ """Create a 'fanout' consumer."""
786+ self.declare_consumer(FanoutConsumer, topic, callback)
787+
788+ def direct_send(self, msg_id, msg):
789+ """Send a 'direct' message."""
790+
791+ timer = rpc_common.DecayingTimer(duration=60)
792+ timer.start()
793+ # NOTE(sileht): retry at least 60sec, after we have a good change
794+ # that the caller is really dead too...
795+
796+ while True:
797+ try:
798+ self.publisher_send(DirectPublisher, msg_id, msg)
799+ except self.connection.channel_errors as exc:
800+ # NOTE(noelbk/sileht):
801+ # If rabbit dies, the consumer can be disconnected before the
802+ # publisher sends, and if the consumer hasn't declared the
803+ # queue, the publisher's will send a message to an exchange
804+ # that's not bound to a queue, and the message wll be lost.
805+ # So we set passive=True to the publisher exchange and catch
806+ # the 404 kombu ChannelError and retry until the exchange
807+ # appears
808+ if exc.code == 404 and timer.check_return() > 0:
809+ LOG.info(_("The exchange to reply to %s doesn't "
810+ "exist yet, retrying...") % msg_id)
811+ time.sleep(1)
812+ continue
813+ raise
814+ return
815+
816+ def topic_send(self, topic, msg, timeout=None):
817+ """Send a 'topic' message."""
818+ self.publisher_send(TopicPublisher, topic, msg, timeout)
819+
820+ def fanout_send(self, topic, msg):
821+ """Send a 'fanout' message."""
822+ self.publisher_send(FanoutPublisher, topic, msg)
823+
824+ def notify_send(self, topic, msg, **kwargs):
825+ """Send a notify message on a topic."""
826+ self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
827+
828+ def consume(self, limit=None, timeout=None):
829+ """Consume from all queues/consumers."""
830+ it = self.iterconsume(limit=limit, timeout=timeout)
831+ while True:
832+ try:
833+ six.next(it)
834+ except StopIteration:
835+ return
836+
837+ def stop_consuming(self):
838+ self._consume_loop_stopped = True
839+
840+
841+class RabbitDriver(amqpdriver.AMQPDriverBase):
842+
843+ def __init__(self, conf, url, default_exchange=None,
844+ allowed_remote_exmods=[]):
845+ conf.register_opts(rabbit_opts)
846+ conf.register_opts(rpc_amqp.amqp_opts)
847+
848+ connection_pool = rpc_amqp.get_connection_pool(conf, Connection)
849+
850+ super(RabbitDriver, self).__init__(conf, url,
851+ connection_pool,
852+ default_exchange,
853+ allowed_remote_exmods)
854+
855+ def require_features(self, requeue=True):
856+ pass
857
858=== added file '.pc/redeclare-consumers-when-ack-requeue-fails.patch/.timestamp'
859=== added file '.pc/skip-qpid-tests.patch/.timestamp'
860=== modified file 'debian/changelog'
861--- debian/changelog 2015-07-13 15:59:14 +0000
862+++ debian/changelog 2016-01-08 06:17:57 +0000
863@@ -1,3 +1,11 @@
864+oslo.messaging (1.3.0-0ubuntu1.4) trusty; urgency=medium
865+
866+ * Backport upstream release. (LP: #1318721):
867+ - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
868+ Redeclare if exception is catched after self.queue.declare() failed.
869+
870+ -- Hui Xiang <hui.xiang@canonical.com> Thu, 17 Dec 2015 18:18:50 +0800
871+
872 oslo.messaging (1.3.0-0ubuntu1.3) trusty; urgency=medium
873
874 * Backport various fixes for AMQP listener/executor. (LP: #1362863).
875
876=== added file 'debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch'
877--- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 00:00:00 +0000
878+++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-08 06:17:57 +0000
879@@ -0,0 +1,78 @@
880+--- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-07 13:21:32.000000000 +0800
881++++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 08:00:00.000000000 +0800
882+@@ -1,75 +0,0 @@
883+-Description: Fix reconnect race condition with RabbitMQ cluster
884+-
885+- commit 7ad0d7eaf9cb095a14b07a08c814d9f1f9c8ff12
886+- Author: Jens Rosenboom <j.rosenboom@x-ion.de>
887+- Date: Fri Jun 27 16:46:47 2014 +0200
888+-
889+- Retry Queue creation to workaround race condition
890+- that may happen when both the client and broker race over
891+- exchange creation and deletion respectively which happen only
892+- when the Queue/Exchange were created with auto-delete flag.
893+-
894+- Queues/Exchange declared with auto-delete instruct the Broker to
895+- delete the Queue when the last Consumer disconnect from it, and
896+- the Exchange when the last Queue is deleted from this Exchange.
897+-
898+- Now in a RabbitMQ cluster setup, if the cluster node that we are
899+- connected to go down, 2 things will happen:
900+-
901+- 1. From RabbitMQ side, the Queues w/ auto-delete will be deleted
902+- from the other cluster nodes and then the Exchanges that the
903+- Queues are bind to if they were also created w/ auto-delete.
904+- 2. From client side, client will reconnect to another cluster
905+- node and call queue.declare() which create Exchanges then
906+- Queues then Binding in that order.
907+-
908+- Now in a happy path the queues/exchanges will be deleted from the
909+- broker before client start re-creating them again, but it also
910+- possible that the client first start by creating queues/exchange
911+- as part of the queue.declare() call, which are no-op operations
912+- b/c they alreay existed, but before it could bind Queue to
913+- Exchange, RabbitMQ nodes just received the 'signal' that the
914+- queue doesn't have any consumer so it should be delete, and the
915+- same with exchanges, which will lead to binding fail with
916+- NotFound error.
917+-
918+- Illustration of the time line from Client and RabbitMQ cluster
919+- respectively when the race condition happen:
920+-
921+- e-declare(E) q-declare(Q) q-bind(Q, E)
922+- -----+------------------+----------------+----------->
923+- e-delete(E)
924+- ------------------------------+---------------------->
925+-
926+- Change-Id: Ideb73af6f246a8282780cdb204d675d5d4555bf0
927+- Closes-Bug: #1318721
928+-
929+-Author: Jens Rosenboom <j.rosenboom@x-ion.de>
930+-Origin: backport, https://review.openstack.org/#/c/103157/
931+-Bug: https://bugs.launchpad.net/neutron/+bug/1318721
932+----
933+-This patch header follows DEP-3: http://dep.debian.net/deps/dep3/
934+---- a/oslo/messaging/_drivers/impl_rabbit.py
935+-+++ b/oslo/messaging/_drivers/impl_rabbit.py
936+-@@ -159,7 +159,20 @@
937+- self.channel = channel
938+- self.kwargs['channel'] = channel
939+- self.queue = kombu.entity.Queue(**self.kwargs)
940+-- self.queue.declare()
941+-+ try:
942+-+ self.queue.declare()
943+-+ except Exception as e:
944+-+ # NOTE: This exception may be triggered by a race condition.
945+-+ # Simply retrying will solve the error most of the time and
946+-+ # should work well enough as a workaround until the race condition
947+-+ # itself can be fixed.
948+-+ # TODO(jrosenboom): In order to be able to match the Execption
949+-+ # more specifically, we have to refactor ConsumerBase to use
950+-+ # 'channel_errors' of the kombu connection object that
951+-+ # has created the channel.
952+-+ # See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
953+-+ LOG.exception(_("Declaring queue failed with (%s), retrying"), e)
954+-+ self.queue.declare()
955+-
956+- def _callback_handler(self, message, callback):
957+- """Call callback with deserialized message.
958
959=== modified file 'debian/patches/series'
960--- debian/patches/series 2015-07-13 15:59:14 +0000
961+++ debian/patches/series 2016-01-08 06:17:57 +0000
962@@ -4,3 +4,4 @@
963 0003-Declare-DirectPublisher-exchanges-with-passive-True.patch
964 redeclare-consumers-when-ack-requeue-fails.patch
965 0004-fix-lp-1362863.patch
966+fix-reconnect-race-condition-with-rabbitmq-cluster.patch
967
968=== modified file 'oslo/messaging/_drivers/impl_rabbit.py'
969--- oslo/messaging/_drivers/impl_rabbit.py 2015-07-13 15:59:14 +0000
970+++ oslo/messaging/_drivers/impl_rabbit.py 2016-01-08 06:17:57 +0000
971@@ -159,7 +159,20 @@
972 self.channel = channel
973 self.kwargs['channel'] = channel
974 self.queue = kombu.entity.Queue(**self.kwargs)
975- self.queue.declare()
976+ try:
977+ self.queue.declare()
978+ except Exception as e:
979+ # NOTE: This exception may be triggered by a race condition.
980+ # Simply retrying will solve the error most of the time and
981+ # should work well enough as a workaround until the race condition
982+ # itself can be fixed.
983+ # TODO(jrosenboom): In order to be able to match the Execption
984+ # more specifically, we have to refactor ConsumerBase to use
985+ # 'channel_errors' of the kombu connection object that
986+ # has created the channel.
987+ # See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
988+ LOG.exception(_("Declaring queue failed with (%s), retrying"), e)
989+ self.queue.declare()
990
991 def _callback_handler(self, message, callback):
992 """Call callback with deserialized message.

Subscribers

People subscribed via source and target branches

to all changes: