Merge lp:~vishvananda/nova/no-db-messaging into lp:~hudson-openstack/nova/trunk
- no-db-messaging
- Merge into trunk
Status: | Rejected |
---|---|
Rejected by: | Vish Ishaya |
Proposed branch: | lp:~vishvananda/nova/no-db-messaging |
Merge into: | lp:~hudson-openstack/nova/trunk |
Diff against target: |
1133 lines (+486/-214) 12 files modified
nova/db/sqlalchemy/models.py (+12/-4) nova/fakerabbit.py (+23/-8) nova/rpc.py (+195/-70) nova/scheduler/manager.py (+7/-5) nova/service.py (+38/-24) nova/test.py (+6/-3) nova/tests/integrated/integrated_helpers.py (+1/-4) nova/tests/test_cloud.py (+9/-18) nova/tests/test_rpc.py (+65/-0) nova/tests/test_service.py (+51/-6) nova/volume/api.py (+53/-19) nova/volume/manager.py (+26/-53) |
To merge this branch: | bzr merge lp:~vishvananda/nova/no-db-messaging |
Related bugs: | |
Related blueprints: |
No DB Messaging
(High)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Dan Prince (community) | Needs Fixing | ||
Review via email: mp+61189@code.launchpad.net |
Commit message
Description of the change
This is an initial proposal for feedback. This branch is an attempt to start on this blueprint:
https:/
which will allow for the implementation of this blueprint:
https:/
And will ultimately make it easy to replace our various services with external projects.
This prototype changes volume_create to pass data through the queue instead of writing information to the database and reading it on the other end. It attempts to make minimal changes. It includes:
* a small change to model code to allow for conversion into dicts
* scheduler uses call instead of cast
* volume.manager provides a volume_get to query the state of a volume
* volume.api creates volume with a call and then spawn greenthread to poll status
* volume.manager returns initial data immediately and then spawn a greenthread to do further work
This will allow us to communicate with an external REST api very easily, by simply turning the two rpc calls into POST and GET. It also allows us to separate out the database for volumes, so the volume worker doesn't need to write to the shared zone database
Outstanding Issues for database split:
* we have to use something like remote_id to refer to the volume when making requests to nova-volume because the id in our local database will not match. Ultimately we've discussed switching to UUIDs, so It may be good to do that change at the same time.
* if the state of a volume changes after the initial poll has finished, we will never know about it. We ultimately need a way for api to be notified if the volume status changes so that we can update our local data, or we need a background worker that is polling the service for changes at an irregular interval.
I'm open to feedback about this approach. I tried a few other versions and this seems like the simplest change set to get what we want. If this looks good, I will modify the other volume commands to work the same way and propose a similar set of changes for compute.
Vish Ishaya (vishvananda) wrote : | # |
- 1078. By Eldar Nugaev
-
Improved error notification in network create
- 1079. By Eldar Nugaev
-
Added network_info into refresh_
security_ group_rules
That fixs https://bugs.launchpad .net/nova/ +bug/773308 - 1080. By Mark Washenberger
-
Convert instance_type_ids in the instances table from strings to integers to enable joins with instance_types. This in particular fixes a problem when using postgresql.
Dan Prince (dan-prince) wrote : | # |
Hey Vish,
I just smoke stacked this and got some errors. It looks like the changes to models.py aren't quite right.
I'm getting the following in nova-api.log:
2011-05-17 17:17:18,574 ERROR nova.api.openstack [-] Caught error: 'AuthToken' object has no attribute 'server_
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
(nova.api.
--
Running a simple 'nova list' command (using the OSAPI) should allow you to reproduce this.
Vish Ishaya (vishvananda) wrote : | # |
Thanks dan, I'll look at this. Also had some feedback from termie offline, and I'm moving where the polling is happening.
- 1081. By Josh Kearney
-
Added missing metadata join to instance_get calls.
- 1082. By Matt Dietz
-
Fixes improper attribute naming around instance types that broke Resizes.
- 1083. By termie
-
Docstring cleanup and formatting (nova/network dir). Minor style fixes as well.
- 1084. By Vish Ishaya
-
Fixes the naming of the server_
management_ url in auth and tests. - 1085. By Josh Kearney
-
Added missing xenhost plugin. This was causing warnings to pop up in the compute logs during periodic_task runs. It must have not been bzr add'd when this code was merged.
- 1086. By Matt Dietz
-
Implements a basic mechanism for pushing notifications out to interested parties. The rationale for implementing notifications this way is that the responsibility for them shouldn't fall to Nova. As such, we simply will be pushing messages to a queue where another worker entirely can be written to push messages around to subscribers.
- 1087. By Johannes Erdfelt
-
Simple change to sort the list of controllers/methods before printing to make it easier to read
- 1088. By termie
-
add support to rpc for multicall
- 1089. By termie
-
make the test more expicit
- 1090. By termie
-
add commented out unworking code for yield-based returns
- 1091. By Chris Behrens
-
Add a connection pool for rpc cast/call
Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each. - 1092. By Chris Behrens
-
pep8 and comment fixes
- 1093. By Chris Behrens
-
convert fanout_cast to ConnectionPool
- 1094. By Chris Behrens
-
fakerabbit's declare_consumer should support more than 1 consumer. also: make fakerabbit Backend.consume be an iterator like it should be..
- 1095. By Vish Ishaya
-
fix consumers to actually be deleted and clean up cloud test
- 1096. By Chris Behrens
-
catch greenlet.
GreenletExit when shutting service down - 1097. By Chris Behrens
-
Always create Service consumers no matter if report_interval is 0
Fix tests to handle how Service loads Consumers now - 1098. By Chris Behrens
-
Add rpc_conn_pool_size flag for the new connection pool
- 1099. By termie
-
bring back commits lost in merge
- 1100. By termie
-
almost everything working with fake_rabbit
- 1101. By termie
-
don't need to use a separate connection
- 1102. By Vish Ishaya
-
lots of fixes for rpc and extra imports
- 1103. By termie
-
make sure that using multicall on a call with a single result still functions
- 1104. By termie
-
cleanup the code for merging
- 1105. By Vish Ishaya
-
update manager to use multicall
- 1106. By Vish Ishaya
-
fix conversion of models to dicts
- 1107. By Vish Ishaya
-
change volume_api to have delayed_create
- 1108. By Vish Ishaya
-
make scheduler use multicall
- 1109. By Vish Ishaya
-
remove the unnecessary try except in manager
- 1110. By Vish Ishaya
-
pep8
Vish Ishaya (vishvananda) wrote : | # |
reproposing this with a prereq branch.
Unmerged revisions
- 1123. By Vish Ishaya
-
merged trunk
- 1122. By Vish Ishaya
-
remove merge error calling failing test
- 1121. By Vish Ishaya
-
fix snapshot test
- 1120. By Vish Ishaya
-
return not yield in scheduler shortcut
- 1119. By Vish Ishaya
-
make sure to handle VolumeIsBusy
- 1118. By Vish Ishaya
-
merged trunk and removed conflicts
- 1117. By Vish Ishaya
-
lost some changes from rpc branch, bring them in manually
- 1116. By Vish Ishaya
-
use strtime for passing datetimes back and forth through the queue
- 1115. By Vish Ishaya
-
fix tests
- 1114. By Vish Ishaya
-
keep the database on the receiving end as well
Preview Diff
1 | === modified file 'nova/db/sqlalchemy/models.py' | |||
2 | --- nova/db/sqlalchemy/models.py 2011-05-17 20:50:12 +0000 | |||
3 | +++ nova/db/sqlalchemy/models.py 2011-05-20 01:34:25 +0000 | |||
4 | @@ -77,17 +77,25 @@ | |||
5 | 77 | return getattr(self, key, default) | 77 | return getattr(self, key, default) |
6 | 78 | 78 | ||
7 | 79 | def __iter__(self): | 79 | def __iter__(self): |
9 | 80 | self._i = iter(object_mapper(self).columns) | 80 | # NOTE(vish): include name property in the iterator |
10 | 81 | columns = dict(object_mapper(self).columns).keys() | ||
11 | 82 | name = self.get('name') | ||
12 | 83 | if name: | ||
13 | 84 | columns.append('name') | ||
14 | 85 | self._i = iter(columns) | ||
15 | 81 | return self | 86 | return self |
16 | 82 | 87 | ||
17 | 83 | def next(self): | 88 | def next(self): |
19 | 84 | n = self._i.next().name | 89 | n = self._i.next() |
20 | 85 | return n, getattr(self, n) | 90 | return n, getattr(self, n) |
21 | 86 | 91 | ||
22 | 87 | def update(self, values): | 92 | def update(self, values): |
23 | 88 | """Make the model object behave like a dict""" | 93 | """Make the model object behave like a dict""" |
26 | 89 | for k, v in values.iteritems(): | 94 | columns = dict(object_mapper(self).columns).keys() |
27 | 90 | setattr(self, k, v) | 95 | for key, value in values.iteritems(): |
28 | 96 | # NOTE(vish): don't update the 'name' property | ||
29 | 97 | if key in columns: | ||
30 | 98 | setattr(self, key, value) | ||
31 | 91 | 99 | ||
32 | 92 | def iteritems(self): | 100 | def iteritems(self): |
33 | 93 | """Make the model object behave like a dict. | 101 | """Make the model object behave like a dict. |
34 | 94 | 102 | ||
35 | === modified file 'nova/fakerabbit.py' | |||
36 | --- nova/fakerabbit.py 2011-02-22 23:05:48 +0000 | |||
37 | +++ nova/fakerabbit.py 2011-05-20 01:34:25 +0000 | |||
38 | @@ -31,6 +31,7 @@ | |||
39 | 31 | 31 | ||
40 | 32 | EXCHANGES = {} | 32 | EXCHANGES = {} |
41 | 33 | QUEUES = {} | 33 | QUEUES = {} |
42 | 34 | CONSUMERS = {} | ||
43 | 34 | 35 | ||
44 | 35 | 36 | ||
45 | 36 | class Message(base.BaseMessage): | 37 | class Message(base.BaseMessage): |
46 | @@ -96,17 +97,29 @@ | |||
47 | 96 | ' key %(routing_key)s') % locals()) | 97 | ' key %(routing_key)s') % locals()) |
48 | 97 | EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) | 98 | EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) |
49 | 98 | 99 | ||
53 | 99 | def declare_consumer(self, queue, callback, *args, **kwargs): | 100 | def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): |
54 | 100 | self.current_queue = queue | 101 | global CONSUMERS |
55 | 101 | self.current_callback = callback | 102 | LOG.debug("Adding consumer %s", consumer_tag) |
56 | 103 | CONSUMERS[consumer_tag] = (queue, callback) | ||
57 | 104 | |||
58 | 105 | def cancel(self, consumer_tag): | ||
59 | 106 | global CONSUMERS | ||
60 | 107 | LOG.debug("Removing consumer %s", consumer_tag) | ||
61 | 108 | del CONSUMERS[consumer_tag] | ||
62 | 102 | 109 | ||
63 | 103 | def consume(self, limit=None): | 110 | def consume(self, limit=None): |
64 | 111 | global CONSUMERS | ||
65 | 112 | num = 0 | ||
66 | 104 | while True: | 113 | while True: |
72 | 105 | item = self.get(self.current_queue) | 114 | for (queue, callback) in CONSUMERS.itervalues(): |
73 | 106 | if item: | 115 | item = self.get(queue) |
74 | 107 | self.current_callback(item) | 116 | if item: |
75 | 108 | raise StopIteration() | 117 | callback(item) |
76 | 109 | greenthread.sleep(0) | 118 | num += 1 |
77 | 119 | yield | ||
78 | 120 | if limit and num == limit: | ||
79 | 121 | raise StopIteration() | ||
80 | 122 | greenthread.sleep(0.1) | ||
81 | 110 | 123 | ||
82 | 111 | def get(self, queue, no_ack=False): | 124 | def get(self, queue, no_ack=False): |
83 | 112 | global QUEUES | 125 | global QUEUES |
84 | @@ -134,5 +147,7 @@ | |||
85 | 134 | def reset_all(): | 147 | def reset_all(): |
86 | 135 | global EXCHANGES | 148 | global EXCHANGES |
87 | 136 | global QUEUES | 149 | global QUEUES |
88 | 150 | global CONSUMERS | ||
89 | 137 | EXCHANGES = {} | 151 | EXCHANGES = {} |
90 | 138 | QUEUES = {} | 152 | QUEUES = {} |
91 | 153 | CONSUMERS = {} | ||
92 | 139 | 154 | ||
93 | === modified file 'nova/rpc.py' | |||
94 | --- nova/rpc.py 2011-04-20 19:08:22 +0000 | |||
95 | +++ nova/rpc.py 2011-05-20 01:34:25 +0000 | |||
96 | @@ -33,7 +33,9 @@ | |||
97 | 33 | from carrot import connection as carrot_connection | 33 | from carrot import connection as carrot_connection |
98 | 34 | from carrot import messaging | 34 | from carrot import messaging |
99 | 35 | from eventlet import greenpool | 35 | from eventlet import greenpool |
101 | 36 | from eventlet import greenthread | 36 | from eventlet import pools |
102 | 37 | from eventlet import queue | ||
103 | 38 | import greenlet | ||
104 | 37 | 39 | ||
105 | 38 | from nova import context | 40 | from nova import context |
106 | 39 | from nova import exception | 41 | from nova import exception |
107 | @@ -47,7 +49,10 @@ | |||
108 | 47 | 49 | ||
109 | 48 | 50 | ||
110 | 49 | FLAGS = flags.FLAGS | 51 | FLAGS = flags.FLAGS |
112 | 50 | flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool') | 52 | flags.DEFINE_integer('rpc_thread_pool_size', 1024, |
113 | 53 | 'Size of RPC thread pool') | ||
114 | 54 | flags.DEFINE_integer('rpc_conn_pool_size', 30, | ||
115 | 55 | 'Size of RPC connection pool') | ||
116 | 51 | 56 | ||
117 | 52 | 57 | ||
118 | 53 | class Connection(carrot_connection.BrokerConnection): | 58 | class Connection(carrot_connection.BrokerConnection): |
119 | @@ -90,6 +95,17 @@ | |||
120 | 90 | return cls.instance() | 95 | return cls.instance() |
121 | 91 | 96 | ||
122 | 92 | 97 | ||
123 | 98 | class Pool(pools.Pool): | ||
124 | 99 | """Class that implements a Pool of Connections.""" | ||
125 | 100 | |||
126 | 101 | def create(self): | ||
127 | 102 | LOG.debug('Creating new connection') | ||
128 | 103 | return Connection.instance(new=True) | ||
129 | 104 | |||
130 | 105 | |||
131 | 106 | ConnectionPool = Pool(max_size=FLAGS.rpc_conn_pool_size) | ||
132 | 107 | |||
133 | 108 | |||
134 | 93 | class Consumer(messaging.Consumer): | 109 | class Consumer(messaging.Consumer): |
135 | 94 | """Consumer base class. | 110 | """Consumer base class. |
136 | 95 | 111 | ||
137 | @@ -131,7 +147,9 @@ | |||
138 | 131 | self.connection = Connection.recreate() | 147 | self.connection = Connection.recreate() |
139 | 132 | self.backend = self.connection.create_backend() | 148 | self.backend = self.connection.create_backend() |
140 | 133 | self.declare() | 149 | self.declare() |
142 | 134 | super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) | 150 | return super(Consumer, self).fetch(no_ack, |
143 | 151 | auto_ack, | ||
144 | 152 | enable_callbacks) | ||
145 | 135 | if self.failed_connection: | 153 | if self.failed_connection: |
146 | 136 | LOG.error(_('Reconnected to queue')) | 154 | LOG.error(_('Reconnected to queue')) |
147 | 137 | self.failed_connection = False | 155 | self.failed_connection = False |
148 | @@ -159,13 +177,13 @@ | |||
149 | 159 | self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) | 177 | self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) |
150 | 160 | super(AdapterConsumer, self).__init__(connection=connection, | 178 | super(AdapterConsumer, self).__init__(connection=connection, |
151 | 161 | topic=topic) | 179 | topic=topic) |
159 | 162 | 180 | self.register_callback(self.process_data) | |
160 | 163 | def receive(self, *args, **kwargs): | 181 | |
161 | 164 | self.pool.spawn_n(self._receive, *args, **kwargs) | 182 | def process_data(self, message_data, message): |
162 | 165 | 183 | """Consumer callback to call a method on a proxy object. | |
163 | 166 | @exception.wrap_exception | 184 | |
164 | 167 | def _receive(self, message_data, message): | 185 | Parses the message for validity and fires off a thread to call the |
165 | 168 | """Magically looks for a method on the proxy object and calls it. | 186 | proxy object method. |
166 | 169 | 187 | ||
167 | 170 | Message data should be a dictionary with two keys: | 188 | Message data should be a dictionary with two keys: |
168 | 171 | method: string representing the method to call | 189 | method: string representing the method to call |
169 | @@ -175,8 +193,8 @@ | |||
170 | 175 | 193 | ||
171 | 176 | """ | 194 | """ |
172 | 177 | LOG.debug(_('received %s') % message_data) | 195 | LOG.debug(_('received %s') % message_data) |
175 | 178 | msg_id = message_data.pop('_msg_id', None) | 196 | # This will be popped off in _unpack_context |
176 | 179 | 197 | msg_id = message_data.get('_msg_id', None) | |
177 | 180 | ctxt = _unpack_context(message_data) | 198 | ctxt = _unpack_context(message_data) |
178 | 181 | 199 | ||
179 | 182 | method = message_data.get('method') | 200 | method = message_data.get('method') |
180 | @@ -190,6 +208,13 @@ | |||
181 | 190 | LOG.warn(_('no method for message: %s') % message_data) | 208 | LOG.warn(_('no method for message: %s') % message_data) |
182 | 191 | msg_reply(msg_id, _('No method for message: %s') % message_data) | 209 | msg_reply(msg_id, _('No method for message: %s') % message_data) |
183 | 192 | return | 210 | return |
184 | 211 | self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args) | ||
185 | 212 | |||
186 | 213 | @exception.wrap_exception | ||
187 | 214 | def _process_data(self, msg_id, ctxt, method, args): | ||
188 | 215 | """Thread that maigcally looks for a method on the proxy | ||
189 | 216 | object and calls it. | ||
190 | 217 | """ | ||
191 | 193 | 218 | ||
192 | 194 | node_func = getattr(self.proxy, str(method)) | 219 | node_func = getattr(self.proxy, str(method)) |
193 | 195 | node_args = dict((str(k), v) for k, v in args.iteritems()) | 220 | node_args = dict((str(k), v) for k, v in args.iteritems()) |
194 | @@ -197,7 +222,15 @@ | |||
195 | 197 | try: | 222 | try: |
196 | 198 | rval = node_func(context=ctxt, **node_args) | 223 | rval = node_func(context=ctxt, **node_args) |
197 | 199 | if msg_id: | 224 | if msg_id: |
199 | 200 | msg_reply(msg_id, rval, None) | 225 | # Check if the result was a generator |
200 | 226 | if hasattr(rval, 'send'): | ||
201 | 227 | for x in rval: | ||
202 | 228 | msg_reply(msg_id, x, None) | ||
203 | 229 | else: | ||
204 | 230 | msg_reply(msg_id, rval, None) | ||
205 | 231 | |||
206 | 232 | # This final None tells multicall that it is done. | ||
207 | 233 | msg_reply(msg_id, None, None) | ||
208 | 201 | except Exception as e: | 234 | except Exception as e: |
209 | 202 | logging.exception('Exception during message handling') | 235 | logging.exception('Exception during message handling') |
210 | 203 | if msg_id: | 236 | if msg_id: |
211 | @@ -205,11 +238,6 @@ | |||
212 | 205 | return | 238 | return |
213 | 206 | 239 | ||
214 | 207 | 240 | ||
215 | 208 | class Publisher(messaging.Publisher): | ||
216 | 209 | """Publisher base class.""" | ||
217 | 210 | pass | ||
218 | 211 | |||
219 | 212 | |||
220 | 213 | class TopicAdapterConsumer(AdapterConsumer): | 241 | class TopicAdapterConsumer(AdapterConsumer): |
221 | 214 | """Consumes messages on a specific topic.""" | 242 | """Consumes messages on a specific topic.""" |
222 | 215 | 243 | ||
223 | @@ -242,6 +270,59 @@ | |||
224 | 242 | topic=topic, proxy=proxy) | 270 | topic=topic, proxy=proxy) |
225 | 243 | 271 | ||
226 | 244 | 272 | ||
227 | 273 | class ConsumerSet(object): | ||
228 | 274 | """Groups consumers to listen on together on a single connection.""" | ||
229 | 275 | |||
230 | 276 | def __init__(self, conn, consumer_list): | ||
231 | 277 | self.consumer_list = set(consumer_list) | ||
232 | 278 | self.consumer_set = None | ||
233 | 279 | self.enabled = True | ||
234 | 280 | self.init(conn) | ||
235 | 281 | |||
236 | 282 | def init(self, conn): | ||
237 | 283 | if not conn: | ||
238 | 284 | conn = Connection.instance(new=True) | ||
239 | 285 | if self.consumer_set: | ||
240 | 286 | self.consumer_set.close() | ||
241 | 287 | self.consumer_set = messaging.ConsumerSet(conn) | ||
242 | 288 | for consumer in self.consumer_list: | ||
243 | 289 | consumer.connection = conn | ||
244 | 290 | # consumer.backend is set for us | ||
245 | 291 | self.consumer_set.add_consumer(consumer) | ||
246 | 292 | |||
247 | 293 | def reconnect(self): | ||
248 | 294 | self.init(None) | ||
249 | 295 | |||
250 | 296 | def wait(self, limit=None): | ||
251 | 297 | running = True | ||
252 | 298 | while running: | ||
253 | 299 | it = self.consumer_set.iterconsume(limit=limit) | ||
254 | 300 | if not it: | ||
255 | 301 | break | ||
256 | 302 | while True: | ||
257 | 303 | try: | ||
258 | 304 | it.next() | ||
259 | 305 | except StopIteration: | ||
260 | 306 | return | ||
261 | 307 | except greenlet.GreenletExit: | ||
262 | 308 | running = False | ||
263 | 309 | break | ||
264 | 310 | except Exception as e: | ||
265 | 311 | LOG.error(_("Received exception %s " % type(e) + \ | ||
266 | 312 | "while processing consumer")) | ||
267 | 313 | self.reconnect() | ||
268 | 314 | # Break to outer loop | ||
269 | 315 | break | ||
270 | 316 | |||
271 | 317 | def close(self): | ||
272 | 318 | self.consumer_set.close() | ||
273 | 319 | |||
274 | 320 | |||
275 | 321 | class Publisher(messaging.Publisher): | ||
276 | 322 | """Publisher base class.""" | ||
277 | 323 | pass | ||
278 | 324 | |||
279 | 325 | |||
280 | 245 | class TopicPublisher(Publisher): | 326 | class TopicPublisher(Publisher): |
281 | 246 | """Publishes messages on a specific topic.""" | 327 | """Publishes messages on a specific topic.""" |
282 | 247 | 328 | ||
283 | @@ -306,16 +387,18 @@ | |||
284 | 306 | LOG.error(_("Returning exception %s to caller"), message) | 387 | LOG.error(_("Returning exception %s to caller"), message) |
285 | 307 | LOG.error(tb) | 388 | LOG.error(tb) |
286 | 308 | failure = (failure[0].__name__, str(failure[1]), tb) | 389 | failure = (failure[0].__name__, str(failure[1]), tb) |
297 | 309 | conn = Connection.instance() | 390 | |
298 | 310 | publisher = DirectPublisher(connection=conn, msg_id=msg_id) | 391 | with ConnectionPool.item() as conn: |
299 | 311 | try: | 392 | publisher = DirectPublisher(connection=conn, msg_id=msg_id) |
300 | 312 | publisher.send({'result': reply, 'failure': failure}) | 393 | try: |
301 | 313 | except TypeError: | 394 | publisher.send({'result': reply, 'failure': failure}) |
302 | 314 | publisher.send( | 395 | except TypeError: |
303 | 315 | {'result': dict((k, repr(v)) | 396 | publisher.send( |
304 | 316 | for k, v in reply.__dict__.iteritems()), | 397 | {'result': dict((k, repr(v)) |
305 | 317 | 'failure': failure}) | 398 | for k, v in reply.__dict__.iteritems()), |
306 | 318 | publisher.close() | 399 | 'failure': failure}) |
307 | 400 | |||
308 | 401 | publisher.close() | ||
309 | 319 | 402 | ||
310 | 320 | 403 | ||
311 | 321 | class RemoteError(exception.Error): | 404 | class RemoteError(exception.Error): |
312 | @@ -347,8 +430,9 @@ | |||
313 | 347 | if key.startswith('_context_'): | 430 | if key.startswith('_context_'): |
314 | 348 | value = msg.pop(key) | 431 | value = msg.pop(key) |
315 | 349 | context_dict[key[9:]] = value | 432 | context_dict[key[9:]] = value |
316 | 433 | context_dict['msg_id'] = msg.pop('_msg_id', None) | ||
317 | 350 | LOG.debug(_('unpacked context: %s'), context_dict) | 434 | LOG.debug(_('unpacked context: %s'), context_dict) |
319 | 351 | return context.RequestContext.from_dict(context_dict) | 435 | return RpcContext.from_dict(context_dict) |
320 | 352 | 436 | ||
321 | 353 | 437 | ||
322 | 354 | def _pack_context(msg, context): | 438 | def _pack_context(msg, context): |
323 | @@ -360,70 +444,110 @@ | |||
324 | 360 | for args at some point. | 444 | for args at some point. |
325 | 361 | 445 | ||
326 | 362 | """ | 446 | """ |
334 | 363 | context = dict([('_context_%s' % key, value) | 447 | context_d = dict([('_context_%s' % key, value) |
335 | 364 | for (key, value) in context.to_dict().iteritems()]) | 448 | for (key, value) in context.to_dict().iteritems()]) |
336 | 365 | msg.update(context) | 449 | msg.update(context_d) |
337 | 366 | 450 | ||
338 | 367 | 451 | ||
339 | 368 | def call(context, topic, msg): | 452 | class RpcContext(context.RequestContext): |
340 | 369 | """Sends a message on a topic and wait for a response.""" | 453 | def __init__(self, *args, **kwargs): |
341 | 454 | msg_id = kwargs.pop('msg_id', None) | ||
342 | 455 | self.msg_id = msg_id | ||
343 | 456 | super(RpcContext, self).__init__(*args, **kwargs) | ||
344 | 457 | |||
345 | 458 | def reply(self, *args, **kwargs): | ||
346 | 459 | msg_reply(self.msg_id, *args, **kwargs) | ||
347 | 460 | |||
348 | 461 | |||
349 | 462 | def multicall(context, topic, msg): | ||
350 | 463 | """Make a call that returns multiple times.""" | ||
351 | 370 | LOG.debug(_('Making asynchronous call on %s ...'), topic) | 464 | LOG.debug(_('Making asynchronous call on %s ...'), topic) |
352 | 371 | msg_id = uuid.uuid4().hex | 465 | msg_id = uuid.uuid4().hex |
353 | 372 | msg.update({'_msg_id': msg_id}) | 466 | msg.update({'_msg_id': msg_id}) |
354 | 373 | LOG.debug(_('MSG_ID is %s') % (msg_id)) | 467 | LOG.debug(_('MSG_ID is %s') % (msg_id)) |
355 | 374 | _pack_context(msg, context) | 468 | _pack_context(msg, context) |
356 | 375 | 469 | ||
369 | 376 | class WaitMessage(object): | 470 | con_conn = ConnectionPool.get() |
370 | 377 | def __call__(self, data, message): | 471 | consumer = DirectConsumer(connection=con_conn, msg_id=msg_id) |
371 | 378 | """Acks message and sets result.""" | 472 | wait_msg = MulticallWaiter(consumer) |
360 | 379 | message.ack() | ||
361 | 380 | if data['failure']: | ||
362 | 381 | self.result = RemoteError(*data['failure']) | ||
363 | 382 | else: | ||
364 | 383 | self.result = data['result'] | ||
365 | 384 | |||
366 | 385 | wait_msg = WaitMessage() | ||
367 | 386 | conn = Connection.instance() | ||
368 | 387 | consumer = DirectConsumer(connection=conn, msg_id=msg_id) | ||
372 | 388 | consumer.register_callback(wait_msg) | 473 | consumer.register_callback(wait_msg) |
373 | 389 | 474 | ||
376 | 390 | conn = Connection.instance() | 475 | publisher = TopicPublisher(connection=con_conn, topic=topic) |
375 | 391 | publisher = TopicPublisher(connection=conn, topic=topic) | ||
377 | 392 | publisher.send(msg) | 476 | publisher.send(msg) |
378 | 393 | publisher.close() | 477 | publisher.close() |
379 | 394 | 478 | ||
392 | 395 | try: | 479 | return wait_msg |
393 | 396 | consumer.wait(limit=1) | 480 | |
394 | 397 | except StopIteration: | 481 | |
395 | 398 | pass | 482 | class MulticallWaiter(object): |
396 | 399 | consumer.close() | 483 | def __init__(self, consumer): |
397 | 400 | # NOTE(termie): this is a little bit of a change from the original | 484 | self._consumer = consumer |
398 | 401 | # non-eventlet code where returning a Failure | 485 | self._results = queue.Queue() |
399 | 402 | # instance from a deferred call is very similar to | 486 | self._closed = False |
400 | 403 | # raising an exception | 487 | |
401 | 404 | if isinstance(wait_msg.result, Exception): | 488 | def close(self): |
402 | 405 | raise wait_msg.result | 489 | self._closed = True |
403 | 406 | return wait_msg.result | 490 | self._consumer.close() |
404 | 491 | ConnectionPool.put(self._consumer.connection) | ||
405 | 492 | |||
406 | 493 | def __call__(self, data, message): | ||
407 | 494 | """Acks message and sets result.""" | ||
408 | 495 | message.ack() | ||
409 | 496 | if data['failure']: | ||
410 | 497 | self._results.put(RemoteError(*data['failure'])) | ||
411 | 498 | else: | ||
412 | 499 | self._results.put(data['result']) | ||
413 | 500 | |||
414 | 501 | def __iter__(self): | ||
415 | 502 | return self.wait() | ||
416 | 503 | |||
417 | 504 | def wait(self): | ||
418 | 505 | while True: | ||
419 | 506 | rv = None | ||
420 | 507 | while rv is None and not self._closed: | ||
421 | 508 | try: | ||
422 | 509 | rv = self._consumer.fetch(enable_callbacks=True) | ||
423 | 510 | except Exception: | ||
424 | 511 | self.close() | ||
425 | 512 | raise | ||
426 | 513 | time.sleep(0.01) | ||
427 | 514 | |||
428 | 515 | result = self._results.get() | ||
429 | 516 | if isinstance(result, Exception): | ||
430 | 517 | self.close() | ||
431 | 518 | raise result | ||
432 | 519 | if result == None: | ||
433 | 520 | self.close() | ||
434 | 521 | raise StopIteration | ||
435 | 522 | yield result | ||
436 | 523 | |||
437 | 524 | |||
438 | 525 | def call(context, topic, msg): | ||
439 | 526 | """Sends a message on a topic and wait for a response.""" | ||
440 | 527 | rv = multicall(context, topic, msg) | ||
441 | 528 | for x in rv: | ||
442 | 529 | rv.close() | ||
443 | 530 | return x | ||
444 | 407 | 531 | ||
445 | 408 | 532 | ||
446 | 409 | def cast(context, topic, msg): | 533 | def cast(context, topic, msg): |
447 | 410 | """Sends a message on a topic without waiting for a response.""" | 534 | """Sends a message on a topic without waiting for a response.""" |
448 | 411 | LOG.debug(_('Making asynchronous cast on %s...'), topic) | 535 | LOG.debug(_('Making asynchronous cast on %s...'), topic) |
449 | 412 | _pack_context(msg, context) | 536 | _pack_context(msg, context) |
454 | 413 | conn = Connection.instance() | 537 | with ConnectionPool.item() as conn: |
455 | 414 | publisher = TopicPublisher(connection=conn, topic=topic) | 538 | publisher = TopicPublisher(connection=conn, topic=topic) |
456 | 415 | publisher.send(msg) | 539 | publisher.send(msg) |
457 | 416 | publisher.close() | 540 | publisher.close() |
458 | 417 | 541 | ||
459 | 418 | 542 | ||
460 | 419 | def fanout_cast(context, topic, msg): | 543 | def fanout_cast(context, topic, msg): |
461 | 420 | """Sends a message on a fanout exchange without waiting for a response.""" | 544 | """Sends a message on a fanout exchange without waiting for a response.""" |
462 | 421 | LOG.debug(_('Making asynchronous fanout cast...')) | 545 | LOG.debug(_('Making asynchronous fanout cast...')) |
463 | 422 | _pack_context(msg, context) | 546 | _pack_context(msg, context) |
468 | 423 | conn = Connection.instance() | 547 | with ConnectionPool.item() as conn: |
469 | 424 | publisher = FanoutPublisher(topic, connection=conn) | 548 | publisher = FanoutPublisher(topic, connection=conn) |
470 | 425 | publisher.send(msg) | 549 | publisher.send(msg) |
471 | 426 | publisher.close() | 550 | publisher.close() |
472 | 427 | 551 | ||
473 | 428 | 552 | ||
474 | 429 | def generic_response(message_data, message): | 553 | def generic_response(message_data, message): |
475 | @@ -459,6 +583,7 @@ | |||
476 | 459 | 583 | ||
477 | 460 | if wait: | 584 | if wait: |
478 | 461 | consumer.wait() | 585 | consumer.wait() |
479 | 586 | consumer.close() | ||
480 | 462 | 587 | ||
481 | 463 | 588 | ||
482 | 464 | if __name__ == '__main__': | 589 | if __name__ == '__main__': |
483 | 465 | 590 | ||
484 | === modified file 'nova/scheduler/manager.py' | |||
485 | --- nova/scheduler/manager.py 2011-05-05 14:35:44 +0000 | |||
486 | +++ nova/scheduler/manager.py 2011-05-20 01:34:25 +0000 | |||
487 | @@ -83,11 +83,13 @@ | |||
488 | 83 | except AttributeError: | 83 | except AttributeError: |
489 | 84 | host = self.driver.schedule(elevated, topic, *args, **kwargs) | 84 | host = self.driver.schedule(elevated, topic, *args, **kwargs) |
490 | 85 | 85 | ||
496 | 86 | rpc.cast(context, | 86 | LOG.debug(_("Multicall %(topic)s %(host)s for %(method)s") % locals()) |
497 | 87 | db.queue_get_for(context, topic, host), | 87 | rvs = rpc.multicall(context, |
498 | 88 | {"method": method, | 88 | db.queue_get_for(context, topic, host), |
499 | 89 | "args": kwargs}) | 89 | {"method": method, |
500 | 90 | LOG.debug(_("Casting to %(topic)s %(host)s for %(method)s") % locals()) | 90 | "args": kwargs}) |
501 | 91 | for rv in rvs: | ||
502 | 92 | yield rv | ||
503 | 91 | 93 | ||
504 | 92 | # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin. | 94 | # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin. |
505 | 93 | # Based on bexar design summit discussion, | 95 | # Based on bexar design summit discussion, |
506 | 94 | 96 | ||
507 | === modified file 'nova/service.py' | |||
508 | --- nova/service.py 2011-04-20 19:08:22 +0000 | |||
509 | +++ nova/service.py 2011-05-20 01:34:25 +0000 | |||
510 | @@ -19,14 +19,11 @@ | |||
511 | 19 | 19 | ||
512 | 20 | """Generic Node baseclass for all workers that run on hosts.""" | 20 | """Generic Node baseclass for all workers that run on hosts.""" |
513 | 21 | 21 | ||
514 | 22 | import greenlet | ||
515 | 22 | import inspect | 23 | import inspect |
516 | 23 | import os | 24 | import os |
517 | 24 | import sys | ||
518 | 25 | import time | ||
519 | 26 | 25 | ||
520 | 27 | from eventlet import event | ||
521 | 28 | from eventlet import greenthread | 26 | from eventlet import greenthread |
522 | 29 | from eventlet import greenpool | ||
523 | 30 | 27 | ||
524 | 31 | from nova import context | 28 | from nova import context |
525 | 32 | from nova import db | 29 | from nova import db |
526 | @@ -91,27 +88,38 @@ | |||
527 | 91 | if 'nova-compute' == self.binary: | 88 | if 'nova-compute' == self.binary: |
528 | 92 | self.manager.update_available_resource(ctxt) | 89 | self.manager.update_available_resource(ctxt) |
529 | 93 | 90 | ||
533 | 94 | conn1 = rpc.Connection.instance(new=True) | 91 | self.conn = rpc.Connection.instance(new=True) |
534 | 95 | conn2 = rpc.Connection.instance(new=True) | 92 | logging.debug("Creating Consumer connection for Service %s" % |
535 | 96 | conn3 = rpc.Connection.instance(new=True) | 93 | self.topic) |
536 | 94 | |||
537 | 95 | # Share this same connection for these Consumers | ||
538 | 96 | consumer_all = rpc.TopicAdapterConsumer( | ||
539 | 97 | connection=self.conn, | ||
540 | 98 | topic=self.topic, | ||
541 | 99 | proxy=self) | ||
542 | 100 | consumer_node = rpc.TopicAdapterConsumer( | ||
543 | 101 | connection=self.conn, | ||
544 | 102 | topic='%s.%s' % (self.topic, self.host), | ||
545 | 103 | proxy=self) | ||
546 | 104 | fanout = rpc.FanoutAdapterConsumer( | ||
547 | 105 | connection=self.conn, | ||
548 | 106 | topic=self.topic, | ||
549 | 107 | proxy=self) | ||
550 | 108 | |||
551 | 109 | cset = rpc.ConsumerSet(self.conn, [consumer_all, | ||
552 | 110 | consumer_node, | ||
553 | 111 | fanout]) | ||
554 | 112 | |||
555 | 113 | # Wait forever, processing these consumers | ||
556 | 114 | def _wait(): | ||
557 | 115 | try: | ||
558 | 116 | cset.wait() | ||
559 | 117 | finally: | ||
560 | 118 | cset.close() | ||
561 | 119 | |||
562 | 120 | self.csetthread = greenthread.spawn(_wait) | ||
563 | 121 | |||
564 | 97 | if self.report_interval: | 122 | if self.report_interval: |
565 | 98 | consumer_all = rpc.TopicAdapterConsumer( | ||
566 | 99 | connection=conn1, | ||
567 | 100 | topic=self.topic, | ||
568 | 101 | proxy=self) | ||
569 | 102 | consumer_node = rpc.TopicAdapterConsumer( | ||
570 | 103 | connection=conn2, | ||
571 | 104 | topic='%s.%s' % (self.topic, self.host), | ||
572 | 105 | proxy=self) | ||
573 | 106 | fanout = rpc.FanoutAdapterConsumer( | ||
574 | 107 | connection=conn3, | ||
575 | 108 | topic=self.topic, | ||
576 | 109 | proxy=self) | ||
577 | 110 | |||
578 | 111 | self.timers.append(consumer_all.attach_to_eventlet()) | ||
579 | 112 | self.timers.append(consumer_node.attach_to_eventlet()) | ||
580 | 113 | self.timers.append(fanout.attach_to_eventlet()) | ||
581 | 114 | |||
582 | 115 | pulse = utils.LoopingCall(self.report_state) | 123 | pulse = utils.LoopingCall(self.report_state) |
583 | 116 | pulse.start(interval=self.report_interval, now=False) | 124 | pulse.start(interval=self.report_interval, now=False) |
584 | 117 | self.timers.append(pulse) | 125 | self.timers.append(pulse) |
585 | @@ -167,7 +175,13 @@ | |||
586 | 167 | 175 | ||
587 | 168 | def kill(self): | 176 | def kill(self): |
588 | 169 | """Destroy the service object in the datastore.""" | 177 | """Destroy the service object in the datastore.""" |
589 | 178 | self.csetthread.kill() | ||
590 | 179 | try: | ||
591 | 180 | self.csetthread.wait() | ||
592 | 181 | except greenlet.GreenletExit: | ||
593 | 182 | pass | ||
594 | 170 | self.stop() | 183 | self.stop() |
595 | 184 | rpc.ConnectionPool.put(self.conn) | ||
596 | 171 | try: | 185 | try: |
597 | 172 | db.service_destroy(context.get_admin_context(), self.service_id) | 186 | db.service_destroy(context.get_admin_context(), self.service_id) |
598 | 173 | except exception.NotFound: | 187 | except exception.NotFound: |
599 | 174 | 188 | ||
600 | === modified file 'nova/test.py' | |||
601 | --- nova/test.py 2011-04-20 19:08:22 +0000 | |||
602 | +++ nova/test.py 2011-05-20 01:34:25 +0000 | |||
603 | @@ -31,17 +31,15 @@ | |||
604 | 31 | import unittest | 31 | import unittest |
605 | 32 | 32 | ||
606 | 33 | import mox | 33 | import mox |
607 | 34 | import shutil | ||
608 | 35 | import stubout | 34 | import stubout |
609 | 36 | from eventlet import greenthread | 35 | from eventlet import greenthread |
610 | 37 | 36 | ||
611 | 38 | from nova import context | ||
612 | 39 | from nova import db | ||
613 | 40 | from nova import fakerabbit | 37 | from nova import fakerabbit |
614 | 41 | from nova import flags | 38 | from nova import flags |
615 | 42 | from nova import rpc | 39 | from nova import rpc |
616 | 43 | from nova import service | 40 | from nova import service |
617 | 44 | from nova import wsgi | 41 | from nova import wsgi |
618 | 42 | from nova.virt import fake | ||
619 | 45 | 43 | ||
620 | 46 | 44 | ||
621 | 47 | FLAGS = flags.FLAGS | 45 | FLAGS = flags.FLAGS |
622 | @@ -85,6 +83,7 @@ | |||
623 | 85 | self._monkey_patch_attach() | 83 | self._monkey_patch_attach() |
624 | 86 | self._monkey_patch_wsgi() | 84 | self._monkey_patch_wsgi() |
625 | 87 | self._original_flags = FLAGS.FlagValuesDict() | 85 | self._original_flags = FLAGS.FlagValuesDict() |
626 | 86 | rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size) | ||
627 | 88 | 87 | ||
628 | 89 | def tearDown(self): | 88 | def tearDown(self): |
629 | 90 | """Runs after each test method to tear down test environment.""" | 89 | """Runs after each test method to tear down test environment.""" |
630 | @@ -99,6 +98,10 @@ | |||
631 | 99 | if FLAGS.fake_rabbit: | 98 | if FLAGS.fake_rabbit: |
632 | 100 | fakerabbit.reset_all() | 99 | fakerabbit.reset_all() |
633 | 101 | 100 | ||
634 | 101 | if FLAGS.connection_type == 'fake': | ||
635 | 102 | if hasattr(fake.FakeConnection, '_instance'): | ||
636 | 103 | del fake.FakeConnection._instance | ||
637 | 104 | |||
638 | 102 | # Reset any overriden flags | 105 | # Reset any overriden flags |
639 | 103 | self.reset_flags() | 106 | self.reset_flags() |
640 | 104 | 107 | ||
641 | 105 | 108 | ||
642 | === modified file 'nova/tests/integrated/integrated_helpers.py' | |||
643 | --- nova/tests/integrated/integrated_helpers.py 2011-03-30 01:13:04 +0000 | |||
644 | +++ nova/tests/integrated/integrated_helpers.py 2011-05-20 01:34:25 +0000 | |||
645 | @@ -154,10 +154,7 @@ | |||
646 | 154 | # set up services | 154 | # set up services |
647 | 155 | self.start_service('compute') | 155 | self.start_service('compute') |
648 | 156 | self.start_service('volume') | 156 | self.start_service('volume') |
653 | 157 | # NOTE(justinsb): There's a bug here which is eluding me... | 157 | self.start_service('network') |
650 | 158 | # If we start the network_service, all is good, but then subsequent | ||
651 | 159 | # tests fail: CloudTestCase.test_ajax_console in particular. | ||
652 | 160 | #self.start_service('network') | ||
654 | 161 | self.start_service('scheduler') | 158 | self.start_service('scheduler') |
655 | 162 | 159 | ||
656 | 163 | self.auth_url = self._start_api_service() | 160 | self.auth_url = self._start_api_service() |
657 | 164 | 161 | ||
658 | === modified file 'nova/tests/test_cloud.py' | |||
659 | --- nova/tests/test_cloud.py 2011-05-16 20:30:40 +0000 | |||
660 | +++ nova/tests/test_cloud.py 2011-05-20 01:34:25 +0000 | |||
661 | @@ -17,13 +17,8 @@ | |||
662 | 17 | # under the License. | 17 | # under the License. |
663 | 18 | 18 | ||
664 | 19 | from base64 import b64decode | 19 | from base64 import b64decode |
665 | 20 | import json | ||
666 | 21 | from M2Crypto import BIO | 20 | from M2Crypto import BIO |
667 | 22 | from M2Crypto import RSA | 21 | from M2Crypto import RSA |
668 | 23 | import os | ||
669 | 24 | import shutil | ||
670 | 25 | import tempfile | ||
671 | 26 | import time | ||
672 | 27 | 22 | ||
673 | 28 | from eventlet import greenthread | 23 | from eventlet import greenthread |
674 | 29 | 24 | ||
675 | @@ -33,12 +28,10 @@ | |||
676 | 33 | from nova import flags | 28 | from nova import flags |
677 | 34 | from nova import log as logging | 29 | from nova import log as logging |
678 | 35 | from nova import rpc | 30 | from nova import rpc |
679 | 36 | from nova import service | ||
680 | 37 | from nova import test | 31 | from nova import test |
681 | 38 | from nova import utils | 32 | from nova import utils |
682 | 39 | from nova import exception | 33 | from nova import exception |
683 | 40 | from nova.auth import manager | 34 | from nova.auth import manager |
684 | 41 | from nova.compute import power_state | ||
685 | 42 | from nova.api.ec2 import cloud | 35 | from nova.api.ec2 import cloud |
686 | 43 | from nova.api.ec2 import ec2utils | 36 | from nova.api.ec2 import ec2utils |
687 | 44 | from nova.image import local | 37 | from nova.image import local |
688 | @@ -79,14 +72,21 @@ | |||
689 | 79 | self.stubs.Set(local.LocalImageService, 'show', fake_show) | 72 | self.stubs.Set(local.LocalImageService, 'show', fake_show) |
690 | 80 | self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show) | 73 | self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show) |
691 | 81 | 74 | ||
692 | 75 | # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish | ||
693 | 76 | rpc_cast = rpc.cast | ||
694 | 77 | |||
695 | 78 | def finish_cast(*args, **kwargs): | ||
696 | 79 | rpc_cast(*args, **kwargs) | ||
697 | 80 | greenthread.sleep(0.2) | ||
698 | 81 | |||
699 | 82 | self.stubs.Set(rpc, 'cast', finish_cast) | ||
700 | 83 | |||
701 | 82 | def tearDown(self): | 84 | def tearDown(self): |
702 | 83 | network_ref = db.project_get_network(self.context, | 85 | network_ref = db.project_get_network(self.context, |
703 | 84 | self.project.id) | 86 | self.project.id) |
704 | 85 | db.network_disassociate(self.context, network_ref['id']) | 87 | db.network_disassociate(self.context, network_ref['id']) |
705 | 86 | self.manager.delete_project(self.project) | 88 | self.manager.delete_project(self.project) |
706 | 87 | self.manager.delete_user(self.user) | 89 | self.manager.delete_user(self.user) |
707 | 88 | self.compute.kill() | ||
708 | 89 | self.network.kill() | ||
709 | 90 | super(CloudTestCase, self).tearDown() | 90 | super(CloudTestCase, self).tearDown() |
710 | 91 | 91 | ||
711 | 92 | def _create_key(self, name): | 92 | def _create_key(self, name): |
712 | @@ -113,7 +113,6 @@ | |||
713 | 113 | self.cloud.describe_addresses(self.context) | 113 | self.cloud.describe_addresses(self.context) |
714 | 114 | self.cloud.release_address(self.context, | 114 | self.cloud.release_address(self.context, |
715 | 115 | public_ip=address) | 115 | public_ip=address) |
716 | 116 | greenthread.sleep(0.3) | ||
717 | 117 | db.floating_ip_destroy(self.context, address) | 116 | db.floating_ip_destroy(self.context, address) |
718 | 118 | 117 | ||
719 | 119 | def test_associate_disassociate_address(self): | 118 | def test_associate_disassociate_address(self): |
720 | @@ -129,12 +128,10 @@ | |||
721 | 129 | self.cloud.associate_address(self.context, | 128 | self.cloud.associate_address(self.context, |
722 | 130 | instance_id=ec2_id, | 129 | instance_id=ec2_id, |
723 | 131 | public_ip=address) | 130 | public_ip=address) |
724 | 132 | greenthread.sleep(0.3) | ||
725 | 133 | self.cloud.disassociate_address(self.context, | 131 | self.cloud.disassociate_address(self.context, |
726 | 134 | public_ip=address) | 132 | public_ip=address) |
727 | 135 | self.cloud.release_address(self.context, | 133 | self.cloud.release_address(self.context, |
728 | 136 | public_ip=address) | 134 | public_ip=address) |
729 | 137 | greenthread.sleep(0.3) | ||
730 | 138 | self.network.deallocate_fixed_ip(self.context, fixed) | 135 | self.network.deallocate_fixed_ip(self.context, fixed) |
731 | 139 | db.instance_destroy(self.context, inst['id']) | 136 | db.instance_destroy(self.context, inst['id']) |
732 | 140 | db.floating_ip_destroy(self.context, address) | 137 | db.floating_ip_destroy(self.context, address) |
733 | @@ -306,31 +303,25 @@ | |||
734 | 306 | 'instance_type': instance_type, | 303 | 'instance_type': instance_type, |
735 | 307 | 'max_count': max_count} | 304 | 'max_count': max_count} |
736 | 308 | rv = self.cloud.run_instances(self.context, **kwargs) | 305 | rv = self.cloud.run_instances(self.context, **kwargs) |
737 | 309 | greenthread.sleep(0.3) | ||
738 | 310 | instance_id = rv['instancesSet'][0]['instanceId'] | 306 | instance_id = rv['instancesSet'][0]['instanceId'] |
739 | 311 | output = self.cloud.get_console_output(context=self.context, | 307 | output = self.cloud.get_console_output(context=self.context, |
740 | 312 | instance_id=[instance_id]) | 308 | instance_id=[instance_id]) |
741 | 313 | self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT') | 309 | self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT') |
742 | 314 | # TODO(soren): We need this until we can stop polling in the rpc code | 310 | # TODO(soren): We need this until we can stop polling in the rpc code |
743 | 315 | # for unit tests. | 311 | # for unit tests. |
744 | 316 | greenthread.sleep(0.3) | ||
745 | 317 | rv = self.cloud.terminate_instances(self.context, [instance_id]) | 312 | rv = self.cloud.terminate_instances(self.context, [instance_id]) |
746 | 318 | greenthread.sleep(0.3) | ||
747 | 319 | 313 | ||
748 | 320 | def test_ajax_console(self): | 314 | def test_ajax_console(self): |
749 | 321 | kwargs = {'image_id': 'ami-1'} | 315 | kwargs = {'image_id': 'ami-1'} |
750 | 322 | rv = self.cloud.run_instances(self.context, **kwargs) | 316 | rv = self.cloud.run_instances(self.context, **kwargs) |
751 | 323 | instance_id = rv['instancesSet'][0]['instanceId'] | 317 | instance_id = rv['instancesSet'][0]['instanceId'] |
752 | 324 | greenthread.sleep(0.3) | ||
753 | 325 | output = self.cloud.get_ajax_console(context=self.context, | 318 | output = self.cloud.get_ajax_console(context=self.context, |
754 | 326 | instance_id=[instance_id]) | 319 | instance_id=[instance_id]) |
755 | 327 | self.assertEquals(output['url'], | 320 | self.assertEquals(output['url'], |
756 | 328 | '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url) | 321 | '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url) |
757 | 329 | # TODO(soren): We need this until we can stop polling in the rpc code | 322 | # TODO(soren): We need this until we can stop polling in the rpc code |
758 | 330 | # for unit tests. | 323 | # for unit tests. |
759 | 331 | greenthread.sleep(0.3) | ||
760 | 332 | rv = self.cloud.terminate_instances(self.context, [instance_id]) | 324 | rv = self.cloud.terminate_instances(self.context, [instance_id]) |
761 | 333 | greenthread.sleep(0.3) | ||
762 | 334 | 325 | ||
763 | 335 | def test_key_generation(self): | 326 | def test_key_generation(self): |
764 | 336 | result = self._create_key('test') | 327 | result = self._create_key('test') |
765 | 337 | 328 | ||
766 | === modified file 'nova/tests/test_rpc.py' | |||
767 | --- nova/tests/test_rpc.py 2011-02-23 22:41:11 +0000 | |||
768 | +++ nova/tests/test_rpc.py 2011-05-20 01:34:25 +0000 | |||
769 | @@ -49,6 +49,59 @@ | |||
770 | 49 | "args": {"value": value}}) | 49 | "args": {"value": value}}) |
771 | 50 | self.assertEqual(value, result) | 50 | self.assertEqual(value, result) |
772 | 51 | 51 | ||
773 | 52 | def test_call_succeed_despite_multiple_returns(self): | ||
774 | 53 | """Get a value through rpc call""" | ||
775 | 54 | value = 42 | ||
776 | 55 | result = rpc.call(self.context, 'test', {"method": "echo_three_times", | ||
777 | 56 | "args": {"value": value}}) | ||
778 | 57 | self.assertEqual(value, result) | ||
779 | 58 | |||
780 | 59 | def test_call_succeed_despite_multiple_returns_yield(self): | ||
781 | 60 | """Get a value through rpc call""" | ||
782 | 61 | value = 42 | ||
783 | 62 | result = rpc.call(self.context, 'test', | ||
784 | 63 | {"method": "echo_three_times_yield", | ||
785 | 64 | "args": {"value": value}}) | ||
786 | 65 | self.assertEqual(value, result) | ||
787 | 66 | |||
788 | 67 | def test_multicall_succeed_once(self): | ||
789 | 68 | """Get a value through rpc call""" | ||
790 | 69 | value = 42 | ||
791 | 70 | result = rpc.multicall(self.context, | ||
792 | 71 | 'test', | ||
793 | 72 | {"method": "echo", | ||
794 | 73 | "args": {"value": value}}) | ||
795 | 74 | i = 0 | ||
796 | 75 | for x in result: | ||
797 | 76 | if i > 0: | ||
798 | 77 | self.fail('should only receive one response') | ||
799 | 78 | self.assertEqual(value + i, x) | ||
800 | 79 | i += 1 | ||
801 | 80 | |||
802 | 81 | def test_multicall_succeed_three_times(self): | ||
803 | 82 | """Get a value through rpc call""" | ||
804 | 83 | value = 42 | ||
805 | 84 | result = rpc.multicall(self.context, | ||
806 | 85 | 'test', | ||
807 | 86 | {"method": "echo_three_times", | ||
808 | 87 | "args": {"value": value}}) | ||
809 | 88 | i = 0 | ||
810 | 89 | for x in result: | ||
811 | 90 | self.assertEqual(value + i, x) | ||
812 | 91 | i += 1 | ||
813 | 92 | |||
814 | 93 | def test_multicall_succeed_three_times_yield(self): | ||
815 | 94 | """Get a value through rpc call""" | ||
816 | 95 | value = 42 | ||
817 | 96 | result = rpc.multicall(self.context, | ||
818 | 97 | 'test', | ||
819 | 98 | {"method": "echo_three_times_yield", | ||
820 | 99 | "args": {"value": value}}) | ||
821 | 100 | i = 0 | ||
822 | 101 | for x in result: | ||
823 | 102 | self.assertEqual(value + i, x) | ||
824 | 103 | i += 1 | ||
825 | 104 | |||
826 | 52 | def test_context_passed(self): | 105 | def test_context_passed(self): |
827 | 53 | """Makes sure a context is passed through rpc call""" | 106 | """Makes sure a context is passed through rpc call""" |
828 | 54 | value = 42 | 107 | value = 42 |
829 | @@ -127,6 +180,18 @@ | |||
830 | 127 | return context.to_dict() | 180 | return context.to_dict() |
831 | 128 | 181 | ||
832 | 129 | @staticmethod | 182 | @staticmethod |
833 | 183 | def echo_three_times(context, value): | ||
834 | 184 | context.reply(value) | ||
835 | 185 | context.reply(value + 1) | ||
836 | 186 | context.reply(value + 2) | ||
837 | 187 | |||
838 | 188 | @staticmethod | ||
839 | 189 | def echo_three_times_yield(context, value): | ||
840 | 190 | yield value | ||
841 | 191 | yield value + 1 | ||
842 | 192 | yield value + 2 | ||
843 | 193 | |||
844 | 194 | @staticmethod | ||
845 | 130 | def fail(context, value): | 195 | def fail(context, value): |
846 | 131 | """Raises an exception with the value sent in""" | 196 | """Raises an exception with the value sent in""" |
847 | 132 | raise Exception(value) | 197 | raise Exception(value) |
848 | 133 | 198 | ||
849 | === modified file 'nova/tests/test_service.py' | |||
850 | --- nova/tests/test_service.py 2011-03-17 13:35:00 +0000 | |||
851 | +++ nova/tests/test_service.py 2011-05-20 01:34:25 +0000 | |||
852 | @@ -106,7 +106,10 @@ | |||
853 | 106 | 106 | ||
854 | 107 | # NOTE(vish): Create was moved out of mox replay to make sure that | 107 | # NOTE(vish): Create was moved out of mox replay to make sure that |
855 | 108 | # the looping calls are created in StartService. | 108 | # the looping calls are created in StartService. |
857 | 109 | app = service.Service.create(host=host, binary=binary) | 109 | app = service.Service.create(host=host, binary=binary, topic=topic) |
858 | 110 | |||
859 | 111 | self.mox.StubOutWithMock(service.rpc.Connection, 'instance') | ||
860 | 112 | service.rpc.Connection.instance(new=mox.IgnoreArg()) | ||
861 | 110 | 113 | ||
862 | 111 | self.mox.StubOutWithMock(rpc, | 114 | self.mox.StubOutWithMock(rpc, |
863 | 112 | 'TopicAdapterConsumer', | 115 | 'TopicAdapterConsumer', |
864 | @@ -114,6 +117,11 @@ | |||
865 | 114 | self.mox.StubOutWithMock(rpc, | 117 | self.mox.StubOutWithMock(rpc, |
866 | 115 | 'FanoutAdapterConsumer', | 118 | 'FanoutAdapterConsumer', |
867 | 116 | use_mock_anything=True) | 119 | use_mock_anything=True) |
868 | 120 | |||
869 | 121 | self.mox.StubOutWithMock(rpc, | ||
870 | 122 | 'ConsumerSet', | ||
871 | 123 | use_mock_anything=True) | ||
872 | 124 | |||
873 | 117 | rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), | 125 | rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), |
874 | 118 | topic=topic, | 126 | topic=topic, |
875 | 119 | proxy=mox.IsA(service.Service)).AndReturn( | 127 | proxy=mox.IsA(service.Service)).AndReturn( |
876 | @@ -129,9 +137,13 @@ | |||
877 | 129 | proxy=mox.IsA(service.Service)).AndReturn( | 137 | proxy=mox.IsA(service.Service)).AndReturn( |
878 | 130 | rpc.FanoutAdapterConsumer) | 138 | rpc.FanoutAdapterConsumer) |
879 | 131 | 139 | ||
883 | 132 | rpc.TopicAdapterConsumer.attach_to_eventlet() | 140 | def wait_func(self, limit=None): |
884 | 133 | rpc.TopicAdapterConsumer.attach_to_eventlet() | 141 | return None |
885 | 134 | rpc.FanoutAdapterConsumer.attach_to_eventlet() | 142 | |
886 | 143 | mock_cset = self.mox.CreateMock(rpc.ConsumerSet, | ||
887 | 144 | {'wait': wait_func}) | ||
888 | 145 | rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset) | ||
889 | 146 | wait_func(mox.IgnoreArg()) | ||
890 | 135 | 147 | ||
891 | 136 | service_create = {'host': host, | 148 | service_create = {'host': host, |
892 | 137 | 'binary': binary, | 149 | 'binary': binary, |
893 | @@ -287,8 +299,41 @@ | |||
894 | 287 | # Creating mocks | 299 | # Creating mocks |
895 | 288 | self.mox.StubOutWithMock(service.rpc.Connection, 'instance') | 300 | self.mox.StubOutWithMock(service.rpc.Connection, 'instance') |
896 | 289 | service.rpc.Connection.instance(new=mox.IgnoreArg()) | 301 | service.rpc.Connection.instance(new=mox.IgnoreArg()) |
899 | 290 | service.rpc.Connection.instance(new=mox.IgnoreArg()) | 302 | |
900 | 291 | service.rpc.Connection.instance(new=mox.IgnoreArg()) | 303 | self.mox.StubOutWithMock(rpc, |
901 | 304 | 'TopicAdapterConsumer', | ||
902 | 305 | use_mock_anything=True) | ||
903 | 306 | self.mox.StubOutWithMock(rpc, | ||
904 | 307 | 'FanoutAdapterConsumer', | ||
905 | 308 | use_mock_anything=True) | ||
906 | 309 | |||
907 | 310 | self.mox.StubOutWithMock(rpc, | ||
908 | 311 | 'ConsumerSet', | ||
909 | 312 | use_mock_anything=True) | ||
910 | 313 | |||
911 | 314 | rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), | ||
912 | 315 | topic=topic, | ||
913 | 316 | proxy=mox.IsA(service.Service)).AndReturn( | ||
914 | 317 | rpc.TopicAdapterConsumer) | ||
915 | 318 | |||
916 | 319 | rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), | ||
917 | 320 | topic='%s.%s' % (topic, host), | ||
918 | 321 | proxy=mox.IsA(service.Service)).AndReturn( | ||
919 | 322 | rpc.TopicAdapterConsumer) | ||
920 | 323 | |||
921 | 324 | rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(), | ||
922 | 325 | topic=topic, | ||
923 | 326 | proxy=mox.IsA(service.Service)).AndReturn( | ||
924 | 327 | rpc.FanoutAdapterConsumer) | ||
925 | 328 | |||
926 | 329 | def wait_func(self, limit=None): | ||
927 | 330 | return None | ||
928 | 331 | |||
929 | 332 | mock_cset = self.mox.CreateMock(rpc.ConsumerSet, | ||
930 | 333 | {'wait': wait_func}) | ||
931 | 334 | rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset) | ||
932 | 335 | wait_func(mox.IgnoreArg()) | ||
933 | 336 | |||
934 | 292 | self.mox.StubOutWithMock(serv.manager.driver, | 337 | self.mox.StubOutWithMock(serv.manager.driver, |
935 | 293 | 'update_available_resource') | 338 | 'update_available_resource') |
936 | 294 | serv.manager.driver.update_available_resource(mox.IgnoreArg(), host) | 339 | serv.manager.driver.update_available_resource(mox.IgnoreArg(), host) |
937 | 295 | 340 | ||
938 | === modified file 'nova/volume/api.py' | |||
939 | --- nova/volume/api.py 2011-03-31 08:39:00 +0000 | |||
940 | +++ nova/volume/api.py 2011-05-20 01:34:25 +0000 | |||
941 | @@ -20,14 +20,14 @@ | |||
942 | 20 | Handles all requests relating to volumes. | 20 | Handles all requests relating to volumes. |
943 | 21 | """ | 21 | """ |
944 | 22 | 22 | ||
946 | 23 | import datetime | 23 | import eventlet |
947 | 24 | 24 | ||
948 | 25 | from nova import db | ||
949 | 26 | from nova import exception | 25 | from nova import exception |
950 | 27 | from nova import flags | 26 | from nova import flags |
951 | 28 | from nova import log as logging | 27 | from nova import log as logging |
952 | 29 | from nova import quota | 28 | from nova import quota |
953 | 30 | from nova import rpc | 29 | from nova import rpc |
954 | 30 | from nova import utils | ||
955 | 31 | from nova.db import base | 31 | from nova.db import base |
956 | 32 | 32 | ||
957 | 33 | FLAGS = flags.FLAGS | 33 | FLAGS = flags.FLAGS |
958 | @@ -57,26 +57,60 @@ | |||
959 | 57 | 'display_name': name, | 57 | 'display_name': name, |
960 | 58 | 'display_description': description} | 58 | 'display_description': description} |
961 | 59 | 59 | ||
969 | 60 | volume = self.db.volume_create(context, options) | 60 | volume_ref = self.db.volume_create(context, options) |
970 | 61 | rpc.cast(context, | 61 | volume_ref = utils.to_primitive(dict(volume_ref)) |
971 | 62 | FLAGS.scheduler_topic, | 62 | |
972 | 63 | {"method": "create_volume", | 63 | def delayed_create(volume_ref): |
973 | 64 | "args": {"topic": FLAGS.volume_topic, | 64 | vid = volume_ref['id'] |
974 | 65 | "volume_id": volume['id']}}) | 65 | try: |
975 | 66 | return volume | 66 | rvs = rpc.multicall(context, |
976 | 67 | FLAGS.scheduler_topic, | ||
977 | 68 | {"method": "create_volume", | ||
978 | 69 | "args": {"topic": FLAGS.volume_topic, | ||
979 | 70 | "volume_ref": volume_ref}}) | ||
980 | 71 | for volume_ref in rvs: | ||
981 | 72 | self.db.volume_update(context, vid, volume_ref) | ||
982 | 73 | volume_ref['launched_at'] = utils.utcnow() | ||
983 | 74 | self.db.volume_update(context, vid, volume_ref) | ||
984 | 75 | |||
985 | 76 | except rpc.RemoteError: | ||
986 | 77 | self.db.volume_update(context, vid, {'status': 'error'}) | ||
987 | 78 | |||
988 | 79 | eventlet.spawn_n(delayed_create, volume_ref) | ||
989 | 80 | return volume_ref | ||
990 | 67 | 81 | ||
991 | 68 | def delete(self, context, volume_id): | 82 | def delete(self, context, volume_id): |
994 | 69 | volume = self.get(context, volume_id) | 83 | volume_ref = self.get(context, volume_id) |
995 | 70 | if volume['status'] != "available": | 84 | if volume_ref['status'] != "available": |
996 | 71 | raise exception.ApiError(_("Volume status must be available")) | 85 | raise exception.ApiError(_("Volume status must be available")) |
1005 | 72 | now = datetime.datetime.utcnow() | 86 | if volume_ref['attach_status'] == "attached": |
1006 | 73 | self.db.volume_update(context, volume_id, {'status': 'deleting', | 87 | raise exception.Error(_("Volume is still attached")) |
1007 | 74 | 'terminated_at': now}) | 88 | |
1008 | 75 | host = volume['host'] | 89 | volume_ref['status'] = 'deleting' |
1009 | 76 | rpc.cast(context, | 90 | volume_ref['terminated_at'] = utils.utcnow() |
1010 | 77 | self.db.queue_get_for(context, FLAGS.volume_topic, host), | 91 | self.db.volume_update(context, volume_ref['id'], volume_ref) |
1011 | 78 | {"method": "delete_volume", | 92 | volume_ref = utils.to_primitive(dict(volume_ref)) |
1012 | 79 | "args": {"volume_id": volume_id}}) | 93 | |
1013 | 94 | def delayed_delete(volume_ref): | ||
1014 | 95 | vid = volume_ref['id'] | ||
1015 | 96 | try: | ||
1016 | 97 | topic = self.db.queue_get_for(context, | ||
1017 | 98 | FLAGS.volume_topic, | ||
1018 | 99 | volume_ref['host']) | ||
1019 | 100 | rvs = rpc.multicall(context, | ||
1020 | 101 | topic, | ||
1021 | 102 | {"method": "delete_volume", | ||
1022 | 103 | "args": {"volume_ref": volume_ref}}) | ||
1023 | 104 | for volume_ref in rvs: | ||
1024 | 105 | self.db.volume_update(context, vid, volume_ref) | ||
1025 | 106 | |||
1026 | 107 | self.db.volume_destroy(context, vid) | ||
1027 | 108 | |||
1028 | 109 | except rpc.RemoteError: | ||
1029 | 110 | self.db.volume_update(context, vid, {'status': 'err_delete'}) | ||
1030 | 111 | |||
1031 | 112 | eventlet.spawn_n(delayed_delete, volume_ref) | ||
1032 | 113 | return True | ||
1033 | 80 | 114 | ||
1034 | 81 | def update(self, context, volume_id, fields): | 115 | def update(self, context, volume_id, fields): |
1035 | 82 | self.db.volume_update(context, volume_id, fields) | 116 | self.db.volume_update(context, volume_id, fields) |
1036 | 83 | 117 | ||
1037 | === modified file 'nova/volume/manager.py' | |||
1038 | --- nova/volume/manager.py 2011-03-17 13:35:00 +0000 | |||
1039 | +++ nova/volume/manager.py 2011-05-20 01:34:25 +0000 | |||
1040 | @@ -90,67 +90,40 @@ | |||
1041 | 90 | else: | 90 | else: |
1042 | 91 | LOG.info(_("volume %s: skipping export"), volume['name']) | 91 | LOG.info(_("volume %s: skipping export"), volume['name']) |
1043 | 92 | 92 | ||
1045 | 93 | def create_volume(self, context, volume_id): | 93 | def create_volume(self, context, volume_ref): |
1046 | 94 | """Creates and exports the volume.""" | 94 | """Creates and exports the volume.""" |
1047 | 95 | context = context.elevated() | ||
1048 | 96 | volume_ref = self.db.volume_get(context, volume_id) | ||
1049 | 97 | LOG.info(_("volume %s: creating"), volume_ref['name']) | 95 | LOG.info(_("volume %s: creating"), volume_ref['name']) |
1050 | 98 | 96 | ||
1051 | 99 | self.db.volume_update(context, | ||
1052 | 100 | volume_id, | ||
1053 | 101 | {'host': self.host}) | ||
1054 | 102 | # NOTE(vish): so we don't have to get volume from db again | ||
1055 | 103 | # before passing it to the driver. | ||
1056 | 104 | volume_ref['host'] = self.host | 97 | volume_ref['host'] = self.host |
1080 | 105 | 98 | yield volume_ref | |
1081 | 106 | try: | 99 | |
1082 | 107 | vol_name = volume_ref['name'] | 100 | vol_name = volume_ref['name'] |
1083 | 108 | vol_size = volume_ref['size'] | 101 | vol_size = volume_ref['size'] |
1084 | 109 | LOG.debug(_("volume %(vol_name)s: creating lv of" | 102 | LOG.debug(_("volume %(vol_name)s: creating lv of" |
1085 | 110 | " size %(vol_size)sG") % locals()) | 103 | " size %(vol_size)sG") % locals()) |
1086 | 111 | model_update = self.driver.create_volume(volume_ref) | 104 | model_update = self.driver.create_volume(volume_ref) |
1087 | 112 | if model_update: | 105 | if model_update: |
1088 | 113 | self.db.volume_update(context, volume_ref['id'], model_update) | 106 | volume_ref.update(model_update) |
1089 | 114 | 107 | yield volume_ref | |
1090 | 115 | LOG.debug(_("volume %s: creating export"), volume_ref['name']) | 108 | |
1091 | 116 | model_update = self.driver.create_export(context, volume_ref) | 109 | LOG.debug(_("volume %s: creating export"), volume_ref['name']) |
1092 | 117 | if model_update: | 110 | model_update = self.driver.create_export(context, volume_ref) |
1093 | 118 | self.db.volume_update(context, volume_ref['id'], model_update) | 111 | if model_update: |
1094 | 119 | except Exception: | 112 | volume_ref.update(model_update) |
1095 | 120 | self.db.volume_update(context, | 113 | yield volume_ref |
1096 | 121 | volume_ref['id'], {'status': 'error'}) | 114 | |
1074 | 122 | raise | ||
1075 | 123 | |||
1076 | 124 | now = datetime.datetime.utcnow() | ||
1077 | 125 | self.db.volume_update(context, | ||
1078 | 126 | volume_ref['id'], {'status': 'available', | ||
1079 | 127 | 'launched_at': now}) | ||
1097 | 128 | LOG.debug(_("volume %s: created successfully"), volume_ref['name']) | 115 | LOG.debug(_("volume %s: created successfully"), volume_ref['name']) |
1099 | 129 | return volume_id | 116 | volume_ref['status'] = 'available' |
1100 | 117 | yield volume_ref | ||
1101 | 130 | 118 | ||
1103 | 131 | def delete_volume(self, context, volume_id): | 119 | def delete_volume(self, context, volume_ref): |
1104 | 132 | """Deletes and unexports volume.""" | 120 | """Deletes and unexports volume.""" |
1124 | 133 | context = context.elevated() | 121 | LOG.debug(_("volume %s: removing export"), volume_ref['name']) |
1125 | 134 | volume_ref = self.db.volume_get(context, volume_id) | 122 | self.driver.remove_export(context, volume_ref) |
1126 | 135 | if volume_ref['attach_status'] == "attached": | 123 | LOG.debug(_("volume %s: deleting"), volume_ref['name']) |
1127 | 136 | raise exception.Error(_("Volume is still attached")) | 124 | self.driver.delete_volume(volume_ref) |
1109 | 137 | if volume_ref['host'] != self.host: | ||
1110 | 138 | raise exception.Error(_("Volume is not local to this node")) | ||
1111 | 139 | |||
1112 | 140 | try: | ||
1113 | 141 | LOG.debug(_("volume %s: removing export"), volume_ref['name']) | ||
1114 | 142 | self.driver.remove_export(context, volume_ref) | ||
1115 | 143 | LOG.debug(_("volume %s: deleting"), volume_ref['name']) | ||
1116 | 144 | self.driver.delete_volume(volume_ref) | ||
1117 | 145 | except Exception: | ||
1118 | 146 | self.db.volume_update(context, | ||
1119 | 147 | volume_ref['id'], | ||
1120 | 148 | {'status': 'error_deleting'}) | ||
1121 | 149 | raise | ||
1122 | 150 | |||
1123 | 151 | self.db.volume_destroy(context, volume_id) | ||
1128 | 152 | LOG.debug(_("volume %s: deleted successfully"), volume_ref['name']) | 125 | LOG.debug(_("volume %s: deleted successfully"), volume_ref['name']) |
1130 | 153 | return True | 126 | yield volume_ref |
1131 | 154 | 127 | ||
1132 | 155 | def setup_compute_volume(self, context, volume_id): | 128 | def setup_compute_volume(self, context, volume_id): |
1133 | 156 | """Setup remote volume on compute host. | 129 | """Setup remote volume on compute host. |
accidentally pushed an old version. This is the right one.