Merge lp:~xianghui/ubuntu/trusty/neutron/lp1318721 into lp:ubuntu/trusty-proposed/neutron

Proposed by Xiang Hui
Status: Approved
Approved by: Serge Hallyn
Approved revision: 37
Proposed branch: lp:~xianghui/ubuntu/trusty/neutron/lp1318721
Merge into: lp:ubuntu/trusty-proposed/neutron
Diff against target: 1047 lines (+992/-1)
6 files modified
.pc/applied-patches (+1/-0)
.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py (+858/-0)
debian/changelog (+8/-0)
debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch (+77/-0)
debian/patches/series (+1/-0)
neutron/openstack/common/rpc/impl_kombu.py (+47/-1)
To merge this branch: bzr merge lp:~xianghui/ubuntu/trusty/neutron/lp1318721
Reviewer Review Type Date Requested Status
Ubuntu Development Team Pending
Review via email: mp+280835@code.launchpad.net

Description of the change

  * Backport of upstream release. (LP: #1318721):-
    - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
      Redeclare if exception is catched after self.queue.declare() failed.

To post a comment you must log in.
37. By Xiang Hui

Add dep3 header

Unmerged revisions

37. By Xiang Hui

Add dep3 header

36. By Xiang Hui

  * Backport of upstream release. (LP: #1318721):-
    - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
      Redeclare if exception is catched after self.queue.declare() failed.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file '.pc/applied-patches'
--- .pc/applied-patches 2014-08-08 08:30:52 +0000
+++ .pc/applied-patches 2016-01-08 08:41:27 +0000
@@ -2,3 +2,4 @@
2disable-udev-tests.patch2disable-udev-tests.patch
3skip-ipv6-tests.patch3skip-ipv6-tests.patch
4use-concurrency.patch4use-concurrency.patch
5fix-reconnect-race-condition-with-rabbitmq-cluster.patch
56
=== added file '.pc/disable-udev-tests.patch/.timestamp'
=== added file '.pc/fix-quantum-configuration.patch/.timestamp'
=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch'
=== added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/.timestamp'
=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron'
=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack'
=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common'
=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc'
=== added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py'
--- .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py 1970-01-01 00:00:00 +0000
+++ .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py 2016-01-08 08:41:27 +0000
@@ -0,0 +1,858 @@
1# Copyright 2011 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import functools
16import itertools
17import socket
18import ssl
19import time
20import uuid
21
22import eventlet
23import greenlet
24import kombu
25import kombu.connection
26import kombu.entity
27import kombu.messaging
28from oslo.config import cfg
29import six
30
31from neutron.openstack.common import excutils
32from neutron.openstack.common.gettextutils import _, _LE, _LI
33from neutron.openstack.common import network_utils
34from neutron.openstack.common.rpc import amqp as rpc_amqp
35from neutron.openstack.common.rpc import common as rpc_common
36from neutron.openstack.common import sslutils
37
38kombu_opts = [
39 cfg.StrOpt('kombu_ssl_version',
40 default='',
41 help='If SSL is enabled, the SSL version to use. Valid '
42 'values are TLSv1, SSLv23 and SSLv3. SSLv2 might '
43 'be available on some distributions.'
44 ),
45 cfg.StrOpt('kombu_ssl_keyfile',
46 default='',
47 help='SSL key file (valid only if SSL enabled)'),
48 cfg.StrOpt('kombu_ssl_certfile',
49 default='',
50 help='SSL cert file (valid only if SSL enabled)'),
51 cfg.StrOpt('kombu_ssl_ca_certs',
52 default='',
53 help=('SSL certification authority file '
54 '(valid only if SSL enabled)')),
55 cfg.StrOpt('rabbit_host',
56 default='localhost',
57 help='The RabbitMQ broker address where a single node is used'),
58 cfg.IntOpt('rabbit_port',
59 default=5672,
60 help='The RabbitMQ broker port where a single node is used'),
61 cfg.ListOpt('rabbit_hosts',
62 default=['$rabbit_host:$rabbit_port'],
63 help='RabbitMQ HA cluster host:port pairs'),
64 cfg.BoolOpt('rabbit_use_ssl',
65 default=False,
66 help='Connect over SSL for RabbitMQ'),
67 cfg.StrOpt('rabbit_userid',
68 default='guest',
69 help='The RabbitMQ userid'),
70 cfg.StrOpt('rabbit_password',
71 default='guest',
72 help='The RabbitMQ password',
73 secret=True),
74 cfg.StrOpt('rabbit_virtual_host',
75 default='/',
76 help='The RabbitMQ virtual host'),
77 cfg.IntOpt('rabbit_retry_interval',
78 default=1,
79 help='How frequently to retry connecting with RabbitMQ'),
80 cfg.IntOpt('rabbit_retry_backoff',
81 default=2,
82 help='How long to backoff for between retries when connecting '
83 'to RabbitMQ'),
84 cfg.IntOpt('rabbit_max_retries',
85 default=0,
86 help='Maximum number of RabbitMQ connection retries. '
87 'Default is 0 (infinite retry count)'),
88 cfg.BoolOpt('rabbit_ha_queues',
89 default=False,
90 help='Use HA queues in RabbitMQ (x-ha-policy: all). '
91 'If you change this option, you must wipe the '
92 'RabbitMQ database.'),
93
94]
95
96cfg.CONF.register_opts(kombu_opts)
97
98LOG = rpc_common.LOG
99
100
101def _get_queue_arguments(conf):
102 """Construct the arguments for declaring a queue.
103
104 If the rabbit_ha_queues option is set, we declare a mirrored queue
105 as described here:
106
107 http://www.rabbitmq.com/ha.html
108
109 Setting x-ha-policy to all means that the queue will be mirrored
110 to all nodes in the cluster.
111 """
112 return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
113
114
115class ConsumerBase(object):
116 """Consumer base class."""
117
118 def __init__(self, channel, callback, tag, **kwargs):
119 """Declare a queue on an amqp channel.
120
121 'channel' is the amqp channel to use
122 'callback' is the callback to call when messages are received
123 'tag' is a unique ID for the consumer on the channel
124
125 queue name, exchange name, and other kombu options are
126 passed in here as a dictionary.
127 """
128 self.callback = callback
129 self.tag = str(tag)
130 self.kwargs = kwargs
131 self.queue = None
132 self.ack_on_error = kwargs.get('ack_on_error', True)
133 self.reconnect(channel)
134
135 def reconnect(self, channel):
136 """Re-declare the queue after a rabbit reconnect."""
137 self.channel = channel
138 self.kwargs['channel'] = channel
139 self.queue = kombu.entity.Queue(**self.kwargs)
140 self.queue.declare()
141
142 def _callback_handler(self, message, callback):
143 """Call callback with deserialized message.
144
145 Messages that are processed without exception are ack'ed.
146
147 If the message processing generates an exception, it will be
148 ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
149 """
150
151 try:
152 msg = rpc_common.deserialize_msg(message.payload)
153 callback(msg)
154 except Exception:
155 if self.ack_on_error:
156 LOG.exception(_LE("Failed to process message"
157 " ... skipping it."))
158 message.ack()
159 else:
160 LOG.exception(_LE("Failed to process message"
161 " ... will requeue."))
162 message.requeue()
163 else:
164 message.ack()
165
166 def consume(self, *args, **kwargs):
167 """Actually declare the consumer on the amqp channel. This will
168 start the flow of messages from the queue. Using the
169 Connection.iterconsume() iterator will process the messages,
170 calling the appropriate callback.
171
172 If a callback is specified in kwargs, use that. Otherwise,
173 use the callback passed during __init__()
174
175 If kwargs['nowait'] is True, then this call will block until
176 a message is read.
177
178 """
179
180 options = {'consumer_tag': self.tag}
181 options['nowait'] = kwargs.get('nowait', False)
182 callback = kwargs.get('callback', self.callback)
183 if not callback:
184 raise ValueError("No callback defined")
185
186 def _callback(raw_message):
187 message = self.channel.message_to_python(raw_message)
188 self._callback_handler(message, callback)
189
190 self.queue.consume(*args, callback=_callback, **options)
191
192 def cancel(self):
193 """Cancel the consuming from the queue, if it has started."""
194 try:
195 self.queue.cancel(self.tag)
196 except KeyError as e:
197 # NOTE(comstud): Kludge to get around a amqplib bug
198 if str(e) != "u'%s'" % self.tag:
199 raise
200 self.queue = None
201
202
203class DirectConsumer(ConsumerBase):
204 """Queue/consumer class for 'direct'."""
205
206 def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
207 """Init a 'direct' queue.
208
209 'channel' is the amqp channel to use
210 'msg_id' is the msg_id to listen on
211 'callback' is the callback to call when messages are received
212 'tag' is a unique ID for the consumer on the channel
213
214 Other kombu options may be passed
215 """
216 # Default options
217 options = {'durable': False,
218 'queue_arguments': _get_queue_arguments(conf),
219 'auto_delete': True,
220 'exclusive': False}
221 options.update(kwargs)
222 exchange = kombu.entity.Exchange(name=msg_id,
223 type='direct',
224 durable=options['durable'],
225 auto_delete=options['auto_delete'])
226 super(DirectConsumer, self).__init__(channel,
227 callback,
228 tag,
229 name=msg_id,
230 exchange=exchange,
231 routing_key=msg_id,
232 **options)
233
234
235class TopicConsumer(ConsumerBase):
236 """Consumer class for 'topic'."""
237
238 def __init__(self, conf, channel, topic, callback, tag, name=None,
239 exchange_name=None, **kwargs):
240 """Init a 'topic' queue.
241
242 :param channel: the amqp channel to use
243 :param topic: the topic to listen on
244 :paramtype topic: str
245 :param callback: the callback to call when messages are received
246 :param tag: a unique ID for the consumer on the channel
247 :param name: optional queue name, defaults to topic
248 :paramtype name: str
249
250 Other kombu options may be passed as keyword arguments
251 """
252 # Default options
253 options = {'durable': conf.amqp_durable_queues,
254 'queue_arguments': _get_queue_arguments(conf),
255 'auto_delete': conf.amqp_auto_delete,
256 'exclusive': False}
257 options.update(kwargs)
258 exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
259 exchange = kombu.entity.Exchange(name=exchange_name,
260 type='topic',
261 durable=options['durable'],
262 auto_delete=options['auto_delete'])
263 super(TopicConsumer, self).__init__(channel,
264 callback,
265 tag,
266 name=name or topic,
267 exchange=exchange,
268 routing_key=topic,
269 **options)
270
271
272class FanoutConsumer(ConsumerBase):
273 """Consumer class for 'fanout'."""
274
275 def __init__(self, conf, channel, topic, callback, tag, **kwargs):
276 """Init a 'fanout' queue.
277
278 'channel' is the amqp channel to use
279 'topic' is the topic to listen on
280 'callback' is the callback to call when messages are received
281 'tag' is a unique ID for the consumer on the channel
282
283 Other kombu options may be passed
284 """
285 unique = uuid.uuid4().hex
286 exchange_name = '%s_fanout' % topic
287 queue_name = '%s_fanout_%s' % (topic, unique)
288
289 # Default options
290 options = {'durable': False,
291 'queue_arguments': _get_queue_arguments(conf),
292 'auto_delete': True,
293 'exclusive': False}
294 options.update(kwargs)
295 exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
296 durable=options['durable'],
297 auto_delete=options['auto_delete'])
298 super(FanoutConsumer, self).__init__(channel, callback, tag,
299 name=queue_name,
300 exchange=exchange,
301 routing_key=topic,
302 **options)
303
304
305class Publisher(object):
306 """Base Publisher class."""
307
308 def __init__(self, channel, exchange_name, routing_key, **kwargs):
309 """Init the Publisher class with the exchange_name, routing_key,
310 and other options
311 """
312 self.exchange_name = exchange_name
313 self.routing_key = routing_key
314 self.kwargs = kwargs
315 self.reconnect(channel)
316
317 def reconnect(self, channel):
318 """Re-establish the Producer after a rabbit reconnection."""
319 self.exchange = kombu.entity.Exchange(name=self.exchange_name,
320 **self.kwargs)
321 self.producer = kombu.messaging.Producer(exchange=self.exchange,
322 channel=channel,
323 routing_key=self.routing_key)
324
325 def send(self, msg, timeout=None):
326 """Send a message."""
327 if timeout:
328 #
329 # AMQP TTL is in milliseconds when set in the header.
330 #
331 self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
332 else:
333 self.producer.publish(msg)
334
335
336class DirectPublisher(Publisher):
337 """Publisher class for 'direct'."""
338 def __init__(self, conf, channel, msg_id, **kwargs):
339 """init a 'direct' publisher.
340
341 Kombu options may be passed as keyword args to override defaults
342 """
343
344 options = {'durable': False,
345 'auto_delete': True,
346 'exclusive': False}
347 options.update(kwargs)
348 super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
349 type='direct', **options)
350
351
352class TopicPublisher(Publisher):
353 """Publisher class for 'topic'."""
354 def __init__(self, conf, channel, topic, **kwargs):
355 """init a 'topic' publisher.
356
357 Kombu options may be passed as keyword args to override defaults
358 """
359 options = {'durable': conf.amqp_durable_queues,
360 'auto_delete': conf.amqp_auto_delete,
361 'exclusive': False}
362 options.update(kwargs)
363 exchange_name = rpc_amqp.get_control_exchange(conf)
364 super(TopicPublisher, self).__init__(channel,
365 exchange_name,
366 topic,
367 type='topic',
368 **options)
369
370
371class FanoutPublisher(Publisher):
372 """Publisher class for 'fanout'."""
373 def __init__(self, conf, channel, topic, **kwargs):
374 """init a 'fanout' publisher.
375
376 Kombu options may be passed as keyword args to override defaults
377 """
378 options = {'durable': False,
379 'auto_delete': True,
380 'exclusive': False}
381 options.update(kwargs)
382 super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
383 None, type='fanout', **options)
384
385
386class NotifyPublisher(TopicPublisher):
387 """Publisher class for 'notify'."""
388
389 def __init__(self, conf, channel, topic, **kwargs):
390 self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
391 self.queue_arguments = _get_queue_arguments(conf)
392 super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
393
394 def reconnect(self, channel):
395 super(NotifyPublisher, self).reconnect(channel)
396
397 # NOTE(jerdfelt): Normally the consumer would create the queue, but
398 # we do this to ensure that messages don't get dropped if the
399 # consumer is started after we do
400 queue = kombu.entity.Queue(channel=channel,
401 exchange=self.exchange,
402 durable=self.durable,
403 name=self.routing_key,
404 routing_key=self.routing_key,
405 queue_arguments=self.queue_arguments)
406 queue.declare()
407
408
409class Connection(object):
410 """Connection object."""
411
412 pool = None
413
414 def __init__(self, conf, server_params=None):
415 self.consumers = []
416 self.consumer_thread = None
417 self.proxy_callbacks = []
418 self.conf = conf
419 self.max_retries = self.conf.rabbit_max_retries
420 # Try forever?
421 if self.max_retries <= 0:
422 self.max_retries = None
423 self.interval_start = self.conf.rabbit_retry_interval
424 self.interval_stepping = self.conf.rabbit_retry_backoff
425 # max retry-interval = 30 seconds
426 self.interval_max = 30
427 self.memory_transport = False
428
429 if server_params is None:
430 server_params = {}
431 # Keys to translate from server_params to kombu params
432 server_params_to_kombu_params = {'username': 'userid'}
433
434 ssl_params = self._fetch_ssl_params()
435 params_list = []
436 for adr in self.conf.rabbit_hosts:
437 hostname, port = network_utils.parse_host_port(
438 adr, default_port=self.conf.rabbit_port)
439
440 params = {
441 'hostname': hostname,
442 'port': port,
443 'userid': self.conf.rabbit_userid,
444 'password': self.conf.rabbit_password,
445 'virtual_host': self.conf.rabbit_virtual_host,
446 }
447
448 for sp_key, value in six.iteritems(server_params):
449 p_key = server_params_to_kombu_params.get(sp_key, sp_key)
450 params[p_key] = value
451
452 if self.conf.fake_rabbit:
453 params['transport'] = 'memory'
454 if self.conf.rabbit_use_ssl:
455 params['ssl'] = ssl_params
456
457 params_list.append(params)
458
459 self.params_list = params_list
460
461 brokers_count = len(self.params_list)
462 self.next_broker_indices = itertools.cycle(range(brokers_count))
463
464 self.memory_transport = self.conf.fake_rabbit
465
466 self.connection = None
467 self.reconnect()
468
469 def _fetch_ssl_params(self):
470 """Handles fetching what ssl params should be used for the connection
471 (if any).
472 """
473 ssl_params = dict()
474
475 # http://docs.python.org/library/ssl.html - ssl.wrap_socket
476 if self.conf.kombu_ssl_version:
477 ssl_params['ssl_version'] = sslutils.validate_ssl_version(
478 self.conf.kombu_ssl_version)
479 if self.conf.kombu_ssl_keyfile:
480 ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
481 if self.conf.kombu_ssl_certfile:
482 ssl_params['certfile'] = self.conf.kombu_ssl_certfile
483 if self.conf.kombu_ssl_ca_certs:
484 ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
485 # We might want to allow variations in the
486 # future with this?
487 ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
488
489 # Return the extended behavior or just have the default behavior
490 return ssl_params or True
491
492 def _connect(self, params):
493 """Connect to rabbit. Re-establish any queues that may have
494 been declared before if we are reconnecting. Exceptions should
495 be handled by the caller.
496 """
497 if self.connection:
498 LOG.info(_LI("Reconnecting to AMQP server on "
499 "%(hostname)s:%(port)d") % params)
500 try:
501 self.connection.release()
502 except self.connection_errors:
503 pass
504 # Setting this in case the next statement fails, though
505 # it shouldn't be doing any network operations, yet.
506 self.connection = None
507 self.connection = kombu.connection.BrokerConnection(**params)
508 self.connection_errors = self.connection.connection_errors
509 if self.memory_transport:
510 # Kludge to speed up tests.
511 self.connection.transport.polling_interval = 0.0
512 self.consumer_num = itertools.count(1)
513 self.connection.connect()
514 self.channel = self.connection.channel()
515 # work around 'memory' transport bug in 1.1.3
516 if self.memory_transport:
517 self.channel._new_queue('ae.undeliver')
518 for consumer in self.consumers:
519 consumer.reconnect(self.channel)
520 LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') %
521 params)
522
523 def reconnect(self):
524 """Handles reconnecting and re-establishing queues.
525 Will retry up to self.max_retries number of times.
526 self.max_retries = 0 means to retry forever.
527 Sleep between tries, starting at self.interval_start
528 seconds, backing off self.interval_stepping number of seconds
529 each attempt.
530 """
531
532 attempt = 0
533 while True:
534 params = self.params_list[next(self.next_broker_indices)]
535 attempt += 1
536 try:
537 self._connect(params)
538 return
539 except (IOError, self.connection_errors) as e:
540 pass
541 except Exception as e:
542 # NOTE(comstud): Unfortunately it's possible for amqplib
543 # to return an error not covered by its transport
544 # connection_errors in the case of a timeout waiting for
545 # a protocol response. (See paste link in LP888621)
546 # So, we check all exceptions for 'timeout' in them
547 # and try to reconnect in this case.
548 if 'timeout' not in str(e):
549 raise
550
551 log_info = {}
552 log_info['err_str'] = str(e)
553 log_info['max_retries'] = self.max_retries
554 log_info.update(params)
555
556 if self.max_retries and attempt == self.max_retries:
557 msg = _('Unable to connect to AMQP server on '
558 '%(hostname)s:%(port)d after %(max_retries)d '
559 'tries: %(err_str)s') % log_info
560 LOG.error(msg)
561 raise rpc_common.RPCException(msg)
562
563 if attempt == 1:
564 sleep_time = self.interval_start or 1
565 elif attempt > 1:
566 sleep_time += self.interval_stepping
567 if self.interval_max:
568 sleep_time = min(sleep_time, self.interval_max)
569
570 log_info['sleep_time'] = sleep_time
571 LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
572 'unreachable: %(err_str)s. Trying again in '
573 '%(sleep_time)d seconds.') % log_info)
574 time.sleep(sleep_time)
575
576 def ensure(self, error_callback, method, *args, **kwargs):
577 while True:
578 try:
579 return method(*args, **kwargs)
580 except (self.connection_errors, socket.timeout, IOError) as e:
581 if error_callback:
582 error_callback(e)
583 except Exception as e:
584 # NOTE(comstud): Unfortunately it's possible for amqplib
585 # to return an error not covered by its transport
586 # connection_errors in the case of a timeout waiting for
587 # a protocol response. (See paste link in LP888621)
588 # So, we check all exceptions for 'timeout' in them
589 # and try to reconnect in this case.
590 if 'timeout' not in str(e):
591 raise
592 if error_callback:
593 error_callback(e)
594 self.reconnect()
595
596 def get_channel(self):
597 """Convenience call for bin/clear_rabbit_queues."""
598 return self.channel
599
600 def close(self):
601 """Close/release this connection."""
602 self.cancel_consumer_thread()
603 self.wait_on_proxy_callbacks()
604 self.connection.release()
605 self.connection = None
606
607 def reset(self):
608 """Reset a connection so it can be used again."""
609 self.cancel_consumer_thread()
610 self.wait_on_proxy_callbacks()
611 self.channel.close()
612 self.channel = self.connection.channel()
613 # work around 'memory' transport bug in 1.1.3
614 if self.memory_transport:
615 self.channel._new_queue('ae.undeliver')
616 self.consumers = []
617
618 def declare_consumer(self, consumer_cls, topic, callback):
619 """Create a Consumer using the class that was passed in and
620 add it to our list of consumers
621 """
622
623 def _connect_error(exc):
624 log_info = {'topic': topic, 'err_str': str(exc)}
625 LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
626 "%(err_str)s") % log_info)
627
628 def _declare_consumer():
629 consumer = consumer_cls(self.conf, self.channel, topic, callback,
630 six.next(self.consumer_num))
631 self.consumers.append(consumer)
632 return consumer
633
634 return self.ensure(_connect_error, _declare_consumer)
635
636 def iterconsume(self, limit=None, timeout=None):
637 """Return an iterator that will consume from all queues/consumers."""
638
639 info = {'do_consume': True}
640
641 def _error_callback(exc):
642 if isinstance(exc, socket.timeout):
643 LOG.debug('Timed out waiting for RPC response: %s' %
644 str(exc))
645 raise rpc_common.Timeout()
646 else:
647 LOG.exception(_LE('Failed to consume message from queue: %s') %
648 str(exc))
649 info['do_consume'] = True
650
651 def _consume():
652 if info['do_consume']:
653 queues_head = self.consumers[:-1] # not fanout.
654 queues_tail = self.consumers[-1] # fanout
655 for queue in queues_head:
656 queue.consume(nowait=True)
657 queues_tail.consume(nowait=False)
658 info['do_consume'] = False
659 return self.connection.drain_events(timeout=timeout)
660
661 for iteration in itertools.count(0):
662 if limit and iteration >= limit:
663 raise StopIteration
664 yield self.ensure(_error_callback, _consume)
665
666 def cancel_consumer_thread(self):
667 """Cancel a consumer thread."""
668 if self.consumer_thread is not None:
669 self.consumer_thread.kill()
670 try:
671 self.consumer_thread.wait()
672 except greenlet.GreenletExit:
673 pass
674 self.consumer_thread = None
675
676 def wait_on_proxy_callbacks(self):
677 """Wait for all proxy callback threads to exit."""
678 for proxy_cb in self.proxy_callbacks:
679 proxy_cb.wait()
680
681 def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
682 """Send to a publisher based on the publisher class."""
683
684 def _error_callback(exc):
685 log_info = {'topic': topic, 'err_str': str(exc)}
686 LOG.exception(_LE("Failed to publish message to topic "
687 "'%(topic)s': %(err_str)s") % log_info)
688
689 def _publish():
690 publisher = cls(self.conf, self.channel, topic, **kwargs)
691 publisher.send(msg, timeout)
692
693 self.ensure(_error_callback, _publish)
694
695 def declare_direct_consumer(self, topic, callback):
696 """Create a 'direct' queue.
697 In nova's use, this is generally a msg_id queue used for
698 responses for call/multicall
699 """
700 self.declare_consumer(DirectConsumer, topic, callback)
701
702 def declare_topic_consumer(self, topic, callback=None, queue_name=None,
703 exchange_name=None, ack_on_error=True):
704 """Create a 'topic' consumer."""
705 self.declare_consumer(functools.partial(TopicConsumer,
706 name=queue_name,
707 exchange_name=exchange_name,
708 ack_on_error=ack_on_error,
709 ),
710 topic, callback)
711
712 def declare_fanout_consumer(self, topic, callback):
713 """Create a 'fanout' consumer."""
714 self.declare_consumer(FanoutConsumer, topic, callback)
715
716 def direct_send(self, msg_id, msg):
717 """Send a 'direct' message."""
718 self.publisher_send(DirectPublisher, msg_id, msg)
719
720 def topic_send(self, topic, msg, timeout=None):
721 """Send a 'topic' message."""
722 self.publisher_send(TopicPublisher, topic, msg, timeout)
723
724 def fanout_send(self, topic, msg):
725 """Send a 'fanout' message."""
726 self.publisher_send(FanoutPublisher, topic, msg)
727
728 def notify_send(self, topic, msg, **kwargs):
729 """Send a notify message on a topic."""
730 self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
731
732 def consume(self, limit=None):
733 """Consume from all queues/consumers."""
734 it = self.iterconsume(limit=limit)
735 while True:
736 try:
737 six.next(it)
738 except StopIteration:
739 return
740
741 def consume_in_thread(self):
742 """Consumer from all queues/consumers in a greenthread."""
743 @excutils.forever_retry_uncaught_exceptions
744 def _consumer_thread():
745 try:
746 self.consume()
747 except greenlet.GreenletExit:
748 return
749 if self.consumer_thread is None:
750 self.consumer_thread = eventlet.spawn(_consumer_thread)
751 return self.consumer_thread
752
753 def create_consumer(self, topic, proxy, fanout=False):
754 """Create a consumer that calls a method in a proxy object."""
755 proxy_cb = rpc_amqp.ProxyCallback(
756 self.conf, proxy,
757 rpc_amqp.get_connection_pool(self.conf, Connection))
758 self.proxy_callbacks.append(proxy_cb)
759
760 if fanout:
761 self.declare_fanout_consumer(topic, proxy_cb)
762 else:
763 self.declare_topic_consumer(topic, proxy_cb)
764
765 def create_worker(self, topic, proxy, pool_name):
766 """Create a worker that calls a method in a proxy object."""
767 proxy_cb = rpc_amqp.ProxyCallback(
768 self.conf, proxy,
769 rpc_amqp.get_connection_pool(self.conf, Connection))
770 self.proxy_callbacks.append(proxy_cb)
771 self.declare_topic_consumer(topic, proxy_cb, pool_name)
772
773 def join_consumer_pool(self, callback, pool_name, topic,
774 exchange_name=None, ack_on_error=True):
775 """Register as a member of a group of consumers for a given topic from
776 the specified exchange.
777
778 Exactly one member of a given pool will receive each message.
779
780 A message will be delivered to multiple pools, if more than
781 one is created.
782 """
783 callback_wrapper = rpc_amqp.CallbackWrapper(
784 conf=self.conf,
785 callback=callback,
786 connection_pool=rpc_amqp.get_connection_pool(self.conf,
787 Connection),
788 wait_for_consumers=not ack_on_error
789 )
790 self.proxy_callbacks.append(callback_wrapper)
791 self.declare_topic_consumer(
792 queue_name=pool_name,
793 topic=topic,
794 exchange_name=exchange_name,
795 callback=callback_wrapper,
796 ack_on_error=ack_on_error,
797 )
798
799
800def create_connection(conf, new=True):
801 """Create a connection."""
802 return rpc_amqp.create_connection(
803 conf, new,
804 rpc_amqp.get_connection_pool(conf, Connection))
805
806
807def multicall(conf, context, topic, msg, timeout=None):
808 """Make a call that returns multiple times."""
809 return rpc_amqp.multicall(
810 conf, context, topic, msg, timeout,
811 rpc_amqp.get_connection_pool(conf, Connection))
812
813
814def call(conf, context, topic, msg, timeout=None):
815 """Sends a message on a topic and wait for a response."""
816 return rpc_amqp.call(
817 conf, context, topic, msg, timeout,
818 rpc_amqp.get_connection_pool(conf, Connection))
819
820
821def cast(conf, context, topic, msg):
822 """Sends a message on a topic without waiting for a response."""
823 return rpc_amqp.cast(
824 conf, context, topic, msg,
825 rpc_amqp.get_connection_pool(conf, Connection))
826
827
828def fanout_cast(conf, context, topic, msg):
829 """Sends a message on a fanout exchange without waiting for a response."""
830 return rpc_amqp.fanout_cast(
831 conf, context, topic, msg,
832 rpc_amqp.get_connection_pool(conf, Connection))
833
834
835def cast_to_server(conf, context, server_params, topic, msg):
836 """Sends a message on a topic to a specific server."""
837 return rpc_amqp.cast_to_server(
838 conf, context, server_params, topic, msg,
839 rpc_amqp.get_connection_pool(conf, Connection))
840
841
842def fanout_cast_to_server(conf, context, server_params, topic, msg):
843 """Sends a message on a fanout exchange to a specific server."""
844 return rpc_amqp.fanout_cast_to_server(
845 conf, context, server_params, topic, msg,
846 rpc_amqp.get_connection_pool(conf, Connection))
847
848
849def notify(conf, context, topic, msg, envelope):
850 """Sends a notification event on a topic."""
851 return rpc_amqp.notify(
852 conf, context, topic, msg,
853 rpc_amqp.get_connection_pool(conf, Connection),
854 envelope)
855
856
857def cleanup():
858 return rpc_amqp.cleanup(Connection.pool)
0859
=== added file '.pc/skip-ipv6-tests.patch/.timestamp'
=== added file '.pc/use-concurrency.patch/.timestamp'
=== modified file 'debian/changelog'
--- debian/changelog 2015-06-22 10:14:52 +0000
+++ debian/changelog 2016-01-08 08:41:27 +0000
@@ -1,3 +1,11 @@
1neutron (1:2014.1.5-0ubuntu2) trusty; urgency=medium
2
3 * Backport upstream release. (LP: #1318721):
4 - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
5 Redeclare if exception is catched after self.queue.declare() failed.
6
7 -- Hui Xiang <hui.xiang@canonical.com> Thu, 17 Dec 2015 16:35:40 +0800
8
1neutron (1:2014.1.5-0ubuntu1) trusty; urgency=medium9neutron (1:2014.1.5-0ubuntu1) trusty; urgency=medium
210
3 * Resynchronize with stable/icehouse (877df58) (LP: #1467533):11 * Resynchronize with stable/icehouse (877df58) (LP: #1467533):
412
=== added file 'debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch'
--- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 00:00:00 +0000
+++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-08 08:41:27 +0000
@@ -0,0 +1,77 @@
1--- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-07 15:15:26.000000000 +0800
2+++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 08:00:00.000000000 +0800
3@@ -1,74 +0,0 @@
4-Description: Fix reconnect race condition with RabbitMQ cluster
5-
6- Retry declaring Queue if it fail to workaround the race condition
7- that may happen when both Neutron and RabbitMQ cluster are trying
8- to create and delete (Respectively) Queues and Exchanges that have
9- auto-delete flag set.
10-
11- Change-Id: I90febd7c8358b7cccf852c272810e2fad3d91dcd
12- Closes-Bug: #1318721
13-
14-Author: Mouad Benchchaoui <m.benchchaoui@x-ion.de>
15-Origin: backport, https://review.openstack.org/#/c/93399/1
16-Bug: https://bugs.launchpad.net/neutron/+bug/1318721
17----
18-This patch header follows DEP-3: http://dep.debian.net/deps/dep3/
19-Index: neutron-2014.1.5/neutron/openstack/common/rpc/impl_kombu.py
20-===================================================================
21---- neutron-2014.1.5.orig/neutron/openstack/common/rpc/impl_kombu.py 2015-06-18 22:26:11.000000000 +0000
22-+++ neutron-2014.1.5/neutron/openstack/common/rpc/impl_kombu.py 2015-12-01 02:40:27.806902000 +0000
23-@@ -137,7 +137,53 @@
24- self.channel = channel
25- self.kwargs['channel'] = channel
26- self.queue = kombu.entity.Queue(**self.kwargs)
27-- self.queue.declare()
28-+ try:
29-+ self.queue.declare()
30-+ except Exception:
31-+ # NOTE(Mouad): Catching Exception b/c Kombu doesn't raise a proper
32-+ # error instead it's raise what the underlying transport library
33-+ # raise which can be either 'ampq.NotFound' or
34-+ # 'librabbitmq.ChannelError' depending on which transport library
35-+ # is used.
36-+ LOG.exception("Declaring queue fail retrying ...")
37-+ # NOTE(Mouad): We need to re-try Queue creation in case the Queue
38-+ # was created with auto-delete this instruct the Broker to delete
39-+ # the Queue when the last Consumer disconnect from it, and the
40-+ # Exchange when the last Queue is deleted from this Exchange.
41-+ #
42-+ # Now in a RabbitMQ cluster setup, if the cluster node that we are
43-+ # connected to go down, 2 things will happen:
44-+ #
45-+ # 1. From RabbitMQ side the Queues will be deleted from the other
46-+ # cluster nodes and then the correspanding Exchanges will also
47-+ # be deleted.
48-+ # 2. From Neutron side, Neutron will reconnect to another cluster
49-+ # node and start creating Exchanges then Queues then Binding
50-+ # (The order is important to understand the problem).
51-+ #
52-+ # Now this may lead to a race condition, specially if Neutron create
53-+ # Exchange and Queue and before Neutron could bind Queue to Exchange
54-+ # RabbitMQ nodes just received the 'signal' that they need to delete
55-+ # Exchanges with auto-delete that belong to the down node, so they
56-+ # will delete the Exchanges that was just created, and when Neutron
57-+ # try to bind Queue with Exchange, the Binding will fail b/c the
58-+ # Exchange is not found.
59-+ #
60-+ # But if the first Queue declartion work and binding was created we
61-+ # suppose that RabbitMQ will not try to delete the Exchange even if
62-+ # the auto-delete propagation wasn't received yet, b/c the Queue
63-+ # have a Consumer now and a Binding exist with the Exchange.
64-+ #
65-+ # Note: AMQP 0-9-1 deprecated the use of 'auto-delete' for Exchanges
66-+ # but according to here[1] RabbitMQ doesn't seem that he will delete
67-+ # it:
68-+ #
69-+ # The 'auto-delete' flag on 'exchange.declare' got deprecated in
70-+ # 0-9-1. Auto-delete exchanges are actually quite useful, so this
71-+ # flag should be restored.
72-+ #
73-+ # [1] http://www.rabbitmq.com/amqp-0-9-1-errata.html
74-+ self.queue.declare()
75-
76- def _callback_handler(self, message, callback):
77- """Call callback with deserialized message.
078
=== modified file 'debian/patches/series'
--- debian/patches/series 2014-08-08 08:30:52 +0000
+++ debian/patches/series 2016-01-08 08:41:27 +0000
@@ -2,3 +2,4 @@
2disable-udev-tests.patch2disable-udev-tests.patch
3skip-ipv6-tests.patch3skip-ipv6-tests.patch
4use-concurrency.patch4use-concurrency.patch
5fix-reconnect-race-condition-with-rabbitmq-cluster.patch
56
=== modified file 'neutron/openstack/common/rpc/impl_kombu.py'
--- neutron/openstack/common/rpc/impl_kombu.py 2014-04-01 16:22:54 +0000
+++ neutron/openstack/common/rpc/impl_kombu.py 2016-01-08 08:41:27 +0000
@@ -137,7 +137,53 @@
137 self.channel = channel137 self.channel = channel
138 self.kwargs['channel'] = channel138 self.kwargs['channel'] = channel
139 self.queue = kombu.entity.Queue(**self.kwargs)139 self.queue = kombu.entity.Queue(**self.kwargs)
140 self.queue.declare()140 try:
141 self.queue.declare()
142 except Exception:
143 # NOTE(Mouad): Catching Exception b/c Kombu doesn't raise a proper
144 # error instead it's raise what the underlying transport library
145 # raise which can be either 'ampq.NotFound' or
146 # 'librabbitmq.ChannelError' depending on which transport library
147 # is used.
148 LOG.exception("Declaring queue fail retrying ...")
149 # NOTE(Mouad): We need to re-try Queue creation in case the Queue
150 # was created with auto-delete this instruct the Broker to delete
151 # the Queue when the last Consumer disconnect from it, and the
152 # Exchange when the last Queue is deleted from this Exchange.
153 #
154 # Now in a RabbitMQ cluster setup, if the cluster node that we are
155 # connected to go down, 2 things will happen:
156 #
157 # 1. From RabbitMQ side the Queues will be deleted from the other
158 # cluster nodes and then the correspanding Exchanges will also
159 # be deleted.
160 # 2. From Neutron side, Neutron will reconnect to another cluster
161 # node and start creating Exchanges then Queues then Binding
162 # (The order is important to understand the problem).
163 #
164 # Now this may lead to a race condition, specially if Neutron create
165 # Exchange and Queue and before Neutron could bind Queue to Exchange
166 # RabbitMQ nodes just received the 'signal' that they need to delete
167 # Exchanges with auto-delete that belong to the down node, so they
168 # will delete the Exchanges that was just created, and when Neutron
169 # try to bind Queue with Exchange, the Binding will fail b/c the
170 # Exchange is not found.
171 #
172 # But if the first Queue declartion work and binding was created we
173 # suppose that RabbitMQ will not try to delete the Exchange even if
174 # the auto-delete propagation wasn't received yet, b/c the Queue
175 # have a Consumer now and a Binding exist with the Exchange.
176 #
177 # Note: AMQP 0-9-1 deprecated the use of 'auto-delete' for Exchanges
178 # but according to here[1] RabbitMQ doesn't seem that he will delete
179 # it:
180 #
181 # The 'auto-delete' flag on 'exchange.declare' got deprecated in
182 # 0-9-1. Auto-delete exchanges are actually quite useful, so this
183 # flag should be restored.
184 #
185 # [1] http://www.rabbitmq.com/amqp-0-9-1-errata.html
186 self.queue.declare()
141187
142 def _callback_handler(self, message, callback):188 def _callback_handler(self, message, callback):
143 """Call callback with deserialized message.189 """Call callback with deserialized message.

Subscribers

People subscribed via source and target branches

to all changes: