Merge lp:~vishvananda/nova/no-db-messaging into lp:~hudson-openstack/nova/trunk

Proposed by Vish Ishaya
Status: Rejected
Rejected by: Vish Ishaya
Proposed branch: lp:~vishvananda/nova/no-db-messaging
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 1133 lines (+486/-214)
12 files modified
nova/db/sqlalchemy/models.py (+12/-4)
nova/fakerabbit.py (+23/-8)
nova/rpc.py (+195/-70)
nova/scheduler/manager.py (+7/-5)
nova/service.py (+38/-24)
nova/test.py (+6/-3)
nova/tests/integrated/integrated_helpers.py (+1/-4)
nova/tests/test_cloud.py (+9/-18)
nova/tests/test_rpc.py (+65/-0)
nova/tests/test_service.py (+51/-6)
nova/volume/api.py (+53/-19)
nova/volume/manager.py (+26/-53)
To merge this branch: bzr merge lp:~vishvananda/nova/no-db-messaging
Reviewer Review Type Date Requested Status
Dan Prince (community) Needs Fixing
Review via email: mp+61189@code.launchpad.net

Description of the change

This is an initial proposal for feedback. This branch is an attempt to start on this blueprint:
https://blueprints.launchpad.net/nova/+spec/no-db-messaging
which will allow for the implementation of this blueprint:
https://blueprints.launchpad.net/nova/+spec/separate-code-for-services
And will ultimately make it easy to replace our various services with external projects.

This prototype changes volume_create to pass data through the queue instead of writing information to the database and reading it on the other end. It attempts to make minimal changes. It includes:
 * a small change to model code to allow for conversion into dicts
 * scheduler uses call instead of cast
 * volume.manager provides a volume_get to query the state of a volume
 * volume.api creates volume with a call and then spawn greenthread to poll status
 * volume.manager returns initial data immediately and then spawn a greenthread to do further work

This will allow us to communicate with an external REST api very easily, by simply turning the two rpc calls into POST and GET. It also allows us to separate out the database for volumes, so the volume worker doesn't need to write to the shared zone database

Outstanding Issues for database split:
 * we have to use something like remote_id to refer to the volume when making requests to nova-volume because the id in our local database will not match. Ultimately we've discussed switching to UUIDs, so It may be good to do that change at the same time.
 * if the state of a volume changes after the initial poll has finished, we will never know about it. We ultimately need a way for api to be notified if the volume status changes so that we can update our local data, or we need a background worker that is polling the service for changes at an irregular interval.

I'm open to feedback about this approach. I tried a few other versions and this seems like the simplest change set to get what we want. If this looks good, I will modify the other volume commands to work the same way and propose a similar set of changes for compute.

To post a comment you must log in.
Revision history for this message
Vish Ishaya (vishvananda) wrote :

accidentally pushed an old version. This is the right one.

lp:~vishvananda/nova/no-db-messaging updated
1078. By Eldar Nugaev

Improved error notification in network create

1079. By Eldar Nugaev

Added network_info into refresh_security_group_rules
That fixs https://bugs.launchpad.net/nova/+bug/773308

1080. By Mark Washenberger

Convert instance_type_ids in the instances table from strings to integers to enable joins with instance_types. This in particular fixes a problem when using postgresql.

Revision history for this message
Dan Prince (dan-prince) wrote :

Hey Vish,

I just smoke stacked this and got some errors. It looks like the changes to models.py aren't quite right.

I'm getting the following in nova-api.log:

2011-05-17 17:17:18,574 ERROR nova.api.openstack [-] Caught error: 'AuthToken' object has no attribute 'server_management_url'
(nova.api.openstack): TRACE: Traceback (most recent call last):
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/nova/api/openstack/__init__.py", line 59, in __call__
(nova.api.openstack): TRACE: return req.get_response(self.application)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/webob/request.py", line 919, in get_response
(nova.api.openstack): TRACE: application, catch_exc_info=False)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/webob/request.py", line 887, in call_application
(nova.api.openstack): TRACE: app_iter = application(self.environ, start_response)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/webob/dec.py", line 147, in __call__
(nova.api.openstack): TRACE: resp = self.call_func(req, *args, **self.kwargs)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/webob/dec.py", line 208, in call_func
(nova.api.openstack): TRACE: return self.func(req, *args, **kwargs)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/nova/api/openstack/auth.py", line 54, in __call__
(nova.api.openstack): TRACE: return self.authenticate(req)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/nova/api/openstack/auth.py", line 107, in authenticate
(nova.api.openstack): TRACE: token.server_management_url
(nova.api.openstack): TRACE: AttributeError: 'AuthToken' object has no attribute 'server_management_url'
--

Running a simple 'nova list' command (using the OSAPI) should allow you to reproduce this.

review: Needs Fixing
Revision history for this message
Vish Ishaya (vishvananda) wrote :

Thanks dan, I'll look at this. Also had some feedback from termie offline, and I'm moving where the polling is happening.

lp:~vishvananda/nova/no-db-messaging updated
1081. By Josh Kearney

Added missing metadata join to instance_get calls.

1082. By Matt Dietz

Fixes improper attribute naming around instance types that broke Resizes.

1083. By termie

Docstring cleanup and formatting (nova/network dir). Minor style fixes as well.

1084. By Vish Ishaya

Fixes the naming of the server_management_url in auth and tests.

1085. By Josh Kearney

Added missing xenhost plugin. This was causing warnings to pop up in the compute logs during periodic_task runs. It must have not been bzr add'd when this code was merged.

1086. By Matt Dietz

Implements a basic mechanism for pushing notifications out to interested parties. The rationale for implementing notifications this way is that the responsibility for them shouldn't fall to Nova. As such, we simply will be pushing messages to a queue where another worker entirely can be written to push messages around to subscribers.

1087. By Johannes Erdfelt

Simple change to sort the list of controllers/methods before printing to make it easier to read

1088. By termie

add support to rpc for multicall

1089. By termie

make the test more expicit

1090. By termie

add commented out unworking code for yield-based returns

1091. By Chris Behrens

Add a connection pool for rpc cast/call
Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each.

1092. By Chris Behrens

pep8 and comment fixes

1093. By Chris Behrens

convert fanout_cast to ConnectionPool

1094. By Chris Behrens

fakerabbit's declare_consumer should support more than 1 consumer. also: make fakerabbit Backend.consume be an iterator like it should be..

1095. By Vish Ishaya

fix consumers to actually be deleted and clean up cloud test

1096. By Chris Behrens

catch greenlet.GreenletExit when shutting service down

1097. By Chris Behrens

Always create Service consumers no matter if report_interval is 0
Fix tests to handle how Service loads Consumers now

1098. By Chris Behrens

Add rpc_conn_pool_size flag for the new connection pool

1099. By termie

bring back commits lost in merge

1100. By termie

almost everything working with fake_rabbit

1101. By termie

don't need to use a separate connection

1102. By Vish Ishaya

lots of fixes for rpc and extra imports

1103. By termie

make sure that using multicall on a call with a single result still functions

1104. By termie

cleanup the code for merging

1105. By Vish Ishaya

update manager to use multicall

1106. By Vish Ishaya

fix conversion of models to dicts

1107. By Vish Ishaya

change volume_api to have delayed_create

1108. By Vish Ishaya

make scheduler use multicall

1109. By Vish Ishaya

remove the unnecessary try except in manager

1110. By Vish Ishaya

pep8

Revision history for this message
Vish Ishaya (vishvananda) wrote :

reproposing this with a prereq branch.

Unmerged revisions

1123. By Vish Ishaya

merged trunk

1122. By Vish Ishaya

remove merge error calling failing test

1121. By Vish Ishaya

fix snapshot test

1120. By Vish Ishaya

return not yield in scheduler shortcut

1119. By Vish Ishaya

make sure to handle VolumeIsBusy

1118. By Vish Ishaya

merged trunk and removed conflicts

1117. By Vish Ishaya

lost some changes from rpc branch, bring them in manually

1116. By Vish Ishaya

use strtime for passing datetimes back and forth through the queue

1115. By Vish Ishaya

fix tests

1114. By Vish Ishaya

keep the database on the receiving end as well

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'nova/db/sqlalchemy/models.py'
2--- nova/db/sqlalchemy/models.py 2011-05-17 20:50:12 +0000
3+++ nova/db/sqlalchemy/models.py 2011-05-20 01:34:25 +0000
4@@ -77,17 +77,25 @@
5 return getattr(self, key, default)
6
7 def __iter__(self):
8- self._i = iter(object_mapper(self).columns)
9+ # NOTE(vish): include name property in the iterator
10+ columns = dict(object_mapper(self).columns).keys()
11+ name = self.get('name')
12+ if name:
13+ columns.append('name')
14+ self._i = iter(columns)
15 return self
16
17 def next(self):
18- n = self._i.next().name
19+ n = self._i.next()
20 return n, getattr(self, n)
21
22 def update(self, values):
23 """Make the model object behave like a dict"""
24- for k, v in values.iteritems():
25- setattr(self, k, v)
26+ columns = dict(object_mapper(self).columns).keys()
27+ for key, value in values.iteritems():
28+ # NOTE(vish): don't update the 'name' property
29+ if key in columns:
30+ setattr(self, key, value)
31
32 def iteritems(self):
33 """Make the model object behave like a dict.
34
35=== modified file 'nova/fakerabbit.py'
36--- nova/fakerabbit.py 2011-02-22 23:05:48 +0000
37+++ nova/fakerabbit.py 2011-05-20 01:34:25 +0000
38@@ -31,6 +31,7 @@
39
40 EXCHANGES = {}
41 QUEUES = {}
42+CONSUMERS = {}
43
44
45 class Message(base.BaseMessage):
46@@ -96,17 +97,29 @@
47 ' key %(routing_key)s') % locals())
48 EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
49
50- def declare_consumer(self, queue, callback, *args, **kwargs):
51- self.current_queue = queue
52- self.current_callback = callback
53+ def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
54+ global CONSUMERS
55+ LOG.debug("Adding consumer %s", consumer_tag)
56+ CONSUMERS[consumer_tag] = (queue, callback)
57+
58+ def cancel(self, consumer_tag):
59+ global CONSUMERS
60+ LOG.debug("Removing consumer %s", consumer_tag)
61+ del CONSUMERS[consumer_tag]
62
63 def consume(self, limit=None):
64+ global CONSUMERS
65+ num = 0
66 while True:
67- item = self.get(self.current_queue)
68- if item:
69- self.current_callback(item)
70- raise StopIteration()
71- greenthread.sleep(0)
72+ for (queue, callback) in CONSUMERS.itervalues():
73+ item = self.get(queue)
74+ if item:
75+ callback(item)
76+ num += 1
77+ yield
78+ if limit and num == limit:
79+ raise StopIteration()
80+ greenthread.sleep(0.1)
81
82 def get(self, queue, no_ack=False):
83 global QUEUES
84@@ -134,5 +147,7 @@
85 def reset_all():
86 global EXCHANGES
87 global QUEUES
88+ global CONSUMERS
89 EXCHANGES = {}
90 QUEUES = {}
91+ CONSUMERS = {}
92
93=== modified file 'nova/rpc.py'
94--- nova/rpc.py 2011-04-20 19:08:22 +0000
95+++ nova/rpc.py 2011-05-20 01:34:25 +0000
96@@ -33,7 +33,9 @@
97 from carrot import connection as carrot_connection
98 from carrot import messaging
99 from eventlet import greenpool
100-from eventlet import greenthread
101+from eventlet import pools
102+from eventlet import queue
103+import greenlet
104
105 from nova import context
106 from nova import exception
107@@ -47,7 +49,10 @@
108
109
110 FLAGS = flags.FLAGS
111-flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
112+flags.DEFINE_integer('rpc_thread_pool_size', 1024,
113+ 'Size of RPC thread pool')
114+flags.DEFINE_integer('rpc_conn_pool_size', 30,
115+ 'Size of RPC connection pool')
116
117
118 class Connection(carrot_connection.BrokerConnection):
119@@ -90,6 +95,17 @@
120 return cls.instance()
121
122
123+class Pool(pools.Pool):
124+ """Class that implements a Pool of Connections."""
125+
126+ def create(self):
127+ LOG.debug('Creating new connection')
128+ return Connection.instance(new=True)
129+
130+
131+ConnectionPool = Pool(max_size=FLAGS.rpc_conn_pool_size)
132+
133+
134 class Consumer(messaging.Consumer):
135 """Consumer base class.
136
137@@ -131,7 +147,9 @@
138 self.connection = Connection.recreate()
139 self.backend = self.connection.create_backend()
140 self.declare()
141- super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
142+ return super(Consumer, self).fetch(no_ack,
143+ auto_ack,
144+ enable_callbacks)
145 if self.failed_connection:
146 LOG.error(_('Reconnected to queue'))
147 self.failed_connection = False
148@@ -159,13 +177,13 @@
149 self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
150 super(AdapterConsumer, self).__init__(connection=connection,
151 topic=topic)
152-
153- def receive(self, *args, **kwargs):
154- self.pool.spawn_n(self._receive, *args, **kwargs)
155-
156- @exception.wrap_exception
157- def _receive(self, message_data, message):
158- """Magically looks for a method on the proxy object and calls it.
159+ self.register_callback(self.process_data)
160+
161+ def process_data(self, message_data, message):
162+ """Consumer callback to call a method on a proxy object.
163+
164+ Parses the message for validity and fires off a thread to call the
165+ proxy object method.
166
167 Message data should be a dictionary with two keys:
168 method: string representing the method to call
169@@ -175,8 +193,8 @@
170
171 """
172 LOG.debug(_('received %s') % message_data)
173- msg_id = message_data.pop('_msg_id', None)
174-
175+ # This will be popped off in _unpack_context
176+ msg_id = message_data.get('_msg_id', None)
177 ctxt = _unpack_context(message_data)
178
179 method = message_data.get('method')
180@@ -190,6 +208,13 @@
181 LOG.warn(_('no method for message: %s') % message_data)
182 msg_reply(msg_id, _('No method for message: %s') % message_data)
183 return
184+ self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
185+
186+ @exception.wrap_exception
187+ def _process_data(self, msg_id, ctxt, method, args):
188+ """Thread that maigcally looks for a method on the proxy
189+ object and calls it.
190+ """
191
192 node_func = getattr(self.proxy, str(method))
193 node_args = dict((str(k), v) for k, v in args.iteritems())
194@@ -197,7 +222,15 @@
195 try:
196 rval = node_func(context=ctxt, **node_args)
197 if msg_id:
198- msg_reply(msg_id, rval, None)
199+ # Check if the result was a generator
200+ if hasattr(rval, 'send'):
201+ for x in rval:
202+ msg_reply(msg_id, x, None)
203+ else:
204+ msg_reply(msg_id, rval, None)
205+
206+ # This final None tells multicall that it is done.
207+ msg_reply(msg_id, None, None)
208 except Exception as e:
209 logging.exception('Exception during message handling')
210 if msg_id:
211@@ -205,11 +238,6 @@
212 return
213
214
215-class Publisher(messaging.Publisher):
216- """Publisher base class."""
217- pass
218-
219-
220 class TopicAdapterConsumer(AdapterConsumer):
221 """Consumes messages on a specific topic."""
222
223@@ -242,6 +270,59 @@
224 topic=topic, proxy=proxy)
225
226
227+class ConsumerSet(object):
228+ """Groups consumers to listen on together on a single connection."""
229+
230+ def __init__(self, conn, consumer_list):
231+ self.consumer_list = set(consumer_list)
232+ self.consumer_set = None
233+ self.enabled = True
234+ self.init(conn)
235+
236+ def init(self, conn):
237+ if not conn:
238+ conn = Connection.instance(new=True)
239+ if self.consumer_set:
240+ self.consumer_set.close()
241+ self.consumer_set = messaging.ConsumerSet(conn)
242+ for consumer in self.consumer_list:
243+ consumer.connection = conn
244+ # consumer.backend is set for us
245+ self.consumer_set.add_consumer(consumer)
246+
247+ def reconnect(self):
248+ self.init(None)
249+
250+ def wait(self, limit=None):
251+ running = True
252+ while running:
253+ it = self.consumer_set.iterconsume(limit=limit)
254+ if not it:
255+ break
256+ while True:
257+ try:
258+ it.next()
259+ except StopIteration:
260+ return
261+ except greenlet.GreenletExit:
262+ running = False
263+ break
264+ except Exception as e:
265+ LOG.error(_("Received exception %s " % type(e) + \
266+ "while processing consumer"))
267+ self.reconnect()
268+ # Break to outer loop
269+ break
270+
271+ def close(self):
272+ self.consumer_set.close()
273+
274+
275+class Publisher(messaging.Publisher):
276+ """Publisher base class."""
277+ pass
278+
279+
280 class TopicPublisher(Publisher):
281 """Publishes messages on a specific topic."""
282
283@@ -306,16 +387,18 @@
284 LOG.error(_("Returning exception %s to caller"), message)
285 LOG.error(tb)
286 failure = (failure[0].__name__, str(failure[1]), tb)
287- conn = Connection.instance()
288- publisher = DirectPublisher(connection=conn, msg_id=msg_id)
289- try:
290- publisher.send({'result': reply, 'failure': failure})
291- except TypeError:
292- publisher.send(
293- {'result': dict((k, repr(v))
294- for k, v in reply.__dict__.iteritems()),
295- 'failure': failure})
296- publisher.close()
297+
298+ with ConnectionPool.item() as conn:
299+ publisher = DirectPublisher(connection=conn, msg_id=msg_id)
300+ try:
301+ publisher.send({'result': reply, 'failure': failure})
302+ except TypeError:
303+ publisher.send(
304+ {'result': dict((k, repr(v))
305+ for k, v in reply.__dict__.iteritems()),
306+ 'failure': failure})
307+
308+ publisher.close()
309
310
311 class RemoteError(exception.Error):
312@@ -347,8 +430,9 @@
313 if key.startswith('_context_'):
314 value = msg.pop(key)
315 context_dict[key[9:]] = value
316+ context_dict['msg_id'] = msg.pop('_msg_id', None)
317 LOG.debug(_('unpacked context: %s'), context_dict)
318- return context.RequestContext.from_dict(context_dict)
319+ return RpcContext.from_dict(context_dict)
320
321
322 def _pack_context(msg, context):
323@@ -360,70 +444,110 @@
324 for args at some point.
325
326 """
327- context = dict([('_context_%s' % key, value)
328- for (key, value) in context.to_dict().iteritems()])
329- msg.update(context)
330-
331-
332-def call(context, topic, msg):
333- """Sends a message on a topic and wait for a response."""
334+ context_d = dict([('_context_%s' % key, value)
335+ for (key, value) in context.to_dict().iteritems()])
336+ msg.update(context_d)
337+
338+
339+class RpcContext(context.RequestContext):
340+ def __init__(self, *args, **kwargs):
341+ msg_id = kwargs.pop('msg_id', None)
342+ self.msg_id = msg_id
343+ super(RpcContext, self).__init__(*args, **kwargs)
344+
345+ def reply(self, *args, **kwargs):
346+ msg_reply(self.msg_id, *args, **kwargs)
347+
348+
349+def multicall(context, topic, msg):
350+ """Make a call that returns multiple times."""
351 LOG.debug(_('Making asynchronous call on %s ...'), topic)
352 msg_id = uuid.uuid4().hex
353 msg.update({'_msg_id': msg_id})
354 LOG.debug(_('MSG_ID is %s') % (msg_id))
355 _pack_context(msg, context)
356
357- class WaitMessage(object):
358- def __call__(self, data, message):
359- """Acks message and sets result."""
360- message.ack()
361- if data['failure']:
362- self.result = RemoteError(*data['failure'])
363- else:
364- self.result = data['result']
365-
366- wait_msg = WaitMessage()
367- conn = Connection.instance()
368- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
369+ con_conn = ConnectionPool.get()
370+ consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
371+ wait_msg = MulticallWaiter(consumer)
372 consumer.register_callback(wait_msg)
373
374- conn = Connection.instance()
375- publisher = TopicPublisher(connection=conn, topic=topic)
376+ publisher = TopicPublisher(connection=con_conn, topic=topic)
377 publisher.send(msg)
378 publisher.close()
379
380- try:
381- consumer.wait(limit=1)
382- except StopIteration:
383- pass
384- consumer.close()
385- # NOTE(termie): this is a little bit of a change from the original
386- # non-eventlet code where returning a Failure
387- # instance from a deferred call is very similar to
388- # raising an exception
389- if isinstance(wait_msg.result, Exception):
390- raise wait_msg.result
391- return wait_msg.result
392+ return wait_msg
393+
394+
395+class MulticallWaiter(object):
396+ def __init__(self, consumer):
397+ self._consumer = consumer
398+ self._results = queue.Queue()
399+ self._closed = False
400+
401+ def close(self):
402+ self._closed = True
403+ self._consumer.close()
404+ ConnectionPool.put(self._consumer.connection)
405+
406+ def __call__(self, data, message):
407+ """Acks message and sets result."""
408+ message.ack()
409+ if data['failure']:
410+ self._results.put(RemoteError(*data['failure']))
411+ else:
412+ self._results.put(data['result'])
413+
414+ def __iter__(self):
415+ return self.wait()
416+
417+ def wait(self):
418+ while True:
419+ rv = None
420+ while rv is None and not self._closed:
421+ try:
422+ rv = self._consumer.fetch(enable_callbacks=True)
423+ except Exception:
424+ self.close()
425+ raise
426+ time.sleep(0.01)
427+
428+ result = self._results.get()
429+ if isinstance(result, Exception):
430+ self.close()
431+ raise result
432+ if result == None:
433+ self.close()
434+ raise StopIteration
435+ yield result
436+
437+
438+def call(context, topic, msg):
439+ """Sends a message on a topic and wait for a response."""
440+ rv = multicall(context, topic, msg)
441+ for x in rv:
442+ rv.close()
443+ return x
444
445
446 def cast(context, topic, msg):
447 """Sends a message on a topic without waiting for a response."""
448 LOG.debug(_('Making asynchronous cast on %s...'), topic)
449 _pack_context(msg, context)
450- conn = Connection.instance()
451- publisher = TopicPublisher(connection=conn, topic=topic)
452- publisher.send(msg)
453- publisher.close()
454+ with ConnectionPool.item() as conn:
455+ publisher = TopicPublisher(connection=conn, topic=topic)
456+ publisher.send(msg)
457+ publisher.close()
458
459
460 def fanout_cast(context, topic, msg):
461 """Sends a message on a fanout exchange without waiting for a response."""
462 LOG.debug(_('Making asynchronous fanout cast...'))
463 _pack_context(msg, context)
464- conn = Connection.instance()
465- publisher = FanoutPublisher(topic, connection=conn)
466- publisher.send(msg)
467- publisher.close()
468+ with ConnectionPool.item() as conn:
469+ publisher = FanoutPublisher(topic, connection=conn)
470+ publisher.send(msg)
471+ publisher.close()
472
473
474 def generic_response(message_data, message):
475@@ -459,6 +583,7 @@
476
477 if wait:
478 consumer.wait()
479+ consumer.close()
480
481
482 if __name__ == '__main__':
483
484=== modified file 'nova/scheduler/manager.py'
485--- nova/scheduler/manager.py 2011-05-05 14:35:44 +0000
486+++ nova/scheduler/manager.py 2011-05-20 01:34:25 +0000
487@@ -83,11 +83,13 @@
488 except AttributeError:
489 host = self.driver.schedule(elevated, topic, *args, **kwargs)
490
491- rpc.cast(context,
492- db.queue_get_for(context, topic, host),
493- {"method": method,
494- "args": kwargs})
495- LOG.debug(_("Casting to %(topic)s %(host)s for %(method)s") % locals())
496+ LOG.debug(_("Multicall %(topic)s %(host)s for %(method)s") % locals())
497+ rvs = rpc.multicall(context,
498+ db.queue_get_for(context, topic, host),
499+ {"method": method,
500+ "args": kwargs})
501+ for rv in rvs:
502+ yield rv
503
504 # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin.
505 # Based on bexar design summit discussion,
506
507=== modified file 'nova/service.py'
508--- nova/service.py 2011-04-20 19:08:22 +0000
509+++ nova/service.py 2011-05-20 01:34:25 +0000
510@@ -19,14 +19,11 @@
511
512 """Generic Node baseclass for all workers that run on hosts."""
513
514+import greenlet
515 import inspect
516 import os
517-import sys
518-import time
519
520-from eventlet import event
521 from eventlet import greenthread
522-from eventlet import greenpool
523
524 from nova import context
525 from nova import db
526@@ -91,27 +88,38 @@
527 if 'nova-compute' == self.binary:
528 self.manager.update_available_resource(ctxt)
529
530- conn1 = rpc.Connection.instance(new=True)
531- conn2 = rpc.Connection.instance(new=True)
532- conn3 = rpc.Connection.instance(new=True)
533+ self.conn = rpc.Connection.instance(new=True)
534+ logging.debug("Creating Consumer connection for Service %s" %
535+ self.topic)
536+
537+ # Share this same connection for these Consumers
538+ consumer_all = rpc.TopicAdapterConsumer(
539+ connection=self.conn,
540+ topic=self.topic,
541+ proxy=self)
542+ consumer_node = rpc.TopicAdapterConsumer(
543+ connection=self.conn,
544+ topic='%s.%s' % (self.topic, self.host),
545+ proxy=self)
546+ fanout = rpc.FanoutAdapterConsumer(
547+ connection=self.conn,
548+ topic=self.topic,
549+ proxy=self)
550+
551+ cset = rpc.ConsumerSet(self.conn, [consumer_all,
552+ consumer_node,
553+ fanout])
554+
555+ # Wait forever, processing these consumers
556+ def _wait():
557+ try:
558+ cset.wait()
559+ finally:
560+ cset.close()
561+
562+ self.csetthread = greenthread.spawn(_wait)
563+
564 if self.report_interval:
565- consumer_all = rpc.TopicAdapterConsumer(
566- connection=conn1,
567- topic=self.topic,
568- proxy=self)
569- consumer_node = rpc.TopicAdapterConsumer(
570- connection=conn2,
571- topic='%s.%s' % (self.topic, self.host),
572- proxy=self)
573- fanout = rpc.FanoutAdapterConsumer(
574- connection=conn3,
575- topic=self.topic,
576- proxy=self)
577-
578- self.timers.append(consumer_all.attach_to_eventlet())
579- self.timers.append(consumer_node.attach_to_eventlet())
580- self.timers.append(fanout.attach_to_eventlet())
581-
582 pulse = utils.LoopingCall(self.report_state)
583 pulse.start(interval=self.report_interval, now=False)
584 self.timers.append(pulse)
585@@ -167,7 +175,13 @@
586
587 def kill(self):
588 """Destroy the service object in the datastore."""
589+ self.csetthread.kill()
590+ try:
591+ self.csetthread.wait()
592+ except greenlet.GreenletExit:
593+ pass
594 self.stop()
595+ rpc.ConnectionPool.put(self.conn)
596 try:
597 db.service_destroy(context.get_admin_context(), self.service_id)
598 except exception.NotFound:
599
600=== modified file 'nova/test.py'
601--- nova/test.py 2011-04-20 19:08:22 +0000
602+++ nova/test.py 2011-05-20 01:34:25 +0000
603@@ -31,17 +31,15 @@
604 import unittest
605
606 import mox
607-import shutil
608 import stubout
609 from eventlet import greenthread
610
611-from nova import context
612-from nova import db
613 from nova import fakerabbit
614 from nova import flags
615 from nova import rpc
616 from nova import service
617 from nova import wsgi
618+from nova.virt import fake
619
620
621 FLAGS = flags.FLAGS
622@@ -85,6 +83,7 @@
623 self._monkey_patch_attach()
624 self._monkey_patch_wsgi()
625 self._original_flags = FLAGS.FlagValuesDict()
626+ rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size)
627
628 def tearDown(self):
629 """Runs after each test method to tear down test environment."""
630@@ -99,6 +98,10 @@
631 if FLAGS.fake_rabbit:
632 fakerabbit.reset_all()
633
634+ if FLAGS.connection_type == 'fake':
635+ if hasattr(fake.FakeConnection, '_instance'):
636+ del fake.FakeConnection._instance
637+
638 # Reset any overriden flags
639 self.reset_flags()
640
641
642=== modified file 'nova/tests/integrated/integrated_helpers.py'
643--- nova/tests/integrated/integrated_helpers.py 2011-03-30 01:13:04 +0000
644+++ nova/tests/integrated/integrated_helpers.py 2011-05-20 01:34:25 +0000
645@@ -154,10 +154,7 @@
646 # set up services
647 self.start_service('compute')
648 self.start_service('volume')
649- # NOTE(justinsb): There's a bug here which is eluding me...
650- # If we start the network_service, all is good, but then subsequent
651- # tests fail: CloudTestCase.test_ajax_console in particular.
652- #self.start_service('network')
653+ self.start_service('network')
654 self.start_service('scheduler')
655
656 self.auth_url = self._start_api_service()
657
658=== modified file 'nova/tests/test_cloud.py'
659--- nova/tests/test_cloud.py 2011-05-16 20:30:40 +0000
660+++ nova/tests/test_cloud.py 2011-05-20 01:34:25 +0000
661@@ -17,13 +17,8 @@
662 # under the License.
663
664 from base64 import b64decode
665-import json
666 from M2Crypto import BIO
667 from M2Crypto import RSA
668-import os
669-import shutil
670-import tempfile
671-import time
672
673 from eventlet import greenthread
674
675@@ -33,12 +28,10 @@
676 from nova import flags
677 from nova import log as logging
678 from nova import rpc
679-from nova import service
680 from nova import test
681 from nova import utils
682 from nova import exception
683 from nova.auth import manager
684-from nova.compute import power_state
685 from nova.api.ec2 import cloud
686 from nova.api.ec2 import ec2utils
687 from nova.image import local
688@@ -79,14 +72,21 @@
689 self.stubs.Set(local.LocalImageService, 'show', fake_show)
690 self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
691
692+ # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
693+ rpc_cast = rpc.cast
694+
695+ def finish_cast(*args, **kwargs):
696+ rpc_cast(*args, **kwargs)
697+ greenthread.sleep(0.2)
698+
699+ self.stubs.Set(rpc, 'cast', finish_cast)
700+
701 def tearDown(self):
702 network_ref = db.project_get_network(self.context,
703 self.project.id)
704 db.network_disassociate(self.context, network_ref['id'])
705 self.manager.delete_project(self.project)
706 self.manager.delete_user(self.user)
707- self.compute.kill()
708- self.network.kill()
709 super(CloudTestCase, self).tearDown()
710
711 def _create_key(self, name):
712@@ -113,7 +113,6 @@
713 self.cloud.describe_addresses(self.context)
714 self.cloud.release_address(self.context,
715 public_ip=address)
716- greenthread.sleep(0.3)
717 db.floating_ip_destroy(self.context, address)
718
719 def test_associate_disassociate_address(self):
720@@ -129,12 +128,10 @@
721 self.cloud.associate_address(self.context,
722 instance_id=ec2_id,
723 public_ip=address)
724- greenthread.sleep(0.3)
725 self.cloud.disassociate_address(self.context,
726 public_ip=address)
727 self.cloud.release_address(self.context,
728 public_ip=address)
729- greenthread.sleep(0.3)
730 self.network.deallocate_fixed_ip(self.context, fixed)
731 db.instance_destroy(self.context, inst['id'])
732 db.floating_ip_destroy(self.context, address)
733@@ -306,31 +303,25 @@
734 'instance_type': instance_type,
735 'max_count': max_count}
736 rv = self.cloud.run_instances(self.context, **kwargs)
737- greenthread.sleep(0.3)
738 instance_id = rv['instancesSet'][0]['instanceId']
739 output = self.cloud.get_console_output(context=self.context,
740 instance_id=[instance_id])
741 self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
742 # TODO(soren): We need this until we can stop polling in the rpc code
743 # for unit tests.
744- greenthread.sleep(0.3)
745 rv = self.cloud.terminate_instances(self.context, [instance_id])
746- greenthread.sleep(0.3)
747
748 def test_ajax_console(self):
749 kwargs = {'image_id': 'ami-1'}
750 rv = self.cloud.run_instances(self.context, **kwargs)
751 instance_id = rv['instancesSet'][0]['instanceId']
752- greenthread.sleep(0.3)
753 output = self.cloud.get_ajax_console(context=self.context,
754 instance_id=[instance_id])
755 self.assertEquals(output['url'],
756 '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
757 # TODO(soren): We need this until we can stop polling in the rpc code
758 # for unit tests.
759- greenthread.sleep(0.3)
760 rv = self.cloud.terminate_instances(self.context, [instance_id])
761- greenthread.sleep(0.3)
762
763 def test_key_generation(self):
764 result = self._create_key('test')
765
766=== modified file 'nova/tests/test_rpc.py'
767--- nova/tests/test_rpc.py 2011-02-23 22:41:11 +0000
768+++ nova/tests/test_rpc.py 2011-05-20 01:34:25 +0000
769@@ -49,6 +49,59 @@
770 "args": {"value": value}})
771 self.assertEqual(value, result)
772
773+ def test_call_succeed_despite_multiple_returns(self):
774+ """Get a value through rpc call"""
775+ value = 42
776+ result = rpc.call(self.context, 'test', {"method": "echo_three_times",
777+ "args": {"value": value}})
778+ self.assertEqual(value, result)
779+
780+ def test_call_succeed_despite_multiple_returns_yield(self):
781+ """Get a value through rpc call"""
782+ value = 42
783+ result = rpc.call(self.context, 'test',
784+ {"method": "echo_three_times_yield",
785+ "args": {"value": value}})
786+ self.assertEqual(value, result)
787+
788+ def test_multicall_succeed_once(self):
789+ """Get a value through rpc call"""
790+ value = 42
791+ result = rpc.multicall(self.context,
792+ 'test',
793+ {"method": "echo",
794+ "args": {"value": value}})
795+ i = 0
796+ for x in result:
797+ if i > 0:
798+ self.fail('should only receive one response')
799+ self.assertEqual(value + i, x)
800+ i += 1
801+
802+ def test_multicall_succeed_three_times(self):
803+ """Get a value through rpc call"""
804+ value = 42
805+ result = rpc.multicall(self.context,
806+ 'test',
807+ {"method": "echo_three_times",
808+ "args": {"value": value}})
809+ i = 0
810+ for x in result:
811+ self.assertEqual(value + i, x)
812+ i += 1
813+
814+ def test_multicall_succeed_three_times_yield(self):
815+ """Get a value through rpc call"""
816+ value = 42
817+ result = rpc.multicall(self.context,
818+ 'test',
819+ {"method": "echo_three_times_yield",
820+ "args": {"value": value}})
821+ i = 0
822+ for x in result:
823+ self.assertEqual(value + i, x)
824+ i += 1
825+
826 def test_context_passed(self):
827 """Makes sure a context is passed through rpc call"""
828 value = 42
829@@ -127,6 +180,18 @@
830 return context.to_dict()
831
832 @staticmethod
833+ def echo_three_times(context, value):
834+ context.reply(value)
835+ context.reply(value + 1)
836+ context.reply(value + 2)
837+
838+ @staticmethod
839+ def echo_three_times_yield(context, value):
840+ yield value
841+ yield value + 1
842+ yield value + 2
843+
844+ @staticmethod
845 def fail(context, value):
846 """Raises an exception with the value sent in"""
847 raise Exception(value)
848
849=== modified file 'nova/tests/test_service.py'
850--- nova/tests/test_service.py 2011-03-17 13:35:00 +0000
851+++ nova/tests/test_service.py 2011-05-20 01:34:25 +0000
852@@ -106,7 +106,10 @@
853
854 # NOTE(vish): Create was moved out of mox replay to make sure that
855 # the looping calls are created in StartService.
856- app = service.Service.create(host=host, binary=binary)
857+ app = service.Service.create(host=host, binary=binary, topic=topic)
858+
859+ self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
860+ service.rpc.Connection.instance(new=mox.IgnoreArg())
861
862 self.mox.StubOutWithMock(rpc,
863 'TopicAdapterConsumer',
864@@ -114,6 +117,11 @@
865 self.mox.StubOutWithMock(rpc,
866 'FanoutAdapterConsumer',
867 use_mock_anything=True)
868+
869+ self.mox.StubOutWithMock(rpc,
870+ 'ConsumerSet',
871+ use_mock_anything=True)
872+
873 rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
874 topic=topic,
875 proxy=mox.IsA(service.Service)).AndReturn(
876@@ -129,9 +137,13 @@
877 proxy=mox.IsA(service.Service)).AndReturn(
878 rpc.FanoutAdapterConsumer)
879
880- rpc.TopicAdapterConsumer.attach_to_eventlet()
881- rpc.TopicAdapterConsumer.attach_to_eventlet()
882- rpc.FanoutAdapterConsumer.attach_to_eventlet()
883+ def wait_func(self, limit=None):
884+ return None
885+
886+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
887+ {'wait': wait_func})
888+ rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset)
889+ wait_func(mox.IgnoreArg())
890
891 service_create = {'host': host,
892 'binary': binary,
893@@ -287,8 +299,41 @@
894 # Creating mocks
895 self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
896 service.rpc.Connection.instance(new=mox.IgnoreArg())
897- service.rpc.Connection.instance(new=mox.IgnoreArg())
898- service.rpc.Connection.instance(new=mox.IgnoreArg())
899+
900+ self.mox.StubOutWithMock(rpc,
901+ 'TopicAdapterConsumer',
902+ use_mock_anything=True)
903+ self.mox.StubOutWithMock(rpc,
904+ 'FanoutAdapterConsumer',
905+ use_mock_anything=True)
906+
907+ self.mox.StubOutWithMock(rpc,
908+ 'ConsumerSet',
909+ use_mock_anything=True)
910+
911+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
912+ topic=topic,
913+ proxy=mox.IsA(service.Service)).AndReturn(
914+ rpc.TopicAdapterConsumer)
915+
916+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
917+ topic='%s.%s' % (topic, host),
918+ proxy=mox.IsA(service.Service)).AndReturn(
919+ rpc.TopicAdapterConsumer)
920+
921+ rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
922+ topic=topic,
923+ proxy=mox.IsA(service.Service)).AndReturn(
924+ rpc.FanoutAdapterConsumer)
925+
926+ def wait_func(self, limit=None):
927+ return None
928+
929+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
930+ {'wait': wait_func})
931+ rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset)
932+ wait_func(mox.IgnoreArg())
933+
934 self.mox.StubOutWithMock(serv.manager.driver,
935 'update_available_resource')
936 serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
937
938=== modified file 'nova/volume/api.py'
939--- nova/volume/api.py 2011-03-31 08:39:00 +0000
940+++ nova/volume/api.py 2011-05-20 01:34:25 +0000
941@@ -20,14 +20,14 @@
942 Handles all requests relating to volumes.
943 """
944
945-import datetime
946+import eventlet
947
948-from nova import db
949 from nova import exception
950 from nova import flags
951 from nova import log as logging
952 from nova import quota
953 from nova import rpc
954+from nova import utils
955 from nova.db import base
956
957 FLAGS = flags.FLAGS
958@@ -57,26 +57,60 @@
959 'display_name': name,
960 'display_description': description}
961
962- volume = self.db.volume_create(context, options)
963- rpc.cast(context,
964- FLAGS.scheduler_topic,
965- {"method": "create_volume",
966- "args": {"topic": FLAGS.volume_topic,
967- "volume_id": volume['id']}})
968- return volume
969+ volume_ref = self.db.volume_create(context, options)
970+ volume_ref = utils.to_primitive(dict(volume_ref))
971+
972+ def delayed_create(volume_ref):
973+ vid = volume_ref['id']
974+ try:
975+ rvs = rpc.multicall(context,
976+ FLAGS.scheduler_topic,
977+ {"method": "create_volume",
978+ "args": {"topic": FLAGS.volume_topic,
979+ "volume_ref": volume_ref}})
980+ for volume_ref in rvs:
981+ self.db.volume_update(context, vid, volume_ref)
982+ volume_ref['launched_at'] = utils.utcnow()
983+ self.db.volume_update(context, vid, volume_ref)
984+
985+ except rpc.RemoteError:
986+ self.db.volume_update(context, vid, {'status': 'error'})
987+
988+ eventlet.spawn_n(delayed_create, volume_ref)
989+ return volume_ref
990
991 def delete(self, context, volume_id):
992- volume = self.get(context, volume_id)
993- if volume['status'] != "available":
994+ volume_ref = self.get(context, volume_id)
995+ if volume_ref['status'] != "available":
996 raise exception.ApiError(_("Volume status must be available"))
997- now = datetime.datetime.utcnow()
998- self.db.volume_update(context, volume_id, {'status': 'deleting',
999- 'terminated_at': now})
1000- host = volume['host']
1001- rpc.cast(context,
1002- self.db.queue_get_for(context, FLAGS.volume_topic, host),
1003- {"method": "delete_volume",
1004- "args": {"volume_id": volume_id}})
1005+ if volume_ref['attach_status'] == "attached":
1006+ raise exception.Error(_("Volume is still attached"))
1007+
1008+ volume_ref['status'] = 'deleting'
1009+ volume_ref['terminated_at'] = utils.utcnow()
1010+ self.db.volume_update(context, volume_ref['id'], volume_ref)
1011+ volume_ref = utils.to_primitive(dict(volume_ref))
1012+
1013+ def delayed_delete(volume_ref):
1014+ vid = volume_ref['id']
1015+ try:
1016+ topic = self.db.queue_get_for(context,
1017+ FLAGS.volume_topic,
1018+ volume_ref['host'])
1019+ rvs = rpc.multicall(context,
1020+ topic,
1021+ {"method": "delete_volume",
1022+ "args": {"volume_ref": volume_ref}})
1023+ for volume_ref in rvs:
1024+ self.db.volume_update(context, vid, volume_ref)
1025+
1026+ self.db.volume_destroy(context, vid)
1027+
1028+ except rpc.RemoteError:
1029+ self.db.volume_update(context, vid, {'status': 'err_delete'})
1030+
1031+ eventlet.spawn_n(delayed_delete, volume_ref)
1032+ return True
1033
1034 def update(self, context, volume_id, fields):
1035 self.db.volume_update(context, volume_id, fields)
1036
1037=== modified file 'nova/volume/manager.py'
1038--- nova/volume/manager.py 2011-03-17 13:35:00 +0000
1039+++ nova/volume/manager.py 2011-05-20 01:34:25 +0000
1040@@ -90,67 +90,40 @@
1041 else:
1042 LOG.info(_("volume %s: skipping export"), volume['name'])
1043
1044- def create_volume(self, context, volume_id):
1045+ def create_volume(self, context, volume_ref):
1046 """Creates and exports the volume."""
1047- context = context.elevated()
1048- volume_ref = self.db.volume_get(context, volume_id)
1049 LOG.info(_("volume %s: creating"), volume_ref['name'])
1050
1051- self.db.volume_update(context,
1052- volume_id,
1053- {'host': self.host})
1054- # NOTE(vish): so we don't have to get volume from db again
1055- # before passing it to the driver.
1056 volume_ref['host'] = self.host
1057-
1058- try:
1059- vol_name = volume_ref['name']
1060- vol_size = volume_ref['size']
1061- LOG.debug(_("volume %(vol_name)s: creating lv of"
1062- " size %(vol_size)sG") % locals())
1063- model_update = self.driver.create_volume(volume_ref)
1064- if model_update:
1065- self.db.volume_update(context, volume_ref['id'], model_update)
1066-
1067- LOG.debug(_("volume %s: creating export"), volume_ref['name'])
1068- model_update = self.driver.create_export(context, volume_ref)
1069- if model_update:
1070- self.db.volume_update(context, volume_ref['id'], model_update)
1071- except Exception:
1072- self.db.volume_update(context,
1073- volume_ref['id'], {'status': 'error'})
1074- raise
1075-
1076- now = datetime.datetime.utcnow()
1077- self.db.volume_update(context,
1078- volume_ref['id'], {'status': 'available',
1079- 'launched_at': now})
1080+ yield volume_ref
1081+
1082+ vol_name = volume_ref['name']
1083+ vol_size = volume_ref['size']
1084+ LOG.debug(_("volume %(vol_name)s: creating lv of"
1085+ " size %(vol_size)sG") % locals())
1086+ model_update = self.driver.create_volume(volume_ref)
1087+ if model_update:
1088+ volume_ref.update(model_update)
1089+ yield volume_ref
1090+
1091+ LOG.debug(_("volume %s: creating export"), volume_ref['name'])
1092+ model_update = self.driver.create_export(context, volume_ref)
1093+ if model_update:
1094+ volume_ref.update(model_update)
1095+ yield volume_ref
1096+
1097 LOG.debug(_("volume %s: created successfully"), volume_ref['name'])
1098- return volume_id
1099+ volume_ref['status'] = 'available'
1100+ yield volume_ref
1101
1102- def delete_volume(self, context, volume_id):
1103+ def delete_volume(self, context, volume_ref):
1104 """Deletes and unexports volume."""
1105- context = context.elevated()
1106- volume_ref = self.db.volume_get(context, volume_id)
1107- if volume_ref['attach_status'] == "attached":
1108- raise exception.Error(_("Volume is still attached"))
1109- if volume_ref['host'] != self.host:
1110- raise exception.Error(_("Volume is not local to this node"))
1111-
1112- try:
1113- LOG.debug(_("volume %s: removing export"), volume_ref['name'])
1114- self.driver.remove_export(context, volume_ref)
1115- LOG.debug(_("volume %s: deleting"), volume_ref['name'])
1116- self.driver.delete_volume(volume_ref)
1117- except Exception:
1118- self.db.volume_update(context,
1119- volume_ref['id'],
1120- {'status': 'error_deleting'})
1121- raise
1122-
1123- self.db.volume_destroy(context, volume_id)
1124+ LOG.debug(_("volume %s: removing export"), volume_ref['name'])
1125+ self.driver.remove_export(context, volume_ref)
1126+ LOG.debug(_("volume %s: deleting"), volume_ref['name'])
1127+ self.driver.delete_volume(volume_ref)
1128 LOG.debug(_("volume %s: deleted successfully"), volume_ref['name'])
1129- return True
1130+ yield volume_ref
1131
1132 def setup_compute_volume(self, context, volume_id):
1133 """Setup remote volume on compute host.