Merge lp:~vishvananda/nova/no-db-messaging into lp:~hudson-openstack/nova/trunk

Proposed by Vish Ishaya
Status: Rejected
Rejected by: Vish Ishaya
Proposed branch: lp:~vishvananda/nova/no-db-messaging
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 1133 lines (+486/-214)
12 files modified
nova/db/sqlalchemy/models.py (+12/-4)
nova/fakerabbit.py (+23/-8)
nova/rpc.py (+195/-70)
nova/scheduler/manager.py (+7/-5)
nova/service.py (+38/-24)
nova/test.py (+6/-3)
nova/tests/integrated/integrated_helpers.py (+1/-4)
nova/tests/test_cloud.py (+9/-18)
nova/tests/test_rpc.py (+65/-0)
nova/tests/test_service.py (+51/-6)
nova/volume/api.py (+53/-19)
nova/volume/manager.py (+26/-53)
To merge this branch: bzr merge lp:~vishvananda/nova/no-db-messaging
Reviewer Review Type Date Requested Status
Dan Prince (community) Needs Fixing
Review via email: mp+61189@code.launchpad.net

Description of the change

This is an initial proposal for feedback. This branch is an attempt to start on this blueprint:
https://blueprints.launchpad.net/nova/+spec/no-db-messaging
which will allow for the implementation of this blueprint:
https://blueprints.launchpad.net/nova/+spec/separate-code-for-services
And will ultimately make it easy to replace our various services with external projects.

This prototype changes volume_create to pass data through the queue instead of writing information to the database and reading it on the other end. It attempts to make minimal changes. It includes:
 * a small change to model code to allow for conversion into dicts
 * scheduler uses call instead of cast
 * volume.manager provides a volume_get to query the state of a volume
 * volume.api creates volume with a call and then spawn greenthread to poll status
 * volume.manager returns initial data immediately and then spawn a greenthread to do further work

This will allow us to communicate with an external REST api very easily, by simply turning the two rpc calls into POST and GET. It also allows us to separate out the database for volumes, so the volume worker doesn't need to write to the shared zone database

Outstanding Issues for database split:
 * we have to use something like remote_id to refer to the volume when making requests to nova-volume because the id in our local database will not match. Ultimately we've discussed switching to UUIDs, so It may be good to do that change at the same time.
 * if the state of a volume changes after the initial poll has finished, we will never know about it. We ultimately need a way for api to be notified if the volume status changes so that we can update our local data, or we need a background worker that is polling the service for changes at an irregular interval.

I'm open to feedback about this approach. I tried a few other versions and this seems like the simplest change set to get what we want. If this looks good, I will modify the other volume commands to work the same way and propose a similar set of changes for compute.

To post a comment you must log in.
Revision history for this message
Vish Ishaya (vishvananda) wrote :

accidentally pushed an old version. This is the right one.

lp:~vishvananda/nova/no-db-messaging updated
1078. By Eldar Nugaev

Improved error notification in network create

1079. By Eldar Nugaev

Added network_info into refresh_security_group_rules
That fixs https://bugs.launchpad.net/nova/+bug/773308

1080. By Mark Washenberger

Convert instance_type_ids in the instances table from strings to integers to enable joins with instance_types. This in particular fixes a problem when using postgresql.

Revision history for this message
Dan Prince (dan-prince) wrote :

Hey Vish,

I just smoke stacked this and got some errors. It looks like the changes to models.py aren't quite right.

I'm getting the following in nova-api.log:

2011-05-17 17:17:18,574 ERROR nova.api.openstack [-] Caught error: 'AuthToken' object has no attribute 'server_management_url'
(nova.api.openstack): TRACE: Traceback (most recent call last):
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/nova/api/openstack/__init__.py", line 59, in __call__
(nova.api.openstack): TRACE: return req.get_response(self.application)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/webob/request.py", line 919, in get_response
(nova.api.openstack): TRACE: application, catch_exc_info=False)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/webob/request.py", line 887, in call_application
(nova.api.openstack): TRACE: app_iter = application(self.environ, start_response)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/webob/dec.py", line 147, in __call__
(nova.api.openstack): TRACE: resp = self.call_func(req, *args, **self.kwargs)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/webob/dec.py", line 208, in call_func
(nova.api.openstack): TRACE: return self.func(req, *args, **kwargs)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/nova/api/openstack/auth.py", line 54, in __call__
(nova.api.openstack): TRACE: return self.authenticate(req)
(nova.api.openstack): TRACE: File "/usr/lib/pymodules/python2.6/nova/api/openstack/auth.py", line 107, in authenticate
(nova.api.openstack): TRACE: token.server_management_url
(nova.api.openstack): TRACE: AttributeError: 'AuthToken' object has no attribute 'server_management_url'
--

Running a simple 'nova list' command (using the OSAPI) should allow you to reproduce this.

review: Needs Fixing
Revision history for this message
Vish Ishaya (vishvananda) wrote :

Thanks dan, I'll look at this. Also had some feedback from termie offline, and I'm moving where the polling is happening.

lp:~vishvananda/nova/no-db-messaging updated
1081. By Josh Kearney

Added missing metadata join to instance_get calls.

1082. By Matt Dietz

Fixes improper attribute naming around instance types that broke Resizes.

1083. By termie

Docstring cleanup and formatting (nova/network dir). Minor style fixes as well.

1084. By Vish Ishaya

Fixes the naming of the server_management_url in auth and tests.

1085. By Josh Kearney

Added missing xenhost plugin. This was causing warnings to pop up in the compute logs during periodic_task runs. It must have not been bzr add'd when this code was merged.

1086. By Matt Dietz

Implements a basic mechanism for pushing notifications out to interested parties. The rationale for implementing notifications this way is that the responsibility for them shouldn't fall to Nova. As such, we simply will be pushing messages to a queue where another worker entirely can be written to push messages around to subscribers.

1087. By Johannes Erdfelt

Simple change to sort the list of controllers/methods before printing to make it easier to read

1088. By termie

add support to rpc for multicall

1089. By termie

make the test more expicit

1090. By termie

add commented out unworking code for yield-based returns

1091. By Chris Behrens

Add a connection pool for rpc cast/call
Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each.

1092. By Chris Behrens

pep8 and comment fixes

1093. By Chris Behrens

convert fanout_cast to ConnectionPool

1094. By Chris Behrens

fakerabbit's declare_consumer should support more than 1 consumer. also: make fakerabbit Backend.consume be an iterator like it should be..

1095. By Vish Ishaya

fix consumers to actually be deleted and clean up cloud test

1096. By Chris Behrens

catch greenlet.GreenletExit when shutting service down

1097. By Chris Behrens

Always create Service consumers no matter if report_interval is 0
Fix tests to handle how Service loads Consumers now

1098. By Chris Behrens

Add rpc_conn_pool_size flag for the new connection pool

1099. By termie

bring back commits lost in merge

1100. By termie

almost everything working with fake_rabbit

1101. By termie

don't need to use a separate connection

1102. By Vish Ishaya

lots of fixes for rpc and extra imports

1103. By termie

make sure that using multicall on a call with a single result still functions

1104. By termie

cleanup the code for merging

1105. By Vish Ishaya

update manager to use multicall

1106. By Vish Ishaya

fix conversion of models to dicts

1107. By Vish Ishaya

change volume_api to have delayed_create

1108. By Vish Ishaya

make scheduler use multicall

1109. By Vish Ishaya

remove the unnecessary try except in manager

1110. By Vish Ishaya

pep8

Revision history for this message
Vish Ishaya (vishvananda) wrote :

reproposing this with a prereq branch.

Unmerged revisions

1123. By Vish Ishaya

merged trunk

1122. By Vish Ishaya

remove merge error calling failing test

1121. By Vish Ishaya

fix snapshot test

1120. By Vish Ishaya

return not yield in scheduler shortcut

1119. By Vish Ishaya

make sure to handle VolumeIsBusy

1118. By Vish Ishaya

merged trunk and removed conflicts

1117. By Vish Ishaya

lost some changes from rpc branch, bring them in manually

1116. By Vish Ishaya

use strtime for passing datetimes back and forth through the queue

1115. By Vish Ishaya

fix tests

1114. By Vish Ishaya

keep the database on the receiving end as well

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'nova/db/sqlalchemy/models.py'
--- nova/db/sqlalchemy/models.py 2011-05-17 20:50:12 +0000
+++ nova/db/sqlalchemy/models.py 2011-05-20 01:34:25 +0000
@@ -77,17 +77,25 @@
77 return getattr(self, key, default)77 return getattr(self, key, default)
7878
79 def __iter__(self):79 def __iter__(self):
80 self._i = iter(object_mapper(self).columns)80 # NOTE(vish): include name property in the iterator
81 columns = dict(object_mapper(self).columns).keys()
82 name = self.get('name')
83 if name:
84 columns.append('name')
85 self._i = iter(columns)
81 return self86 return self
8287
83 def next(self):88 def next(self):
84 n = self._i.next().name89 n = self._i.next()
85 return n, getattr(self, n)90 return n, getattr(self, n)
8691
87 def update(self, values):92 def update(self, values):
88 """Make the model object behave like a dict"""93 """Make the model object behave like a dict"""
89 for k, v in values.iteritems():94 columns = dict(object_mapper(self).columns).keys()
90 setattr(self, k, v)95 for key, value in values.iteritems():
96 # NOTE(vish): don't update the 'name' property
97 if key in columns:
98 setattr(self, key, value)
9199
92 def iteritems(self):100 def iteritems(self):
93 """Make the model object behave like a dict.101 """Make the model object behave like a dict.
94102
=== modified file 'nova/fakerabbit.py'
--- nova/fakerabbit.py 2011-02-22 23:05:48 +0000
+++ nova/fakerabbit.py 2011-05-20 01:34:25 +0000
@@ -31,6 +31,7 @@
3131
32EXCHANGES = {}32EXCHANGES = {}
33QUEUES = {}33QUEUES = {}
34CONSUMERS = {}
3435
3536
36class Message(base.BaseMessage):37class Message(base.BaseMessage):
@@ -96,17 +97,29 @@
96 ' key %(routing_key)s') % locals())97 ' key %(routing_key)s') % locals())
97 EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)98 EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
9899
99 def declare_consumer(self, queue, callback, *args, **kwargs):100 def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
100 self.current_queue = queue101 global CONSUMERS
101 self.current_callback = callback102 LOG.debug("Adding consumer %s", consumer_tag)
103 CONSUMERS[consumer_tag] = (queue, callback)
104
105 def cancel(self, consumer_tag):
106 global CONSUMERS
107 LOG.debug("Removing consumer %s", consumer_tag)
108 del CONSUMERS[consumer_tag]
102109
103 def consume(self, limit=None):110 def consume(self, limit=None):
111 global CONSUMERS
112 num = 0
104 while True:113 while True:
105 item = self.get(self.current_queue)114 for (queue, callback) in CONSUMERS.itervalues():
106 if item:115 item = self.get(queue)
107 self.current_callback(item)116 if item:
108 raise StopIteration()117 callback(item)
109 greenthread.sleep(0)118 num += 1
119 yield
120 if limit and num == limit:
121 raise StopIteration()
122 greenthread.sleep(0.1)
110123
111 def get(self, queue, no_ack=False):124 def get(self, queue, no_ack=False):
112 global QUEUES125 global QUEUES
@@ -134,5 +147,7 @@
134def reset_all():147def reset_all():
135 global EXCHANGES148 global EXCHANGES
136 global QUEUES149 global QUEUES
150 global CONSUMERS
137 EXCHANGES = {}151 EXCHANGES = {}
138 QUEUES = {}152 QUEUES = {}
153 CONSUMERS = {}
139154
=== modified file 'nova/rpc.py'
--- nova/rpc.py 2011-04-20 19:08:22 +0000
+++ nova/rpc.py 2011-05-20 01:34:25 +0000
@@ -33,7 +33,9 @@
33from carrot import connection as carrot_connection33from carrot import connection as carrot_connection
34from carrot import messaging34from carrot import messaging
35from eventlet import greenpool35from eventlet import greenpool
36from eventlet import greenthread36from eventlet import pools
37from eventlet import queue
38import greenlet
3739
38from nova import context40from nova import context
39from nova import exception41from nova import exception
@@ -47,7 +49,10 @@
4749
4850
49FLAGS = flags.FLAGS51FLAGS = flags.FLAGS
50flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')52flags.DEFINE_integer('rpc_thread_pool_size', 1024,
53 'Size of RPC thread pool')
54flags.DEFINE_integer('rpc_conn_pool_size', 30,
55 'Size of RPC connection pool')
5156
5257
53class Connection(carrot_connection.BrokerConnection):58class Connection(carrot_connection.BrokerConnection):
@@ -90,6 +95,17 @@
90 return cls.instance()95 return cls.instance()
9196
9297
98class Pool(pools.Pool):
99 """Class that implements a Pool of Connections."""
100
101 def create(self):
102 LOG.debug('Creating new connection')
103 return Connection.instance(new=True)
104
105
106ConnectionPool = Pool(max_size=FLAGS.rpc_conn_pool_size)
107
108
93class Consumer(messaging.Consumer):109class Consumer(messaging.Consumer):
94 """Consumer base class.110 """Consumer base class.
95111
@@ -131,7 +147,9 @@
131 self.connection = Connection.recreate()147 self.connection = Connection.recreate()
132 self.backend = self.connection.create_backend()148 self.backend = self.connection.create_backend()
133 self.declare()149 self.declare()
134 super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)150 return super(Consumer, self).fetch(no_ack,
151 auto_ack,
152 enable_callbacks)
135 if self.failed_connection:153 if self.failed_connection:
136 LOG.error(_('Reconnected to queue'))154 LOG.error(_('Reconnected to queue'))
137 self.failed_connection = False155 self.failed_connection = False
@@ -159,13 +177,13 @@
159 self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)177 self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
160 super(AdapterConsumer, self).__init__(connection=connection,178 super(AdapterConsumer, self).__init__(connection=connection,
161 topic=topic)179 topic=topic)
162180 self.register_callback(self.process_data)
163 def receive(self, *args, **kwargs):181
164 self.pool.spawn_n(self._receive, *args, **kwargs)182 def process_data(self, message_data, message):
165183 """Consumer callback to call a method on a proxy object.
166 @exception.wrap_exception184
167 def _receive(self, message_data, message):185 Parses the message for validity and fires off a thread to call the
168 """Magically looks for a method on the proxy object and calls it.186 proxy object method.
169187
170 Message data should be a dictionary with two keys:188 Message data should be a dictionary with two keys:
171 method: string representing the method to call189 method: string representing the method to call
@@ -175,8 +193,8 @@
175193
176 """194 """
177 LOG.debug(_('received %s') % message_data)195 LOG.debug(_('received %s') % message_data)
178 msg_id = message_data.pop('_msg_id', None)196 # This will be popped off in _unpack_context
179197 msg_id = message_data.get('_msg_id', None)
180 ctxt = _unpack_context(message_data)198 ctxt = _unpack_context(message_data)
181199
182 method = message_data.get('method')200 method = message_data.get('method')
@@ -190,6 +208,13 @@
190 LOG.warn(_('no method for message: %s') % message_data)208 LOG.warn(_('no method for message: %s') % message_data)
191 msg_reply(msg_id, _('No method for message: %s') % message_data)209 msg_reply(msg_id, _('No method for message: %s') % message_data)
192 return210 return
211 self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
212
213 @exception.wrap_exception
214 def _process_data(self, msg_id, ctxt, method, args):
215 """Thread that maigcally looks for a method on the proxy
216 object and calls it.
217 """
193218
194 node_func = getattr(self.proxy, str(method))219 node_func = getattr(self.proxy, str(method))
195 node_args = dict((str(k), v) for k, v in args.iteritems())220 node_args = dict((str(k), v) for k, v in args.iteritems())
@@ -197,7 +222,15 @@
197 try:222 try:
198 rval = node_func(context=ctxt, **node_args)223 rval = node_func(context=ctxt, **node_args)
199 if msg_id:224 if msg_id:
200 msg_reply(msg_id, rval, None)225 # Check if the result was a generator
226 if hasattr(rval, 'send'):
227 for x in rval:
228 msg_reply(msg_id, x, None)
229 else:
230 msg_reply(msg_id, rval, None)
231
232 # This final None tells multicall that it is done.
233 msg_reply(msg_id, None, None)
201 except Exception as e:234 except Exception as e:
202 logging.exception('Exception during message handling')235 logging.exception('Exception during message handling')
203 if msg_id:236 if msg_id:
@@ -205,11 +238,6 @@
205 return238 return
206239
207240
208class Publisher(messaging.Publisher):
209 """Publisher base class."""
210 pass
211
212
213class TopicAdapterConsumer(AdapterConsumer):241class TopicAdapterConsumer(AdapterConsumer):
214 """Consumes messages on a specific topic."""242 """Consumes messages on a specific topic."""
215243
@@ -242,6 +270,59 @@
242 topic=topic, proxy=proxy)270 topic=topic, proxy=proxy)
243271
244272
273class ConsumerSet(object):
274 """Groups consumers to listen on together on a single connection."""
275
276 def __init__(self, conn, consumer_list):
277 self.consumer_list = set(consumer_list)
278 self.consumer_set = None
279 self.enabled = True
280 self.init(conn)
281
282 def init(self, conn):
283 if not conn:
284 conn = Connection.instance(new=True)
285 if self.consumer_set:
286 self.consumer_set.close()
287 self.consumer_set = messaging.ConsumerSet(conn)
288 for consumer in self.consumer_list:
289 consumer.connection = conn
290 # consumer.backend is set for us
291 self.consumer_set.add_consumer(consumer)
292
293 def reconnect(self):
294 self.init(None)
295
296 def wait(self, limit=None):
297 running = True
298 while running:
299 it = self.consumer_set.iterconsume(limit=limit)
300 if not it:
301 break
302 while True:
303 try:
304 it.next()
305 except StopIteration:
306 return
307 except greenlet.GreenletExit:
308 running = False
309 break
310 except Exception as e:
311 LOG.error(_("Received exception %s " % type(e) + \
312 "while processing consumer"))
313 self.reconnect()
314 # Break to outer loop
315 break
316
317 def close(self):
318 self.consumer_set.close()
319
320
321class Publisher(messaging.Publisher):
322 """Publisher base class."""
323 pass
324
325
245class TopicPublisher(Publisher):326class TopicPublisher(Publisher):
246 """Publishes messages on a specific topic."""327 """Publishes messages on a specific topic."""
247328
@@ -306,16 +387,18 @@
306 LOG.error(_("Returning exception %s to caller"), message)387 LOG.error(_("Returning exception %s to caller"), message)
307 LOG.error(tb)388 LOG.error(tb)
308 failure = (failure[0].__name__, str(failure[1]), tb)389 failure = (failure[0].__name__, str(failure[1]), tb)
309 conn = Connection.instance()390
310 publisher = DirectPublisher(connection=conn, msg_id=msg_id)391 with ConnectionPool.item() as conn:
311 try:392 publisher = DirectPublisher(connection=conn, msg_id=msg_id)
312 publisher.send({'result': reply, 'failure': failure})393 try:
313 except TypeError:394 publisher.send({'result': reply, 'failure': failure})
314 publisher.send(395 except TypeError:
315 {'result': dict((k, repr(v))396 publisher.send(
316 for k, v in reply.__dict__.iteritems()),397 {'result': dict((k, repr(v))
317 'failure': failure})398 for k, v in reply.__dict__.iteritems()),
318 publisher.close()399 'failure': failure})
400
401 publisher.close()
319402
320403
321class RemoteError(exception.Error):404class RemoteError(exception.Error):
@@ -347,8 +430,9 @@
347 if key.startswith('_context_'):430 if key.startswith('_context_'):
348 value = msg.pop(key)431 value = msg.pop(key)
349 context_dict[key[9:]] = value432 context_dict[key[9:]] = value
433 context_dict['msg_id'] = msg.pop('_msg_id', None)
350 LOG.debug(_('unpacked context: %s'), context_dict)434 LOG.debug(_('unpacked context: %s'), context_dict)
351 return context.RequestContext.from_dict(context_dict)435 return RpcContext.from_dict(context_dict)
352436
353437
354def _pack_context(msg, context):438def _pack_context(msg, context):
@@ -360,70 +444,110 @@
360 for args at some point.444 for args at some point.
361445
362 """446 """
363 context = dict([('_context_%s' % key, value)447 context_d = dict([('_context_%s' % key, value)
364 for (key, value) in context.to_dict().iteritems()])448 for (key, value) in context.to_dict().iteritems()])
365 msg.update(context)449 msg.update(context_d)
366450
367451
368def call(context, topic, msg):452class RpcContext(context.RequestContext):
369 """Sends a message on a topic and wait for a response."""453 def __init__(self, *args, **kwargs):
454 msg_id = kwargs.pop('msg_id', None)
455 self.msg_id = msg_id
456 super(RpcContext, self).__init__(*args, **kwargs)
457
458 def reply(self, *args, **kwargs):
459 msg_reply(self.msg_id, *args, **kwargs)
460
461
462def multicall(context, topic, msg):
463 """Make a call that returns multiple times."""
370 LOG.debug(_('Making asynchronous call on %s ...'), topic)464 LOG.debug(_('Making asynchronous call on %s ...'), topic)
371 msg_id = uuid.uuid4().hex465 msg_id = uuid.uuid4().hex
372 msg.update({'_msg_id': msg_id})466 msg.update({'_msg_id': msg_id})
373 LOG.debug(_('MSG_ID is %s') % (msg_id))467 LOG.debug(_('MSG_ID is %s') % (msg_id))
374 _pack_context(msg, context)468 _pack_context(msg, context)
375469
376 class WaitMessage(object):470 con_conn = ConnectionPool.get()
377 def __call__(self, data, message):471 consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
378 """Acks message and sets result."""472 wait_msg = MulticallWaiter(consumer)
379 message.ack()
380 if data['failure']:
381 self.result = RemoteError(*data['failure'])
382 else:
383 self.result = data['result']
384
385 wait_msg = WaitMessage()
386 conn = Connection.instance()
387 consumer = DirectConsumer(connection=conn, msg_id=msg_id)
388 consumer.register_callback(wait_msg)473 consumer.register_callback(wait_msg)
389474
390 conn = Connection.instance()475 publisher = TopicPublisher(connection=con_conn, topic=topic)
391 publisher = TopicPublisher(connection=conn, topic=topic)
392 publisher.send(msg)476 publisher.send(msg)
393 publisher.close()477 publisher.close()
394478
395 try:479 return wait_msg
396 consumer.wait(limit=1)480
397 except StopIteration:481
398 pass482class MulticallWaiter(object):
399 consumer.close()483 def __init__(self, consumer):
400 # NOTE(termie): this is a little bit of a change from the original484 self._consumer = consumer
401 # non-eventlet code where returning a Failure485 self._results = queue.Queue()
402 # instance from a deferred call is very similar to486 self._closed = False
403 # raising an exception487
404 if isinstance(wait_msg.result, Exception):488 def close(self):
405 raise wait_msg.result489 self._closed = True
406 return wait_msg.result490 self._consumer.close()
491 ConnectionPool.put(self._consumer.connection)
492
493 def __call__(self, data, message):
494 """Acks message and sets result."""
495 message.ack()
496 if data['failure']:
497 self._results.put(RemoteError(*data['failure']))
498 else:
499 self._results.put(data['result'])
500
501 def __iter__(self):
502 return self.wait()
503
504 def wait(self):
505 while True:
506 rv = None
507 while rv is None and not self._closed:
508 try:
509 rv = self._consumer.fetch(enable_callbacks=True)
510 except Exception:
511 self.close()
512 raise
513 time.sleep(0.01)
514
515 result = self._results.get()
516 if isinstance(result, Exception):
517 self.close()
518 raise result
519 if result == None:
520 self.close()
521 raise StopIteration
522 yield result
523
524
525def call(context, topic, msg):
526 """Sends a message on a topic and wait for a response."""
527 rv = multicall(context, topic, msg)
528 for x in rv:
529 rv.close()
530 return x
407531
408532
409def cast(context, topic, msg):533def cast(context, topic, msg):
410 """Sends a message on a topic without waiting for a response."""534 """Sends a message on a topic without waiting for a response."""
411 LOG.debug(_('Making asynchronous cast on %s...'), topic)535 LOG.debug(_('Making asynchronous cast on %s...'), topic)
412 _pack_context(msg, context)536 _pack_context(msg, context)
413 conn = Connection.instance()537 with ConnectionPool.item() as conn:
414 publisher = TopicPublisher(connection=conn, topic=topic)538 publisher = TopicPublisher(connection=conn, topic=topic)
415 publisher.send(msg)539 publisher.send(msg)
416 publisher.close()540 publisher.close()
417541
418542
419def fanout_cast(context, topic, msg):543def fanout_cast(context, topic, msg):
420 """Sends a message on a fanout exchange without waiting for a response."""544 """Sends a message on a fanout exchange without waiting for a response."""
421 LOG.debug(_('Making asynchronous fanout cast...'))545 LOG.debug(_('Making asynchronous fanout cast...'))
422 _pack_context(msg, context)546 _pack_context(msg, context)
423 conn = Connection.instance()547 with ConnectionPool.item() as conn:
424 publisher = FanoutPublisher(topic, connection=conn)548 publisher = FanoutPublisher(topic, connection=conn)
425 publisher.send(msg)549 publisher.send(msg)
426 publisher.close()550 publisher.close()
427551
428552
429def generic_response(message_data, message):553def generic_response(message_data, message):
@@ -459,6 +583,7 @@
459583
460 if wait:584 if wait:
461 consumer.wait()585 consumer.wait()
586 consumer.close()
462587
463588
464if __name__ == '__main__':589if __name__ == '__main__':
465590
=== modified file 'nova/scheduler/manager.py'
--- nova/scheduler/manager.py 2011-05-05 14:35:44 +0000
+++ nova/scheduler/manager.py 2011-05-20 01:34:25 +0000
@@ -83,11 +83,13 @@
83 except AttributeError:83 except AttributeError:
84 host = self.driver.schedule(elevated, topic, *args, **kwargs)84 host = self.driver.schedule(elevated, topic, *args, **kwargs)
8585
86 rpc.cast(context,86 LOG.debug(_("Multicall %(topic)s %(host)s for %(method)s") % locals())
87 db.queue_get_for(context, topic, host),87 rvs = rpc.multicall(context,
88 {"method": method,88 db.queue_get_for(context, topic, host),
89 "args": kwargs})89 {"method": method,
90 LOG.debug(_("Casting to %(topic)s %(host)s for %(method)s") % locals())90 "args": kwargs})
91 for rv in rvs:
92 yield rv
9193
92 # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin.94 # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin.
93 # Based on bexar design summit discussion,95 # Based on bexar design summit discussion,
9496
=== modified file 'nova/service.py'
--- nova/service.py 2011-04-20 19:08:22 +0000
+++ nova/service.py 2011-05-20 01:34:25 +0000
@@ -19,14 +19,11 @@
1919
20"""Generic Node baseclass for all workers that run on hosts."""20"""Generic Node baseclass for all workers that run on hosts."""
2121
22import greenlet
22import inspect23import inspect
23import os24import os
24import sys
25import time
2625
27from eventlet import event
28from eventlet import greenthread26from eventlet import greenthread
29from eventlet import greenpool
3027
31from nova import context28from nova import context
32from nova import db29from nova import db
@@ -91,27 +88,38 @@
91 if 'nova-compute' == self.binary:88 if 'nova-compute' == self.binary:
92 self.manager.update_available_resource(ctxt)89 self.manager.update_available_resource(ctxt)
9390
94 conn1 = rpc.Connection.instance(new=True)91 self.conn = rpc.Connection.instance(new=True)
95 conn2 = rpc.Connection.instance(new=True)92 logging.debug("Creating Consumer connection for Service %s" %
96 conn3 = rpc.Connection.instance(new=True)93 self.topic)
94
95 # Share this same connection for these Consumers
96 consumer_all = rpc.TopicAdapterConsumer(
97 connection=self.conn,
98 topic=self.topic,
99 proxy=self)
100 consumer_node = rpc.TopicAdapterConsumer(
101 connection=self.conn,
102 topic='%s.%s' % (self.topic, self.host),
103 proxy=self)
104 fanout = rpc.FanoutAdapterConsumer(
105 connection=self.conn,
106 topic=self.topic,
107 proxy=self)
108
109 cset = rpc.ConsumerSet(self.conn, [consumer_all,
110 consumer_node,
111 fanout])
112
113 # Wait forever, processing these consumers
114 def _wait():
115 try:
116 cset.wait()
117 finally:
118 cset.close()
119
120 self.csetthread = greenthread.spawn(_wait)
121
97 if self.report_interval:122 if self.report_interval:
98 consumer_all = rpc.TopicAdapterConsumer(
99 connection=conn1,
100 topic=self.topic,
101 proxy=self)
102 consumer_node = rpc.TopicAdapterConsumer(
103 connection=conn2,
104 topic='%s.%s' % (self.topic, self.host),
105 proxy=self)
106 fanout = rpc.FanoutAdapterConsumer(
107 connection=conn3,
108 topic=self.topic,
109 proxy=self)
110
111 self.timers.append(consumer_all.attach_to_eventlet())
112 self.timers.append(consumer_node.attach_to_eventlet())
113 self.timers.append(fanout.attach_to_eventlet())
114
115 pulse = utils.LoopingCall(self.report_state)123 pulse = utils.LoopingCall(self.report_state)
116 pulse.start(interval=self.report_interval, now=False)124 pulse.start(interval=self.report_interval, now=False)
117 self.timers.append(pulse)125 self.timers.append(pulse)
@@ -167,7 +175,13 @@
167175
168 def kill(self):176 def kill(self):
169 """Destroy the service object in the datastore."""177 """Destroy the service object in the datastore."""
178 self.csetthread.kill()
179 try:
180 self.csetthread.wait()
181 except greenlet.GreenletExit:
182 pass
170 self.stop()183 self.stop()
184 rpc.ConnectionPool.put(self.conn)
171 try:185 try:
172 db.service_destroy(context.get_admin_context(), self.service_id)186 db.service_destroy(context.get_admin_context(), self.service_id)
173 except exception.NotFound:187 except exception.NotFound:
174188
=== modified file 'nova/test.py'
--- nova/test.py 2011-04-20 19:08:22 +0000
+++ nova/test.py 2011-05-20 01:34:25 +0000
@@ -31,17 +31,15 @@
31import unittest31import unittest
3232
33import mox33import mox
34import shutil
35import stubout34import stubout
36from eventlet import greenthread35from eventlet import greenthread
3736
38from nova import context
39from nova import db
40from nova import fakerabbit37from nova import fakerabbit
41from nova import flags38from nova import flags
42from nova import rpc39from nova import rpc
43from nova import service40from nova import service
44from nova import wsgi41from nova import wsgi
42from nova.virt import fake
4543
4644
47FLAGS = flags.FLAGS45FLAGS = flags.FLAGS
@@ -85,6 +83,7 @@
85 self._monkey_patch_attach()83 self._monkey_patch_attach()
86 self._monkey_patch_wsgi()84 self._monkey_patch_wsgi()
87 self._original_flags = FLAGS.FlagValuesDict()85 self._original_flags = FLAGS.FlagValuesDict()
86 rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size)
8887
89 def tearDown(self):88 def tearDown(self):
90 """Runs after each test method to tear down test environment."""89 """Runs after each test method to tear down test environment."""
@@ -99,6 +98,10 @@
99 if FLAGS.fake_rabbit:98 if FLAGS.fake_rabbit:
100 fakerabbit.reset_all()99 fakerabbit.reset_all()
101100
101 if FLAGS.connection_type == 'fake':
102 if hasattr(fake.FakeConnection, '_instance'):
103 del fake.FakeConnection._instance
104
102 # Reset any overriden flags105 # Reset any overriden flags
103 self.reset_flags()106 self.reset_flags()
104107
105108
=== modified file 'nova/tests/integrated/integrated_helpers.py'
--- nova/tests/integrated/integrated_helpers.py 2011-03-30 01:13:04 +0000
+++ nova/tests/integrated/integrated_helpers.py 2011-05-20 01:34:25 +0000
@@ -154,10 +154,7 @@
154 # set up services154 # set up services
155 self.start_service('compute')155 self.start_service('compute')
156 self.start_service('volume')156 self.start_service('volume')
157 # NOTE(justinsb): There's a bug here which is eluding me...157 self.start_service('network')
158 # If we start the network_service, all is good, but then subsequent
159 # tests fail: CloudTestCase.test_ajax_console in particular.
160 #self.start_service('network')
161 self.start_service('scheduler')158 self.start_service('scheduler')
162159
163 self.auth_url = self._start_api_service()160 self.auth_url = self._start_api_service()
164161
=== modified file 'nova/tests/test_cloud.py'
--- nova/tests/test_cloud.py 2011-05-16 20:30:40 +0000
+++ nova/tests/test_cloud.py 2011-05-20 01:34:25 +0000
@@ -17,13 +17,8 @@
17# under the License.17# under the License.
1818
19from base64 import b64decode19from base64 import b64decode
20import json
21from M2Crypto import BIO20from M2Crypto import BIO
22from M2Crypto import RSA21from M2Crypto import RSA
23import os
24import shutil
25import tempfile
26import time
2722
28from eventlet import greenthread23from eventlet import greenthread
2924
@@ -33,12 +28,10 @@
33from nova import flags28from nova import flags
34from nova import log as logging29from nova import log as logging
35from nova import rpc30from nova import rpc
36from nova import service
37from nova import test31from nova import test
38from nova import utils32from nova import utils
39from nova import exception33from nova import exception
40from nova.auth import manager34from nova.auth import manager
41from nova.compute import power_state
42from nova.api.ec2 import cloud35from nova.api.ec2 import cloud
43from nova.api.ec2 import ec2utils36from nova.api.ec2 import ec2utils
44from nova.image import local37from nova.image import local
@@ -79,14 +72,21 @@
79 self.stubs.Set(local.LocalImageService, 'show', fake_show)72 self.stubs.Set(local.LocalImageService, 'show', fake_show)
80 self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)73 self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
8174
75 # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
76 rpc_cast = rpc.cast
77
78 def finish_cast(*args, **kwargs):
79 rpc_cast(*args, **kwargs)
80 greenthread.sleep(0.2)
81
82 self.stubs.Set(rpc, 'cast', finish_cast)
83
82 def tearDown(self):84 def tearDown(self):
83 network_ref = db.project_get_network(self.context,85 network_ref = db.project_get_network(self.context,
84 self.project.id)86 self.project.id)
85 db.network_disassociate(self.context, network_ref['id'])87 db.network_disassociate(self.context, network_ref['id'])
86 self.manager.delete_project(self.project)88 self.manager.delete_project(self.project)
87 self.manager.delete_user(self.user)89 self.manager.delete_user(self.user)
88 self.compute.kill()
89 self.network.kill()
90 super(CloudTestCase, self).tearDown()90 super(CloudTestCase, self).tearDown()
9191
92 def _create_key(self, name):92 def _create_key(self, name):
@@ -113,7 +113,6 @@
113 self.cloud.describe_addresses(self.context)113 self.cloud.describe_addresses(self.context)
114 self.cloud.release_address(self.context,114 self.cloud.release_address(self.context,
115 public_ip=address)115 public_ip=address)
116 greenthread.sleep(0.3)
117 db.floating_ip_destroy(self.context, address)116 db.floating_ip_destroy(self.context, address)
118117
119 def test_associate_disassociate_address(self):118 def test_associate_disassociate_address(self):
@@ -129,12 +128,10 @@
129 self.cloud.associate_address(self.context,128 self.cloud.associate_address(self.context,
130 instance_id=ec2_id,129 instance_id=ec2_id,
131 public_ip=address)130 public_ip=address)
132 greenthread.sleep(0.3)
133 self.cloud.disassociate_address(self.context,131 self.cloud.disassociate_address(self.context,
134 public_ip=address)132 public_ip=address)
135 self.cloud.release_address(self.context,133 self.cloud.release_address(self.context,
136 public_ip=address)134 public_ip=address)
137 greenthread.sleep(0.3)
138 self.network.deallocate_fixed_ip(self.context, fixed)135 self.network.deallocate_fixed_ip(self.context, fixed)
139 db.instance_destroy(self.context, inst['id'])136 db.instance_destroy(self.context, inst['id'])
140 db.floating_ip_destroy(self.context, address)137 db.floating_ip_destroy(self.context, address)
@@ -306,31 +303,25 @@
306 'instance_type': instance_type,303 'instance_type': instance_type,
307 'max_count': max_count}304 'max_count': max_count}
308 rv = self.cloud.run_instances(self.context, **kwargs)305 rv = self.cloud.run_instances(self.context, **kwargs)
309 greenthread.sleep(0.3)
310 instance_id = rv['instancesSet'][0]['instanceId']306 instance_id = rv['instancesSet'][0]['instanceId']
311 output = self.cloud.get_console_output(context=self.context,307 output = self.cloud.get_console_output(context=self.context,
312 instance_id=[instance_id])308 instance_id=[instance_id])
313 self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')309 self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
314 # TODO(soren): We need this until we can stop polling in the rpc code310 # TODO(soren): We need this until we can stop polling in the rpc code
315 # for unit tests.311 # for unit tests.
316 greenthread.sleep(0.3)
317 rv = self.cloud.terminate_instances(self.context, [instance_id])312 rv = self.cloud.terminate_instances(self.context, [instance_id])
318 greenthread.sleep(0.3)
319313
320 def test_ajax_console(self):314 def test_ajax_console(self):
321 kwargs = {'image_id': 'ami-1'}315 kwargs = {'image_id': 'ami-1'}
322 rv = self.cloud.run_instances(self.context, **kwargs)316 rv = self.cloud.run_instances(self.context, **kwargs)
323 instance_id = rv['instancesSet'][0]['instanceId']317 instance_id = rv['instancesSet'][0]['instanceId']
324 greenthread.sleep(0.3)
325 output = self.cloud.get_ajax_console(context=self.context,318 output = self.cloud.get_ajax_console(context=self.context,
326 instance_id=[instance_id])319 instance_id=[instance_id])
327 self.assertEquals(output['url'],320 self.assertEquals(output['url'],
328 '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)321 '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
329 # TODO(soren): We need this until we can stop polling in the rpc code322 # TODO(soren): We need this until we can stop polling in the rpc code
330 # for unit tests.323 # for unit tests.
331 greenthread.sleep(0.3)
332 rv = self.cloud.terminate_instances(self.context, [instance_id])324 rv = self.cloud.terminate_instances(self.context, [instance_id])
333 greenthread.sleep(0.3)
334325
335 def test_key_generation(self):326 def test_key_generation(self):
336 result = self._create_key('test')327 result = self._create_key('test')
337328
=== modified file 'nova/tests/test_rpc.py'
--- nova/tests/test_rpc.py 2011-02-23 22:41:11 +0000
+++ nova/tests/test_rpc.py 2011-05-20 01:34:25 +0000
@@ -49,6 +49,59 @@
49 "args": {"value": value}})49 "args": {"value": value}})
50 self.assertEqual(value, result)50 self.assertEqual(value, result)
5151
52 def test_call_succeed_despite_multiple_returns(self):
53 """Get a value through rpc call"""
54 value = 42
55 result = rpc.call(self.context, 'test', {"method": "echo_three_times",
56 "args": {"value": value}})
57 self.assertEqual(value, result)
58
59 def test_call_succeed_despite_multiple_returns_yield(self):
60 """Get a value through rpc call"""
61 value = 42
62 result = rpc.call(self.context, 'test',
63 {"method": "echo_three_times_yield",
64 "args": {"value": value}})
65 self.assertEqual(value, result)
66
67 def test_multicall_succeed_once(self):
68 """Get a value through rpc call"""
69 value = 42
70 result = rpc.multicall(self.context,
71 'test',
72 {"method": "echo",
73 "args": {"value": value}})
74 i = 0
75 for x in result:
76 if i > 0:
77 self.fail('should only receive one response')
78 self.assertEqual(value + i, x)
79 i += 1
80
81 def test_multicall_succeed_three_times(self):
82 """Get a value through rpc call"""
83 value = 42
84 result = rpc.multicall(self.context,
85 'test',
86 {"method": "echo_three_times",
87 "args": {"value": value}})
88 i = 0
89 for x in result:
90 self.assertEqual(value + i, x)
91 i += 1
92
93 def test_multicall_succeed_three_times_yield(self):
94 """Get a value through rpc call"""
95 value = 42
96 result = rpc.multicall(self.context,
97 'test',
98 {"method": "echo_three_times_yield",
99 "args": {"value": value}})
100 i = 0
101 for x in result:
102 self.assertEqual(value + i, x)
103 i += 1
104
52 def test_context_passed(self):105 def test_context_passed(self):
53 """Makes sure a context is passed through rpc call"""106 """Makes sure a context is passed through rpc call"""
54 value = 42107 value = 42
@@ -127,6 +180,18 @@
127 return context.to_dict()180 return context.to_dict()
128181
129 @staticmethod182 @staticmethod
183 def echo_three_times(context, value):
184 context.reply(value)
185 context.reply(value + 1)
186 context.reply(value + 2)
187
188 @staticmethod
189 def echo_three_times_yield(context, value):
190 yield value
191 yield value + 1
192 yield value + 2
193
194 @staticmethod
130 def fail(context, value):195 def fail(context, value):
131 """Raises an exception with the value sent in"""196 """Raises an exception with the value sent in"""
132 raise Exception(value)197 raise Exception(value)
133198
=== modified file 'nova/tests/test_service.py'
--- nova/tests/test_service.py 2011-03-17 13:35:00 +0000
+++ nova/tests/test_service.py 2011-05-20 01:34:25 +0000
@@ -106,7 +106,10 @@
106106
107 # NOTE(vish): Create was moved out of mox replay to make sure that107 # NOTE(vish): Create was moved out of mox replay to make sure that
108 # the looping calls are created in StartService.108 # the looping calls are created in StartService.
109 app = service.Service.create(host=host, binary=binary)109 app = service.Service.create(host=host, binary=binary, topic=topic)
110
111 self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
112 service.rpc.Connection.instance(new=mox.IgnoreArg())
110113
111 self.mox.StubOutWithMock(rpc,114 self.mox.StubOutWithMock(rpc,
112 'TopicAdapterConsumer',115 'TopicAdapterConsumer',
@@ -114,6 +117,11 @@
114 self.mox.StubOutWithMock(rpc,117 self.mox.StubOutWithMock(rpc,
115 'FanoutAdapterConsumer',118 'FanoutAdapterConsumer',
116 use_mock_anything=True)119 use_mock_anything=True)
120
121 self.mox.StubOutWithMock(rpc,
122 'ConsumerSet',
123 use_mock_anything=True)
124
117 rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),125 rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
118 topic=topic,126 topic=topic,
119 proxy=mox.IsA(service.Service)).AndReturn(127 proxy=mox.IsA(service.Service)).AndReturn(
@@ -129,9 +137,13 @@
129 proxy=mox.IsA(service.Service)).AndReturn(137 proxy=mox.IsA(service.Service)).AndReturn(
130 rpc.FanoutAdapterConsumer)138 rpc.FanoutAdapterConsumer)
131139
132 rpc.TopicAdapterConsumer.attach_to_eventlet()140 def wait_func(self, limit=None):
133 rpc.TopicAdapterConsumer.attach_to_eventlet()141 return None
134 rpc.FanoutAdapterConsumer.attach_to_eventlet()142
143 mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
144 {'wait': wait_func})
145 rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset)
146 wait_func(mox.IgnoreArg())
135147
136 service_create = {'host': host,148 service_create = {'host': host,
137 'binary': binary,149 'binary': binary,
@@ -287,8 +299,41 @@
287 # Creating mocks299 # Creating mocks
288 self.mox.StubOutWithMock(service.rpc.Connection, 'instance')300 self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
289 service.rpc.Connection.instance(new=mox.IgnoreArg())301 service.rpc.Connection.instance(new=mox.IgnoreArg())
290 service.rpc.Connection.instance(new=mox.IgnoreArg())302
291 service.rpc.Connection.instance(new=mox.IgnoreArg())303 self.mox.StubOutWithMock(rpc,
304 'TopicAdapterConsumer',
305 use_mock_anything=True)
306 self.mox.StubOutWithMock(rpc,
307 'FanoutAdapterConsumer',
308 use_mock_anything=True)
309
310 self.mox.StubOutWithMock(rpc,
311 'ConsumerSet',
312 use_mock_anything=True)
313
314 rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
315 topic=topic,
316 proxy=mox.IsA(service.Service)).AndReturn(
317 rpc.TopicAdapterConsumer)
318
319 rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
320 topic='%s.%s' % (topic, host),
321 proxy=mox.IsA(service.Service)).AndReturn(
322 rpc.TopicAdapterConsumer)
323
324 rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
325 topic=topic,
326 proxy=mox.IsA(service.Service)).AndReturn(
327 rpc.FanoutAdapterConsumer)
328
329 def wait_func(self, limit=None):
330 return None
331
332 mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
333 {'wait': wait_func})
334 rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset)
335 wait_func(mox.IgnoreArg())
336
292 self.mox.StubOutWithMock(serv.manager.driver,337 self.mox.StubOutWithMock(serv.manager.driver,
293 'update_available_resource')338 'update_available_resource')
294 serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)339 serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
295340
=== modified file 'nova/volume/api.py'
--- nova/volume/api.py 2011-03-31 08:39:00 +0000
+++ nova/volume/api.py 2011-05-20 01:34:25 +0000
@@ -20,14 +20,14 @@
20Handles all requests relating to volumes.20Handles all requests relating to volumes.
21"""21"""
2222
23import datetime23import eventlet
2424
25from nova import db
26from nova import exception25from nova import exception
27from nova import flags26from nova import flags
28from nova import log as logging27from nova import log as logging
29from nova import quota28from nova import quota
30from nova import rpc29from nova import rpc
30from nova import utils
31from nova.db import base31from nova.db import base
3232
33FLAGS = flags.FLAGS33FLAGS = flags.FLAGS
@@ -57,26 +57,60 @@
57 'display_name': name,57 'display_name': name,
58 'display_description': description}58 'display_description': description}
5959
60 volume = self.db.volume_create(context, options)60 volume_ref = self.db.volume_create(context, options)
61 rpc.cast(context,61 volume_ref = utils.to_primitive(dict(volume_ref))
62 FLAGS.scheduler_topic,62
63 {"method": "create_volume",63 def delayed_create(volume_ref):
64 "args": {"topic": FLAGS.volume_topic,64 vid = volume_ref['id']
65 "volume_id": volume['id']}})65 try:
66 return volume66 rvs = rpc.multicall(context,
67 FLAGS.scheduler_topic,
68 {"method": "create_volume",
69 "args": {"topic": FLAGS.volume_topic,
70 "volume_ref": volume_ref}})
71 for volume_ref in rvs:
72 self.db.volume_update(context, vid, volume_ref)
73 volume_ref['launched_at'] = utils.utcnow()
74 self.db.volume_update(context, vid, volume_ref)
75
76 except rpc.RemoteError:
77 self.db.volume_update(context, vid, {'status': 'error'})
78
79 eventlet.spawn_n(delayed_create, volume_ref)
80 return volume_ref
6781
68 def delete(self, context, volume_id):82 def delete(self, context, volume_id):
69 volume = self.get(context, volume_id)83 volume_ref = self.get(context, volume_id)
70 if volume['status'] != "available":84 if volume_ref['status'] != "available":
71 raise exception.ApiError(_("Volume status must be available"))85 raise exception.ApiError(_("Volume status must be available"))
72 now = datetime.datetime.utcnow()86 if volume_ref['attach_status'] == "attached":
73 self.db.volume_update(context, volume_id, {'status': 'deleting',87 raise exception.Error(_("Volume is still attached"))
74 'terminated_at': now})88
75 host = volume['host']89 volume_ref['status'] = 'deleting'
76 rpc.cast(context,90 volume_ref['terminated_at'] = utils.utcnow()
77 self.db.queue_get_for(context, FLAGS.volume_topic, host),91 self.db.volume_update(context, volume_ref['id'], volume_ref)
78 {"method": "delete_volume",92 volume_ref = utils.to_primitive(dict(volume_ref))
79 "args": {"volume_id": volume_id}})93
94 def delayed_delete(volume_ref):
95 vid = volume_ref['id']
96 try:
97 topic = self.db.queue_get_for(context,
98 FLAGS.volume_topic,
99 volume_ref['host'])
100 rvs = rpc.multicall(context,
101 topic,
102 {"method": "delete_volume",
103 "args": {"volume_ref": volume_ref}})
104 for volume_ref in rvs:
105 self.db.volume_update(context, vid, volume_ref)
106
107 self.db.volume_destroy(context, vid)
108
109 except rpc.RemoteError:
110 self.db.volume_update(context, vid, {'status': 'err_delete'})
111
112 eventlet.spawn_n(delayed_delete, volume_ref)
113 return True
80114
81 def update(self, context, volume_id, fields):115 def update(self, context, volume_id, fields):
82 self.db.volume_update(context, volume_id, fields)116 self.db.volume_update(context, volume_id, fields)
83117
=== modified file 'nova/volume/manager.py'
--- nova/volume/manager.py 2011-03-17 13:35:00 +0000
+++ nova/volume/manager.py 2011-05-20 01:34:25 +0000
@@ -90,67 +90,40 @@
90 else:90 else:
91 LOG.info(_("volume %s: skipping export"), volume['name'])91 LOG.info(_("volume %s: skipping export"), volume['name'])
9292
93 def create_volume(self, context, volume_id):93 def create_volume(self, context, volume_ref):
94 """Creates and exports the volume."""94 """Creates and exports the volume."""
95 context = context.elevated()
96 volume_ref = self.db.volume_get(context, volume_id)
97 LOG.info(_("volume %s: creating"), volume_ref['name'])95 LOG.info(_("volume %s: creating"), volume_ref['name'])
9896
99 self.db.volume_update(context,
100 volume_id,
101 {'host': self.host})
102 # NOTE(vish): so we don't have to get volume from db again
103 # before passing it to the driver.
104 volume_ref['host'] = self.host97 volume_ref['host'] = self.host
10598 yield volume_ref
106 try:99
107 vol_name = volume_ref['name']100 vol_name = volume_ref['name']
108 vol_size = volume_ref['size']101 vol_size = volume_ref['size']
109 LOG.debug(_("volume %(vol_name)s: creating lv of"102 LOG.debug(_("volume %(vol_name)s: creating lv of"
110 " size %(vol_size)sG") % locals())103 " size %(vol_size)sG") % locals())
111 model_update = self.driver.create_volume(volume_ref)104 model_update = self.driver.create_volume(volume_ref)
112 if model_update:105 if model_update:
113 self.db.volume_update(context, volume_ref['id'], model_update)106 volume_ref.update(model_update)
114107 yield volume_ref
115 LOG.debug(_("volume %s: creating export"), volume_ref['name'])108
116 model_update = self.driver.create_export(context, volume_ref)109 LOG.debug(_("volume %s: creating export"), volume_ref['name'])
117 if model_update:110 model_update = self.driver.create_export(context, volume_ref)
118 self.db.volume_update(context, volume_ref['id'], model_update)111 if model_update:
119 except Exception:112 volume_ref.update(model_update)
120 self.db.volume_update(context,113 yield volume_ref
121 volume_ref['id'], {'status': 'error'})114
122 raise
123
124 now = datetime.datetime.utcnow()
125 self.db.volume_update(context,
126 volume_ref['id'], {'status': 'available',
127 'launched_at': now})
128 LOG.debug(_("volume %s: created successfully"), volume_ref['name'])115 LOG.debug(_("volume %s: created successfully"), volume_ref['name'])
129 return volume_id116 volume_ref['status'] = 'available'
117 yield volume_ref
130118
131 def delete_volume(self, context, volume_id):119 def delete_volume(self, context, volume_ref):
132 """Deletes and unexports volume."""120 """Deletes and unexports volume."""
133 context = context.elevated()121 LOG.debug(_("volume %s: removing export"), volume_ref['name'])
134 volume_ref = self.db.volume_get(context, volume_id)122 self.driver.remove_export(context, volume_ref)
135 if volume_ref['attach_status'] == "attached":123 LOG.debug(_("volume %s: deleting"), volume_ref['name'])
136 raise exception.Error(_("Volume is still attached"))124 self.driver.delete_volume(volume_ref)
137 if volume_ref['host'] != self.host:
138 raise exception.Error(_("Volume is not local to this node"))
139
140 try:
141 LOG.debug(_("volume %s: removing export"), volume_ref['name'])
142 self.driver.remove_export(context, volume_ref)
143 LOG.debug(_("volume %s: deleting"), volume_ref['name'])
144 self.driver.delete_volume(volume_ref)
145 except Exception:
146 self.db.volume_update(context,
147 volume_ref['id'],
148 {'status': 'error_deleting'})
149 raise
150
151 self.db.volume_destroy(context, volume_id)
152 LOG.debug(_("volume %s: deleted successfully"), volume_ref['name'])125 LOG.debug(_("volume %s: deleted successfully"), volume_ref['name'])
153 return True126 yield volume_ref
154127
155 def setup_compute_volume(self, context, volume_id):128 def setup_compute_volume(self, context, volume_id):
156 """Setup remote volume on compute host.129 """Setup remote volume on compute host.