Merge lp:~xianghui/ubuntu/trusty/neutron/lp1318721 into lp:ubuntu/trusty-proposed/neutron
- Trusty (14.04)
- lp1318721
- Merge into trusty-proposed
Proposed by
Xiang Hui
Status: | Approved | ||||
---|---|---|---|---|---|
Approved by: | Serge Hallyn | ||||
Approved revision: | 37 | ||||
Proposed branch: | lp:~xianghui/ubuntu/trusty/neutron/lp1318721 | ||||
Merge into: | lp:ubuntu/trusty-proposed/neutron | ||||
Diff against target: |
1047 lines (+992/-1) 6 files modified
.pc/applied-patches (+1/-0) .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py (+858/-0) debian/changelog (+8/-0) debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch (+77/-0) debian/patches/series (+1/-0) neutron/openstack/common/rpc/impl_kombu.py (+47/-1) |
||||
To merge this branch: | bzr merge lp:~xianghui/ubuntu/trusty/neutron/lp1318721 | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Development Team | Pending | ||
Review via email: mp+280835@code.launchpad.net |
Commit message
Description of the change
* Backport of upstream release. (LP: #1318721):-
- d/p/fix-
Redeclare if exception is catched after self.queue.
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file '.pc/applied-patches' | |||
2 | --- .pc/applied-patches 2014-08-08 08:30:52 +0000 | |||
3 | +++ .pc/applied-patches 2016-01-08 08:41:27 +0000 | |||
4 | @@ -2,3 +2,4 @@ | |||
5 | 2 | disable-udev-tests.patch | 2 | disable-udev-tests.patch |
6 | 3 | skip-ipv6-tests.patch | 3 | skip-ipv6-tests.patch |
7 | 4 | use-concurrency.patch | 4 | use-concurrency.patch |
8 | 5 | fix-reconnect-race-condition-with-rabbitmq-cluster.patch | ||
9 | 5 | 6 | ||
10 | === added file '.pc/disable-udev-tests.patch/.timestamp' | |||
11 | === added file '.pc/fix-quantum-configuration.patch/.timestamp' | |||
12 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch' | |||
13 | === added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/.timestamp' | |||
14 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron' | |||
15 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack' | |||
16 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common' | |||
17 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc' | |||
18 | === added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py' | |||
19 | --- .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py 1970-01-01 00:00:00 +0000 | |||
20 | +++ .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py 2016-01-08 08:41:27 +0000 | |||
21 | @@ -0,0 +1,858 @@ | |||
22 | 1 | # Copyright 2011 OpenStack Foundation | ||
23 | 2 | # | ||
24 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
25 | 4 | # not use this file except in compliance with the License. You may obtain | ||
26 | 5 | # a copy of the License at | ||
27 | 6 | # | ||
28 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
29 | 8 | # | ||
30 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
31 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
32 | 11 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
33 | 12 | # License for the specific language governing permissions and limitations | ||
34 | 13 | # under the License. | ||
35 | 14 | |||
36 | 15 | import functools | ||
37 | 16 | import itertools | ||
38 | 17 | import socket | ||
39 | 18 | import ssl | ||
40 | 19 | import time | ||
41 | 20 | import uuid | ||
42 | 21 | |||
43 | 22 | import eventlet | ||
44 | 23 | import greenlet | ||
45 | 24 | import kombu | ||
46 | 25 | import kombu.connection | ||
47 | 26 | import kombu.entity | ||
48 | 27 | import kombu.messaging | ||
49 | 28 | from oslo.config import cfg | ||
50 | 29 | import six | ||
51 | 30 | |||
52 | 31 | from neutron.openstack.common import excutils | ||
53 | 32 | from neutron.openstack.common.gettextutils import _, _LE, _LI | ||
54 | 33 | from neutron.openstack.common import network_utils | ||
55 | 34 | from neutron.openstack.common.rpc import amqp as rpc_amqp | ||
56 | 35 | from neutron.openstack.common.rpc import common as rpc_common | ||
57 | 36 | from neutron.openstack.common import sslutils | ||
58 | 37 | |||
59 | 38 | kombu_opts = [ | ||
60 | 39 | cfg.StrOpt('kombu_ssl_version', | ||
61 | 40 | default='', | ||
62 | 41 | help='If SSL is enabled, the SSL version to use. Valid ' | ||
63 | 42 | 'values are TLSv1, SSLv23 and SSLv3. SSLv2 might ' | ||
64 | 43 | 'be available on some distributions.' | ||
65 | 44 | ), | ||
66 | 45 | cfg.StrOpt('kombu_ssl_keyfile', | ||
67 | 46 | default='', | ||
68 | 47 | help='SSL key file (valid only if SSL enabled)'), | ||
69 | 48 | cfg.StrOpt('kombu_ssl_certfile', | ||
70 | 49 | default='', | ||
71 | 50 | help='SSL cert file (valid only if SSL enabled)'), | ||
72 | 51 | cfg.StrOpt('kombu_ssl_ca_certs', | ||
73 | 52 | default='', | ||
74 | 53 | help=('SSL certification authority file ' | ||
75 | 54 | '(valid only if SSL enabled)')), | ||
76 | 55 | cfg.StrOpt('rabbit_host', | ||
77 | 56 | default='localhost', | ||
78 | 57 | help='The RabbitMQ broker address where a single node is used'), | ||
79 | 58 | cfg.IntOpt('rabbit_port', | ||
80 | 59 | default=5672, | ||
81 | 60 | help='The RabbitMQ broker port where a single node is used'), | ||
82 | 61 | cfg.ListOpt('rabbit_hosts', | ||
83 | 62 | default=['$rabbit_host:$rabbit_port'], | ||
84 | 63 | help='RabbitMQ HA cluster host:port pairs'), | ||
85 | 64 | cfg.BoolOpt('rabbit_use_ssl', | ||
86 | 65 | default=False, | ||
87 | 66 | help='Connect over SSL for RabbitMQ'), | ||
88 | 67 | cfg.StrOpt('rabbit_userid', | ||
89 | 68 | default='guest', | ||
90 | 69 | help='The RabbitMQ userid'), | ||
91 | 70 | cfg.StrOpt('rabbit_password', | ||
92 | 71 | default='guest', | ||
93 | 72 | help='The RabbitMQ password', | ||
94 | 73 | secret=True), | ||
95 | 74 | cfg.StrOpt('rabbit_virtual_host', | ||
96 | 75 | default='/', | ||
97 | 76 | help='The RabbitMQ virtual host'), | ||
98 | 77 | cfg.IntOpt('rabbit_retry_interval', | ||
99 | 78 | default=1, | ||
100 | 79 | help='How frequently to retry connecting with RabbitMQ'), | ||
101 | 80 | cfg.IntOpt('rabbit_retry_backoff', | ||
102 | 81 | default=2, | ||
103 | 82 | help='How long to backoff for between retries when connecting ' | ||
104 | 83 | 'to RabbitMQ'), | ||
105 | 84 | cfg.IntOpt('rabbit_max_retries', | ||
106 | 85 | default=0, | ||
107 | 86 | help='Maximum number of RabbitMQ connection retries. ' | ||
108 | 87 | 'Default is 0 (infinite retry count)'), | ||
109 | 88 | cfg.BoolOpt('rabbit_ha_queues', | ||
110 | 89 | default=False, | ||
111 | 90 | help='Use HA queues in RabbitMQ (x-ha-policy: all). ' | ||
112 | 91 | 'If you change this option, you must wipe the ' | ||
113 | 92 | 'RabbitMQ database.'), | ||
114 | 93 | |||
115 | 94 | ] | ||
116 | 95 | |||
117 | 96 | cfg.CONF.register_opts(kombu_opts) | ||
118 | 97 | |||
119 | 98 | LOG = rpc_common.LOG | ||
120 | 99 | |||
121 | 100 | |||
122 | 101 | def _get_queue_arguments(conf): | ||
123 | 102 | """Construct the arguments for declaring a queue. | ||
124 | 103 | |||
125 | 104 | If the rabbit_ha_queues option is set, we declare a mirrored queue | ||
126 | 105 | as described here: | ||
127 | 106 | |||
128 | 107 | http://www.rabbitmq.com/ha.html | ||
129 | 108 | |||
130 | 109 | Setting x-ha-policy to all means that the queue will be mirrored | ||
131 | 110 | to all nodes in the cluster. | ||
132 | 111 | """ | ||
133 | 112 | return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} | ||
134 | 113 | |||
135 | 114 | |||
136 | 115 | class ConsumerBase(object): | ||
137 | 116 | """Consumer base class.""" | ||
138 | 117 | |||
139 | 118 | def __init__(self, channel, callback, tag, **kwargs): | ||
140 | 119 | """Declare a queue on an amqp channel. | ||
141 | 120 | |||
142 | 121 | 'channel' is the amqp channel to use | ||
143 | 122 | 'callback' is the callback to call when messages are received | ||
144 | 123 | 'tag' is a unique ID for the consumer on the channel | ||
145 | 124 | |||
146 | 125 | queue name, exchange name, and other kombu options are | ||
147 | 126 | passed in here as a dictionary. | ||
148 | 127 | """ | ||
149 | 128 | self.callback = callback | ||
150 | 129 | self.tag = str(tag) | ||
151 | 130 | self.kwargs = kwargs | ||
152 | 131 | self.queue = None | ||
153 | 132 | self.ack_on_error = kwargs.get('ack_on_error', True) | ||
154 | 133 | self.reconnect(channel) | ||
155 | 134 | |||
156 | 135 | def reconnect(self, channel): | ||
157 | 136 | """Re-declare the queue after a rabbit reconnect.""" | ||
158 | 137 | self.channel = channel | ||
159 | 138 | self.kwargs['channel'] = channel | ||
160 | 139 | self.queue = kombu.entity.Queue(**self.kwargs) | ||
161 | 140 | self.queue.declare() | ||
162 | 141 | |||
163 | 142 | def _callback_handler(self, message, callback): | ||
164 | 143 | """Call callback with deserialized message. | ||
165 | 144 | |||
166 | 145 | Messages that are processed without exception are ack'ed. | ||
167 | 146 | |||
168 | 147 | If the message processing generates an exception, it will be | ||
169 | 148 | ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed. | ||
170 | 149 | """ | ||
171 | 150 | |||
172 | 151 | try: | ||
173 | 152 | msg = rpc_common.deserialize_msg(message.payload) | ||
174 | 153 | callback(msg) | ||
175 | 154 | except Exception: | ||
176 | 155 | if self.ack_on_error: | ||
177 | 156 | LOG.exception(_LE("Failed to process message" | ||
178 | 157 | " ... skipping it.")) | ||
179 | 158 | message.ack() | ||
180 | 159 | else: | ||
181 | 160 | LOG.exception(_LE("Failed to process message" | ||
182 | 161 | " ... will requeue.")) | ||
183 | 162 | message.requeue() | ||
184 | 163 | else: | ||
185 | 164 | message.ack() | ||
186 | 165 | |||
187 | 166 | def consume(self, *args, **kwargs): | ||
188 | 167 | """Actually declare the consumer on the amqp channel. This will | ||
189 | 168 | start the flow of messages from the queue. Using the | ||
190 | 169 | Connection.iterconsume() iterator will process the messages, | ||
191 | 170 | calling the appropriate callback. | ||
192 | 171 | |||
193 | 172 | If a callback is specified in kwargs, use that. Otherwise, | ||
194 | 173 | use the callback passed during __init__() | ||
195 | 174 | |||
196 | 175 | If kwargs['nowait'] is True, then this call will block until | ||
197 | 176 | a message is read. | ||
198 | 177 | |||
199 | 178 | """ | ||
200 | 179 | |||
201 | 180 | options = {'consumer_tag': self.tag} | ||
202 | 181 | options['nowait'] = kwargs.get('nowait', False) | ||
203 | 182 | callback = kwargs.get('callback', self.callback) | ||
204 | 183 | if not callback: | ||
205 | 184 | raise ValueError("No callback defined") | ||
206 | 185 | |||
207 | 186 | def _callback(raw_message): | ||
208 | 187 | message = self.channel.message_to_python(raw_message) | ||
209 | 188 | self._callback_handler(message, callback) | ||
210 | 189 | |||
211 | 190 | self.queue.consume(*args, callback=_callback, **options) | ||
212 | 191 | |||
213 | 192 | def cancel(self): | ||
214 | 193 | """Cancel the consuming from the queue, if it has started.""" | ||
215 | 194 | try: | ||
216 | 195 | self.queue.cancel(self.tag) | ||
217 | 196 | except KeyError as e: | ||
218 | 197 | # NOTE(comstud): Kludge to get around a amqplib bug | ||
219 | 198 | if str(e) != "u'%s'" % self.tag: | ||
220 | 199 | raise | ||
221 | 200 | self.queue = None | ||
222 | 201 | |||
223 | 202 | |||
224 | 203 | class DirectConsumer(ConsumerBase): | ||
225 | 204 | """Queue/consumer class for 'direct'.""" | ||
226 | 205 | |||
227 | 206 | def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): | ||
228 | 207 | """Init a 'direct' queue. | ||
229 | 208 | |||
230 | 209 | 'channel' is the amqp channel to use | ||
231 | 210 | 'msg_id' is the msg_id to listen on | ||
232 | 211 | 'callback' is the callback to call when messages are received | ||
233 | 212 | 'tag' is a unique ID for the consumer on the channel | ||
234 | 213 | |||
235 | 214 | Other kombu options may be passed | ||
236 | 215 | """ | ||
237 | 216 | # Default options | ||
238 | 217 | options = {'durable': False, | ||
239 | 218 | 'queue_arguments': _get_queue_arguments(conf), | ||
240 | 219 | 'auto_delete': True, | ||
241 | 220 | 'exclusive': False} | ||
242 | 221 | options.update(kwargs) | ||
243 | 222 | exchange = kombu.entity.Exchange(name=msg_id, | ||
244 | 223 | type='direct', | ||
245 | 224 | durable=options['durable'], | ||
246 | 225 | auto_delete=options['auto_delete']) | ||
247 | 226 | super(DirectConsumer, self).__init__(channel, | ||
248 | 227 | callback, | ||
249 | 228 | tag, | ||
250 | 229 | name=msg_id, | ||
251 | 230 | exchange=exchange, | ||
252 | 231 | routing_key=msg_id, | ||
253 | 232 | **options) | ||
254 | 233 | |||
255 | 234 | |||
256 | 235 | class TopicConsumer(ConsumerBase): | ||
257 | 236 | """Consumer class for 'topic'.""" | ||
258 | 237 | |||
259 | 238 | def __init__(self, conf, channel, topic, callback, tag, name=None, | ||
260 | 239 | exchange_name=None, **kwargs): | ||
261 | 240 | """Init a 'topic' queue. | ||
262 | 241 | |||
263 | 242 | :param channel: the amqp channel to use | ||
264 | 243 | :param topic: the topic to listen on | ||
265 | 244 | :paramtype topic: str | ||
266 | 245 | :param callback: the callback to call when messages are received | ||
267 | 246 | :param tag: a unique ID for the consumer on the channel | ||
268 | 247 | :param name: optional queue name, defaults to topic | ||
269 | 248 | :paramtype name: str | ||
270 | 249 | |||
271 | 250 | Other kombu options may be passed as keyword arguments | ||
272 | 251 | """ | ||
273 | 252 | # Default options | ||
274 | 253 | options = {'durable': conf.amqp_durable_queues, | ||
275 | 254 | 'queue_arguments': _get_queue_arguments(conf), | ||
276 | 255 | 'auto_delete': conf.amqp_auto_delete, | ||
277 | 256 | 'exclusive': False} | ||
278 | 257 | options.update(kwargs) | ||
279 | 258 | exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) | ||
280 | 259 | exchange = kombu.entity.Exchange(name=exchange_name, | ||
281 | 260 | type='topic', | ||
282 | 261 | durable=options['durable'], | ||
283 | 262 | auto_delete=options['auto_delete']) | ||
284 | 263 | super(TopicConsumer, self).__init__(channel, | ||
285 | 264 | callback, | ||
286 | 265 | tag, | ||
287 | 266 | name=name or topic, | ||
288 | 267 | exchange=exchange, | ||
289 | 268 | routing_key=topic, | ||
290 | 269 | **options) | ||
291 | 270 | |||
292 | 271 | |||
293 | 272 | class FanoutConsumer(ConsumerBase): | ||
294 | 273 | """Consumer class for 'fanout'.""" | ||
295 | 274 | |||
296 | 275 | def __init__(self, conf, channel, topic, callback, tag, **kwargs): | ||
297 | 276 | """Init a 'fanout' queue. | ||
298 | 277 | |||
299 | 278 | 'channel' is the amqp channel to use | ||
300 | 279 | 'topic' is the topic to listen on | ||
301 | 280 | 'callback' is the callback to call when messages are received | ||
302 | 281 | 'tag' is a unique ID for the consumer on the channel | ||
303 | 282 | |||
304 | 283 | Other kombu options may be passed | ||
305 | 284 | """ | ||
306 | 285 | unique = uuid.uuid4().hex | ||
307 | 286 | exchange_name = '%s_fanout' % topic | ||
308 | 287 | queue_name = '%s_fanout_%s' % (topic, unique) | ||
309 | 288 | |||
310 | 289 | # Default options | ||
311 | 290 | options = {'durable': False, | ||
312 | 291 | 'queue_arguments': _get_queue_arguments(conf), | ||
313 | 292 | 'auto_delete': True, | ||
314 | 293 | 'exclusive': False} | ||
315 | 294 | options.update(kwargs) | ||
316 | 295 | exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', | ||
317 | 296 | durable=options['durable'], | ||
318 | 297 | auto_delete=options['auto_delete']) | ||
319 | 298 | super(FanoutConsumer, self).__init__(channel, callback, tag, | ||
320 | 299 | name=queue_name, | ||
321 | 300 | exchange=exchange, | ||
322 | 301 | routing_key=topic, | ||
323 | 302 | **options) | ||
324 | 303 | |||
325 | 304 | |||
326 | 305 | class Publisher(object): | ||
327 | 306 | """Base Publisher class.""" | ||
328 | 307 | |||
329 | 308 | def __init__(self, channel, exchange_name, routing_key, **kwargs): | ||
330 | 309 | """Init the Publisher class with the exchange_name, routing_key, | ||
331 | 310 | and other options | ||
332 | 311 | """ | ||
333 | 312 | self.exchange_name = exchange_name | ||
334 | 313 | self.routing_key = routing_key | ||
335 | 314 | self.kwargs = kwargs | ||
336 | 315 | self.reconnect(channel) | ||
337 | 316 | |||
338 | 317 | def reconnect(self, channel): | ||
339 | 318 | """Re-establish the Producer after a rabbit reconnection.""" | ||
340 | 319 | self.exchange = kombu.entity.Exchange(name=self.exchange_name, | ||
341 | 320 | **self.kwargs) | ||
342 | 321 | self.producer = kombu.messaging.Producer(exchange=self.exchange, | ||
343 | 322 | channel=channel, | ||
344 | 323 | routing_key=self.routing_key) | ||
345 | 324 | |||
346 | 325 | def send(self, msg, timeout=None): | ||
347 | 326 | """Send a message.""" | ||
348 | 327 | if timeout: | ||
349 | 328 | # | ||
350 | 329 | # AMQP TTL is in milliseconds when set in the header. | ||
351 | 330 | # | ||
352 | 331 | self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) | ||
353 | 332 | else: | ||
354 | 333 | self.producer.publish(msg) | ||
355 | 334 | |||
356 | 335 | |||
357 | 336 | class DirectPublisher(Publisher): | ||
358 | 337 | """Publisher class for 'direct'.""" | ||
359 | 338 | def __init__(self, conf, channel, msg_id, **kwargs): | ||
360 | 339 | """init a 'direct' publisher. | ||
361 | 340 | |||
362 | 341 | Kombu options may be passed as keyword args to override defaults | ||
363 | 342 | """ | ||
364 | 343 | |||
365 | 344 | options = {'durable': False, | ||
366 | 345 | 'auto_delete': True, | ||
367 | 346 | 'exclusive': False} | ||
368 | 347 | options.update(kwargs) | ||
369 | 348 | super(DirectPublisher, self).__init__(channel, msg_id, msg_id, | ||
370 | 349 | type='direct', **options) | ||
371 | 350 | |||
372 | 351 | |||
373 | 352 | class TopicPublisher(Publisher): | ||
374 | 353 | """Publisher class for 'topic'.""" | ||
375 | 354 | def __init__(self, conf, channel, topic, **kwargs): | ||
376 | 355 | """init a 'topic' publisher. | ||
377 | 356 | |||
378 | 357 | Kombu options may be passed as keyword args to override defaults | ||
379 | 358 | """ | ||
380 | 359 | options = {'durable': conf.amqp_durable_queues, | ||
381 | 360 | 'auto_delete': conf.amqp_auto_delete, | ||
382 | 361 | 'exclusive': False} | ||
383 | 362 | options.update(kwargs) | ||
384 | 363 | exchange_name = rpc_amqp.get_control_exchange(conf) | ||
385 | 364 | super(TopicPublisher, self).__init__(channel, | ||
386 | 365 | exchange_name, | ||
387 | 366 | topic, | ||
388 | 367 | type='topic', | ||
389 | 368 | **options) | ||
390 | 369 | |||
391 | 370 | |||
392 | 371 | class FanoutPublisher(Publisher): | ||
393 | 372 | """Publisher class for 'fanout'.""" | ||
394 | 373 | def __init__(self, conf, channel, topic, **kwargs): | ||
395 | 374 | """init a 'fanout' publisher. | ||
396 | 375 | |||
397 | 376 | Kombu options may be passed as keyword args to override defaults | ||
398 | 377 | """ | ||
399 | 378 | options = {'durable': False, | ||
400 | 379 | 'auto_delete': True, | ||
401 | 380 | 'exclusive': False} | ||
402 | 381 | options.update(kwargs) | ||
403 | 382 | super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, | ||
404 | 383 | None, type='fanout', **options) | ||
405 | 384 | |||
406 | 385 | |||
407 | 386 | class NotifyPublisher(TopicPublisher): | ||
408 | 387 | """Publisher class for 'notify'.""" | ||
409 | 388 | |||
410 | 389 | def __init__(self, conf, channel, topic, **kwargs): | ||
411 | 390 | self.durable = kwargs.pop('durable', conf.amqp_durable_queues) | ||
412 | 391 | self.queue_arguments = _get_queue_arguments(conf) | ||
413 | 392 | super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) | ||
414 | 393 | |||
415 | 394 | def reconnect(self, channel): | ||
416 | 395 | super(NotifyPublisher, self).reconnect(channel) | ||
417 | 396 | |||
418 | 397 | # NOTE(jerdfelt): Normally the consumer would create the queue, but | ||
419 | 398 | # we do this to ensure that messages don't get dropped if the | ||
420 | 399 | # consumer is started after we do | ||
421 | 400 | queue = kombu.entity.Queue(channel=channel, | ||
422 | 401 | exchange=self.exchange, | ||
423 | 402 | durable=self.durable, | ||
424 | 403 | name=self.routing_key, | ||
425 | 404 | routing_key=self.routing_key, | ||
426 | 405 | queue_arguments=self.queue_arguments) | ||
427 | 406 | queue.declare() | ||
428 | 407 | |||
429 | 408 | |||
430 | 409 | class Connection(object): | ||
431 | 410 | """Connection object.""" | ||
432 | 411 | |||
433 | 412 | pool = None | ||
434 | 413 | |||
435 | 414 | def __init__(self, conf, server_params=None): | ||
436 | 415 | self.consumers = [] | ||
437 | 416 | self.consumer_thread = None | ||
438 | 417 | self.proxy_callbacks = [] | ||
439 | 418 | self.conf = conf | ||
440 | 419 | self.max_retries = self.conf.rabbit_max_retries | ||
441 | 420 | # Try forever? | ||
442 | 421 | if self.max_retries <= 0: | ||
443 | 422 | self.max_retries = None | ||
444 | 423 | self.interval_start = self.conf.rabbit_retry_interval | ||
445 | 424 | self.interval_stepping = self.conf.rabbit_retry_backoff | ||
446 | 425 | # max retry-interval = 30 seconds | ||
447 | 426 | self.interval_max = 30 | ||
448 | 427 | self.memory_transport = False | ||
449 | 428 | |||
450 | 429 | if server_params is None: | ||
451 | 430 | server_params = {} | ||
452 | 431 | # Keys to translate from server_params to kombu params | ||
453 | 432 | server_params_to_kombu_params = {'username': 'userid'} | ||
454 | 433 | |||
455 | 434 | ssl_params = self._fetch_ssl_params() | ||
456 | 435 | params_list = [] | ||
457 | 436 | for adr in self.conf.rabbit_hosts: | ||
458 | 437 | hostname, port = network_utils.parse_host_port( | ||
459 | 438 | adr, default_port=self.conf.rabbit_port) | ||
460 | 439 | |||
461 | 440 | params = { | ||
462 | 441 | 'hostname': hostname, | ||
463 | 442 | 'port': port, | ||
464 | 443 | 'userid': self.conf.rabbit_userid, | ||
465 | 444 | 'password': self.conf.rabbit_password, | ||
466 | 445 | 'virtual_host': self.conf.rabbit_virtual_host, | ||
467 | 446 | } | ||
468 | 447 | |||
469 | 448 | for sp_key, value in six.iteritems(server_params): | ||
470 | 449 | p_key = server_params_to_kombu_params.get(sp_key, sp_key) | ||
471 | 450 | params[p_key] = value | ||
472 | 451 | |||
473 | 452 | if self.conf.fake_rabbit: | ||
474 | 453 | params['transport'] = 'memory' | ||
475 | 454 | if self.conf.rabbit_use_ssl: | ||
476 | 455 | params['ssl'] = ssl_params | ||
477 | 456 | |||
478 | 457 | params_list.append(params) | ||
479 | 458 | |||
480 | 459 | self.params_list = params_list | ||
481 | 460 | |||
482 | 461 | brokers_count = len(self.params_list) | ||
483 | 462 | self.next_broker_indices = itertools.cycle(range(brokers_count)) | ||
484 | 463 | |||
485 | 464 | self.memory_transport = self.conf.fake_rabbit | ||
486 | 465 | |||
487 | 466 | self.connection = None | ||
488 | 467 | self.reconnect() | ||
489 | 468 | |||
490 | 469 | def _fetch_ssl_params(self): | ||
491 | 470 | """Handles fetching what ssl params should be used for the connection | ||
492 | 471 | (if any). | ||
493 | 472 | """ | ||
494 | 473 | ssl_params = dict() | ||
495 | 474 | |||
496 | 475 | # http://docs.python.org/library/ssl.html - ssl.wrap_socket | ||
497 | 476 | if self.conf.kombu_ssl_version: | ||
498 | 477 | ssl_params['ssl_version'] = sslutils.validate_ssl_version( | ||
499 | 478 | self.conf.kombu_ssl_version) | ||
500 | 479 | if self.conf.kombu_ssl_keyfile: | ||
501 | 480 | ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile | ||
502 | 481 | if self.conf.kombu_ssl_certfile: | ||
503 | 482 | ssl_params['certfile'] = self.conf.kombu_ssl_certfile | ||
504 | 483 | if self.conf.kombu_ssl_ca_certs: | ||
505 | 484 | ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs | ||
506 | 485 | # We might want to allow variations in the | ||
507 | 486 | # future with this? | ||
508 | 487 | ssl_params['cert_reqs'] = ssl.CERT_REQUIRED | ||
509 | 488 | |||
510 | 489 | # Return the extended behavior or just have the default behavior | ||
511 | 490 | return ssl_params or True | ||
512 | 491 | |||
513 | 492 | def _connect(self, params): | ||
514 | 493 | """Connect to rabbit. Re-establish any queues that may have | ||
515 | 494 | been declared before if we are reconnecting. Exceptions should | ||
516 | 495 | be handled by the caller. | ||
517 | 496 | """ | ||
518 | 497 | if self.connection: | ||
519 | 498 | LOG.info(_LI("Reconnecting to AMQP server on " | ||
520 | 499 | "%(hostname)s:%(port)d") % params) | ||
521 | 500 | try: | ||
522 | 501 | self.connection.release() | ||
523 | 502 | except self.connection_errors: | ||
524 | 503 | pass | ||
525 | 504 | # Setting this in case the next statement fails, though | ||
526 | 505 | # it shouldn't be doing any network operations, yet. | ||
527 | 506 | self.connection = None | ||
528 | 507 | self.connection = kombu.connection.BrokerConnection(**params) | ||
529 | 508 | self.connection_errors = self.connection.connection_errors | ||
530 | 509 | if self.memory_transport: | ||
531 | 510 | # Kludge to speed up tests. | ||
532 | 511 | self.connection.transport.polling_interval = 0.0 | ||
533 | 512 | self.consumer_num = itertools.count(1) | ||
534 | 513 | self.connection.connect() | ||
535 | 514 | self.channel = self.connection.channel() | ||
536 | 515 | # work around 'memory' transport bug in 1.1.3 | ||
537 | 516 | if self.memory_transport: | ||
538 | 517 | self.channel._new_queue('ae.undeliver') | ||
539 | 518 | for consumer in self.consumers: | ||
540 | 519 | consumer.reconnect(self.channel) | ||
541 | 520 | LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') % | ||
542 | 521 | params) | ||
543 | 522 | |||
544 | 523 | def reconnect(self): | ||
545 | 524 | """Handles reconnecting and re-establishing queues. | ||
546 | 525 | Will retry up to self.max_retries number of times. | ||
547 | 526 | self.max_retries = 0 means to retry forever. | ||
548 | 527 | Sleep between tries, starting at self.interval_start | ||
549 | 528 | seconds, backing off self.interval_stepping number of seconds | ||
550 | 529 | each attempt. | ||
551 | 530 | """ | ||
552 | 531 | |||
553 | 532 | attempt = 0 | ||
554 | 533 | while True: | ||
555 | 534 | params = self.params_list[next(self.next_broker_indices)] | ||
556 | 535 | attempt += 1 | ||
557 | 536 | try: | ||
558 | 537 | self._connect(params) | ||
559 | 538 | return | ||
560 | 539 | except (IOError, self.connection_errors) as e: | ||
561 | 540 | pass | ||
562 | 541 | except Exception as e: | ||
563 | 542 | # NOTE(comstud): Unfortunately it's possible for amqplib | ||
564 | 543 | # to return an error not covered by its transport | ||
565 | 544 | # connection_errors in the case of a timeout waiting for | ||
566 | 545 | # a protocol response. (See paste link in LP888621) | ||
567 | 546 | # So, we check all exceptions for 'timeout' in them | ||
568 | 547 | # and try to reconnect in this case. | ||
569 | 548 | if 'timeout' not in str(e): | ||
570 | 549 | raise | ||
571 | 550 | |||
572 | 551 | log_info = {} | ||
573 | 552 | log_info['err_str'] = str(e) | ||
574 | 553 | log_info['max_retries'] = self.max_retries | ||
575 | 554 | log_info.update(params) | ||
576 | 555 | |||
577 | 556 | if self.max_retries and attempt == self.max_retries: | ||
578 | 557 | msg = _('Unable to connect to AMQP server on ' | ||
579 | 558 | '%(hostname)s:%(port)d after %(max_retries)d ' | ||
580 | 559 | 'tries: %(err_str)s') % log_info | ||
581 | 560 | LOG.error(msg) | ||
582 | 561 | raise rpc_common.RPCException(msg) | ||
583 | 562 | |||
584 | 563 | if attempt == 1: | ||
585 | 564 | sleep_time = self.interval_start or 1 | ||
586 | 565 | elif attempt > 1: | ||
587 | 566 | sleep_time += self.interval_stepping | ||
588 | 567 | if self.interval_max: | ||
589 | 568 | sleep_time = min(sleep_time, self.interval_max) | ||
590 | 569 | |||
591 | 570 | log_info['sleep_time'] = sleep_time | ||
592 | 571 | LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is ' | ||
593 | 572 | 'unreachable: %(err_str)s. Trying again in ' | ||
594 | 573 | '%(sleep_time)d seconds.') % log_info) | ||
595 | 574 | time.sleep(sleep_time) | ||
596 | 575 | |||
597 | 576 | def ensure(self, error_callback, method, *args, **kwargs): | ||
598 | 577 | while True: | ||
599 | 578 | try: | ||
600 | 579 | return method(*args, **kwargs) | ||
601 | 580 | except (self.connection_errors, socket.timeout, IOError) as e: | ||
602 | 581 | if error_callback: | ||
603 | 582 | error_callback(e) | ||
604 | 583 | except Exception as e: | ||
605 | 584 | # NOTE(comstud): Unfortunately it's possible for amqplib | ||
606 | 585 | # to return an error not covered by its transport | ||
607 | 586 | # connection_errors in the case of a timeout waiting for | ||
608 | 587 | # a protocol response. (See paste link in LP888621) | ||
609 | 588 | # So, we check all exceptions for 'timeout' in them | ||
610 | 589 | # and try to reconnect in this case. | ||
611 | 590 | if 'timeout' not in str(e): | ||
612 | 591 | raise | ||
613 | 592 | if error_callback: | ||
614 | 593 | error_callback(e) | ||
615 | 594 | self.reconnect() | ||
616 | 595 | |||
617 | 596 | def get_channel(self): | ||
618 | 597 | """Convenience call for bin/clear_rabbit_queues.""" | ||
619 | 598 | return self.channel | ||
620 | 599 | |||
621 | 600 | def close(self): | ||
622 | 601 | """Close/release this connection.""" | ||
623 | 602 | self.cancel_consumer_thread() | ||
624 | 603 | self.wait_on_proxy_callbacks() | ||
625 | 604 | self.connection.release() | ||
626 | 605 | self.connection = None | ||
627 | 606 | |||
628 | 607 | def reset(self): | ||
629 | 608 | """Reset a connection so it can be used again.""" | ||
630 | 609 | self.cancel_consumer_thread() | ||
631 | 610 | self.wait_on_proxy_callbacks() | ||
632 | 611 | self.channel.close() | ||
633 | 612 | self.channel = self.connection.channel() | ||
634 | 613 | # work around 'memory' transport bug in 1.1.3 | ||
635 | 614 | if self.memory_transport: | ||
636 | 615 | self.channel._new_queue('ae.undeliver') | ||
637 | 616 | self.consumers = [] | ||
638 | 617 | |||
639 | 618 | def declare_consumer(self, consumer_cls, topic, callback): | ||
640 | 619 | """Create a Consumer using the class that was passed in and | ||
641 | 620 | add it to our list of consumers | ||
642 | 621 | """ | ||
643 | 622 | |||
644 | 623 | def _connect_error(exc): | ||
645 | 624 | log_info = {'topic': topic, 'err_str': str(exc)} | ||
646 | 625 | LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': " | ||
647 | 626 | "%(err_str)s") % log_info) | ||
648 | 627 | |||
649 | 628 | def _declare_consumer(): | ||
650 | 629 | consumer = consumer_cls(self.conf, self.channel, topic, callback, | ||
651 | 630 | six.next(self.consumer_num)) | ||
652 | 631 | self.consumers.append(consumer) | ||
653 | 632 | return consumer | ||
654 | 633 | |||
655 | 634 | return self.ensure(_connect_error, _declare_consumer) | ||
656 | 635 | |||
657 | 636 | def iterconsume(self, limit=None, timeout=None): | ||
658 | 637 | """Return an iterator that will consume from all queues/consumers.""" | ||
659 | 638 | |||
660 | 639 | info = {'do_consume': True} | ||
661 | 640 | |||
662 | 641 | def _error_callback(exc): | ||
663 | 642 | if isinstance(exc, socket.timeout): | ||
664 | 643 | LOG.debug('Timed out waiting for RPC response: %s' % | ||
665 | 644 | str(exc)) | ||
666 | 645 | raise rpc_common.Timeout() | ||
667 | 646 | else: | ||
668 | 647 | LOG.exception(_LE('Failed to consume message from queue: %s') % | ||
669 | 648 | str(exc)) | ||
670 | 649 | info['do_consume'] = True | ||
671 | 650 | |||
672 | 651 | def _consume(): | ||
673 | 652 | if info['do_consume']: | ||
674 | 653 | queues_head = self.consumers[:-1] # not fanout. | ||
675 | 654 | queues_tail = self.consumers[-1] # fanout | ||
676 | 655 | for queue in queues_head: | ||
677 | 656 | queue.consume(nowait=True) | ||
678 | 657 | queues_tail.consume(nowait=False) | ||
679 | 658 | info['do_consume'] = False | ||
680 | 659 | return self.connection.drain_events(timeout=timeout) | ||
681 | 660 | |||
682 | 661 | for iteration in itertools.count(0): | ||
683 | 662 | if limit and iteration >= limit: | ||
684 | 663 | raise StopIteration | ||
685 | 664 | yield self.ensure(_error_callback, _consume) | ||
686 | 665 | |||
687 | 666 | def cancel_consumer_thread(self): | ||
688 | 667 | """Cancel a consumer thread.""" | ||
689 | 668 | if self.consumer_thread is not None: | ||
690 | 669 | self.consumer_thread.kill() | ||
691 | 670 | try: | ||
692 | 671 | self.consumer_thread.wait() | ||
693 | 672 | except greenlet.GreenletExit: | ||
694 | 673 | pass | ||
695 | 674 | self.consumer_thread = None | ||
696 | 675 | |||
697 | 676 | def wait_on_proxy_callbacks(self): | ||
698 | 677 | """Wait for all proxy callback threads to exit.""" | ||
699 | 678 | for proxy_cb in self.proxy_callbacks: | ||
700 | 679 | proxy_cb.wait() | ||
701 | 680 | |||
702 | 681 | def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): | ||
703 | 682 | """Send to a publisher based on the publisher class.""" | ||
704 | 683 | |||
705 | 684 | def _error_callback(exc): | ||
706 | 685 | log_info = {'topic': topic, 'err_str': str(exc)} | ||
707 | 686 | LOG.exception(_LE("Failed to publish message to topic " | ||
708 | 687 | "'%(topic)s': %(err_str)s") % log_info) | ||
709 | 688 | |||
710 | 689 | def _publish(): | ||
711 | 690 | publisher = cls(self.conf, self.channel, topic, **kwargs) | ||
712 | 691 | publisher.send(msg, timeout) | ||
713 | 692 | |||
714 | 693 | self.ensure(_error_callback, _publish) | ||
715 | 694 | |||
716 | 695 | def declare_direct_consumer(self, topic, callback): | ||
717 | 696 | """Create a 'direct' queue. | ||
718 | 697 | In nova's use, this is generally a msg_id queue used for | ||
719 | 698 | responses for call/multicall | ||
720 | 699 | """ | ||
721 | 700 | self.declare_consumer(DirectConsumer, topic, callback) | ||
722 | 701 | |||
723 | 702 | def declare_topic_consumer(self, topic, callback=None, queue_name=None, | ||
724 | 703 | exchange_name=None, ack_on_error=True): | ||
725 | 704 | """Create a 'topic' consumer.""" | ||
726 | 705 | self.declare_consumer(functools.partial(TopicConsumer, | ||
727 | 706 | name=queue_name, | ||
728 | 707 | exchange_name=exchange_name, | ||
729 | 708 | ack_on_error=ack_on_error, | ||
730 | 709 | ), | ||
731 | 710 | topic, callback) | ||
732 | 711 | |||
733 | 712 | def declare_fanout_consumer(self, topic, callback): | ||
734 | 713 | """Create a 'fanout' consumer.""" | ||
735 | 714 | self.declare_consumer(FanoutConsumer, topic, callback) | ||
736 | 715 | |||
737 | 716 | def direct_send(self, msg_id, msg): | ||
738 | 717 | """Send a 'direct' message.""" | ||
739 | 718 | self.publisher_send(DirectPublisher, msg_id, msg) | ||
740 | 719 | |||
741 | 720 | def topic_send(self, topic, msg, timeout=None): | ||
742 | 721 | """Send a 'topic' message.""" | ||
743 | 722 | self.publisher_send(TopicPublisher, topic, msg, timeout) | ||
744 | 723 | |||
745 | 724 | def fanout_send(self, topic, msg): | ||
746 | 725 | """Send a 'fanout' message.""" | ||
747 | 726 | self.publisher_send(FanoutPublisher, topic, msg) | ||
748 | 727 | |||
749 | 728 | def notify_send(self, topic, msg, **kwargs): | ||
750 | 729 | """Send a notify message on a topic.""" | ||
751 | 730 | self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs) | ||
752 | 731 | |||
753 | 732 | def consume(self, limit=None): | ||
754 | 733 | """Consume from all queues/consumers.""" | ||
755 | 734 | it = self.iterconsume(limit=limit) | ||
756 | 735 | while True: | ||
757 | 736 | try: | ||
758 | 737 | six.next(it) | ||
759 | 738 | except StopIteration: | ||
760 | 739 | return | ||
761 | 740 | |||
762 | 741 | def consume_in_thread(self): | ||
763 | 742 | """Consumer from all queues/consumers in a greenthread.""" | ||
764 | 743 | @excutils.forever_retry_uncaught_exceptions | ||
765 | 744 | def _consumer_thread(): | ||
766 | 745 | try: | ||
767 | 746 | self.consume() | ||
768 | 747 | except greenlet.GreenletExit: | ||
769 | 748 | return | ||
770 | 749 | if self.consumer_thread is None: | ||
771 | 750 | self.consumer_thread = eventlet.spawn(_consumer_thread) | ||
772 | 751 | return self.consumer_thread | ||
773 | 752 | |||
774 | 753 | def create_consumer(self, topic, proxy, fanout=False): | ||
775 | 754 | """Create a consumer that calls a method in a proxy object.""" | ||
776 | 755 | proxy_cb = rpc_amqp.ProxyCallback( | ||
777 | 756 | self.conf, proxy, | ||
778 | 757 | rpc_amqp.get_connection_pool(self.conf, Connection)) | ||
779 | 758 | self.proxy_callbacks.append(proxy_cb) | ||
780 | 759 | |||
781 | 760 | if fanout: | ||
782 | 761 | self.declare_fanout_consumer(topic, proxy_cb) | ||
783 | 762 | else: | ||
784 | 763 | self.declare_topic_consumer(topic, proxy_cb) | ||
785 | 764 | |||
786 | 765 | def create_worker(self, topic, proxy, pool_name): | ||
787 | 766 | """Create a worker that calls a method in a proxy object.""" | ||
788 | 767 | proxy_cb = rpc_amqp.ProxyCallback( | ||
789 | 768 | self.conf, proxy, | ||
790 | 769 | rpc_amqp.get_connection_pool(self.conf, Connection)) | ||
791 | 770 | self.proxy_callbacks.append(proxy_cb) | ||
792 | 771 | self.declare_topic_consumer(topic, proxy_cb, pool_name) | ||
793 | 772 | |||
794 | 773 | def join_consumer_pool(self, callback, pool_name, topic, | ||
795 | 774 | exchange_name=None, ack_on_error=True): | ||
796 | 775 | """Register as a member of a group of consumers for a given topic from | ||
797 | 776 | the specified exchange. | ||
798 | 777 | |||
799 | 778 | Exactly one member of a given pool will receive each message. | ||
800 | 779 | |||
801 | 780 | A message will be delivered to multiple pools, if more than | ||
802 | 781 | one is created. | ||
803 | 782 | """ | ||
804 | 783 | callback_wrapper = rpc_amqp.CallbackWrapper( | ||
805 | 784 | conf=self.conf, | ||
806 | 785 | callback=callback, | ||
807 | 786 | connection_pool=rpc_amqp.get_connection_pool(self.conf, | ||
808 | 787 | Connection), | ||
809 | 788 | wait_for_consumers=not ack_on_error | ||
810 | 789 | ) | ||
811 | 790 | self.proxy_callbacks.append(callback_wrapper) | ||
812 | 791 | self.declare_topic_consumer( | ||
813 | 792 | queue_name=pool_name, | ||
814 | 793 | topic=topic, | ||
815 | 794 | exchange_name=exchange_name, | ||
816 | 795 | callback=callback_wrapper, | ||
817 | 796 | ack_on_error=ack_on_error, | ||
818 | 797 | ) | ||
819 | 798 | |||
820 | 799 | |||
821 | 800 | def create_connection(conf, new=True): | ||
822 | 801 | """Create a connection.""" | ||
823 | 802 | return rpc_amqp.create_connection( | ||
824 | 803 | conf, new, | ||
825 | 804 | rpc_amqp.get_connection_pool(conf, Connection)) | ||
826 | 805 | |||
827 | 806 | |||
828 | 807 | def multicall(conf, context, topic, msg, timeout=None): | ||
829 | 808 | """Make a call that returns multiple times.""" | ||
830 | 809 | return rpc_amqp.multicall( | ||
831 | 810 | conf, context, topic, msg, timeout, | ||
832 | 811 | rpc_amqp.get_connection_pool(conf, Connection)) | ||
833 | 812 | |||
834 | 813 | |||
835 | 814 | def call(conf, context, topic, msg, timeout=None): | ||
836 | 815 | """Sends a message on a topic and wait for a response.""" | ||
837 | 816 | return rpc_amqp.call( | ||
838 | 817 | conf, context, topic, msg, timeout, | ||
839 | 818 | rpc_amqp.get_connection_pool(conf, Connection)) | ||
840 | 819 | |||
841 | 820 | |||
842 | 821 | def cast(conf, context, topic, msg): | ||
843 | 822 | """Sends a message on a topic without waiting for a response.""" | ||
844 | 823 | return rpc_amqp.cast( | ||
845 | 824 | conf, context, topic, msg, | ||
846 | 825 | rpc_amqp.get_connection_pool(conf, Connection)) | ||
847 | 826 | |||
848 | 827 | |||
849 | 828 | def fanout_cast(conf, context, topic, msg): | ||
850 | 829 | """Sends a message on a fanout exchange without waiting for a response.""" | ||
851 | 830 | return rpc_amqp.fanout_cast( | ||
852 | 831 | conf, context, topic, msg, | ||
853 | 832 | rpc_amqp.get_connection_pool(conf, Connection)) | ||
854 | 833 | |||
855 | 834 | |||
856 | 835 | def cast_to_server(conf, context, server_params, topic, msg): | ||
857 | 836 | """Sends a message on a topic to a specific server.""" | ||
858 | 837 | return rpc_amqp.cast_to_server( | ||
859 | 838 | conf, context, server_params, topic, msg, | ||
860 | 839 | rpc_amqp.get_connection_pool(conf, Connection)) | ||
861 | 840 | |||
862 | 841 | |||
863 | 842 | def fanout_cast_to_server(conf, context, server_params, topic, msg): | ||
864 | 843 | """Sends a message on a fanout exchange to a specific server.""" | ||
865 | 844 | return rpc_amqp.fanout_cast_to_server( | ||
866 | 845 | conf, context, server_params, topic, msg, | ||
867 | 846 | rpc_amqp.get_connection_pool(conf, Connection)) | ||
868 | 847 | |||
869 | 848 | |||
870 | 849 | def notify(conf, context, topic, msg, envelope): | ||
871 | 850 | """Sends a notification event on a topic.""" | ||
872 | 851 | return rpc_amqp.notify( | ||
873 | 852 | conf, context, topic, msg, | ||
874 | 853 | rpc_amqp.get_connection_pool(conf, Connection), | ||
875 | 854 | envelope) | ||
876 | 855 | |||
877 | 856 | |||
878 | 857 | def cleanup(): | ||
879 | 858 | return rpc_amqp.cleanup(Connection.pool) | ||
880 | 0 | 859 | ||
881 | === added file '.pc/skip-ipv6-tests.patch/.timestamp' | |||
882 | === added file '.pc/use-concurrency.patch/.timestamp' | |||
883 | === modified file 'debian/changelog' | |||
884 | --- debian/changelog 2015-06-22 10:14:52 +0000 | |||
885 | +++ debian/changelog 2016-01-08 08:41:27 +0000 | |||
886 | @@ -1,3 +1,11 @@ | |||
887 | 1 | neutron (1:2014.1.5-0ubuntu2) trusty; urgency=medium | ||
888 | 2 | |||
889 | 3 | * Backport upstream release. (LP: #1318721): | ||
890 | 4 | - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch: | ||
891 | 5 | Redeclare if exception is catched after self.queue.declare() failed. | ||
892 | 6 | |||
893 | 7 | -- Hui Xiang <hui.xiang@canonical.com> Thu, 17 Dec 2015 16:35:40 +0800 | ||
894 | 8 | |||
895 | 1 | neutron (1:2014.1.5-0ubuntu1) trusty; urgency=medium | 9 | neutron (1:2014.1.5-0ubuntu1) trusty; urgency=medium |
896 | 2 | 10 | ||
897 | 3 | * Resynchronize with stable/icehouse (877df58) (LP: #1467533): | 11 | * Resynchronize with stable/icehouse (877df58) (LP: #1467533): |
898 | 4 | 12 | ||
899 | === added file 'debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch' | |||
900 | --- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 00:00:00 +0000 | |||
901 | +++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-08 08:41:27 +0000 | |||
902 | @@ -0,0 +1,77 @@ | |||
903 | 1 | --- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-07 15:15:26.000000000 +0800 | ||
904 | 2 | +++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 08:00:00.000000000 +0800 | ||
905 | 3 | @@ -1,74 +0,0 @@ | ||
906 | 4 | -Description: Fix reconnect race condition with RabbitMQ cluster | ||
907 | 5 | - | ||
908 | 6 | - Retry declaring Queue if it fail to workaround the race condition | ||
909 | 7 | - that may happen when both Neutron and RabbitMQ cluster are trying | ||
910 | 8 | - to create and delete (Respectively) Queues and Exchanges that have | ||
911 | 9 | - auto-delete flag set. | ||
912 | 10 | - | ||
913 | 11 | - Change-Id: I90febd7c8358b7cccf852c272810e2fad3d91dcd | ||
914 | 12 | - Closes-Bug: #1318721 | ||
915 | 13 | - | ||
916 | 14 | -Author: Mouad Benchchaoui <m.benchchaoui@x-ion.de> | ||
917 | 15 | -Origin: backport, https://review.openstack.org/#/c/93399/1 | ||
918 | 16 | -Bug: https://bugs.launchpad.net/neutron/+bug/1318721 | ||
919 | 17 | ---- | ||
920 | 18 | -This patch header follows DEP-3: http://dep.debian.net/deps/dep3/ | ||
921 | 19 | -Index: neutron-2014.1.5/neutron/openstack/common/rpc/impl_kombu.py | ||
922 | 20 | -=================================================================== | ||
923 | 21 | ---- neutron-2014.1.5.orig/neutron/openstack/common/rpc/impl_kombu.py 2015-06-18 22:26:11.000000000 +0000 | ||
924 | 22 | -+++ neutron-2014.1.5/neutron/openstack/common/rpc/impl_kombu.py 2015-12-01 02:40:27.806902000 +0000 | ||
925 | 23 | -@@ -137,7 +137,53 @@ | ||
926 | 24 | - self.channel = channel | ||
927 | 25 | - self.kwargs['channel'] = channel | ||
928 | 26 | - self.queue = kombu.entity.Queue(**self.kwargs) | ||
929 | 27 | -- self.queue.declare() | ||
930 | 28 | -+ try: | ||
931 | 29 | -+ self.queue.declare() | ||
932 | 30 | -+ except Exception: | ||
933 | 31 | -+ # NOTE(Mouad): Catching Exception b/c Kombu doesn't raise a proper | ||
934 | 32 | -+ # error instead it's raise what the underlying transport library | ||
935 | 33 | -+ # raise which can be either 'ampq.NotFound' or | ||
936 | 34 | -+ # 'librabbitmq.ChannelError' depending on which transport library | ||
937 | 35 | -+ # is used. | ||
938 | 36 | -+ LOG.exception("Declaring queue fail retrying ...") | ||
939 | 37 | -+ # NOTE(Mouad): We need to re-try Queue creation in case the Queue | ||
940 | 38 | -+ # was created with auto-delete this instruct the Broker to delete | ||
941 | 39 | -+ # the Queue when the last Consumer disconnect from it, and the | ||
942 | 40 | -+ # Exchange when the last Queue is deleted from this Exchange. | ||
943 | 41 | -+ # | ||
944 | 42 | -+ # Now in a RabbitMQ cluster setup, if the cluster node that we are | ||
945 | 43 | -+ # connected to go down, 2 things will happen: | ||
946 | 44 | -+ # | ||
947 | 45 | -+ # 1. From RabbitMQ side the Queues will be deleted from the other | ||
948 | 46 | -+ # cluster nodes and then the correspanding Exchanges will also | ||
949 | 47 | -+ # be deleted. | ||
950 | 48 | -+ # 2. From Neutron side, Neutron will reconnect to another cluster | ||
951 | 49 | -+ # node and start creating Exchanges then Queues then Binding | ||
952 | 50 | -+ # (The order is important to understand the problem). | ||
953 | 51 | -+ # | ||
954 | 52 | -+ # Now this may lead to a race condition, specially if Neutron create | ||
955 | 53 | -+ # Exchange and Queue and before Neutron could bind Queue to Exchange | ||
956 | 54 | -+ # RabbitMQ nodes just received the 'signal' that they need to delete | ||
957 | 55 | -+ # Exchanges with auto-delete that belong to the down node, so they | ||
958 | 56 | -+ # will delete the Exchanges that was just created, and when Neutron | ||
959 | 57 | -+ # try to bind Queue with Exchange, the Binding will fail b/c the | ||
960 | 58 | -+ # Exchange is not found. | ||
961 | 59 | -+ # | ||
962 | 60 | -+ # But if the first Queue declartion work and binding was created we | ||
963 | 61 | -+ # suppose that RabbitMQ will not try to delete the Exchange even if | ||
964 | 62 | -+ # the auto-delete propagation wasn't received yet, b/c the Queue | ||
965 | 63 | -+ # have a Consumer now and a Binding exist with the Exchange. | ||
966 | 64 | -+ # | ||
967 | 65 | -+ # Note: AMQP 0-9-1 deprecated the use of 'auto-delete' for Exchanges | ||
968 | 66 | -+ # but according to here[1] RabbitMQ doesn't seem that he will delete | ||
969 | 67 | -+ # it: | ||
970 | 68 | -+ # | ||
971 | 69 | -+ # The 'auto-delete' flag on 'exchange.declare' got deprecated in | ||
972 | 70 | -+ # 0-9-1. Auto-delete exchanges are actually quite useful, so this | ||
973 | 71 | -+ # flag should be restored. | ||
974 | 72 | -+ # | ||
975 | 73 | -+ # [1] http://www.rabbitmq.com/amqp-0-9-1-errata.html | ||
976 | 74 | -+ self.queue.declare() | ||
977 | 75 | - | ||
978 | 76 | - def _callback_handler(self, message, callback): | ||
979 | 77 | - """Call callback with deserialized message. | ||
980 | 0 | 78 | ||
981 | === modified file 'debian/patches/series' | |||
982 | --- debian/patches/series 2014-08-08 08:30:52 +0000 | |||
983 | +++ debian/patches/series 2016-01-08 08:41:27 +0000 | |||
984 | @@ -2,3 +2,4 @@ | |||
985 | 2 | disable-udev-tests.patch | 2 | disable-udev-tests.patch |
986 | 3 | skip-ipv6-tests.patch | 3 | skip-ipv6-tests.patch |
987 | 4 | use-concurrency.patch | 4 | use-concurrency.patch |
988 | 5 | fix-reconnect-race-condition-with-rabbitmq-cluster.patch | ||
989 | 5 | 6 | ||
990 | === modified file 'neutron/openstack/common/rpc/impl_kombu.py' | |||
991 | --- neutron/openstack/common/rpc/impl_kombu.py 2014-04-01 16:22:54 +0000 | |||
992 | +++ neutron/openstack/common/rpc/impl_kombu.py 2016-01-08 08:41:27 +0000 | |||
993 | @@ -137,7 +137,53 @@ | |||
994 | 137 | self.channel = channel | 137 | self.channel = channel |
995 | 138 | self.kwargs['channel'] = channel | 138 | self.kwargs['channel'] = channel |
996 | 139 | self.queue = kombu.entity.Queue(**self.kwargs) | 139 | self.queue = kombu.entity.Queue(**self.kwargs) |
998 | 140 | self.queue.declare() | 140 | try: |
999 | 141 | self.queue.declare() | ||
1000 | 142 | except Exception: | ||
1001 | 143 | # NOTE(Mouad): Catching Exception b/c Kombu doesn't raise a proper | ||
1002 | 144 | # error instead it's raise what the underlying transport library | ||
1003 | 145 | # raise which can be either 'ampq.NotFound' or | ||
1004 | 146 | # 'librabbitmq.ChannelError' depending on which transport library | ||
1005 | 147 | # is used. | ||
1006 | 148 | LOG.exception("Declaring queue fail retrying ...") | ||
1007 | 149 | # NOTE(Mouad): We need to re-try Queue creation in case the Queue | ||
1008 | 150 | # was created with auto-delete this instruct the Broker to delete | ||
1009 | 151 | # the Queue when the last Consumer disconnect from it, and the | ||
1010 | 152 | # Exchange when the last Queue is deleted from this Exchange. | ||
1011 | 153 | # | ||
1012 | 154 | # Now in a RabbitMQ cluster setup, if the cluster node that we are | ||
1013 | 155 | # connected to go down, 2 things will happen: | ||
1014 | 156 | # | ||
1015 | 157 | # 1. From RabbitMQ side the Queues will be deleted from the other | ||
1016 | 158 | # cluster nodes and then the correspanding Exchanges will also | ||
1017 | 159 | # be deleted. | ||
1018 | 160 | # 2. From Neutron side, Neutron will reconnect to another cluster | ||
1019 | 161 | # node and start creating Exchanges then Queues then Binding | ||
1020 | 162 | # (The order is important to understand the problem). | ||
1021 | 163 | # | ||
1022 | 164 | # Now this may lead to a race condition, specially if Neutron create | ||
1023 | 165 | # Exchange and Queue and before Neutron could bind Queue to Exchange | ||
1024 | 166 | # RabbitMQ nodes just received the 'signal' that they need to delete | ||
1025 | 167 | # Exchanges with auto-delete that belong to the down node, so they | ||
1026 | 168 | # will delete the Exchanges that was just created, and when Neutron | ||
1027 | 169 | # try to bind Queue with Exchange, the Binding will fail b/c the | ||
1028 | 170 | # Exchange is not found. | ||
1029 | 171 | # | ||
1030 | 172 | # But if the first Queue declartion work and binding was created we | ||
1031 | 173 | # suppose that RabbitMQ will not try to delete the Exchange even if | ||
1032 | 174 | # the auto-delete propagation wasn't received yet, b/c the Queue | ||
1033 | 175 | # have a Consumer now and a Binding exist with the Exchange. | ||
1034 | 176 | # | ||
1035 | 177 | # Note: AMQP 0-9-1 deprecated the use of 'auto-delete' for Exchanges | ||
1036 | 178 | # but according to here[1] RabbitMQ doesn't seem that he will delete | ||
1037 | 179 | # it: | ||
1038 | 180 | # | ||
1039 | 181 | # The 'auto-delete' flag on 'exchange.declare' got deprecated in | ||
1040 | 182 | # 0-9-1. Auto-delete exchanges are actually quite useful, so this | ||
1041 | 183 | # flag should be restored. | ||
1042 | 184 | # | ||
1043 | 185 | # [1] http://www.rabbitmq.com/amqp-0-9-1-errata.html | ||
1044 | 186 | self.queue.declare() | ||
1045 | 141 | 187 | ||
1046 | 142 | def _callback_handler(self, message, callback): | 188 | def _callback_handler(self, message, callback): |
1047 | 143 | """Call callback with deserialized message. | 189 | """Call callback with deserialized message. |