Merge lp:~cbehrens/nova/rpc-kombu into lp:~hudson-openstack/nova/trunk

Proposed by Chris Behrens
Status: Merged
Approved by: Soren Hansen
Approved revision: 1529
Merged at revision: 1513
Proposed branch: lp:~cbehrens/nova/rpc-kombu
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 1853 lines (+1262/-330)
17 files modified
bin/nova-ajax-console-proxy (+6/-6)
contrib/nova.sh (+1/-1)
nova/flags.py (+6/-2)
nova/rpc/__init__.py (+17/-26)
nova/rpc/common.py (+6/-0)
nova/rpc/impl_carrot.py (+81/-21)
nova/rpc/impl_kombu.py (+781/-0)
nova/service.py (+11/-21)
nova/tests/test_adminapi.py (+0/-2)
nova/tests/test_cloud.py (+0/-2)
nova/tests/test_rpc.py (+6/-158)
nova/tests/test_rpc_amqp.py (+0/-88)
nova/tests/test_rpc_carrot.py (+45/-0)
nova/tests/test_rpc_common.py (+189/-0)
nova/tests/test_rpc_kombu.py (+110/-0)
nova/tests/test_test.py (+2/-3)
tools/pip-requires (+1/-0)
To merge this branch: bzr merge lp:~cbehrens/nova/rpc-kombu
Reviewer Review Type Date Requested Status
Dan Prince (community) Approve
Zed A. Shaw (community) Approve
Brian Lamar (community) Needs Information
Vish Ishaya (community) Approve
Joseph Heck (community) Approve
Dave Walker Pending
Review via email: mp+73096@code.launchpad.net

Description of the change

Implements lp:798876 which is 'switch carrot to kombu'. Leaves carrot as the default for now... decision will be made later to switch the default to kombu after further testing. There's a lot of code duplication between carrot and kombu, but I left it that way in preparation for ripping carrot out later and to keep minimal changes to carrot.

This also fixes bug: lp:794627 (re-establish connections to carrot when it restarts), but only fixes it in kombu.
This also fixes bug: lp:803168 (msg-id response queues being left around), but also only fixes it in kombu.

See those bugs for comments.

To post a comment you must log in.
Revision history for this message
Chris Behrens (cbehrens) wrote :

One question I have is... carrot would call sys.exit() when it couldn't connect to rabbit after so many attempts. I left this in when handling lp:794627, but it seems I should probably make it try to re-connect forever. Thoughts?

Revision history for this message
Joseph Heck (heckj) wrote :

+1 for reconnecting forever, but with some fail-off on retry interval so that a failure doesn't end up spamming the network with attempts. Some exponential try interval that caps at a retry interval maximum of roughly every 5 minutes is what I would think would be reasonable.

Revision history for this message
Chris Behrens (cbehrens) wrote :

yeah, there's some backoff built into the ensure_connection() call I'm making to kombu. I'll look at reconnecting forever.

Also appears I broke what Zed was doing. :( Going to restore the old interfaces and make this work.

Revision history for this message
Chris Behrens (cbehrens) wrote :

Restores most of the original interfaces with only slight changes to satisfy work Zed is doing.

This keeps carrot as the default for now. There were slight modifications to carrot done to support an interface change... as well as to handle changes to FLAG values regarding rabbit retries.

To use kombu, use --rpc_backend=nova.rpc.impl_kombu

Moved rpc tests into a common class, so that tests for the default module as well as tests for carrot and kombu can all be shared. There's a test for fanout for kombu that is SKIP'd because the 'memory' transport built into kombu that I'm using when FLAGS.fake_rabbit=True seems to be buggy. When I run the test against a real rabbit server (FLAGS.fake_rabbit=False), that test passes fine.

Please give the tests a run. They all pass for me. zedas said he might have seen a failure somewhere in his scrollback, but I'm unable to see one.

Revision history for this message
Chris Behrens (cbehrens) wrote :

heckj: I changed FLAGS.max_retries to default to 0, which means reconnect-forever now. There's also a FLAGS.rabbit_retry_backoff.. which is # of seconds to add to the delay on each cycle. FLAGS.rabbit_retry_interval is now the delay before reconnecting the 1st time.

Revision history for this message
Joseph Heck (heckj) wrote :

Noticed the kombu memory/fanout test skip - ran into that myself

lgtm

review: Approve
Revision history for this message
Vish Ishaya (vishvananda) wrote :

Still runs fine with carrot, but not with kombu:

(nova): TRACE: File "/tmp/bzr/rpc-kombu/nova/rpc/impl_kombu.py", line 325, in reconnect
(nova): TRACE: self.connection = kombu.connection.Connection(**self.params)
(nova): TRACE: AttributeError: 'module' object has no attribute 'Connection'
(nova): TRACE:
root@nova:/tmp/bzr# pip freeze | grep kombu
kombu==1.0.4

is a specific version of kombu required?

review: Needs Information
Revision history for this message
Vish Ishaya (vishvananda) wrote :

So it works fine with 1.2.1

If this isn't going to work with 1.0.4 we should:
a) update pip requires to require >=1.2
b) provide a kombu package for the ppa

Vish

Revision history for this message
Chris Behrens (cbehrens) wrote :

Should be fixed for new and old versions of kombu.

I was using kombu.connection.Connection which in at least 1.1.2 is == connection.BrokerConnection, but 1.0.4 is missing that. Reverted to using kombu.connection.BrokerConnection and it seems to work in 1.0.4 and 1.2.1 now.

Revision history for this message
Vish Ishaya (vishvananda) wrote :

Works for me too. I'm actually happy making this the default, so we make sure and get lots of people banging on it for rbp. If there is an issue we can always revert the default.

review: Approve
Revision history for this message
Chris Behrens (cbehrens) wrote :

Agree. Switched default to kombu, then.

Revision history for this message
Brian Lamar (blamar) wrote :

Ubuntu only has a package for kombu in 11.10 (oneric), unless I'm missing something. I don't know really what the policies are, but has this been taken into account?

review: Needs Information
Revision history for this message
Vish Ishaya (vishvananda) wrote :

There is a version in the ppa (since it is a requirement of glance now), so I don't know that it is a huge deal that packages aren't provided in the older distros. We have a lot of dependencies that aren't in older distros, so I don't think this needs to be a blocker.

On Aug 31, 2011, at 10:55 AM, Brian Lamar wrote:

> Review: Needs Information
> Ubuntu only has a package for kombu in 11.10 (oneric), unless I'm missing something. I don't know really what the policies are, but has this been taken into account?
> --
> https://code.launchpad.net/~cbehrens/nova/rpc-kombu/+merge/73096
> You are reviewing the proposed merge of lp:~cbehrens/nova/rpc-kombu into lp:nova.

Revision history for this message
Dan Prince (dan-prince) wrote :

Chris.

