Merge lp:~vishvananda/nova/no-db-messaging into lp:~hudson-openstack/nova/trunk
- no-db-messaging
- Merge into trunk
Status: | Rejected |
---|---|
Rejected by: | Vish Ishaya |
Proposed branch: | lp:~vishvananda/nova/no-db-messaging |
Merge into: | lp:~hudson-openstack/nova/trunk |
Diff against target: |
1133 lines (+486/-214) 12 files modified
nova/db/sqlalchemy/models.py (+12/-4) nova/fakerabbit.py (+23/-8) nova/rpc.py (+195/-70) nova/scheduler/manager.py (+7/-5) nova/service.py (+38/-24) nova/test.py (+6/-3) nova/tests/integrated/integrated_helpers.py (+1/-4) nova/tests/test_cloud.py (+9/-18) nova/tests/test_rpc.py (+65/-0) nova/tests/test_service.py (+51/-6) nova/volume/api.py (+53/-19) nova/volume/manager.py (+26/-53) |
To merge this branch: | bzr merge lp:~vishvananda/nova/no-db-messaging |
Related bugs: | |
Related blueprints: |
No DB Messaging
(High)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Dan Prince (community) | Needs Fixing | ||
Review via email: mp+61189@code.launchpad.net |
Commit message
Description of the change
This is an initial proposal for feedback. This branch is an attempt to start on this blueprint:
https:/
which will allow for the implementation of this blueprint:
https:/
And will ultimately make it easy to replace our various services with external projects.
This prototype changes volume_create to pass data through the queue instead of writing information to the database and reading it on the other end. It attempts to make minimal changes. It includes:
* a small change to model code to allow for conversion into dicts
* scheduler uses call instead of cast
* volume.manager provides a volume_get to query the state of a volume
* volume.api creates volume with a call and then spawn greenthread to poll status
* volume.manager returns initial data immediately and then spawn a greenthread to do further work
This will allow us to communicate with an external REST api very easily, by simply turning the two rpc calls into POST and GET. It also allows us to separate out the database for volumes, so the volume worker doesn't need to write to the shared zone database
Outstanding Issues for database split:
* we have to use something like remote_id to refer to the volume when making requests to nova-volume because the id in our local database will not match. Ultimately we've discussed switching to UUIDs, so It may be good to do that change at the same time.
* if the state of a volume changes after the initial poll has finished, we will never know about it. We ultimately need a way for api to be notified if the volume status changes so that we can update our local data, or we need a background worker that is polling the service for changes at an irregular interval.
I'm open to feedback about this approach. I tried a few other versions and this seems like the simplest change set to get what we want. If this looks good, I will modify the other volume commands to work the same way and propose a similar set of changes for compute.
Vish Ishaya (vishvananda) wrote : | # |
- 1078. By Eldar Nugaev
-
Improved error notification in network create
- 1079. By Eldar Nugaev
-
Added network_info into refresh_
security_ group_rules
That fixs https://bugs.launchpad .net/nova/ +bug/773308 - 1080. By Mark Washenberger
-
Convert instance_type_ids in the instances table from strings to integers to enable joins with instance_types. This in particular fixes a problem when using postgresql.
Dan Prince (dan-prince) wrote : | # |
Hey Vish,
I just smoke stacked this and got some errors. It looks like the changes to models.py aren't quite right.
I'm getting the following in nova-api.log:
2011-05-17 17:17:18,574 ERROR nova.api.openstack [-] Caught error: 'AuthToken' object has no attribute 'server_
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
--
Running a simple 'nova list' command (using the OSAPI) should allow you to reproduce this.
Vish Ishaya (vishvananda) wrote : | # |
Thanks dan, I'll look at this. Also had some feedback from termie offline, and I'm moving where the polling is happening.
- 1081. By Josh Kearney
-
Added missing metadata join to instance_get calls.
- 1082. By Matt Dietz
-
Fixes improper attribute naming around instance types that broke Resizes.
- 1083. By termie
-
Docstring cleanup and formatting (nova/network dir). Minor style fixes as well.
- 1084. By Vish Ishaya
-
Fixes the naming of the server_
management_ url in auth and tests. - 1085. By Josh Kearney
-
Added missing xenhost plugin. This was causing warnings to pop up in the compute logs during periodic_task runs. It must have not been bzr add'd when this code was merged.
- 1086. By Matt Dietz
-
Implements a basic mechanism for pushing notifications out to interested parties. The rationale for implementing notifications this way is that the responsibility for them shouldn't fall to Nova. As such, we simply will be pushing messages to a queue where another worker entirely can be written to push messages around to subscribers.
- 1087. By Johannes Erdfelt
-
Simple change to sort the list of controllers/methods before printing to make it easier to read
- 1088. By termie
-
add support to rpc for multicall
- 1089. By termie
-
make the test more expicit
- 1090. By termie
-
add commented out unworking code for yield-based returns
- 1091. By Chris Behrens
-
Add a connection pool for rpc cast/call
Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each. - 1092. By Chris Behrens
-
pep8 and comment fixes
- 1093. By Chris Behrens
-
convert fanout_cast to ConnectionPool
- 1094. By Chris Behrens
-
fakerabbit's declare_consumer should support more than 1 consumer. also: make fakerabbit Backend.consume be an iterator like it should be..
- 1095. By Vish Ishaya
-
fix consumers to actually be deleted and clean up cloud test
- 1096. By Chris Behrens
-
catch greenlet.
GreenletExit when shutting service down - 1097. By Chris Behrens
-
Always create Service consumers no matter if report_interval is 0
Fix tests to handle how Service loads Consumers now - 1098. By Chris Behrens
-
Add rpc_conn_pool_size flag for the new connection pool
- 1099. By termie
-
bring back commits lost in merge
- 1100. By termie
-
almost everything working with fake_rabbit
- 1101. By termie
-
don't need to use a separate connection
- 1102. By Vish Ishaya
-
lots of fixes for rpc and extra imports
- 1103. By termie
-
make sure that using multicall on a call with a single result still functions
- 1104. By termie
-
cleanup the code for merging
- 1105. By Vish Ishaya
-
update manager to use multicall
- 1106. By Vish Ishaya
-
fix conversion of models to dicts
- 1107. By Vish Ishaya
-
change volume_api to have delayed_create
- 1108. By Vish Ishaya
-
make scheduler use multicall
- 1109. By Vish Ishaya
-
remove the unnecessary try except in manager
- 1110. By Vish Ishaya
-
pep8
Vish Ishaya (vishvananda) wrote : | # |
reproposing this with a prereq branch.
Unmerged revisions
- 1123. By Vish Ishaya
-
merged trunk
- 1122. By Vish Ishaya
-
remove merge error calling failing test
- 1121. By Vish Ishaya
-
fix snapshot test
- 1120. By Vish Ishaya
-
return not yield in scheduler shortcut
- 1119. By Vish Ishaya
-
make sure to handle VolumeIsBusy
- 1118. By Vish Ishaya
-
merged trunk and removed conflicts
- 1117. By Vish Ishaya
-
lost some changes from rpc branch, bring them in manually
- 1116. By Vish Ishaya
-
use strtime for passing datetimes back and forth through the queue
- 1115. By Vish Ishaya
-
fix tests
- 1114. By Vish Ishaya
-
keep the database on the receiving end as well
Preview Diff
1 | === modified file 'nova/db/sqlalchemy/models.py' |
2 | --- nova/db/sqlalchemy/models.py 2011-05-17 20:50:12 +0000 |
3 | +++ nova/db/sqlalchemy/models.py 2011-05-20 01:34:25 +0000 |
4 | @@ -77,17 +77,25 @@ |
5 | return getattr(self, key, default) |
6 | |
7 | def __iter__(self): |
8 | - self._i = iter(object_mapper(self).columns) |
9 | + # NOTE(vish): include name property in the iterator |
10 | + columns = dict(object_mapper(self).columns).keys() |
11 | + name = self.get('name') |
12 | + if name: |
13 | + columns.append('name') |
14 | + self._i = iter(columns) |
15 | return self |
16 | |
17 | def next(self): |
18 | - n = self._i.next().name |
19 | + n = self._i.next() |
20 | return n, getattr(self, n) |
21 | |
22 | def update(self, values): |
23 | """Make the model object behave like a dict""" |
24 | - for k, v in values.iteritems(): |
25 | - setattr(self, k, v) |
26 | + columns = dict(object_mapper(self).columns).keys() |
27 | + for key, value in values.iteritems(): |
28 | + # NOTE(vish): don't update the 'name' property |
29 | + if key in columns: |
30 | + setattr(self, key, value) |
31 | |
32 | def iteritems(self): |
33 | """Make the model object behave like a dict. |
34 | |
35 | === modified file 'nova/fakerabbit.py' |
36 | --- nova/fakerabbit.py 2011-02-22 23:05:48 +0000 |
37 | +++ nova/fakerabbit.py 2011-05-20 01:34:25 +0000 |
38 | @@ -31,6 +31,7 @@ |
39 | |
40 | EXCHANGES = {} |
41 | QUEUES = {} |
42 | +CONSUMERS = {} |
43 | |
44 | |
45 | class Message(base.BaseMessage): |
46 | @@ -96,17 +97,29 @@ |
47 | ' key %(routing_key)s') % locals()) |
48 | EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) |
49 | |
50 | - def declare_consumer(self, queue, callback, *args, **kwargs): |
51 | - self.current_queue = queue |
52 | - self.current_callback = callback |
53 | + def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): |
54 | + global CONSUMERS |
55 | + LOG.debug("Adding consumer %s", consumer_tag) |
56 | + CONSUMERS[consumer_tag] = (queue, callback) |
57 | + |
58 | + def cancel(self, consumer_tag): |
59 | + global CONSUMERS |
60 | + LOG.debug("Removing consumer %s", consumer_tag) |
61 | + del CONSUMERS[consumer_tag] |
62 | |
63 | def consume(self, limit=None): |
64 | + global CONSUMERS |
65 | + num = 0 |
66 | while True: |
67 | - item = self.get(self.current_queue) |
68 | - if item: |
69 | - self.current_callback(item) |
70 | - raise StopIteration() |
71 | - greenthread.sleep(0) |
72 | + for (queue, callback) in CONSUMERS.itervalues(): |
73 | + item = self.get(queue) |
74 | + if item: |
75 | + callback(item) |
76 | + num += 1 |
77 | + yield |
78 | + if limit and num == limit: |
79 | + raise StopIteration() |
80 | + greenthread.sleep(0.1) |
81 | |
82 | def get(self, queue, no_ack=False): |
83 | global QUEUES |
84 | @@ -134,5 +147,7 @@ |
85 | def reset_all(): |
86 | global EXCHANGES |
87 | global QUEUES |
88 | + global CONSUMERS |
89 | EXCHANGES = {} |
90 | QUEUES = {} |
91 | + CONSUMERS = {} |
92 | |
93 | === modified file 'nova/rpc.py' |
94 | --- nova/rpc.py 2011-04-20 19:08:22 +0000 |
95 | +++ nova/rpc.py 2011-05-20 01:34:25 +0000 |
96 | @@ -33,7 +33,9 @@ |
97 | from carrot import connection as carrot_connection |
98 | from carrot import messaging |
99 | from eventlet import greenpool |
100 | -from eventlet import greenthread |
101 | +from eventlet import pools |
102 | +from eventlet import queue |
103 | +import greenlet |
104 | |
105 | from nova import context |
106 | from nova import exception |
107 | @@ -47,7 +49,10 @@ |
108 | |
109 | |
110 | FLAGS = flags.FLAGS |
111 | -flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool') |
112 | +flags.DEFINE_integer('rpc_thread_pool_size', 1024, |
113 | + 'Size of RPC thread pool') |
114 | +flags.DEFINE_integer('rpc_conn_pool_size', 30, |
115 | + 'Size of RPC connection pool') |
116 | |
117 | |
118 | class Connection(carrot_connection.BrokerConnection): |
119 | @@ -90,6 +95,17 @@ |
120 | return cls.instance() |
121 | |
122 | |
123 | +class Pool(pools.Pool): |
124 | + """Class that implements a Pool of Connections.""" |
125 | + |
126 | + def create(self): |
127 | + LOG.debug('Creating new connection') |
128 | + return Connection.instance(new=True) |
129 | + |
130 | + |
131 | +ConnectionPool = Pool(max_size=FLAGS.rpc_conn_pool_size) |
132 | + |
133 | + |
134 | class Consumer(messaging.Consumer): |
135 | """Consumer base class. |
136 | |
137 | @@ -131,7 +147,9 @@ |
138 | self.connection = Connection.recreate() |
139 | self.backend = self.connection.create_backend() |
140 | self.declare() |
141 | - super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) |
142 | + return super(Consumer, self).fetch(no_ack, |
143 | + auto_ack, |
144 | + enable_callbacks) |
145 | if self.failed_connection: |
146 | LOG.error(_('Reconnected to queue')) |
147 | self.failed_connection = False |
148 | @@ -159,13 +177,13 @@ |
149 | self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) |
150 | super(AdapterConsumer, self).__init__(connection=connection, |
151 | topic=topic) |
152 | - |
153 | - def receive(self, *args, **kwargs): |
154 | - self.pool.spawn_n(self._receive, *args, **kwargs) |
155 | - |
156 | - @exception.wrap_exception |
157 | - def _receive(self, message_data, message): |
158 | - """Magically looks for a method on the proxy object and calls it. |
159 | + self.register_callback(self.process_data) |
160 | + |
161 | + def process_data(self, message_data, message): |
162 | + """Consumer callback to call a method on a proxy object. |
163 | + |
164 | + Parses the message for validity and fires off a thread to call the |
165 | + proxy object method. |
166 | |
167 | Message data should be a dictionary with two keys: |
168 | method: string representing the method to call |
169 | @@ -175,8 +193,8 @@ |
170 | |
171 | """ |
172 | LOG.debug(_('received %s') % message_data) |
173 | - msg_id = message_data.pop('_msg_id', None) |
174 | - |
175 | + # This will be popped off in _unpack_context |
176 | + msg_id = message_data.get('_msg_id', None) |
177 | ctxt = _unpack_context(message_data) |
178 | |
179 | method = message_data.get('method') |
180 | @@ -190,6 +208,13 @@ |
181 | LOG.warn(_('no method for message: %s') % message_data) |
182 | msg_reply(msg_id, _('No method for message: %s') % message_data) |
183 | return |
184 | + self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args) |
185 | + |
186 | + @exception.wrap_exception |
187 | + def _process_data(self, msg_id, ctxt, method, args): |
188 | + """Thread that maigcally looks for a method on the proxy |
189 | + object and calls it. |
190 | + """ |
191 | |
192 | node_func = getattr(self.proxy, str(method)) |
193 | node_args = dict((str(k), v) for k, v in args.iteritems()) |
194 | @@ -197,7 +222,15 @@ |
195 | try: |
196 | rval = node_func(context=ctxt, **node_args) |
197 | if msg_id: |
198 | - msg_reply(msg_id, rval, None) |
199 | + # Check if the result was a generator |
200 | + if hasattr(rval, 'send'): |
201 | + for x in rval: |
202 | + msg_reply(msg_id, x, None) |
203 | + else: |
204 | + msg_reply(msg_id, rval, None) |
205 | + |
206 | + # This final None tells multicall that it is done. |
207 | + msg_reply(msg_id, None, None) |
208 | except Exception as e: |
209 | logging.exception('Exception during message handling') |
210 | if msg_id: |
211 | @@ -205,11 +238,6 @@ |
212 | return |
213 | |
214 | |
215 | -class Publisher(messaging.Publisher): |
216 | - """Publisher base class.""" |
217 | - pass |
218 | - |
219 | - |
220 | class TopicAdapterConsumer(AdapterConsumer): |
221 | """Consumes messages on a specific topic.""" |
222 | |
223 | @@ -242,6 +270,59 @@ |
224 | topic=topic, proxy=proxy) |
225 | |
226 | |
227 | +class ConsumerSet(object): |
228 | + """Groups consumers to listen on together on a single connection.""" |
229 | + |
230 | + def __init__(self, conn, consumer_list): |
231 | + self.consumer_list = set(consumer_list) |
232 | + self.consumer_set = None |
233 | + self.enabled = True |
234 | + self.init(conn) |
235 | + |
236 | + def init(self, conn): |
237 | + if not conn: |
238 | + conn = Connection.instance(new=True) |
239 | + if self.consumer_set: |
240 | + self.consumer_set.close() |
241 | + self.consumer_set = messaging.ConsumerSet(conn) |
242 | + for consumer in self.consumer_list: |
243 | + consumer.connection = conn |
244 | + # consumer.backend is set for us |
245 | + self.consumer_set.add_consumer(consumer) |
246 | + |
247 | + def reconnect(self): |
248 | + self.init(None) |
249 | + |
250 | + def wait(self, limit=None): |
251 | + running = True |
252 | + while running: |
253 | + it = self.consumer_set.iterconsume(limit=limit) |
254 | + if not it: |
255 | + break |
256 | + while True: |
257 | + try: |
258 | + it.next() |
259 | + except StopIteration: |
260 | + return |
261 | + except greenlet.GreenletExit: |
262 | + running = False |
263 | + break |
264 | + except Exception as e: |
265 | + LOG.error(_("Received exception %s " % type(e) + \ |
266 | + "while processing consumer")) |
267 | + self.reconnect() |
268 | + # Break to outer loop |
269 | + break |
270 | + |
271 | + def close(self): |
272 | + self.consumer_set.close() |
273 | + |
274 | + |
275 | +class Publisher(messaging.Publisher): |
276 | + """Publisher base class.""" |
277 | + pass |
278 | + |
279 | + |
280 | class TopicPublisher(Publisher): |
281 | """Publishes messages on a specific topic.""" |
282 | |
283 | @@ -306,16 +387,18 @@ |
284 | LOG.error(_("Returning exception %s to caller"), message) |
285 | LOG.error(tb) |
286 | failure = (failure[0].__name__, str(failure[1]), tb) |
287 | - conn = Connection.instance() |
288 | - publisher = DirectPublisher(connection=conn, msg_id=msg_id) |
289 | - try: |
290 | - publisher.send({'result': reply, 'failure': failure}) |
291 | - except TypeError: |
292 | - publisher.send( |
293 | - {'result': dict((k, repr(v)) |
294 | - for k, v in reply.__dict__.iteritems()), |
295 | - 'failure': failure}) |
296 | - publisher.close() |
297 | + |
298 | + with ConnectionPool.item() as conn: |
299 | + publisher = DirectPublisher(connection=conn, msg_id=msg_id) |
300 | + try: |
301 | + publisher.send({'result': reply, 'failure': failure}) |
302 | + except TypeError: |
303 | + publisher.send( |
304 | + {'result': dict((k, repr(v)) |
305 | + for k, v in reply.__dict__.iteritems()), |
306 | + 'failure': failure}) |
307 | + |
308 | + publisher.close() |
309 | |
310 | |
311 | class RemoteError(exception.Error): |
312 | @@ -347,8 +430,9 @@ |
313 | if key.startswith('_context_'): |
314 | value = msg.pop(key) |
315 | context_dict[key[9:]] = value |
316 | + context_dict['msg_id'] = msg.pop('_msg_id', None) |
317 | LOG.debug(_('unpacked context: %s'), context_dict) |
318 | - return context.RequestContext.from_dict(context_dict) |
319 | + return RpcContext.from_dict(context_dict) |
320 | |
321 | |
322 | def _pack_context(msg, context): |
323 | @@ -360,70 +444,110 @@ |
324 | for args at some point. |
325 | |
326 | """ |
327 | - context = dict([('_context_%s' % key, value) |
328 | - for (key, value) in context.to_dict().iteritems()]) |
329 | - msg.update(context) |
330 | - |
331 | - |
332 | -def call(context, topic, msg): |
333 | - """Sends a message on a topic and wait for a response.""" |
334 | + context_d = dict([('_context_%s' % key, value) |
335 | + for (key, value) in context.to_dict().iteritems()]) |
336 | + msg.update(context_d) |
337 | + |
338 | + |
339 | +class RpcContext(context.RequestContext): |
340 | + def __init__(self, *args, **kwargs): |
341 | + msg_id = kwargs.pop('msg_id', None) |
342 | + self.msg_id = msg_id |
343 | + super(RpcContext, self).__init__(*args, **kwargs) |
344 | + |
345 | + def reply(self, *args, **kwargs): |
346 | + msg_reply(self.msg_id, *args, **kwargs) |
347 | + |
348 | + |
349 | +def multicall(context, topic, msg): |
350 | + """Make a call that returns multiple times.""" |
351 | LOG.debug(_('Making asynchronous call on %s ...'), topic) |
352 | msg_id = uuid.uuid4().hex |
353 | msg.update({'_msg_id': msg_id}) |
354 | LOG.debug(_('MSG_ID is %s') % (msg_id)) |
355 | _pack_context(msg, context) |
356 | |
357 | - class WaitMessage(object): |
358 | - def __call__(self, data, message): |
359 | - """Acks message and sets result.""" |
360 | - message.ack() |
361 | - if data['failure']: |
362 | - self.result = RemoteError(*data['failure']) |
363 | - else: |
364 | - self.result = data['result'] |
365 | - |
366 | - wait_msg = WaitMessage() |
367 | - conn = Connection.instance() |
368 | - consumer = DirectConsumer(connection=conn, msg_id=msg_id) |
369 | + con_conn = ConnectionPool.get() |
370 | + consumer = DirectConsumer(connection=con_conn, msg_id=msg_id) |
371 | + wait_msg = MulticallWaiter(consumer) |
372 | consumer.register_callback(wait_msg) |
373 | |
374 | - conn = Connection.instance() |
375 | - publisher = TopicPublisher(connection=conn, topic=topic) |
376 | + publisher = TopicPublisher(connection=con_conn, topic=topic) |
377 | publisher.send(msg) |
378 | publisher.close() |
379 | |
380 | - try: |
381 | - consumer.wait(limit=1) |
382 | - except StopIteration: |
383 | - pass |
384 | - consumer.close() |
385 | - # NOTE(termie): this is a little bit of a change from the original |
386 | - # non-eventlet code where returning a Failure |
387 | - # instance from a deferred call is very similar to |
388 | - # raising an exception |
389 | - if isinstance(wait_msg.result, Exception): |
390 | - raise wait_msg.result |
391 | - return wait_msg.result |
392 | + return wait_msg |
393 | + |
394 | + |
395 | +class MulticallWaiter(object): |
396 | + def __init__(self, consumer): |
397 | + self._consumer = consumer |
398 | + self._results = queue.Queue() |
399 | + self._closed = False |
400 | + |
401 | + def close(self): |
402 | + self._closed = True |
403 | + self._consumer.close() |
404 | + ConnectionPool.put(self._consumer.connection) |
405 | + |
406 | + def __call__(self, data, message): |
407 | + """Acks message and sets result.""" |
408 | + message.ack() |
409 | + if data['failure']: |
410 | + self._results.put(RemoteError(*data['failure'])) |
411 | + else: |
412 | + self._results.put(data['result']) |
413 | + |
414 | + def __iter__(self): |
415 | + return self.wait() |
416 | + |
417 | + def wait(self): |
418 | + while True: |
419 | + rv = None |
420 | + while rv is None and not self._closed: |
421 | + try: |
422 | + rv = self._consumer.fetch(enable_callbacks=True) |
423 | + except Exception: |
424 | + self.close() |
425 | + raise |
426 | + time.sleep(0.01) |
427 | + |
428 | + result = self._results.get() |
429 | + if isinstance(result, Exception): |
430 | + self.close() |
431 | + raise result |
432 | + if result == None: |
433 | + self.close() |
434 | + raise StopIteration |
435 | + yield result |
436 | + |
437 | + |
438 | +def call(context, topic, msg): |
439 | + """Sends a message on a topic and wait for a response.""" |
440 | + rv = multicall(context, topic, msg) |
441 | + for x in rv: |
442 | + rv.close() |
443 | + return x |
444 | |
445 | |
446 | def cast(context, topic, msg): |
447 | """Sends a message on a topic without waiting for a response.""" |
448 | LOG.debug(_('Making asynchronous cast on %s...'), topic) |
449 | _pack_context(msg, context) |
450 | - conn = Connection.instance() |
451 | - publisher = TopicPublisher(connection=conn, topic=topic) |
452 | - publisher.send(msg) |
453 | - publisher.close() |
454 | + with ConnectionPool.item() as conn: |
455 | + publisher = TopicPublisher(connection=conn, topic=topic) |
456 | + publisher.send(msg) |
457 | + publisher.close() |
458 | |
459 | |
460 | def fanout_cast(context, topic, msg): |
461 | """Sends a message on a fanout exchange without waiting for a response.""" |
462 | LOG.debug(_('Making asynchronous fanout cast...')) |
463 | _pack_context(msg, context) |
464 | - conn = Connection.instance() |
465 | - publisher = FanoutPublisher(topic, connection=conn) |
466 | - publisher.send(msg) |
467 | - publisher.close() |
468 | + with ConnectionPool.item() as conn: |
469 | + publisher = FanoutPublisher(topic, connection=conn) |
470 | + publisher.send(msg) |
471 | + publisher.close() |
472 | |
473 | |
474 | def generic_response(message_data, message): |
475 | @@ -459,6 +583,7 @@ |
476 | |
477 | if wait: |
478 | consumer.wait() |
479 | + consumer.close() |
480 | |
481 | |
482 | if __name__ == '__main__': |
483 | |
484 | === modified file 'nova/scheduler/manager.py' |
485 | --- nova/scheduler/manager.py 2011-05-05 14:35:44 +0000 |
486 | +++ nova/scheduler/manager.py 2011-05-20 01:34:25 +0000 |
487 | @@ -83,11 +83,13 @@ |
488 | except AttributeError: |
489 | host = self.driver.schedule(elevated, topic, *args, **kwargs) |
490 | |
491 | - rpc.cast(context, |
492 | - db.queue_get_for(context, topic, host), |
493 | - {"method": method, |
494 | - "args": kwargs}) |
495 | - LOG.debug(_("Casting to %(topic)s %(host)s for %(method)s") % locals()) |
496 | + LOG.debug(_("Multicall %(topic)s %(host)s for %(method)s") % locals()) |
497 | + rvs = rpc.multicall(context, |
498 | + db.queue_get_for(context, topic, host), |
499 | + {"method": method, |
500 | + "args": kwargs}) |
501 | + for rv in rvs: |
502 | + yield rv |
503 | |
504 | # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin. |
505 | # Based on bexar design summit discussion, |
506 | |
507 | === modified file 'nova/service.py' |
508 | --- nova/service.py 2011-04-20 19:08:22 +0000 |
509 | +++ nova/service.py 2011-05-20 01:34:25 +0000 |
510 | @@ -19,14 +19,11 @@ |
511 | |
512 | """Generic Node baseclass for all workers that run on hosts.""" |
513 | |
514 | +import greenlet |
515 | import inspect |
516 | import os |
517 | -import sys |
518 | -import time |
519 | |
520 | -from eventlet import event |
521 | from eventlet import greenthread |
522 | -from eventlet import greenpool |
523 | |
524 | from nova import context |
525 | from nova import db |
526 | @@ -91,27 +88,38 @@ |
527 | if 'nova-compute' == self.binary: |
528 | self.manager.update_available_resource(ctxt) |
529 | |
530 | - conn1 = rpc.Connection.instance(new=True) |
531 | - conn2 = rpc.Connection.instance(new=True) |
532 | - conn3 = rpc.Connection.instance(new=True) |
533 | + self.conn = rpc.Connection.instance(new=True) |
534 | + logging.debug("Creating Consumer connection for Service %s" % |
535 | + self.topic) |
536 | + |
537 | + # Share this same connection for these Consumers |
538 | + consumer_all = rpc.TopicAdapterConsumer( |
539 | + connection=self.conn, |
540 | + topic=self.topic, |
541 | + proxy=self) |
542 | + consumer_node = rpc.TopicAdapterConsumer( |
543 | + connection=self.conn, |
544 | + topic='%s.%s' % (self.topic, self.host), |
545 | + proxy=self) |
546 | + fanout = rpc.FanoutAdapterConsumer( |
547 | + connection=self.conn, |
548 | + topic=self.topic, |
549 | + proxy=self) |
550 | + |
551 | + cset = rpc.ConsumerSet(self.conn, [consumer_all, |
552 | + consumer_node, |
553 | + fanout]) |
554 | + |
555 | + # Wait forever, processing these consumers |
556 | + def _wait(): |
557 | + try: |
558 | + cset.wait() |
559 | + finally: |
560 | + cset.close() |
561 | + |
562 | + self.csetthread = greenthread.spawn(_wait) |
563 | + |
564 | if self.report_interval: |
565 | - consumer_all = rpc.TopicAdapterConsumer( |
566 | - connection=conn1, |
567 | - topic=self.topic, |
568 | - proxy=self) |
569 | - consumer_node = rpc.TopicAdapterConsumer( |
570 | - connection=conn2, |
571 | - topic='%s.%s' % (self.topic, self.host), |
572 | - proxy=self) |
573 | - fanout = rpc.FanoutAdapterConsumer( |
574 | - connection=conn3, |
575 | - topic=self.topic, |
576 | - proxy=self) |
577 | - |
578 | - self.timers.append(consumer_all.attach_to_eventlet()) |
579 | - self.timers.append(consumer_node.attach_to_eventlet()) |
580 | - self.timers.append(fanout.attach_to_eventlet()) |
581 | - |
582 | pulse = utils.LoopingCall(self.report_state) |
583 | pulse.start(interval=self.report_interval, now=False) |
584 | self.timers.append(pulse) |
585 | @@ -167,7 +175,13 @@ |
586 | |
587 | def kill(self): |
588 | """Destroy the service object in the datastore.""" |
589 | + self.csetthread.kill() |
590 | + try: |
591 | + self.csetthread.wait() |
592 | + except greenlet.GreenletExit: |
593 | + pass |
594 | self.stop() |
595 | + rpc.ConnectionPool.put(self.conn) |
596 | try: |
597 | db.service_destroy(context.get_admin_context(), self.service_id) |
598 | except exception.NotFound: |
599 | |
600 | === modified file 'nova/test.py' |
601 | --- nova/test.py 2011-04-20 19:08:22 +0000 |
602 | +++ nova/test.py 2011-05-20 01:34:25 +0000 |
603 | @@ -31,17 +31,15 @@ |
604 | import unittest |
605 | |
606 | import mox |
607 | -import shutil |
608 | import stubout |
609 | from eventlet import greenthread |
610 | |
611 | -from nova import context |
612 | -from nova import db |
613 | from nova import fakerabbit |
614 | from nova import flags |
615 | from nova import rpc |
616 | from nova import service |
617 | from nova import wsgi |
618 | +from nova.virt import fake |
619 | |
620 | |
621 | FLAGS = flags.FLAGS |
622 | @@ -85,6 +83,7 @@ |
623 | self._monkey_patch_attach() |
624 | self._monkey_patch_wsgi() |
625 | self._original_flags = FLAGS.FlagValuesDict() |
626 | + rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size) |
627 | |
628 | def tearDown(self): |
629 | """Runs after each test method to tear down test environment.""" |
630 | @@ -99,6 +98,10 @@ |
631 | if FLAGS.fake_rabbit: |
632 | fakerabbit.reset_all() |
633 | |
634 | + if FLAGS.connection_type == 'fake': |
635 | + if hasattr(fake.FakeConnection, '_instance'): |
636 | + del fake.FakeConnection._instance |
637 | + |
638 | # Reset any overriden flags |
639 | self.reset_flags() |
640 | |
641 | |
642 | === modified file 'nova/tests/integrated/integrated_helpers.py' |
643 | --- nova/tests/integrated/integrated_helpers.py 2011-03-30 01:13:04 +0000 |
644 | +++ nova/tests/integrated/integrated_helpers.py 2011-05-20 01:34:25 +0000 |
645 | @@ -154,10 +154,7 @@ |
646 | # set up services |
647 | self.start_service('compute') |
648 | self.start_service('volume') |
649 | - # NOTE(justinsb): There's a bug here which is eluding me... |
650 | - # If we start the network_service, all is good, but then subsequent |
651 | - # tests fail: CloudTestCase.test_ajax_console in particular. |
652 | - #self.start_service('network') |
653 | + self.start_service('network') |
654 | self.start_service('scheduler') |
655 | |
656 | self.auth_url = self._start_api_service() |
657 | |
658 | === modified file 'nova/tests/test_cloud.py' |
659 | --- nova/tests/test_cloud.py 2011-05-16 20:30:40 +0000 |
660 | +++ nova/tests/test_cloud.py 2011-05-20 01:34:25 +0000 |
661 | @@ -17,13 +17,8 @@ |
662 | # under the License. |
663 | |
664 | from base64 import b64decode |
665 | -import json |
666 | from M2Crypto import BIO |
667 | from M2Crypto import RSA |
668 | -import os |
669 | -import shutil |
670 | -import tempfile |
671 | -import time |
672 | |
673 | from eventlet import greenthread |
674 | |
675 | @@ -33,12 +28,10 @@ |
676 | from nova import flags |
677 | from nova import log as logging |
678 | from nova import rpc |
679 | -from nova import service |
680 | from nova import test |
681 | from nova import utils |
682 | from nova import exception |
683 | from nova.auth import manager |
684 | -from nova.compute import power_state |
685 | from nova.api.ec2 import cloud |
686 | from nova.api.ec2 import ec2utils |
687 | from nova.image import local |
688 | @@ -79,14 +72,21 @@ |
689 | self.stubs.Set(local.LocalImageService, 'show', fake_show) |
690 | self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show) |
691 | |
692 | + # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish |
693 | + rpc_cast = rpc.cast |
694 | + |
695 | + def finish_cast(*args, **kwargs): |
696 | + rpc_cast(*args, **kwargs) |
697 | + greenthread.sleep(0.2) |
698 | + |
699 | + self.stubs.Set(rpc, 'cast', finish_cast) |
700 | + |
701 | def tearDown(self): |
702 | network_ref = db.project_get_network(self.context, |
703 | self.project.id) |
704 | db.network_disassociate(self.context, network_ref['id']) |
705 | self.manager.delete_project(self.project) |
706 | self.manager.delete_user(self.user) |
707 | - self.compute.kill() |
708 | - self.network.kill() |
709 | super(CloudTestCase, self).tearDown() |
710 | |
711 | def _create_key(self, name): |
712 | @@ -113,7 +113,6 @@ |
713 | self.cloud.describe_addresses(self.context) |
714 | self.cloud.release_address(self.context, |
715 | public_ip=address) |
716 | - greenthread.sleep(0.3) |
717 | db.floating_ip_destroy(self.context, address) |
718 | |
719 | def test_associate_disassociate_address(self): |
720 | @@ -129,12 +128,10 @@ |
721 | self.cloud.associate_address(self.context, |
722 | instance_id=ec2_id, |
723 | public_ip=address) |
724 | - greenthread.sleep(0.3) |
725 | self.cloud.disassociate_address(self.context, |
726 | public_ip=address) |
727 | self.cloud.release_address(self.context, |
728 | public_ip=address) |
729 | - greenthread.sleep(0.3) |
730 | self.network.deallocate_fixed_ip(self.context, fixed) |
731 | db.instance_destroy(self.context, inst['id']) |
732 | db.floating_ip_destroy(self.context, address) |
733 | @@ -306,31 +303,25 @@ |
734 | 'instance_type': instance_type, |
735 | 'max_count': max_count} |
736 | rv = self.cloud.run_instances(self.context, **kwargs) |
737 | - greenthread.sleep(0.3) |
738 | instance_id = rv['instancesSet'][0]['instanceId'] |
739 | output = self.cloud.get_console_output(context=self.context, |
740 | instance_id=[instance_id]) |
741 | self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT') |
742 | # TODO(soren): We need this until we can stop polling in the rpc code |
743 | # for unit tests. |
744 | - greenthread.sleep(0.3) |
745 | rv = self.cloud.terminate_instances(self.context, [instance_id]) |
746 | - greenthread.sleep(0.3) |
747 | |
748 | def test_ajax_console(self): |
749 | kwargs = {'image_id': 'ami-1'} |
750 | rv = self.cloud.run_instances(self.context, **kwargs) |
751 | instance_id = rv['instancesSet'][0]['instanceId'] |
752 | - greenthread.sleep(0.3) |
753 | output = self.cloud.get_ajax_console(context=self.context, |
754 | instance_id=[instance_id]) |
755 | self.assertEquals(output['url'], |
756 | '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url) |
757 | # TODO(soren): We need this until we can stop polling in the rpc code |
758 | # for unit tests. |
759 | - greenthread.sleep(0.3) |
760 | rv = self.cloud.terminate_instances(self.context, [instance_id]) |
761 | - greenthread.sleep(0.3) |
762 | |
763 | def test_key_generation(self): |
764 | result = self._create_key('test') |
765 | |
766 | === modified file 'nova/tests/test_rpc.py' |
767 | --- nova/tests/test_rpc.py 2011-02-23 22:41:11 +0000 |
768 | +++ nova/tests/test_rpc.py 2011-05-20 01:34:25 +0000 |
769 | @@ -49,6 +49,59 @@ |
770 | "args": {"value": value}}) |
771 | self.assertEqual(value, result) |
772 | |
773 | + def test_call_succeed_despite_multiple_returns(self): |
774 | + """Get a value through rpc call""" |
775 | + value = 42 |
776 | + result = rpc.call(self.context, 'test', {"method": "echo_three_times", |
777 | + "args": {"value": value}}) |
778 | + self.assertEqual(value, result) |
779 | + |
780 | + def test_call_succeed_despite_multiple_returns_yield(self): |
781 | + """Get a value through rpc call""" |
782 | + value = 42 |
783 | + result = rpc.call(self.context, 'test', |
784 | + {"method": "echo_three_times_yield", |
785 | + "args": {"value": value}}) |
786 | + self.assertEqual(value, result) |
787 | + |
788 | + def test_multicall_succeed_once(self): |
789 | + """Get a value through rpc call""" |
790 | + value = 42 |
791 | + result = rpc.multicall(self.context, |
792 | + 'test', |
793 | + {"method": "echo", |
794 | + "args": {"value": value}}) |
795 | + i = 0 |
796 | + for x in result: |
797 | + if i > 0: |
798 | + self.fail('should only receive one response') |
799 | + self.assertEqual(value + i, x) |
800 | + i += 1 |
801 | + |
802 | + def test_multicall_succeed_three_times(self): |
803 | + """Get a value through rpc call""" |
804 | + value = 42 |
805 | + result = rpc.multicall(self.context, |
806 | + 'test', |
807 | + {"method": "echo_three_times", |
808 | + "args": {"value": value}}) |
809 | + i = 0 |
810 | + for x in result: |
811 | + self.assertEqual(value + i, x) |
812 | + i += 1 |
813 | + |
814 | + def test_multicall_succeed_three_times_yield(self): |
815 | + """Get a value through rpc call""" |
816 | + value = 42 |
817 | + result = rpc.multicall(self.context, |
818 | + 'test', |
819 | + {"method": "echo_three_times_yield", |
820 | + "args": {"value": value}}) |
821 | + i = 0 |
822 | + for x in result: |
823 | + self.assertEqual(value + i, x) |
824 | + i += 1 |
825 | + |
826 | def test_context_passed(self): |
827 | """Makes sure a context is passed through rpc call""" |
828 | value = 42 |
829 | @@ -127,6 +180,18 @@ |
830 | return context.to_dict() |
831 | |
832 | @staticmethod |
833 | + def echo_three_times(context, value): |
834 | + context.reply(value) |
835 | + context.reply(value + 1) |
836 | + context.reply(value + 2) |
837 | + |
838 | + @staticmethod |
839 | + def echo_three_times_yield(context, value): |
840 | + yield value |
841 | + yield value + 1 |
842 | + yield value + 2 |
843 | + |
844 | + @staticmethod |
845 | def fail(context, value): |
846 | """Raises an exception with the value sent in""" |
847 | raise Exception(value) |
848 | |
849 | === modified file 'nova/tests/test_service.py' |
850 | --- nova/tests/test_service.py 2011-03-17 13:35:00 +0000 |
851 | +++ nova/tests/test_service.py 2011-05-20 01:34:25 +0000 |
852 | @@ -106,7 +106,10 @@ |
853 | |
854 | # NOTE(vish): Create was moved out of mox replay to make sure that |
855 | # the looping calls are created in StartService. |
856 | - app = service.Service.create(host=host, binary=binary) |
857 | + app = service.Service.create(host=host, binary=binary, topic=topic) |
858 | + |
859 | + self.mox.StubOutWithMock(service.rpc.Connection, 'instance') |
860 | + service.rpc.Connection.instance(new=mox.IgnoreArg()) |
861 | |
862 | self.mox.StubOutWithMock(rpc, |
863 | 'TopicAdapterConsumer', |
864 | @@ -114,6 +117,11 @@ |
865 | self.mox.StubOutWithMock(rpc, |
866 | 'FanoutAdapterConsumer', |
867 | use_mock_anything=True) |
868 | + |
869 | + self.mox.StubOutWithMock(rpc, |
870 | + 'ConsumerSet', |
871 | + use_mock_anything=True) |
872 | + |
873 | rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), |
874 | topic=topic, |
875 | proxy=mox.IsA(service.Service)).AndReturn( |
876 | @@ -129,9 +137,13 @@ |
877 | proxy=mox.IsA(service.Service)).AndReturn( |
878 | rpc.FanoutAdapterConsumer) |
879 | |
880 | - rpc.TopicAdapterConsumer.attach_to_eventlet() |
881 | - rpc.TopicAdapterConsumer.attach_to_eventlet() |
882 | - rpc.FanoutAdapterConsumer.attach_to_eventlet() |
883 | + def wait_func(self, limit=None): |
884 | + return None |
885 | + |
886 | + mock_cset = self.mox.CreateMock(rpc.ConsumerSet, |
887 | + {'wait': wait_func}) |
888 | + rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset) |
889 | + wait_func(mox.IgnoreArg()) |
890 | |
891 | service_create = {'host': host, |
892 | 'binary': binary, |
893 | @@ -287,8 +299,41 @@ |
894 | # Creating mocks |
895 | self.mox.StubOutWithMock(service.rpc.Connection, 'instance') |
896 | service.rpc.Connection.instance(new=mox.IgnoreArg()) |
897 | - service.rpc.Connection.instance(new=mox.IgnoreArg()) |
898 | - service.rpc.Connection.instance(new=mox.IgnoreArg()) |
899 | + |
900 | + self.mox.StubOutWithMock(rpc, |
901 | + 'TopicAdapterConsumer', |
902 | + use_mock_anything=True) |
903 | + self.mox.StubOutWithMock(rpc, |
904 | + 'FanoutAdapterConsumer', |
905 | + use_mock_anything=True) |
906 | + |
907 | + self.mox.StubOutWithMock(rpc, |
908 | + 'ConsumerSet', |
909 | + use_mock_anything=True) |
910 | + |
911 | + rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), |
912 | + topic=topic, |
913 | + proxy=mox.IsA(service.Service)).AndReturn( |
914 | + rpc.TopicAdapterConsumer) |
915 | + |
916 | + rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), |
917 | + topic='%s.%s' % (topic, host), |
918 | + proxy=mox.IsA(service.Service)).AndReturn( |
919 | + rpc.TopicAdapterConsumer) |
920 | + |
921 | + rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(), |
922 | + topic=topic, |
923 | + proxy=mox.IsA(service.Service)).AndReturn( |
924 | + rpc.FanoutAdapterConsumer) |
925 | + |
926 | + def wait_func(self, limit=None): |
927 | + return None |
928 | + |
929 | + mock_cset = self.mox.CreateMock(rpc.ConsumerSet, |
930 | + {'wait': wait_func}) |
931 | + rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset) |
932 | + wait_func(mox.IgnoreArg()) |
933 | + |
934 | self.mox.StubOutWithMock(serv.manager.driver, |
935 | 'update_available_resource') |
936 | serv.manager.driver.update_available_resource(mox.IgnoreArg(), host) |
937 | |
938 | === modified file 'nova/volume/api.py' |
939 | --- nova/volume/api.py 2011-03-31 08:39:00 +0000 |
940 | +++ nova/volume/api.py 2011-05-20 01:34:25 +0000 |
941 | @@ -20,14 +20,14 @@ |
942 | Handles all requests relating to volumes. |
943 | """ |
944 | |
945 | -import datetime |
946 | +import eventlet |
947 | |
948 | -from nova import db |
949 | from nova import exception |
950 | from nova import flags |
951 | from nova import log as logging |
952 | from nova import quota |
953 | from nova import rpc |
954 | +from nova import utils |
955 | from nova.db import base |
956 | |
957 | FLAGS = flags.FLAGS |
958 | @@ -57,26 +57,60 @@ |
959 | 'display_name': name, |
960 | 'display_description': description} |
961 | |
962 | - volume = self.db.volume_create(context, options) |
963 | - rpc.cast(context, |
964 | - FLAGS.scheduler_topic, |
965 | - {"method": "create_volume", |
966 | - "args": {"topic": FLAGS.volume_topic, |
967 | - "volume_id": volume['id']}}) |
968 | - return volume |
969 | + volume_ref = self.db.volume_create(context, options) |
970 | + volume_ref = utils.to_primitive(dict(volume_ref)) |
971 | + |
972 | + def delayed_create(volume_ref): |
973 | + vid = volume_ref['id'] |
974 | + try: |
975 | + rvs = rpc.multicall(context, |
976 | + FLAGS.scheduler_topic, |
977 | + {"method": "create_volume", |
978 | + "args": {"topic": FLAGS.volume_topic, |
979 | + "volume_ref": volume_ref}}) |
980 | + for volume_ref in rvs: |
981 | + self.db.volume_update(context, vid, volume_ref) |
982 | + volume_ref['launched_at'] = utils.utcnow() |
983 | + self.db.volume_update(context, vid, volume_ref) |
984 | + |
985 | + except rpc.RemoteError: |
986 | + self.db.volume_update(context, vid, {'status': 'error'}) |
987 | + |
988 | + eventlet.spawn_n(delayed_create, volume_ref) |
989 | + return volume_ref |
990 | |
991 | def delete(self, context, volume_id): |
992 | - volume = self.get(context, volume_id) |
993 | - if volume['status'] != "available": |
994 | + volume_ref = self.get(context, volume_id) |
995 | + if volume_ref['status'] != "available": |
996 | raise exception.ApiError(_("Volume status must be available")) |
997 | - now = datetime.datetime.utcnow() |
998 | - self.db.volume_update(context, volume_id, {'status': 'deleting', |
999 | - 'terminated_at': now}) |
1000 | - host = volume['host'] |
1001 | - rpc.cast(context, |
1002 | - self.db.queue_get_for(context, FLAGS.volume_topic, host), |
1003 | - {"method": "delete_volume", |
1004 | - "args": {"volume_id": volume_id}}) |
1005 | + if volume_ref['attach_status'] == "attached": |
1006 | + raise exception.Error(_("Volume is still attached")) |
1007 | + |
1008 | + volume_ref['status'] = 'deleting' |
1009 | + volume_ref['terminated_at'] = utils.utcnow() |
1010 | + self.db.volume_update(context, volume_ref['id'], volume_ref) |
1011 | + volume_ref = utils.to_primitive(dict(volume_ref)) |
1012 | + |
1013 | + def delayed_delete(volume_ref): |
1014 | + vid = volume_ref['id'] |
1015 | + try: |
1016 | + topic = self.db.queue_get_for(context, |
1017 | + FLAGS.volume_topic, |
1018 | + volume_ref['host']) |
1019 | + rvs = rpc.multicall(context, |
1020 | + topic, |
1021 | + {"method": "delete_volume", |
1022 | + "args": {"volume_ref": volume_ref}}) |
1023 | + for volume_ref in rvs: |
1024 | + self.db.volume_update(context, vid, volume_ref) |
1025 | + |
1026 | + self.db.volume_destroy(context, vid) |
1027 | + |
1028 | + except rpc.RemoteError: |
1029 | + self.db.volume_update(context, vid, {'status': 'err_delete'}) |
1030 | + |
1031 | + eventlet.spawn_n(delayed_delete, volume_ref) |
1032 | + return True |
1033 | |
1034 | def update(self, context, volume_id, fields): |
1035 | self.db.volume_update(context, volume_id, fields) |
1036 | |
1037 | === modified file 'nova/volume/manager.py' |
1038 | --- nova/volume/manager.py 2011-03-17 13:35:00 +0000 |
1039 | +++ nova/volume/manager.py 2011-05-20 01:34:25 +0000 |
1040 | @@ -90,67 +90,40 @@ |
1041 | else: |
1042 | LOG.info(_("volume %s: skipping export"), volume['name']) |
1043 | |
1044 | - def create_volume(self, context, volume_id): |
1045 | + def create_volume(self, context, volume_ref): |
1046 | """Creates and exports the volume.""" |
1047 | - context = context.elevated() |
1048 | - volume_ref = self.db.volume_get(context, volume_id) |
1049 | LOG.info(_("volume %s: creating"), volume_ref['name']) |
1050 | |
1051 | - self.db.volume_update(context, |
1052 | - volume_id, |
1053 | - {'host': self.host}) |
1054 | - # NOTE(vish): so we don't have to get volume from db again |
1055 | - # before passing it to the driver. |
1056 | volume_ref['host'] = self.host |
1057 | - |
1058 | - try: |
1059 | - vol_name = volume_ref['name'] |
1060 | - vol_size = volume_ref['size'] |
1061 | - LOG.debug(_("volume %(vol_name)s: creating lv of" |
1062 | - " size %(vol_size)sG") % locals()) |
1063 | - model_update = self.driver.create_volume(volume_ref) |
1064 | - if model_update: |
1065 | - self.db.volume_update(context, volume_ref['id'], model_update) |
1066 | - |
1067 | - LOG.debug(_("volume %s: creating export"), volume_ref['name']) |
1068 | - model_update = self.driver.create_export(context, volume_ref) |
1069 | - if model_update: |
1070 | - self.db.volume_update(context, volume_ref['id'], model_update) |
1071 | - except Exception: |
1072 | - self.db.volume_update(context, |
1073 | - volume_ref['id'], {'status': 'error'}) |
1074 | - raise |
1075 | - |
1076 | - now = datetime.datetime.utcnow() |
1077 | - self.db.volume_update(context, |
1078 | - volume_ref['id'], {'status': 'available', |
1079 | - 'launched_at': now}) |
1080 | + yield volume_ref |
1081 | + |
1082 | + vol_name = volume_ref['name'] |
1083 | + vol_size = volume_ref['size'] |
1084 | + LOG.debug(_("volume %(vol_name)s: creating lv of" |
1085 | + " size %(vol_size)sG") % locals()) |
1086 | + model_update = self.driver.create_volume(volume_ref) |
1087 | + if model_update: |
1088 | + volume_ref.update(model_update) |
1089 | + yield volume_ref |
1090 | + |
1091 | + LOG.debug(_("volume %s: creating export"), volume_ref['name']) |
1092 | + model_update = self.driver.create_export(context, volume_ref) |
1093 | + if model_update: |
1094 | + volume_ref.update(model_update) |
1095 | + yield volume_ref |
1096 | + |
1097 | LOG.debug(_("volume %s: created successfully"), volume_ref['name']) |
1098 | - return volume_id |
1099 | + volume_ref['status'] = 'available' |
1100 | + yield volume_ref |
1101 | |
1102 | - def delete_volume(self, context, volume_id): |
1103 | + def delete_volume(self, context, volume_ref): |
1104 | """Deletes and unexports volume.""" |
1105 | - context = context.elevated() |
1106 | - volume_ref = self.db.volume_get(context, volume_id) |
1107 | - if volume_ref['attach_status'] == "attached": |
1108 | - raise exception.Error(_("Volume is still attached")) |
1109 | - if volume_ref['host'] != self.host: |
1110 | - raise exception.Error(_("Volume is not local to this node")) |
1111 | - |
1112 | - try: |
1113 | - LOG.debug(_("volume %s: removing export"), volume_ref['name']) |
1114 | - self.driver.remove_export(context, volume_ref) |
1115 | - LOG.debug(_("volume %s: deleting"), volume_ref['name']) |
1116 | - self.driver.delete_volume(volume_ref) |
1117 | - except Exception: |
1118 | - self.db.volume_update(context, |
1119 | - volume_ref['id'], |
1120 | - {'status': 'error_deleting'}) |
1121 | - raise |
1122 | - |
1123 | - self.db.volume_destroy(context, volume_id) |
1124 | + LOG.debug(_("volume %s: removing export"), volume_ref['name']) |
1125 | + self.driver.remove_export(context, volume_ref) |
1126 | + LOG.debug(_("volume %s: deleting"), volume_ref['name']) |
1127 | + self.driver.delete_volume(volume_ref) |
1128 | LOG.debug(_("volume %s: deleted successfully"), volume_ref['name']) |
1129 | - return True |
1130 | + yield volume_ref |
1131 | |
1132 | def setup_compute_volume(self, context, volume_id): |
1133 | """Setup remote volume on compute host. |
accidentally pushed an old version. This is the right one.