This runs great for me functionall with kombu and carrot with 1.0.4. (I'm able to boot instances, etc.)

Using python-kombu-1.1.3-1.fc15.noarch on my Fedora box I'm getting errors when running unit tests:

http://paste.openstack.org/show/2322/ (KeyError: 'ae.undeliver')

Is it possible to support both kombu 1.0 and 1.1 versions?

review: Needs Fixing
Revision history for this message
Chuck Short (zulcss) wrote :

Ubuntu has 1.0.4

Revision history for this message
Zed A. Shaw (zedshaw) wrote :

I have no problems with this patch as I worked with Chris so we could coordinate my other pending patch. The tests all pass on my side, and the sooner this is in the sooner I can finish my work.

review: Approve
Revision history for this message
Chris Behrens (cbehrens) wrote :

Dan: Hm. I just installed 1.1.3. test_rpc_kombu passes... I'm starting the run of all tests now. What's a test that was failing for you? I can't tell from the paste.

Revision history for this message
Chris Behrens (cbehrens) wrote :

Nevermind... that was quick:

FAIL: test_associate (nova.tests.api.openstack.contrib.test_security_groups.TestSecurityGroups)

Looking at it.

Revision history for this message
Chris Behrens (cbehrens) wrote :

Dan: It appears to be a bug in 1.1.3's 'memory' transport that is used for tests only. I've kludged a fix in when we're using that transport. 1.1.3 tests pass now. And 1.2.1 tests still pass with this. I'm running the 1.0.4 tests right now. Will put this back into 'needs review' if that succeeds.

Revision history for this message
Chris Behrens (cbehrens) wrote :

Ok, passes on 1.0.4, 1.1.3, and 1.2.1.

Revision history for this message
Dan Prince (dan-prince) wrote :

> Ok, passes on 1.0.4, 1.1.3, and 1.2.1.

Running tests now...

Revision history for this message
Dan Prince (dan-prince) wrote :

Looks great.

review: Approve
Revision history for this message
OpenStack Infra (hudson-openstack) wrote :
Download full text (912.2 KiB)

The attempt to merge lp:~cbehrens/nova/rpc-kombu into lp:nova failed. Below is the output from the failed tests.

CreateserverextTest
    test_create_instance_with_duplicate_networks OK 0.30
    test_create_instance_with_duplicate_networks_xml OK 0.42
    test_create_instance_with_network_empty_fixed_ip OK 0.28
    test_create_instance_with_network_empty_fixed_ip_xml OK 0.28
    test_create_instance_with_network_invalid_id OK 0.44
    test_create_instance_with_network_invalid_id_xml OK 0.28
    test_create_instance_with_network_no_fixed_ip OK 0.29
    test_create_instance_with_network_no_fixed_ip_xml OK 0.47
    test_create_instance_with_network_no_id OK 0.28
    test_create_instance_with_network_no_id_xml OK 0.28
    test_create_instance_with_network_non_string_fixed_ip OK 0.45
    test_create_instance_with_no_networks OK 0.29
    test_create_instance_with_no_networks_xml OK 0.29
    test_create_instance_with_one_network OK 0.45
    test_create_instance_with_one_network_xml OK 0.30
    test_create_instance_with_two_networks OK 0.29
    test_create_instance_with_two_networks_xml OK 0.47
FloatingIpTest
    test_add_floating_ip_to_instance OK 0.31
    test_bad_address_param_in_add_floating_ip OK 0.30
    test_bad_address_param_in_remove_floating_ip OK 0.47
    test_floating_ip_allocate OK 0.31
    test_floating_ip_release OK 0.31
    test_floating_ip_show OK 0.47
    test_floating_ips_list OK 0.31
    test_missing_dict_param_in_add_floating_ip OK 0.45
    test_missing_dict_param_in_remove_floating_ip OK 0.31
    test_remove_floating_ip_from_instance OK 0.30
    test_translate_floating_ip_view OK 0.04
    test_translate_floating_ip_view_dict OK 0.02
KeypairsTest
    test_keypair_create OK 0.64
    test_keypair_delete OK 0.32
    test_keypair_import OK 0.37
    test_keypair_list OK 0.55
FixedIpTest
    test_add_fixed_ip OK 0.29
    test_add_fixed_ip_no_network OK 0.78
    test_remove_fixed_ip OK 0.46
    test_remove_fixed_ip_no_address OK 0.28
QuotaSetsTest
    test_format_quota_set OK 0.00
    test_quotas_defaults OK 0.29
    test_quotas_show_as_admin OK 0.45
    test_quotas_show_as_unauthorized_user OK 0.28
    test_quotas_upd...

Revision history for this message
Chris Behrens (cbehrens) wrote :

Well shoot. I'm gonna need some help to figure out why kombu couldn't be loaded. It should be on the server because of glance's dependency... and I've tested this branch using the version from the ppa.

Revision history for this message
Chris Behrens (cbehrens) wrote :

hrm. soren added python-kombu... i'm a little confused why it was not there, since nova depends on glance, which already depends on kombu.. let's try this again.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bin/nova-ajax-console-proxy'
2--- bin/nova-ajax-console-proxy 2011-08-18 17:55:39 +0000
3+++ bin/nova-ajax-console-proxy 2011-08-31 18:56:22 +0000
4@@ -113,11 +113,10 @@
5 AjaxConsoleProxy.tokens[kwargs['token']] = \
6 {'args': kwargs, 'last_activity': time.time()}
7
8- conn = rpc.create_connection(new=True)
9- consumer = rpc.create_consumer(
10- conn,
11- FLAGS.ajax_console_proxy_topic,
12- TopicProxy)
13+ self.conn = rpc.create_connection(new=True)
14+ self.conn.create_consumer(
15+ FLAGS.ajax_console_proxy_topic,
16+ TopicProxy)
17
18 def delete_expired_tokens():
19 now = time.time()
20@@ -129,7 +128,7 @@
21 for k in to_delete:
22 del AjaxConsoleProxy.tokens[k]
23
24- utils.LoopingCall(consumer.fetch, enable_callbacks=True).start(0.1)
25+ self.conn.consume_in_thread()
26 utils.LoopingCall(delete_expired_tokens).start(1)
27
28 if __name__ == '__main__':
29@@ -142,3 +141,4 @@
30 server = wsgi.Server("AJAX Console Proxy", acp, port=acp_port)
31 service.serve(server)
32 service.wait()
33+ self.conn.close()
34
35=== modified file 'contrib/nova.sh'
36--- contrib/nova.sh 2011-08-05 16:59:28 +0000
37+++ contrib/nova.sh 2011-08-31 18:56:22 +0000
38@@ -81,7 +81,7 @@
39 sudo apt-get install -y python-netaddr python-pastedeploy python-eventlet
40 sudo apt-get install -y python-novaclient python-glance python-cheetah
41 sudo apt-get install -y python-carrot python-tempita python-sqlalchemy
42- sudo apt-get install -y python-suds
43+ sudo apt-get install -y python-suds python-kombu
44
45
46 if [ "$USE_IPV6" == 1 ]; then
47
48=== modified file 'nova/flags.py'
49--- nova/flags.py 2011-08-24 23:48:04 +0000
50+++ nova/flags.py 2011-08-31 18:56:22 +0000
51@@ -303,8 +303,12 @@
52 DEFINE_string('rabbit_userid', 'guest', 'rabbit userid')
53 DEFINE_string('rabbit_password', 'guest', 'rabbit password')
54 DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host')
55-DEFINE_integer('rabbit_retry_interval', 10, 'rabbit connection retry interval')
56-DEFINE_integer('rabbit_max_retries', 12, 'rabbit connection attempts')
57+DEFINE_integer('rabbit_retry_interval', 1,
58+ 'rabbit connection retry interval to start')
59+DEFINE_integer('rabbit_retry_backoff', 2,
60+ 'rabbit connection retry backoff in seconds')
61+DEFINE_integer('rabbit_max_retries', 0,
62+ 'maximum rabbit connection attempts (0=try forever)')
63 DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to')
64 DEFINE_boolean('rabbit_durable_queues', False, 'use durable queues')
65 DEFINE_list('enabled_apis', ['ec2', 'osapi'],
66
67=== modified file 'nova/rpc/__init__.py'
68--- nova/rpc/__init__.py 2011-07-29 19:08:59 +0000
69+++ nova/rpc/__init__.py 2011-08-31 18:56:22 +0000
70@@ -23,44 +23,35 @@
71
72 FLAGS = flags.FLAGS
73 flags.DEFINE_string('rpc_backend',
74- 'nova.rpc.amqp',
75- "The messaging module to use, defaults to AMQP.")
76-
77-RPCIMPL = import_object(FLAGS.rpc_backend)
78+ 'nova.rpc.impl_kombu',
79+ "The messaging module to use, defaults to kombu.")
80+
81+_RPCIMPL = None
82+
83+
84+def get_impl():
85+ """Delay import of rpc_backend until FLAGS are loaded."""
86+ global _RPCIMPL
87+ if _RPCIMPL is None:
88+ _RPCIMPL = import_object(FLAGS.rpc_backend)
89+ return _RPCIMPL
90
91
92 def create_connection(new=True):
93- return RPCIMPL.Connection.instance(new=True)
94-
95-
96-def create_consumer(conn, topic, proxy, fanout=False):
97- if fanout:
98- return RPCIMPL.FanoutAdapterConsumer(
99- connection=conn,
100- topic=topic,
101- proxy=proxy)
102- else:
103- return RPCIMPL.TopicAdapterConsumer(
104- connection=conn,
105- topic=topic,
106- proxy=proxy)
107-
108-
109-def create_consumer_set(conn, consumers):
110- return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)
111+ return get_impl().create_connection(new=new)
112
113
114 def call(context, topic, msg):
115- return RPCIMPL.call(context, topic, msg)
116+ return get_impl().call(context, topic, msg)
117
118
119 def cast(context, topic, msg):
120- return RPCIMPL.cast(context, topic, msg)
121+ return get_impl().cast(context, topic, msg)
122
123
124 def fanout_cast(context, topic, msg):
125- return RPCIMPL.fanout_cast(context, topic, msg)
126+ return get_impl().fanout_cast(context, topic, msg)
127
128
129 def multicall(context, topic, msg):
130- return RPCIMPL.multicall(context, topic, msg)
131+ return get_impl().multicall(context, topic, msg)
132
133=== modified file 'nova/rpc/common.py'
134--- nova/rpc/common.py 2011-07-29 19:08:59 +0000
135+++ nova/rpc/common.py 2011-08-31 18:56:22 +0000
136@@ -1,8 +1,14 @@
137 from nova import exception
138+from nova import flags
139 from nova import log as logging
140
141 LOG = logging.getLogger('nova.rpc')
142
143+flags.DEFINE_integer('rpc_thread_pool_size', 1024,
144+ 'Size of RPC thread pool')
145+flags.DEFINE_integer('rpc_conn_pool_size', 30,
146+ 'Size of RPC connection pool')
147+
148
149 class RemoteError(exception.Error):
150 """Signifies that a remote class has raised an exception.
151
152=== renamed file 'nova/rpc/amqp.py' => 'nova/rpc/impl_carrot.py'
153--- nova/rpc/amqp.py 2011-08-16 00:30:13 +0000
154+++ nova/rpc/impl_carrot.py 2011-08-31 18:56:22 +0000
155@@ -33,6 +33,7 @@
156
157 from carrot import connection as carrot_connection
158 from carrot import messaging
159+import eventlet
160 from eventlet import greenpool
161 from eventlet import pools
162 from eventlet import queue
163@@ -42,21 +43,22 @@
164 from nova import exception
165 from nova import fakerabbit
166 from nova import flags
167-from nova import log as logging
168-from nova import utils
169 from nova.rpc.common import RemoteError, LOG
170
171+# Needed for tests
172+eventlet.monkey_patch()
173
174 FLAGS = flags.FLAGS
175-flags.DEFINE_integer('rpc_thread_pool_size', 1024,
176- 'Size of RPC thread pool')
177-flags.DEFINE_integer('rpc_conn_pool_size', 30,
178- 'Size of RPC connection pool')
179
180
181 class Connection(carrot_connection.BrokerConnection):
182 """Connection instance object."""
183
184+ def __init__(self, *args, **kwargs):
185+ super(Connection, self).__init__(*args, **kwargs)
186+ self._rpc_consumers = []
187+ self._rpc_consumer_thread = None
188+
189 @classmethod
190 def instance(cls, new=True):
191 """Returns the instance."""
192@@ -94,13 +96,63 @@
193 pass
194 return cls.instance()
195
196+ def close(self):
197+ self.cancel_consumer_thread()
198+ for consumer in self._rpc_consumers:
199+ try:
200+ consumer.close()
201+ except Exception:
202+ # ignore all errors
203+ pass
204+ self._rpc_consumers = []
205+ super(Connection, self).close()
206+
207+ def consume_in_thread(self):
208+ """Consumer from all queues/consumers in a greenthread"""
209+
210+ consumer_set = ConsumerSet(connection=self,
211+ consumer_list=self._rpc_consumers)
212+
213+ def _consumer_thread():
214+ try:
215+ consumer_set.wait()
216+ except greenlet.GreenletExit:
217+ return
218+ if self._rpc_consumer_thread is None:
219+ self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
220+ return self._rpc_consumer_thread
221+
222+ def cancel_consumer_thread(self):
223+ """Cancel a consumer thread"""
224+ if self._rpc_consumer_thread is not None:
225+ self._rpc_consumer_thread.kill()
226+ try:
227+ self._rpc_consumer_thread.wait()
228+ except greenlet.GreenletExit:
229+ pass
230+ self._rpc_consumer_thread = None
231+
232+ def create_consumer(self, topic, proxy, fanout=False):
233+ """Create a consumer that calls methods in the proxy"""
234+ if fanout:
235+ consumer = FanoutAdapterConsumer(
236+ connection=self,
237+ topic=topic,
238+ proxy=proxy)
239+ else:
240+ consumer = TopicAdapterConsumer(
241+ connection=self,
242+ topic=topic,
243+ proxy=proxy)
244+ self._rpc_consumers.append(consumer)
245+
246
247 class Pool(pools.Pool):
248 """Class that implements a Pool of Connections."""
249
250 # TODO(comstud): Timeout connections not used in a while
251 def create(self):
252- LOG.debug('Creating new connection')
253+ LOG.debug('Pool creating new connection')
254 return Connection.instance(new=True)
255
256 # Create a ConnectionPool to use for RPC calls. We'll order the
257@@ -119,25 +171,34 @@
258 """
259
260 def __init__(self, *args, **kwargs):
261- for i in xrange(FLAGS.rabbit_max_retries):
262- if i > 0:
263- time.sleep(FLAGS.rabbit_retry_interval)
264+ max_retries = FLAGS.rabbit_max_retries
265+ sleep_time = FLAGS.rabbit_retry_interval
266+ tries = 0
267+ while True:
268+ tries += 1
269+ if tries > 1:
270+ time.sleep(sleep_time)
271+ # backoff for next retry attempt.. if there is one
272+ sleep_time += FLAGS.rabbit_retry_backoff
273+ if sleep_time > 30:
274+ sleep_time = 30
275 try:
276 super(Consumer, self).__init__(*args, **kwargs)
277 self.failed_connection = False
278 break
279 except Exception as e: # Catching all because carrot sucks
280+ self.failed_connection = True
281+ if max_retries > 0 and tries == max_retries:
282+ break
283 fl_host = FLAGS.rabbit_host
284 fl_port = FLAGS.rabbit_port
285- fl_intv = FLAGS.rabbit_retry_interval
286+ fl_intv = sleep_time
287 LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
288 ' unreachable: %(e)s. Trying again in %(fl_intv)d'
289 ' seconds.') % locals())
290- self.failed_connection = True
291 if self.failed_connection:
292 LOG.error(_('Unable to connect to AMQP server '
293- 'after %d tries. Shutting down.'),
294- FLAGS.rabbit_max_retries)
295+ 'after %(tries)d tries. Shutting down.') % locals())
296 sys.exit(1)
297
298 def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
299@@ -166,12 +227,6 @@
300 LOG.exception(_('Failed to fetch message from queue: %s' % e))
301 self.failed_connection = True
302
303- def attach_to_eventlet(self):
304- """Only needed for unit tests!"""
305- timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
306- timer.start(0.1)
307- return timer
308-
309
310 class AdapterConsumer(Consumer):
311 """Calls methods on a proxy object based on method and args."""
312@@ -242,7 +297,7 @@
313 # NOTE(vish): this iterates through the generator
314 list(rval)
315 except Exception as e:
316- logging.exception('Exception during message handling')
317+ LOG.exception('Exception during message handling')
318 if msg_id:
319 msg_reply(msg_id, None, sys.exc_info())
320 return
321@@ -520,6 +575,11 @@
322 yield result
323
324
325+def create_connection(new=True):
326+ """Create a connection"""
327+ return Connection.instance(new=new)
328+
329+
330 def call(context, topic, msg):
331 """Sends a message on a topic and wait for a response."""
332 rv = multicall(context, topic, msg)
333
334=== added file 'nova/rpc/impl_kombu.py'
335--- nova/rpc/impl_kombu.py 1970-01-01 00:00:00 +0000
336+++ nova/rpc/impl_kombu.py 2011-08-31 18:56:22 +0000
337@@ -0,0 +1,781 @@
338+# vim: tabstop=4 shiftwidth=4 softtabstop=4
339+
340+# Copyright 2011 OpenStack LLC
341+#
342+# Licensed under the Apache License, Version 2.0 (the "License"); you may
343+# not use this file except in compliance with the License. You may obtain
344+# a copy of the License at
345+#
346+# http://www.apache.org/licenses/LICENSE-2.0
347+#
348+# Unless required by applicable law or agreed to in writing, software
349+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
350+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
351+# License for the specific language governing permissions and limitations
352+# under the License.
353+
354+import kombu
355+import kombu.entity
356+import kombu.messaging
357+import kombu.connection
358+import itertools
359+import sys
360+import time
361+import traceback
362+import types
363+import uuid
364+
365+import eventlet
366+from eventlet import greenpool
367+from eventlet import pools
368+import greenlet
369+
370+from nova import context
371+from nova import exception
372+from nova import flags
373+from nova.rpc.common import RemoteError, LOG
374+
375+# Needed for tests
376+eventlet.monkey_patch()
377+
378+FLAGS = flags.FLAGS
379+
380+
381+class ConsumerBase(object):
382+ """Consumer base class."""
383+
384+ def __init__(self, channel, callback, tag, **kwargs):
385+ """Declare a queue on an amqp channel.
386+
387+ 'channel' is the amqp channel to use
388+ 'callback' is the callback to call when messages are received
389+ 'tag' is a unique ID for the consumer on the channel
390+
391+ queue name, exchange name, and other kombu options are
392+ passed in here as a dictionary.
393+ """
394+ self.callback = callback
395+ self.tag = str(tag)
396+ self.kwargs = kwargs
397+ self.queue = None
398+ self.reconnect(channel)
399+
400+ def reconnect(self, channel):
401+ """Re-declare the queue after a rabbit reconnect"""
402+ self.channel = channel
403+ self.kwargs['channel'] = channel
404+ self.queue = kombu.entity.Queue(**self.kwargs)
405+ self.queue.declare()
406+
407+ def consume(self, *args, **kwargs):
408+ """Actually declare the consumer on the amqp channel. This will
409+ start the flow of messages from the queue. Using the
410+ Connection.iterconsume() iterator will process the messages,
411+ calling the appropriate callback.
412+
413+ If a callback is specified in kwargs, use that. Otherwise,
414+ use the callback passed during __init__()
415+
416+ If kwargs['nowait'] is True, then this call will block until
417+ a message is read.
418+
419+ Messages will automatically be acked if the callback doesn't
420+ raise an exception
421+ """
422+
423+ options = {'consumer_tag': self.tag}
424+ options['nowait'] = kwargs.get('nowait', False)
425+ callback = kwargs.get('callback', self.callback)
426+ if not callback:
427+ raise ValueError("No callback defined")
428+
429+ def _callback(raw_message):
430+ message = self.channel.message_to_python(raw_message)
431+ callback(message.payload)
432+ message.ack()
433+
434+ self.queue.consume(*args, callback=_callback, **options)
435+
436+ def cancel(self):
437+ """Cancel the consuming from the queue, if it has started"""
438+ try:
439+ self.queue.cancel(self.tag)
440+ except KeyError, e:
441+ # NOTE(comstud): Kludge to get around a amqplib bug
442+ if str(e) != "u'%s'" % self.tag:
443+ raise
444+ self.queue = None
445+
446+
447+class DirectConsumer(ConsumerBase):
448+ """Queue/consumer class for 'direct'"""
449+
450+ def __init__(self, channel, msg_id, callback, tag, **kwargs):
451+ """Init a 'direct' queue.
452+
453+ 'channel' is the amqp channel to use
454+ 'msg_id' is the msg_id to listen on
455+ 'callback' is the callback to call when messages are received
456+ 'tag' is a unique ID for the consumer on the channel
457+
458+ Other kombu options may be passed
459+ """
460+ # Default options
461+ options = {'durable': False,
462+ 'auto_delete': True,
463+ 'exclusive': True}
464+ options.update(kwargs)
465+ exchange = kombu.entity.Exchange(
466+ name=msg_id,
467+ type='direct',
468+ durable=options['durable'],
469+ auto_delete=options['auto_delete'])
470+ super(DirectConsumer, self).__init__(
471+ channel,
472+ callback,
473+ tag,
474+ name=msg_id,
475+ exchange=exchange,
476+ routing_key=msg_id,
477+ **options)
478+
479+
480+class TopicConsumer(ConsumerBase):
481+ """Consumer class for 'topic'"""
482+
483+ def __init__(self, channel, topic, callback, tag, **kwargs):
484+ """Init a 'topic' queue.
485+
486+ 'channel' is the amqp channel to use
487+ 'topic' is the topic to listen on
488+ 'callback' is the callback to call when messages are received
489+ 'tag' is a unique ID for the consumer on the channel
490+
491+ Other kombu options may be passed
492+ """
493+ # Default options
494+ options = {'durable': FLAGS.rabbit_durable_queues,
495+ 'auto_delete': False,
496+ 'exclusive': False}
497+ options.update(kwargs)
498+ exchange = kombu.entity.Exchange(
499+ name=FLAGS.control_exchange,
500+ type='topic',
501+ durable=options['durable'],
502+ auto_delete=options['auto_delete'])
503+ super(TopicConsumer, self).__init__(
504+ channel,
505+ callback,
506+ tag,
507+ name=topic,
508+ exchange=exchange,
509+ routing_key=topic,
510+ **options)
511+
512+
513+class FanoutConsumer(ConsumerBase):
514+ """Consumer class for 'fanout'"""
515+
516+ def __init__(self, channel, topic, callback, tag, **kwargs):
517+ """Init a 'fanout' queue.
518+
519+ 'channel' is the amqp channel to use
520+ 'topic' is the topic to listen on
521+ 'callback' is the callback to call when messages are received
522+ 'tag' is a unique ID for the consumer on the channel
523+
524+ Other kombu options may be passed
525+ """
526+ unique = uuid.uuid4().hex
527+ exchange_name = '%s_fanout' % topic
528+ queue_name = '%s_fanout_%s' % (topic, unique)
529+
530+ # Default options
531+ options = {'durable': False,
532+ 'auto_delete': True,
533+ 'exclusive': True}
534+ options.update(kwargs)
535+ exchange = kombu.entity.Exchange(
536+ name=exchange_name,
537+ type='fanout',
538+ durable=options['durable'],
539+ auto_delete=options['auto_delete'])
540+ super(FanoutConsumer, self).__init__(
541+ channel,
542+ callback,
543+ tag,
544+ name=queue_name,
545+ exchange=exchange,
546+ routing_key=topic,
547+ **options)
548+
549+
550+class Publisher(object):
551+ """Base Publisher class"""
552+
553+ def __init__(self, channel, exchange_name, routing_key, **kwargs):
554+ """Init the Publisher class with the exchange_name, routing_key,
555+ and other options
556+ """
557+ self.exchange_name = exchange_name
558+ self.routing_key = routing_key
559+ self.kwargs = kwargs
560+ self.reconnect(channel)
561+
562+ def reconnect(self, channel):
563+ """Re-establish the Producer after a rabbit reconnection"""
564+ self.exchange = kombu.entity.Exchange(name=self.exchange_name,
565+ **self.kwargs)
566+ self.producer = kombu.messaging.Producer(exchange=self.exchange,
567+ channel=channel, routing_key=self.routing_key)
568+
569+ def send(self, msg):
570+ """Send a message"""
571+ self.producer.publish(msg)
572+
573+
574+class DirectPublisher(Publisher):
575+ """Publisher class for 'direct'"""
576+ def __init__(self, channel, msg_id, **kwargs):
577+ """init a 'direct' publisher.
578+
579+ Kombu options may be passed as keyword args to override defaults
580+ """
581+
582+ options = {'durable': False,
583+ 'auto_delete': True,
584+ 'exclusive': True}
585+ options.update(kwargs)
586+ super(DirectPublisher, self).__init__(channel,
587+ msg_id,
588+ msg_id,
589+ type='direct',
590+ **options)
591+
592+
593+class TopicPublisher(Publisher):
594+ """Publisher class for 'topic'"""
595+ def __init__(self, channel, topic, **kwargs):
596+ """init a 'topic' publisher.
597+
598+ Kombu options may be passed as keyword args to override defaults
599+ """
600+ options = {'durable': FLAGS.rabbit_durable_queues,
601+ 'auto_delete': False,
602+ 'exclusive': False}
603+ options.update(kwargs)
604+ super(TopicPublisher, self).__init__(channel,
605+ FLAGS.control_exchange,
606+ topic,
607+ type='topic',
608+ **options)
609+
610+
611+class FanoutPublisher(Publisher):
612+ """Publisher class for 'fanout'"""
613+ def __init__(self, channel, topic, **kwargs):
614+ """init a 'fanout' publisher.
615+
616+ Kombu options may be passed as keyword args to override defaults
617+ """
618+ options = {'durable': False,
619+ 'auto_delete': True,
620+ 'exclusive': True}
621+ options.update(kwargs)
622+ super(FanoutPublisher, self).__init__(channel,
623+ '%s_fanout' % topic,
624+ None,
625+ type='fanout',
626+ **options)
627+
628+
629+class Connection(object):
630+ """Connection object."""
631+
632+ def __init__(self):
633+ self.consumers = []
634+ self.consumer_thread = None
635+ self.max_retries = FLAGS.rabbit_max_retries
636+ # Try forever?
637+ if self.max_retries <= 0:
638+ self.max_retries = None
639+ self.interval_start = FLAGS.rabbit_retry_interval
640+ self.interval_stepping = FLAGS.rabbit_retry_backoff
641+ # max retry-interval = 30 seconds
642+ self.interval_max = 30
643+ self.memory_transport = False
644+
645+ self.params = dict(hostname=FLAGS.rabbit_host,
646+ port=FLAGS.rabbit_port,
647+ userid=FLAGS.rabbit_userid,
648+ password=FLAGS.rabbit_password,
649+ virtual_host=FLAGS.rabbit_virtual_host)
650+ if FLAGS.fake_rabbit:
651+ self.params['transport'] = 'memory'
652+ self.memory_transport = True
653+ else:
654+ self.memory_transport = False
655+ self.connection = None
656+ self.reconnect()
657+
658+ def reconnect(self):
659+ """Handles reconnecting and re-estblishing queues"""
660+ if self.connection:
661+ try:
662+ self.connection.close()
663+ except self.connection.connection_errors:
664+ pass
665+ time.sleep(1)
666+ self.connection = kombu.connection.BrokerConnection(**self.params)
667+ if self.memory_transport:
668+ # Kludge to speed up tests.
669+ self.connection.transport.polling_interval = 0.0
670+ self.consumer_num = itertools.count(1)
671+
672+ try:
673+ self.connection.ensure_connection(errback=self.connect_error,
674+ max_retries=self.max_retries,
675+ interval_start=self.interval_start,
676+ interval_step=self.interval_stepping,
677+ interval_max=self.interval_max)
678+ except self.connection.connection_errors, e:
679+ # We should only get here if max_retries is set. We'll go
680+ # ahead and exit in this case.
681+ err_str = str(e)
682+ max_retries = self.max_retries
683+ LOG.error(_('Unable to connect to AMQP server '
684+ 'after %(max_retries)d tries: %(err_str)s') % locals())
685+ sys.exit(1)
686+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
687+ self.params))
688+ self.channel = self.connection.channel()
689+ # work around 'memory' transport bug in 1.1.3
690+ if self.memory_transport:
691+ self.channel._new_queue('ae.undeliver')
692+ for consumer in self.consumers:
693+ consumer.reconnect(self.channel)
694+ if self.consumers:
695+ LOG.debug(_("Re-established AMQP queues"))
696+
697+ def get_channel(self):
698+ """Convenience call for bin/clear_rabbit_queues"""
699+ return self.channel
700+
701+ def connect_error(self, exc, interval):
702+ """Callback when there are connection re-tries by kombu"""
703+ info = self.params.copy()
704+ info['intv'] = interval
705+ info['e'] = exc
706+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is'
707+ ' unreachable: %(e)s. Trying again in %(intv)d'
708+ ' seconds.') % info)
709+
710+ def close(self):
711+ """Close/release this connection"""
712+ self.cancel_consumer_thread()
713+ self.connection.release()
714+ self.connection = None
715+
716+ def reset(self):
717+ """Reset a connection so it can be used again"""
718+ self.cancel_consumer_thread()
719+ self.channel.close()
720+ self.channel = self.connection.channel()
721+ # work around 'memory' transport bug in 1.1.3
722+ if self.memory_transport:
723+ self.channel._new_queue('ae.undeliver')
724+ self.consumers = []
725+
726+ def declare_consumer(self, consumer_cls, topic, callback):
727+ """Create a Consumer using the class that was passed in and
728+ add it to our list of consumers
729+ """
730+ consumer = consumer_cls(self.channel, topic, callback,
731+ self.consumer_num.next())
732+ self.consumers.append(consumer)
733+ return consumer
734+
735+ def iterconsume(self, limit=None):
736+ """Return an iterator that will consume from all queues/consumers"""
737+ while True:
738+ try:
739+ queues_head = self.consumers[:-1]
740+ queues_tail = self.consumers[-1]
741+ for queue in queues_head:
742+ queue.consume(nowait=True)
743+ queues_tail.consume(nowait=False)
744+
745+ for iteration in itertools.count(0):
746+ if limit and iteration >= limit:
747+ raise StopIteration
748+ yield self.connection.drain_events()
749+ except self.connection.connection_errors, e:
750+ LOG.exception(_('Failed to consume message from queue: '
751+ '%s' % str(e)))
752+ self.reconnect()
753+
754+ def cancel_consumer_thread(self):
755+ """Cancel a consumer thread"""
756+ if self.consumer_thread is not None:
757+ self.consumer_thread.kill()
758+ try:
759+ self.consumer_thread.wait()
760+ except greenlet.GreenletExit:
761+ pass
762+ self.consumer_thread = None
763+
764+ def publisher_send(self, cls, topic, msg):
765+ """Send to a publisher based on the publisher class"""
766+ while True:
767+ publisher = None
768+ try:
769+ publisher = cls(self.channel, topic)
770+ publisher.send(msg)
771+ return
772+ except self.connection.connection_errors, e:
773+ LOG.exception(_('Failed to publish message %s' % str(e)))
774+ try:
775+ self.reconnect()
776+ if publisher:
777+ publisher.reconnect(self.channel)
778+ except self.connection.connection_errors, e:
779+ pass
780+
781+ def declare_direct_consumer(self, topic, callback):
782+ """Create a 'direct' queue.
783+ In nova's use, this is generally a msg_id queue used for
784+ responses for call/multicall
785+ """
786+ self.declare_consumer(DirectConsumer, topic, callback)
787+
788+ def declare_topic_consumer(self, topic, callback=None):
789+ """Create a 'topic' consumer."""
790+ self.declare_consumer(TopicConsumer, topic, callback)
791+
792+ def declare_fanout_consumer(self, topic, callback):
793+ """Create a 'fanout' consumer"""
794+ self.declare_consumer(FanoutConsumer, topic, callback)
795+
796+ def direct_send(self, msg_id, msg):
797+ """Send a 'direct' message"""
798+ self.publisher_send(DirectPublisher, msg_id, msg)
799+
800+ def topic_send(self, topic, msg):
801+ """Send a 'topic' message"""
802+ self.publisher_send(TopicPublisher, topic, msg)
803+
804+ def fanout_send(self, topic, msg):
805+ """Send a 'fanout' message"""
806+ self.publisher_send(FanoutPublisher, topic, msg)
807+
808+ def consume(self, limit=None):
809+ """Consume from all queues/consumers"""
810+ it = self.iterconsume(limit=limit)
811+ while True:
812+ try:
813+ it.next()
814+ except StopIteration:
815+ return
816+
817+ def consume_in_thread(self):
818+ """Consumer from all queues/consumers in a greenthread"""
819+ def _consumer_thread():
820+ try:
821+ self.consume()
822+ except greenlet.GreenletExit:
823+ return
824+ if self.consumer_thread is None:
825+ self.consumer_thread = eventlet.spawn(_consumer_thread)
826+ return self.consumer_thread
827+
828+ def create_consumer(self, topic, proxy, fanout=False):
829+ """Create a consumer that calls a method in a proxy object"""
830+ if fanout:
831+ self.declare_fanout_consumer(topic, ProxyCallback(proxy))
832+ else:
833+ self.declare_topic_consumer(topic, ProxyCallback(proxy))
834+
835+
836+class Pool(pools.Pool):
837+ """Class that implements a Pool of Connections."""
838+
839+ # TODO(comstud): Timeout connections not used in a while
840+ def create(self):
841+ LOG.debug('Pool creating new connection')
842+ return Connection()
843+
844+# Create a ConnectionPool to use for RPC calls. We'll order the
845+# pool as a stack (LIFO), so that we can potentially loop through and
846+# timeout old unused connections at some point
847+ConnectionPool = Pool(
848+ max_size=FLAGS.rpc_conn_pool_size,
849+ order_as_stack=True)
850+
851+
852+class ConnectionContext(object):
853+ """The class that is actually returned to the caller of
854+ create_connection(). This is a essentially a wrapper around
855+ Connection that supports 'with' and can return a new Connection or
856+ one from a pool. It will also catch when an instance of this class
857+ is to be deleted so that we can return Connections to the pool on
858+ exceptions and so forth without making the caller be responsible for
859+ catching all exceptions and making sure to return a connection to
860+ the pool.
861+ """
862+
863+ def __init__(self, pooled=True):
864+ """Create a new connection, or get one from the pool"""
865+ self.connection = None
866+ if pooled:
867+ self.connection = ConnectionPool.get()
868+ else:
869+ self.connection = Connection()
870+ self.pooled = pooled
871+
872+ def __enter__(self):
873+ """with ConnectionContext() should return self"""
874+ return self
875+
876+ def _done(self):
877+ """If the connection came from a pool, clean it up and put it back.
878+ If it did not come from a pool, close it.
879+ """
880+ if self.connection:
881+ if self.pooled:
882+ # Reset the connection so it's ready for the next caller
883+ # to grab from the pool
884+ self.connection.reset()
885+ ConnectionPool.put(self.connection)
886+ else:
887+ try:
888+ self.connection.close()
889+ except Exception:
890+ # There's apparently a bug in kombu 'memory' transport
891+ # which causes an assert failure.
892+ # But, we probably want to ignore all exceptions when
893+ # trying to close a connection, anyway...
894+ pass
895+ self.connection = None
896+
897+ def __exit__(self, t, v, tb):
898+ """end of 'with' statement. We're done here."""
899+ self._done()
900+
901+ def __del__(self):
902+ """Caller is done with this connection. Make sure we cleaned up."""
903+ self._done()
904+
905+ def close(self):
906+ """Caller is done with this connection."""
907+ self._done()
908+
909+ def __getattr__(self, key):
910+ """Proxy all other calls to the Connection instance"""
911+ if self.connection:
912+ return getattr(self.connection, key)
913+ else:
914+ raise exception.InvalidRPCConnectionReuse()
915+
916+
917+class ProxyCallback(object):
918+ """Calls methods on a proxy object based on method and args."""
919+
920+ def __init__(self, proxy):
921+ self.proxy = proxy
922+ self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
923+
924+ def __call__(self, message_data):
925+ """Consumer callback to call a method on a proxy object.
926+
927+ Parses the message for validity and fires off a thread to call the
928+ proxy object method.
929+
930+ Message data should be a dictionary with two keys:
931+ method: string representing the method to call
932+ args: dictionary of arg: value
933+
934+ Example: {'method': 'echo', 'args': {'value': 42}}
935+
936+ """
937+ LOG.debug(_('received %s') % message_data)
938+ ctxt = _unpack_context(message_data)
939+ method = message_data.get('method')
940+ args = message_data.get('args', {})
941+ if not method:
942+ LOG.warn(_('no method for message: %s') % message_data)
943+ ctxt.reply(_('No method for message: %s') % message_data)
944+ return
945+ self.pool.spawn_n(self._process_data, ctxt, method, args)
946+
947+ @exception.wrap_exception()
948+ def _process_data(self, ctxt, method, args):
949+ """Thread that maigcally looks for a method on the proxy
950+ object and calls it.
951+ """
952+
953+ node_func = getattr(self.proxy, str(method))
954+ node_args = dict((str(k), v) for k, v in args.iteritems())
955+ # NOTE(vish): magic is fun!
956+ try:
957+ rval = node_func(context=ctxt, **node_args)
958+ # Check if the result was a generator
959+ if isinstance(rval, types.GeneratorType):
960+ for x in rval:
961+ ctxt.reply(x, None)
962+ else:
963+ ctxt.reply(rval, None)
964+ # This final None tells multicall that it is done.
965+ ctxt.reply(None, None)
966+ except Exception as e:
967+ LOG.exception('Exception during message handling')
968+ ctxt.reply(None, sys.exc_info())
969+ return
970+
971+
972+def _unpack_context(msg):
973+ """Unpack context from msg."""
974+ context_dict = {}
975+ for key in list(msg.keys()):
976+ # NOTE(vish): Some versions of python don't like unicode keys
977+ # in kwargs.
978+ key = str(key)
979+ if key.startswith('_context_'):
980+ value = msg.pop(key)
981+ context_dict[key[9:]] = value
982+ context_dict['msg_id'] = msg.pop('_msg_id', None)
983+ LOG.debug(_('unpacked context: %s'), context_dict)
984+ return RpcContext.from_dict(context_dict)
985+
986+
987+def _pack_context(msg, context):
988+ """Pack context into msg.
989+
990+ Values for message keys need to be less than 255 chars, so we pull
991+ context out into a bunch of separate keys. If we want to support
992+ more arguments in rabbit messages, we may want to do the same
993+ for args at some point.
994+
995+ """
996+ context_d = dict([('_context_%s' % key, value)
997+ for (key, value) in context.to_dict().iteritems()])
998+ msg.update(context_d)
999+
1000+
1001+class RpcContext(context.RequestContext):
1002+ """Context that supports replying to a rpc.call"""
1003+ def __init__(self, *args, **kwargs):
1004+ msg_id = kwargs.pop('msg_id', None)
1005+ self.msg_id = msg_id
1006+ super(RpcContext, self).__init__(*args, **kwargs)
1007+
1008+ def reply(self, *args, **kwargs):
1009+ if self.msg_id:
1010+ msg_reply(self.msg_id, *args, **kwargs)
1011+
1012+
1013+class MulticallWaiter(object):
1014+ def __init__(self, connection):
1015+ self._connection = connection
1016+ self._iterator = connection.iterconsume()
1017+ self._result = None
1018+ self._done = False
1019+
1020+ def done(self):
1021+ self._done = True
1022+ self._connection.close()
1023+
1024+ def __call__(self, data):
1025+ """The consume() callback will call this. Store the result."""
1026+ if data['failure']:
1027+ self._result = RemoteError(*data['failure'])
1028+ else:
1029+ self._result = data['result']
1030+
1031+ def __iter__(self):
1032+ """Return a result until we get a 'None' response from consumer"""
1033+ if self._done:
1034+ raise StopIteration
1035+ while True:
1036+ self._iterator.next()
1037+ result = self._result
1038+ if isinstance(result, Exception):
1039+ self.done()
1040+ raise result
1041+ if result == None:
1042+ self.done()
1043+ raise StopIteration
1044+ yield result
1045+
1046+
1047+def create_connection(new=True):
1048+ """Create a connection"""
1049+ return ConnectionContext(pooled=not new)
1050+
1051+
1052+def multicall(context, topic, msg):
1053+ """Make a call that returns multiple times."""
1054+ # Can't use 'with' for multicall, as it returns an iterator
1055+ # that will continue to use the connection. When it's done,
1056+ # connection.close() will get called which will put it back into
1057+ # the pool
1058+ LOG.debug(_('Making asynchronous call on %s ...'), topic)
1059+ msg_id = uuid.uuid4().hex
1060+ msg.update({'_msg_id': msg_id})
1061+ LOG.debug(_('MSG_ID is %s') % (msg_id))
1062+ _pack_context(msg, context)
1063+
1064+ conn = ConnectionContext()
1065+ wait_msg = MulticallWaiter(conn)
1066+ conn.declare_direct_consumer(msg_id, wait_msg)
1067+ conn.topic_send(topic, msg)
1068+
1069+ return wait_msg
1070+
1071+
1072+def call(context, topic, msg):
1073+ """Sends a message on a topic and wait for a response."""
1074+ rv = multicall(context, topic, msg)
1075+ # NOTE(vish): return the last result from the multicall
1076+ rv = list(rv)
1077+ if not rv:
1078+ return
1079+ return rv[-1]
1080+
1081+
1082+def cast(context, topic, msg):
1083+ """Sends a message on a topic without waiting for a response."""
1084+ LOG.debug(_('Making asynchronous cast on %s...'), topic)
1085+ _pack_context(msg, context)
1086+ with ConnectionContext() as conn:
1087+ conn.topic_send(topic, msg)
1088+
1089+
1090+def fanout_cast(context, topic, msg):
1091+ """Sends a message on a fanout exchange without waiting for a response."""
1092+ LOG.debug(_('Making asynchronous fanout cast...'))
1093+ _pack_context(msg, context)
1094+ with ConnectionContext() as conn:
1095+ conn.fanout_send(topic, msg)
1096+
1097+
1098+def msg_reply(msg_id, reply=None, failure=None):
1099+ """Sends a reply or an error on the channel signified by msg_id.
1100+
1101+ Failure should be a sys.exc_info() tuple.
1102+
1103+ """
1104+ with ConnectionContext() as conn:
1105+ if failure:
1106+ message = str(failure[1])
1107+ tb = traceback.format_exception(*failure)
1108+ LOG.error(_("Returning exception %s to caller"), message)
1109+ LOG.error(tb)
1110+ failure = (failure[0].__name__, str(failure[1]), tb)
1111+
1112+ try:
1113+ msg = {'result': reply, 'failure': failure}
1114+ except TypeError:
1115+ msg = {'result': dict((k, repr(v))
1116+ for k, v in reply.__dict__.iteritems()),
1117+ 'failure': failure}
1118+ conn.direct_send(msg_id, msg)
1119
1120=== modified file 'nova/service.py'
1121--- nova/service.py 2011-08-18 18:28:02 +0000
1122+++ nova/service.py 2011-08-31 18:56:22 +0000
1123@@ -153,26 +153,15 @@
1124 self.topic)
1125
1126 # Share this same connection for these Consumers
1127- consumer_all = rpc.create_consumer(self.conn, self.topic, self,
1128- fanout=False)
1129+ self.conn.create_consumer(self.topic, self, fanout=False)
1130
1131 node_topic = '%s.%s' % (self.topic, self.host)
1132- consumer_node = rpc.create_consumer(self.conn, node_topic, self,
1133- fanout=False)
1134-
1135- fanout = rpc.create_consumer(self.conn, self.topic, self, fanout=True)
1136-
1137- consumers = [consumer_all, consumer_node, fanout]
1138- consumer_set = rpc.create_consumer_set(self.conn, consumers)
1139-
1140- # Wait forever, processing these consumers
1141- def _wait():
1142- try:
1143- consumer_set.wait()
1144- finally:
1145- consumer_set.close()
1146-
1147- self.consumer_set_thread = eventlet.spawn(_wait)
1148+ self.conn.create_consumer(node_topic, self, fanout=False)
1149+
1150+ self.conn.create_consumer(self.topic, self, fanout=True)
1151+
1152+ # Consume from all consumers in a thread
1153+ self.conn.consume_in_thread()
1154
1155 if self.report_interval:
1156 pulse = utils.LoopingCall(self.report_state)
1157@@ -237,10 +226,11 @@
1158 logging.warn(_('Service killed that has no database entry'))
1159
1160 def stop(self):
1161- self.consumer_set_thread.kill()
1162+ # Try to shut the connection down, but if we get any sort of
1163+ # errors, go ahead and ignore them.. as we're shutting down anyway
1164 try:
1165- self.consumer_set_thread.wait()
1166- except greenlet.GreenletExit:
1167+ self.conn.close()
1168+ except Exception:
1169 pass
1170 for x in self.timers:
1171 try:
1172
1173=== modified file 'nova/tests/test_adminapi.py'
1174--- nova/tests/test_adminapi.py 2011-07-29 19:36:37 +0000
1175+++ nova/tests/test_adminapi.py 2011-08-31 18:56:22 +0000
1176@@ -38,8 +38,6 @@
1177 super(AdminApiTestCase, self).setUp()
1178 self.flags(connection_type='fake')
1179
1180- self.conn = rpc.create_connection()
1181-
1182 # set up our cloud
1183 self.api = admin.AdminController()
1184
1185
1186=== modified file 'nova/tests/test_cloud.py'
1187--- nova/tests/test_cloud.py 2011-08-16 16:18:13 +0000
1188+++ nova/tests/test_cloud.py 2011-08-31 18:56:22 +0000
1189@@ -51,8 +51,6 @@
1190 self.flags(connection_type='fake',
1191 stub_network=True)
1192
1193- self.conn = rpc.create_connection()
1194-
1195 # set up our cloud
1196 self.cloud = cloud.CloudController()
1197
1198
1199=== modified file 'nova/tests/test_rpc.py'
1200--- nova/tests/test_rpc.py 2011-08-03 19:22:58 +0000
1201+++ nova/tests/test_rpc.py 2011-08-31 18:56:22 +0000
1202@@ -22,168 +22,16 @@
1203 from nova import context
1204 from nova import log as logging
1205 from nova import rpc
1206-from nova import test
1207+from nova.tests import test_rpc_common
1208
1209
1210 LOG = logging.getLogger('nova.tests.rpc')
1211
1212
1213-class RpcTestCase(test.TestCase):
1214+class RpcTestCase(test_rpc_common._BaseRpcTestCase):
1215 def setUp(self):
1216+ self.rpc = rpc
1217 super(RpcTestCase, self).setUp()
1218- self.conn = rpc.create_connection(True)
1219- self.receiver = TestReceiver()
1220- self.consumer = rpc.create_consumer(self.conn,
1221- 'test',
1222- self.receiver,
1223- False)
1224- self.consumer.attach_to_eventlet()
1225- self.context = context.get_admin_context()
1226-
1227- def test_call_succeed(self):
1228- value = 42
1229- result = rpc.call(self.context, 'test', {"method": "echo",
1230- "args": {"value": value}})
1231- self.assertEqual(value, result)
1232-
1233- def test_call_succeed_despite_multiple_returns(self):
1234- value = 42
1235- result = rpc.call(self.context, 'test', {"method": "echo_three_times",
1236- "args": {"value": value}})
1237- self.assertEqual(value + 2, result)
1238-
1239- def test_call_succeed_despite_multiple_returns_yield(self):
1240- value = 42
1241- result = rpc.call(self.context, 'test',
1242- {"method": "echo_three_times_yield",
1243- "args": {"value": value}})
1244- self.assertEqual(value + 2, result)
1245-
1246- def test_multicall_succeed_once(self):
1247- value = 42
1248- result = rpc.multicall(self.context,
1249- 'test',
1250- {"method": "echo",
1251- "args": {"value": value}})
1252- for i, x in enumerate(result):
1253- if i > 0:
1254- self.fail('should only receive one response')
1255- self.assertEqual(value + i, x)
1256-
1257- def test_multicall_succeed_three_times(self):
1258- value = 42
1259- result = rpc.multicall(self.context,
1260- 'test',
1261- {"method": "echo_three_times",
1262- "args": {"value": value}})
1263- for i, x in enumerate(result):
1264- self.assertEqual(value + i, x)
1265-
1266- def test_multicall_succeed_three_times_yield(self):
1267- value = 42
1268- result = rpc.multicall(self.context,
1269- 'test',
1270- {"method": "echo_three_times_yield",
1271- "args": {"value": value}})
1272- for i, x in enumerate(result):
1273- self.assertEqual(value + i, x)
1274-
1275- def test_context_passed(self):
1276- """Makes sure a context is passed through rpc call."""
1277- value = 42
1278- result = rpc.call(self.context,
1279- 'test', {"method": "context",
1280- "args": {"value": value}})
1281- self.assertEqual(self.context.to_dict(), result)
1282-
1283- def test_call_exception(self):
1284- """Test that exception gets passed back properly.
1285-
1286- rpc.call returns a RemoteError object. The value of the
1287- exception is converted to a string, so we convert it back
1288- to an int in the test.
1289-
1290- """
1291- value = 42
1292- self.assertRaises(rpc.RemoteError,
1293- rpc.call,
1294- self.context,
1295- 'test',
1296- {"method": "fail",
1297- "args": {"value": value}})
1298- try:
1299- rpc.call(self.context,
1300- 'test',
1301- {"method": "fail",
1302- "args": {"value": value}})
1303- self.fail("should have thrown rpc.RemoteError")
1304- except rpc.RemoteError as exc:
1305- self.assertEqual(int(exc.value), value)
1306-
1307- def test_nested_calls(self):
1308- """Test that we can do an rpc.call inside another call."""
1309- class Nested(object):
1310- @staticmethod
1311- def echo(context, queue, value):
1312- """Calls echo in the passed queue"""
1313- LOG.debug(_("Nested received %(queue)s, %(value)s")
1314- % locals())
1315- # TODO: so, it will replay the context and use the same REQID?
1316- # that's bizarre.
1317- ret = rpc.call(context,
1318- queue,
1319- {"method": "echo",
1320- "args": {"value": value}})
1321- LOG.debug(_("Nested return %s"), ret)
1322- return value
1323-
1324- nested = Nested()
1325- conn = rpc.create_connection(True)
1326- consumer = rpc.create_consumer(conn,
1327- 'nested',
1328- nested,
1329- False)
1330- consumer.attach_to_eventlet()
1331- value = 42
1332- result = rpc.call(self.context,
1333- 'nested', {"method": "echo",
1334- "args": {"queue": "test",
1335- "value": value}})
1336- self.assertEqual(value, result)
1337-
1338-
1339-class TestReceiver(object):
1340- """Simple Proxy class so the consumer has methods to call.
1341-
1342- Uses static methods because we aren't actually storing any state.
1343-
1344- """
1345-
1346- @staticmethod
1347- def echo(context, value):
1348- """Simply returns whatever value is sent in."""
1349- LOG.debug(_("Received %s"), value)
1350- return value
1351-
1352- @staticmethod
1353- def context(context, value):
1354- """Returns dictionary version of context."""
1355- LOG.debug(_("Received %s"), context)
1356- return context.to_dict()
1357-
1358- @staticmethod
1359- def echo_three_times(context, value):
1360- context.reply(value)
1361- context.reply(value + 1)
1362- context.reply(value + 2)
1363-
1364- @staticmethod
1365- def echo_three_times_yield(context, value):
1366- yield value
1367- yield value + 1
1368- yield value + 2
1369-
1370- @staticmethod
1371- def fail(context, value):
1372- """Raises an exception with the value sent in."""
1373- raise Exception(value)
1374+
1375+ def tearDown(self):
1376+ super(RpcTestCase, self).tearDown()
1377
1378=== removed file 'nova/tests/test_rpc_amqp.py'
1379--- nova/tests/test_rpc_amqp.py 2011-08-03 19:22:58 +0000
1380+++ nova/tests/test_rpc_amqp.py 1970-01-01 00:00:00 +0000
1381@@ -1,88 +0,0 @@
1382-# vim: tabstop=4 shiftwidth=4 softtabstop=4
1383-
1384-# Copyright (c) 2010 Openstack, LLC.
1385-# Administrator of the National Aeronautics and Space Administration.
1386-# All Rights Reserved.
1387-#
1388-# Licensed under the Apache License, Version 2.0 (the "License"); you may
1389-# not use this file except in compliance with the License. You may obtain
1390-# a copy of the License at
1391-#
1392-# http://www.apache.org/licenses/LICENSE-2.0
1393-#
1394-# Unless required by applicable law or agreed to in writing, software
1395-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
1396-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1397-# License for the specific language governing permissions and limitations
1398-# under the License.
1399-
1400-"""
1401-Tests For RPC AMQP.
1402-"""
1403-
1404-from nova import context
1405-from nova import log as logging
1406-from nova import rpc
1407-from nova.rpc import amqp
1408-from nova import test
1409-
1410-
1411-LOG = logging.getLogger('nova.tests.rpc')
1412-
1413-
1414-class RpcAMQPTestCase(test.TestCase):
1415- def setUp(self):
1416- super(RpcAMQPTestCase, self).setUp()
1417- self.conn = rpc.create_connection(True)
1418- self.receiver = TestReceiver()
1419- self.consumer = rpc.create_consumer(self.conn,
1420- 'test',
1421- self.receiver,
1422- False)
1423- self.consumer.attach_to_eventlet()
1424- self.context = context.get_admin_context()
1425-
1426- def test_connectionpool_single(self):
1427- """Test that ConnectionPool recycles a single connection."""
1428- conn1 = amqp.ConnectionPool.get()
1429- amqp.ConnectionPool.put(conn1)
1430- conn2 = amqp.ConnectionPool.get()
1431- amqp.ConnectionPool.put(conn2)
1432- self.assertEqual(conn1, conn2)
1433-
1434-
1435-class TestReceiver(object):
1436- """Simple Proxy class so the consumer has methods to call.
1437-
1438- Uses static methods because we aren't actually storing any state.
1439-
1440- """
1441-
1442- @staticmethod
1443- def echo(context, value):
1444- """Simply returns whatever value is sent in."""
1445- LOG.debug(_("Received %s"), value)
1446- return value
1447-
1448- @staticmethod
1449- def context(context, value):
1450- """Returns dictionary version of context."""
1451- LOG.debug(_("Received %s"), context)
1452- return context.to_dict()
1453-
1454- @staticmethod
1455- def echo_three_times(context, value):
1456- context.reply(value)
1457- context.reply(value + 1)
1458- context.reply(value + 2)
1459-
1460- @staticmethod
1461- def echo_three_times_yield(context, value):
1462- yield value
1463- yield value + 1
1464- yield value + 2
1465-
1466- @staticmethod
1467- def fail(context, value):
1468- """Raises an exception with the value sent in."""
1469- raise Exception(value)
1470
1471=== added file 'nova/tests/test_rpc_carrot.py'
1472--- nova/tests/test_rpc_carrot.py 1970-01-01 00:00:00 +0000
1473+++ nova/tests/test_rpc_carrot.py 2011-08-31 18:56:22 +0000
1474@@ -0,0 +1,45 @@
1475+# vim: tabstop=4 shiftwidth=4 softtabstop=4
1476+
1477+# Copyright 2010 United States Government as represented by the
1478+# Administrator of the National Aeronautics and Space Administration.
1479+# All Rights Reserved.
1480+#
1481+# Licensed under the Apache License, Version 2.0 (the "License"); you may
1482+# not use this file except in compliance with the License. You may obtain
1483+# a copy of the License at
1484+#
1485+# http://www.apache.org/licenses/LICENSE-2.0
1486+#
1487+# Unless required by applicable law or agreed to in writing, software
1488+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
1489+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1490+# License for the specific language governing permissions and limitations
1491+# under the License.
1492+"""
1493+Unit Tests for remote procedure calls using carrot
1494+"""
1495+
1496+from nova import context
1497+from nova import log as logging
1498+from nova.rpc import impl_carrot
1499+from nova.tests import test_rpc_common
1500+
1501+
1502+LOG = logging.getLogger('nova.tests.rpc')
1503+
1504+
1505+class RpcCarrotTestCase(test_rpc_common._BaseRpcTestCase):
1506+ def setUp(self):
1507+ self.rpc = impl_carrot
1508+ super(RpcCarrotTestCase, self).setUp()
1509+
1510+ def tearDown(self):
1511+ super(RpcCarrotTestCase, self).tearDown()
1512+
1513+ def test_connectionpool_single(self):
1514+ """Test that ConnectionPool recycles a single connection."""
1515+ conn1 = self.rpc.ConnectionPool.get()
1516+ self.rpc.ConnectionPool.put(conn1)
1517+ conn2 = self.rpc.ConnectionPool.get()
1518+ self.rpc.ConnectionPool.put(conn2)
1519+ self.assertEqual(conn1, conn2)
1520
1521=== added file 'nova/tests/test_rpc_common.py'
1522--- nova/tests/test_rpc_common.py 1970-01-01 00:00:00 +0000
1523+++ nova/tests/test_rpc_common.py 2011-08-31 18:56:22 +0000
1524@@ -0,0 +1,189 @@
1525+# vim: tabstop=4 shiftwidth=4 softtabstop=4
1526+
1527+# Copyright 2010 United States Government as represented by the
1528+# Administrator of the National Aeronautics and Space Administration.
1529+# All Rights Reserved.
1530+#
1531+# Licensed under the Apache License, Version 2.0 (the "License"); you may
1532+# not use this file except in compliance with the License. You may obtain
1533+# a copy of the License at
1534+#
1535+# http://www.apache.org/licenses/LICENSE-2.0
1536+#
1537+# Unless required by applicable law or agreed to in writing, software
1538+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
1539+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1540+# License for the specific language governing permissions and limitations
1541+# under the License.
1542+"""
1543+Unit Tests for remote procedure calls shared between all implementations
1544+"""
1545+
1546+from nova import context
1547+from nova import log as logging
1548+from nova.rpc.common import RemoteError
1549+from nova import test
1550+
1551+
1552+LOG = logging.getLogger('nova.tests.rpc')
1553+
1554+
1555+class _BaseRpcTestCase(test.TestCase):
1556+ def setUp(self):
1557+ super(_BaseRpcTestCase, self).setUp()
1558+ self.conn = self.rpc.create_connection(True)
1559+ self.receiver = TestReceiver()
1560+ self.conn.create_consumer('test', self.receiver, False)
1561+ self.conn.consume_in_thread()
1562+ self.context = context.get_admin_context()
1563+
1564+ def tearDown(self):
1565+ self.conn.close()
1566+ super(_BaseRpcTestCase, self).tearDown()
1567+
1568+ def test_call_succeed(self):
1569+ value = 42
1570+ result = self.rpc.call(self.context, 'test', {"method": "echo",
1571+ "args": {"value": value}})
1572+ self.assertEqual(value, result)
1573+
1574+ def test_call_succeed_despite_multiple_returns(self):
1575+ value = 42
1576+ result = self.rpc.call(self.context, 'test',
1577+ {"method": "echo_three_times",
1578+ "args": {"value": value}})
1579+ self.assertEqual(value + 2, result)
1580+
1581+ def test_call_succeed_despite_multiple_returns_yield(self):
1582+ value = 42
1583+ result = self.rpc.call(self.context, 'test',
1584+ {"method": "echo_three_times_yield",
1585+ "args": {"value": value}})
1586+ self.assertEqual(value + 2, result)
1587+
1588+ def test_multicall_succeed_once(self):
1589+ value = 42
1590+ result = self.rpc.multicall(self.context,
1591+ 'test',
1592+ {"method": "echo",
1593+ "args": {"value": value}})
1594+ for i, x in enumerate(result):
1595+ if i > 0:
1596+ self.fail('should only receive one response')
1597+ self.assertEqual(value + i, x)
1598+
1599+ def test_multicall_succeed_three_times(self):
1600+ value = 42
1601+ result = self.rpc.multicall(self.context,
1602+ 'test',
1603+ {"method": "echo_three_times",
1604+ "args": {"value": value}})
1605+ for i, x in enumerate(result):
1606+ self.assertEqual(value + i, x)
1607+
1608+ def test_multicall_succeed_three_times_yield(self):
1609+ value = 42
1610+ result = self.rpc.multicall(self.context,
1611+ 'test',
1612+ {"method": "echo_three_times_yield",
1613+ "args": {"value": value}})
1614+ for i, x in enumerate(result):
1615+ self.assertEqual(value + i, x)
1616+
1617+ def test_context_passed(self):
1618+ """Makes sure a context is passed through rpc call."""
1619+ value = 42
1620+ result = self.rpc.call(self.context,
1621+ 'test', {"method": "context",
1622+ "args": {"value": value}})
1623+ self.assertEqual(self.context.to_dict(), result)
1624+
1625+ def test_call_exception(self):
1626+ """Test that exception gets passed back properly.
1627+
1628+ rpc.call returns a RemoteError object. The value of the
1629+ exception is converted to a string, so we convert it back
1630+ to an int in the test.
1631+
1632+ """
1633+ value = 42
1634+ self.assertRaises(RemoteError,
1635+ self.rpc.call,
1636+ self.context,
1637+ 'test',
1638+ {"method": "fail",
1639+ "args": {"value": value}})
1640+ try:
1641+ self.rpc.call(self.context,
1642+ 'test',
1643+ {"method": "fail",
1644+ "args": {"value": value}})
1645+ self.fail("should have thrown RemoteError")
1646+ except RemoteError as exc:
1647+ self.assertEqual(int(exc.value), value)
1648+
1649+ def test_nested_calls(self):
1650+ """Test that we can do an rpc.call inside another call."""
1651+ class Nested(object):
1652+ @staticmethod
1653+ def echo(context, queue, value):
1654+ """Calls echo in the passed queue"""
1655+ LOG.debug(_("Nested received %(queue)s, %(value)s")
1656+ % locals())
1657+ # TODO: so, it will replay the context and use the same REQID?
1658+ # that's bizarre.
1659+ ret = self.rpc.call(context,
1660+ queue,
1661+ {"method": "echo",
1662+ "args": {"value": value}})
1663+ LOG.debug(_("Nested return %s"), ret)
1664+ return value
1665+
1666+ nested = Nested()
1667+ conn = self.rpc.create_connection(True)
1668+ conn.create_consumer('nested', nested, False)
1669+ conn.consume_in_thread()
1670+ value = 42
1671+ result = self.rpc.call(self.context,
1672+ 'nested', {"method": "echo",
1673+ "args": {"queue": "test",
1674+ "value": value}})
1675+ conn.close()
1676+ self.assertEqual(value, result)
1677+
1678+
1679+class TestReceiver(object):
1680+ """Simple Proxy class so the consumer has methods to call.
1681+
1682+ Uses static methods because we aren't actually storing any state.
1683+
1684+ """
1685+
1686+ @staticmethod
1687+ def echo(context, value):
1688+ """Simply returns whatever value is sent in."""
1689+ LOG.debug(_("Received %s"), value)
1690+ return value
1691+
1692+ @staticmethod
1693+ def context(context, value):
1694+ """Returns dictionary version of context."""
1695+ LOG.debug(_("Received %s"), context)
1696+ return context.to_dict()
1697+
1698+ @staticmethod
1699+ def echo_three_times(context, value):
1700+ context.reply(value)
1701+ context.reply(value + 1)
1702+ context.reply(value + 2)
1703+
1704+ @staticmethod
1705+ def echo_three_times_yield(context, value):
1706+ yield value
1707+ yield value + 1
1708+ yield value + 2
1709+
1710+ @staticmethod
1711+ def fail(context, value):
1712+ """Raises an exception with the value sent in."""
1713+ raise Exception(value)
1714
1715=== added file 'nova/tests/test_rpc_kombu.py'
1716--- nova/tests/test_rpc_kombu.py 1970-01-01 00:00:00 +0000
1717+++ nova/tests/test_rpc_kombu.py 2011-08-31 18:56:22 +0000
1718@@ -0,0 +1,110 @@
1719+# vim: tabstop=4 shiftwidth=4 softtabstop=4
1720+
1721+# Copyright 2010 United States Government as represented by the
1722+# Administrator of the National Aeronautics and Space Administration.
1723+# All Rights Reserved.
1724+#
1725+# Licensed under the Apache License, Version 2.0 (the "License"); you may
1726+# not use this file except in compliance with the License. You may obtain
1727+# a copy of the License at
1728+#
1729+# http://www.apache.org/licenses/LICENSE-2.0
1730+#
1731+# Unless required by applicable law or agreed to in writing, software
1732+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
1733+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1734+# License for the specific language governing permissions and limitations
1735+# under the License.
1736+"""
1737+Unit Tests for remote procedure calls using kombu
1738+"""
1739+
1740+from nova import context
1741+from nova import log as logging
1742+from nova import test
1743+from nova.rpc import impl_kombu
1744+from nova.tests import test_rpc_common
1745+
1746+
1747+LOG = logging.getLogger('nova.tests.rpc')
1748+
1749+
1750+class RpcKombuTestCase(test_rpc_common._BaseRpcTestCase):
1751+ def setUp(self):
1752+ self.rpc = impl_kombu
1753+ super(RpcKombuTestCase, self).setUp()
1754+
1755+ def tearDown(self):
1756+ super(RpcKombuTestCase, self).tearDown()
1757+
1758+ def test_reusing_connection(self):
1759+ """Test that reusing a connection returns same one."""
1760+ conn_context = self.rpc.create_connection(new=False)
1761+ conn1 = conn_context.connection
1762+ conn_context.close()
1763+ conn_context = self.rpc.create_connection(new=False)
1764+ conn2 = conn_context.connection
1765+ conn_context.close()
1766+ self.assertEqual(conn1, conn2)
1767+
1768+ def test_topic_send_receive(self):
1769+ """Test sending to a topic exchange/queue"""
1770+
1771+ conn = self.rpc.create_connection()
1772+ message = 'topic test message'
1773+
1774+ self.received_message = None
1775+
1776+ def _callback(message):
1777+ self.received_message = message
1778+
1779+ conn.declare_topic_consumer('a_topic', _callback)
1780+ conn.topic_send('a_topic', message)
1781+ conn.consume(limit=1)
1782+ conn.close()
1783+
1784+ self.assertEqual(self.received_message, message)
1785+
1786+ def test_direct_send_receive(self):
1787+ """Test sending to a direct exchange/queue"""
1788+ conn = self.rpc.create_connection()
1789+ message = 'direct test message'
1790+
1791+ self.received_message = None
1792+
1793+ def _callback(message):
1794+ self.received_message = message
1795+
1796+ conn.declare_direct_consumer('a_direct', _callback)
1797+ conn.direct_send('a_direct', message)
1798+ conn.consume(limit=1)
1799+ conn.close()
1800+
1801+ self.assertEqual(self.received_message, message)
1802+
1803+ @test.skip_test("kombu memory transport seems buggy with fanout queues "
1804+ "as this test passes when you use rabbit (fake_rabbit=False)")
1805+ def test_fanout_send_receive(self):
1806+ """Test sending to a fanout exchange and consuming from 2 queues"""
1807+
1808+ conn = self.rpc.create_connection()
1809+ conn2 = self.rpc.create_connection()
1810+ message = 'fanout test message'
1811+
1812+ self.received_message = None
1813+
1814+ def _callback(message):
1815+ self.received_message = message
1816+
1817+ conn.declare_fanout_consumer('a_fanout', _callback)
1818+ conn2.declare_fanout_consumer('a_fanout', _callback)
1819+ conn.fanout_send('a_fanout', message)
1820+
1821+ conn.consume(limit=1)
1822+ conn.close()
1823+ self.assertEqual(self.received_message, message)
1824+
1825+ self.received_message = None
1826+ conn2.consume(limit=1)
1827+ conn2.close()
1828+ self.assertEqual(self.received_message, message)
1829
1830=== modified file 'nova/tests/test_test.py'
1831--- nova/tests/test_test.py 2011-07-26 23:29:50 +0000
1832+++ nova/tests/test_test.py 2011-08-31 18:56:22 +0000
1833@@ -40,6 +40,5 @@
1834
1835 connection = rpc.create_connection(new=True)
1836 proxy = NeverCalled()
1837- consumer = rpc.create_consumer(connection, 'compute',
1838- proxy, fanout=False)
1839- consumer.attach_to_eventlet()
1840+ connection.create_consumer('compute', proxy, fanout=False)
1841+ connection.consume_in_thread()
1842
1843=== modified file 'tools/pip-requires'
1844--- tools/pip-requires 2011-08-22 21:24:37 +0000
1845+++ tools/pip-requires 2011-08-31 18:56:22 +0000
1846@@ -8,6 +8,7 @@
1847 boto==1.9b
1848 carrot==0.10.5
1849 eventlet
1850+kombu
1851 lockfile==0.8
1852 lxml==2.3
1853 python-novaclient==2.6.0