Merge lp:~sandy-walsh/nova/zones4 into lp:~hudson-openstack/nova/trunk

Proposed by Sandy Walsh
Status: Superseded
Proposed branch: lp:~sandy-walsh/nova/zones4
Merge into: lp:~hudson-openstack/nova/trunk
Prerequisite: lp:~sandy-walsh/nova/zones3
Diff against target: 1345 lines (+675/-86)
21 files modified
nova/api/openstack/servers.py (+20/-1)
nova/api/openstack/zones.py (+19/-14)
nova/compute/api.py (+16/-1)
nova/compute/manager.py (+3/-2)
nova/db/api.py (+1/-0)
nova/flags.py (+3/-2)
nova/manager.py (+27/-1)
nova/network/manager.py (+3/-2)
nova/rpc.py (+59/-18)
nova/scheduler/api.py (+214/-22)
nova/scheduler/driver.py (+7/-0)
nova/scheduler/manager.py (+13/-1)
nova/scheduler/zone_manager.py (+36/-3)
nova/service.py (+8/-2)
nova/tests/api/openstack/test_zones.py (+26/-3)
nova/tests/test_rpc.py (+2/-2)
nova/tests/test_scheduler.py (+161/-0)
nova/tests/test_service.py (+19/-9)
nova/tests/test_test.py (+1/-1)
nova/tests/test_zones.py (+34/-0)
nova/volume/manager.py (+3/-2)
To merge this branch: bzr merge lp:~sandy-walsh/nova/zones4
Reviewer Review Type Date Requested Status
Rick Harris (community) Approve
Matt Dietz (community) Approve
Review via email: mp+53726@code.launchpad.net

This proposal has been superseded by a proposal from 2011-03-24.

Commit message

In this branch we are forwarding incoming requests to child zones when the requested resource is not found in the current zone.

Description of the change

In this branch we are forwarding incoming requests to child zones when the requested resource is not found in the current zone.

For example: If 'nova pause 123' is issued against Zone 1, but instance 123 does not live in Zone 1, the call will be forwarded to all child zones hoping someone can deal with it.

NOTE: This currently only works with OpenStack API requests and routing checks are only being done against Compute/instance_id checks.
Specifically:
* servers.get/pause/unpause/diagnostics/suspend/resume/rescue/unrescue/delete
* servers.create is pending for distributed scheduler
* servers.get_all will get added early in Diablo.

What I've been doing for testing:
1. Set up a Nova deployment in a VM (Zone0)
2. Clone the VM and set --zone_name=zone1 (and change all the IP addresses to the new address in nova.conf, glance.conf and novarc)
3. Set --enable_zone_routing=true on all zones
4. use the --connection_type=fake driver for compute to keep things easy
5. Add Zone1 as a child of Zone0 (nova zone-add)

(make sure the instance id's are different in each zone)

Example of calls being sent to child zones:
http://paste.openstack.org/show/964/

To post a comment you must log in.
Revision history for this message
Eric Day (eday) wrote :

Hi Sandy,

I don't think WSGI middleware is the correct place to be handling zones forwarding. What if we write a tool directly against nova.compute.api or if some other component (like a compute or network worker) issues a nova.compute.api call? I think all logic and routing should instead be handled inside inside nova.compute and/or nova.scheduler. Nothing in nova.api should ever be aware of what is happening.

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Thanks Eric,

That was the way I was initially intending to go.

The closer I got to it, I noticed that all calls would need to be re-marshaled to be sent to the children. We'd need a new client library abstraction layer (since forwarded requests may be OS API or EC2 API). I was hoping to avoid all that by simply forwarding the already marshaled message.

This client library abstraction is likely to get a little unwieldy since it needs to support OS/EC2 and whomever else comes along (actually, sounds like another endorsement for DirectAPI.)

Also, with this approach I was assuming an idiom of "API checks parameters and bails if they're wrong. Work is done in the services." ... but I see the counter-argument.

Let me think more about client library abstraction. Since we have big API changes coming soon, might be timely to consider this.

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Thinking a little more about it, but I've got a concern about that approach. Not for technical reasons, but for schedule/political reasons: Since API is such a touchy subject and still under development, this could turn in a major blocker against getting anything practical done with Zones for a long time.

Revision history for this message
Eric Day (eday) wrote :

FWIW, I think making the call to novaclient (and re-marshal) from nova.compute.* is the correct thing to do. I would move forward with the OpenStack API for this as well, as that seems to be where we are committed at the moment. If we need to change this in the future, it will be a simple HTTP formatting issue inside novaclient (the API into novaclient shouldn't change).

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Yup ... that's the direction I'll take. Thanks again Eric.

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Ok, all refactored out of middleware now. The <service>/api.py does the checking now and the external api just bails out after a redirect. It's all handled in decorators now.

Revision history for this message
Rick Harris (rconradharris) wrote :

Nice work, Sandy.

Some really minor nits/suggestions:

> 402 + try:
> 403 + manager = getattr(nova, collection)

The getattr can be outside of the try-block (AFAICT).

> 404 + if isinstance(item_id, int) or item_id.isdigit():
> 405 + result = manager.get(int(item_id))
> 406 + else:
> 407 + result = manager.find(name=item_id)

One common way of handling this is to just go ahead and cast to int and
handle the possible ValueError (EAFP), like:

    try:
        result = manager.get(int(item_id))
    except ValueError:
        result = manager.find(name=item_id)

> 371 +def _wrap_method(function, self):
> 372 + """Wrap method to supply self."""
> 373 + def _wrap(*args, **kwargs):
> 374 + return function(self, *args, **kwargs)
> 375 + return _wrap

For the newly added decorators, it might be worth using @functools.wraps so
that the outer-func inherits the inner-funcs attributes. Can make debugging a
little easier.

> 397 +def _issue_novaclient_command(nova, zone, collection, method_name, \
> 398 + item_id):

Trailing '\' isn't needed.

Could line-up item_id with opening param.

> 264 + @scheduler_api.reroute_compute("diagnostics")
> 265 def get_diagnostics(self, context, instance_id):

Should that be 'get_diagnostics'? I ask because it's the only one that differs
in that regard.

review: Needs Fixing
Revision history for this message
Matt Dietz (cerberus) wrote :

The exception raising idea is really interesting. Hard to follow at first, but the tests make it clear IMO. I think this looks pretty pretty promising.

review: Approve
Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Thanks rick ... great feedback. I've implemented your changes.

re: get_diagnostics, the name in the decorator is the name of the method in novaclient, so it may not map 1:1 to the method being wrapped (as in this case).

Basically the decorator is saying, "if this method can't find the instance, check with the child zones by calling <method> in novaclient"

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

(oh, push pending, will change WIP to let you know)

Revision history for this message
Rick Harris (rconradharris) wrote :

> 788 +def redirect_handler(f):
> 648 +def _wrap_method(function, self):

Didn't like the idea of using @functools.wraps to propagate the inner func's metadata?

review: Needs Information
Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

I do. Good suggestion. But I'll fix that up on the next branch. Haven't spent enough time with it and don't want to miss the window today.

Revision history for this message
Rick Harris (rconradharris) wrote :

Good deal, lgtm. Thanks!

review: Approve
Revision history for this message
OpenStack Infra (hudson-openstack) wrote :

No proposals found for merge of lp:~sandy-walsh/nova/zones3 into lp:nova.

lp:~sandy-walsh/nova/zones4 updated
720. By Sandy Walsh

trunk merge

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'nova/api/openstack/servers.py'
2--- nova/api/openstack/servers.py 2011-03-24 05:08:17 +0000
3+++ nova/api/openstack/servers.py 2011-03-24 12:14:14 +0000
4@@ -38,6 +38,7 @@
5 from nova.compute import power_state
6 from nova.quota import QuotaError
7 import nova.api.openstack
8+from nova.scheduler import api as scheduler_api
9
10
11 LOG = logging.getLogger('server')
12@@ -88,15 +89,18 @@
13 for inst in limited_list]
14 return dict(servers=servers)
15
16+ @scheduler_api.redirect_handler
17 def show(self, req, id):
18 """ Returns server details by server id """
19 try:
20- instance = self.compute_api.get(req.environ['nova.context'], id)
21+ instance = self.compute_api.routing_get(
22+ req.environ['nova.context'], id)
23 builder = self._get_view_builder(req)
24 return builder.build(instance, is_detail=True)
25 except exception.NotFound:
26 return faults.Fault(exc.HTTPNotFound())
27
28+ @scheduler_api.redirect_handler
29 def delete(self, req, id):
30 """ Destroys a server """
31 try:
32@@ -227,6 +231,7 @@
33 # if the original error is okay, just reraise it
34 raise error
35
36+ @scheduler_api.redirect_handler
37 def update(self, req, id):
38 """ Updates the server name or password """
39 if len(req.body) == 0:
40@@ -252,6 +257,7 @@
41 return faults.Fault(exc.HTTPNotFound())
42 return exc.HTTPNoContent()
43
44+ @scheduler_api.redirect_handler
45 def action(self, req, id):
46 """Multi-purpose method used to reboot, rebuild, or
47 resize a server"""
48@@ -317,6 +323,7 @@
49 return faults.Fault(exc.HTTPUnprocessableEntity())
50 return exc.HTTPAccepted()
51
52+ @scheduler_api.redirect_handler
53 def lock(self, req, id):
54 """
55 lock the instance with id
56@@ -332,6 +339,7 @@
57 return faults.Fault(exc.HTTPUnprocessableEntity())
58 return exc.HTTPAccepted()
59
60+ @scheduler_api.redirect_handler
61 def unlock(self, req, id):
62 """
63 unlock the instance with id
64@@ -347,6 +355,7 @@
65 return faults.Fault(exc.HTTPUnprocessableEntity())
66 return exc.HTTPAccepted()
67
68+ @scheduler_api.redirect_handler
69 def get_lock(self, req, id):
70 """
71 return the boolean state of (instance with id)'s lock
72@@ -361,6 +370,7 @@
73 return faults.Fault(exc.HTTPUnprocessableEntity())
74 return exc.HTTPAccepted()
75
76+ @scheduler_api.redirect_handler
77 def reset_network(self, req, id):
78 """
79 Reset networking on an instance (admin only).
80@@ -375,6 +385,7 @@
81 return faults.Fault(exc.HTTPUnprocessableEntity())
82 return exc.HTTPAccepted()
83
84+ @scheduler_api.redirect_handler
85 def inject_network_info(self, req, id):
86 """
87 Inject network info for an instance (admin only).
88@@ -389,6 +400,7 @@
89 return faults.Fault(exc.HTTPUnprocessableEntity())
90 return exc.HTTPAccepted()
91
92+ @scheduler_api.redirect_handler
93 def pause(self, req, id):
94 """ Permit Admins to Pause the server. """
95 ctxt = req.environ['nova.context']
96@@ -400,6 +412,7 @@
97 return faults.Fault(exc.HTTPUnprocessableEntity())
98 return exc.HTTPAccepted()
99
100+ @scheduler_api.redirect_handler
101 def unpause(self, req, id):
102 """ Permit Admins to Unpause the server. """
103 ctxt = req.environ['nova.context']
104@@ -411,6 +424,7 @@
105 return faults.Fault(exc.HTTPUnprocessableEntity())
106 return exc.HTTPAccepted()
107
108+ @scheduler_api.redirect_handler
109 def suspend(self, req, id):
110 """permit admins to suspend the server"""
111 context = req.environ['nova.context']
112@@ -422,6 +436,7 @@
113 return faults.Fault(exc.HTTPUnprocessableEntity())
114 return exc.HTTPAccepted()
115
116+ @scheduler_api.redirect_handler
117 def resume(self, req, id):
118 """permit admins to resume the server from suspend"""
119 context = req.environ['nova.context']
120@@ -433,6 +448,7 @@
121 return faults.Fault(exc.HTTPUnprocessableEntity())
122 return exc.HTTPAccepted()
123
124+ @scheduler_api.redirect_handler
125 def rescue(self, req, id):
126 """Permit users to rescue the server."""
127 context = req.environ["nova.context"]
128@@ -444,6 +460,7 @@
129 return faults.Fault(exc.HTTPUnprocessableEntity())
130 return exc.HTTPAccepted()
131
132+ @scheduler_api.redirect_handler
133 def unrescue(self, req, id):
134 """Permit users to unrescue the server."""
135 context = req.environ["nova.context"]
136@@ -455,6 +472,7 @@
137 return faults.Fault(exc.HTTPUnprocessableEntity())
138 return exc.HTTPAccepted()
139
140+ @scheduler_api.redirect_handler
141 def get_ajax_console(self, req, id):
142 """ Returns a url to an instance's ajaxterm console. """
143 try:
144@@ -464,6 +482,7 @@
145 return faults.Fault(exc.HTTPNotFound())
146 return exc.HTTPAccepted()
147
148+ @scheduler_api.redirect_handler
149 def diagnostics(self, req, id):
150 """Permit Admins to retrieve server diagnostics."""
151 ctxt = req.environ["nova.context"]
152
153=== modified file 'nova/api/openstack/zones.py'
154--- nova/api/openstack/zones.py 2011-03-11 19:49:32 +0000
155+++ nova/api/openstack/zones.py 2011-03-24 12:14:14 +0000
156@@ -16,8 +16,8 @@
157 import common
158
159 from nova import flags
160+from nova import log as logging
161 from nova import wsgi
162-from nova import db
163 from nova.scheduler import api
164
165
166@@ -38,7 +38,8 @@
167
168
169 def _scrub_zone(zone):
170- return _filter_keys(zone, ('id', 'api_url'))
171+ return _exclude_keys(zone, ('username', 'password', 'created_at',
172+ 'deleted', 'deleted_at', 'updated_at'))
173
174
175 class Controller(wsgi.Controller):
176@@ -52,13 +53,9 @@
177 """Return all zones in brief"""
178 # Ask the ZoneManager in the Scheduler for most recent data,
179 # or fall-back to the database ...
180- items = api.API().get_zone_list(req.environ['nova.context'])
181- if not items:
182- items = db.zone_get_all(req.environ['nova.context'])
183-
184+ items = api.get_zone_list(req.environ['nova.context'])
185 items = common.limited(items, req)
186- items = [_exclude_keys(item, ['username', 'password'])
187- for item in items]
188+ items = [_scrub_zone(item) for item in items]
189 return dict(zones=items)
190
191 def detail(self, req):
192@@ -67,29 +64,37 @@
193
194 def info(self, req):
195 """Return name and capabilities for this zone."""
196- return dict(zone=dict(name=FLAGS.zone_name,
197- capabilities=FLAGS.zone_capabilities))
198+ items = api.get_zone_capabilities(req.environ['nova.context'])
199+
200+ zone = dict(name=FLAGS.zone_name)
201+ caps = FLAGS.zone_capabilities
202+ for cap in caps:
203+ key_values = cap.split('=')
204+ zone[key_values[0]] = key_values[1]
205+ for item, (min_value, max_value) in items.iteritems():
206+ zone[item] = "%s,%s" % (min_value, max_value)
207+ return dict(zone=zone)
208
209 def show(self, req, id):
210 """Return data about the given zone id"""
211 zone_id = int(id)
212- zone = db.zone_get(req.environ['nova.context'], zone_id)
213+ zone = api.zone_get(req.environ['nova.context'], zone_id)
214 return dict(zone=_scrub_zone(zone))
215
216 def delete(self, req, id):
217 zone_id = int(id)
218- db.zone_delete(req.environ['nova.context'], zone_id)
219+ api.zone_delete(req.environ['nova.context'], zone_id)
220 return {}
221
222 def create(self, req):
223 context = req.environ['nova.context']
224 env = self._deserialize(req.body, req.get_content_type())
225- zone = db.zone_create(context, env["zone"])
226+ zone = api.zone_create(context, env["zone"])
227 return dict(zone=_scrub_zone(zone))
228
229 def update(self, req, id):
230 context = req.environ['nova.context']
231 env = self._deserialize(req.body, req.get_content_type())
232 zone_id = int(id)
233- zone = db.zone_update(context, zone_id, env["zone"])
234+ zone = api.zone_update(context, zone_id, env["zone"])
235 return dict(zone=_scrub_zone(zone))
236
237=== modified file 'nova/compute/api.py'
238--- nova/compute/api.py 2011-03-23 21:04:42 +0000
239+++ nova/compute/api.py 2011-03-24 12:14:14 +0000
240@@ -34,6 +34,7 @@
241 from nova import utils
242 from nova import volume
243 from nova.compute import instance_types
244+from nova.scheduler import api as scheduler_api
245 from nova.db import base
246
247 FLAGS = flags.FLAGS
248@@ -352,6 +353,7 @@
249 rv = self.db.instance_update(context, instance_id, kwargs)
250 return dict(rv.iteritems())
251
252+ @scheduler_api.reroute_compute("delete")
253 def delete(self, context, instance_id):
254 LOG.debug(_("Going to try to terminate %s"), instance_id)
255 try:
256@@ -384,6 +386,13 @@
257 rv = self.db.instance_get(context, instance_id)
258 return dict(rv.iteritems())
259
260+ @scheduler_api.reroute_compute("get")
261+ def routing_get(self, context, instance_id):
262+ """Use this method instead of get() if this is the only
263+ operation you intend to to. It will route to novaclient.get
264+ if the instance is not found."""
265+ return self.get(context, instance_id)
266+
267 def get_all(self, context, project_id=None, reservation_id=None,
268 fixed_ip=None):
269 """Get all instances, possibly filtered by one of the
270@@ -527,14 +536,17 @@
271 "instance_id": instance_id,
272 "flavor_id": flavor_id}})
273
274+ @scheduler_api.reroute_compute("pause")
275 def pause(self, context, instance_id):
276 """Pause the given instance."""
277 self._cast_compute_message('pause_instance', context, instance_id)
278
279+ @scheduler_api.reroute_compute("unpause")
280 def unpause(self, context, instance_id):
281 """Unpause the given instance."""
282 self._cast_compute_message('unpause_instance', context, instance_id)
283
284+ @scheduler_api.reroute_compute("diagnostics")
285 def get_diagnostics(self, context, instance_id):
286 """Retrieve diagnostics for the given instance."""
287 return self._call_compute_message(
288@@ -546,18 +558,22 @@
289 """Retrieve actions for the given instance."""
290 return self.db.instance_get_actions(context, instance_id)
291
292+ @scheduler_api.reroute_compute("suspend")
293 def suspend(self, context, instance_id):
294 """suspend the instance with instance_id"""
295 self._cast_compute_message('suspend_instance', context, instance_id)
296
297+ @scheduler_api.reroute_compute("resume")
298 def resume(self, context, instance_id):
299 """resume the instance with instance_id"""
300 self._cast_compute_message('resume_instance', context, instance_id)
301
302+ @scheduler_api.reroute_compute("rescue")
303 def rescue(self, context, instance_id):
304 """Rescue the given instance."""
305 self._cast_compute_message('rescue_instance', context, instance_id)
306
307+ @scheduler_api.reroute_compute("unrescue")
308 def unrescue(self, context, instance_id):
309 """Unrescue the given instance."""
310 self._cast_compute_message('unrescue_instance', context, instance_id)
311@@ -573,7 +589,6 @@
312
313 def get_ajax_console(self, context, instance_id):
314 """Get a url to an AJAX Console"""
315- instance = self.get(context, instance_id)
316 output = self._call_compute_message('get_ajax_console',
317 context,
318 instance_id)
319
320=== modified file 'nova/compute/manager.py'
321--- nova/compute/manager.py 2011-03-24 10:01:22 +0000
322+++ nova/compute/manager.py 2011-03-24 12:14:14 +0000
323@@ -111,7 +111,7 @@
324 return decorated_function
325
326
327-class ComputeManager(manager.Manager):
328+class ComputeManager(manager.SchedulerDependentManager):
329
330 """Manages the running instances from creation to destruction."""
331
332@@ -132,7 +132,8 @@
333
334 self.network_manager = utils.import_object(FLAGS.network_manager)
335 self.volume_manager = utils.import_object(FLAGS.volume_manager)
336- super(ComputeManager, self).__init__(*args, **kwargs)
337+ super(ComputeManager, self).__init__(service_name="compute",
338+ *args, **kwargs)
339
340 def init_host(self):
341 """Do any initialization that needs to be run if this is a
342
343=== modified file 'nova/db/api.py'
344--- nova/db/api.py 2011-03-23 04:31:50 +0000
345+++ nova/db/api.py 2011-03-24 12:14:14 +0000
346@@ -71,6 +71,7 @@
347 """No more available blades"""
348 pass
349
350+
351 ###################
352
353
354
355=== modified file 'nova/flags.py'
356--- nova/flags.py 2011-03-18 11:35:00 +0000
357+++ nova/flags.py 2011-03-24 12:14:14 +0000
358@@ -358,5 +358,6 @@
359 'availability zone of this node')
360
361 DEFINE_string('zone_name', 'nova', 'name of this zone')
362-DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux',
363- 'Key/Value tags which represent capabilities of this zone')
364+DEFINE_list('zone_capabilities',
365+ ['hypervisor=xenserver;kvm', 'os=linux;windows'],
366+ 'Key/Multi-value list representng capabilities of this zone')
367
368=== modified file 'nova/manager.py'
369--- nova/manager.py 2010-12-15 00:05:39 +0000
370+++ nova/manager.py 2011-03-24 12:14:14 +0000
371@@ -53,8 +53,9 @@
372
373 from nova import utils
374 from nova import flags
375+from nova import log as logging
376 from nova.db import base
377-
378+from nova.scheduler import api
379
380 FLAGS = flags.FLAGS
381
382@@ -74,3 +75,28 @@
383 """Do any initialization that needs to be run if this is a standalone
384 service. Child classes should override this method."""
385 pass
386+
387+
388+class SchedulerDependentManager(Manager):
389+ """Periodically send capability updates to the Scheduler services.
390+ Services that need to update the Scheduler of their capabilities
391+ should derive from this class. Otherwise they can derive from
392+ manager.Manager directly. Updates are only sent after
393+ update_service_capabilities is called with non-None values."""
394+ def __init__(self, host=None, db_driver=None, service_name="undefined"):
395+ self.last_capabilities = None
396+ self.service_name = service_name
397+ super(SchedulerDependentManager, self).__init__(host, db_driver)
398+
399+ def update_service_capabilities(self, capabilities):
400+ """Remember these capabilities to send on next periodic update."""
401+ self.last_capabilities = capabilities
402+
403+ def periodic_tasks(self, context=None):
404+ """Pass data back to the scheduler at a periodic interval"""
405+ if self.last_capabilities:
406+ logging.debug(_("Notifying Schedulers of capabilities ..."))
407+ api.update_service_capabilities(context, self.service_name,
408+ self.host, self.last_capabilities)
409+
410+ super(SchedulerDependentManager, self).periodic_tasks(context)
411
412=== modified file 'nova/network/manager.py'
413--- nova/network/manager.py 2011-03-23 05:29:32 +0000
414+++ nova/network/manager.py 2011-03-24 12:14:14 +0000
415@@ -105,7 +105,7 @@
416 pass
417
418
419-class NetworkManager(manager.Manager):
420+class NetworkManager(manager.SchedulerDependentManager):
421 """Implements common network manager functionality.
422
423 This class must be subclassed to support specific topologies.
424@@ -116,7 +116,8 @@
425 if not network_driver:
426 network_driver = FLAGS.network_driver
427 self.driver = utils.import_object(network_driver)
428- super(NetworkManager, self).__init__(*args, **kwargs)
429+ super(NetworkManager, self).__init__(service_name='network',
430+ *args, **kwargs)
431
432 def init_host(self):
433 """Do any initialization that needs to be run if this is a
434
435=== modified file 'nova/rpc.py'
436--- nova/rpc.py 2011-03-18 13:56:05 +0000
437+++ nova/rpc.py 2011-03-24 12:14:14 +0000
438@@ -137,24 +137,7 @@
439 return timer
440
441
442-class Publisher(messaging.Publisher):
443- """Publisher base class"""
444- pass
445-
446-
447-class TopicConsumer(Consumer):
448- """Consumes messages on a specific topic"""
449- exchange_type = "topic"
450-
451- def __init__(self, connection=None, topic="broadcast"):
452- self.queue = topic
453- self.routing_key = topic
454- self.exchange = FLAGS.control_exchange
455- self.durable = False
456- super(TopicConsumer, self).__init__(connection=connection)
457-
458-
459-class AdapterConsumer(TopicConsumer):
460+class AdapterConsumer(Consumer):
461 """Calls methods on a proxy object based on method and args"""
462 def __init__(self, connection=None, topic="broadcast", proxy=None):
463 LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
464@@ -207,6 +190,41 @@
465 return
466
467
468+class Publisher(messaging.Publisher):
469+ """Publisher base class"""
470+ pass
471+
472+
473+class TopicAdapterConsumer(AdapterConsumer):
474+ """Consumes messages on a specific topic"""
475+ exchange_type = "topic"
476+
477+ def __init__(self, connection=None, topic="broadcast", proxy=None):
478+ self.queue = topic
479+ self.routing_key = topic
480+ self.exchange = FLAGS.control_exchange
481+ self.durable = False
482+ super(TopicAdapterConsumer, self).__init__(connection=connection,
483+ topic=topic, proxy=proxy)
484+
485+
486+class FanoutAdapterConsumer(AdapterConsumer):
487+ """Consumes messages from a fanout exchange"""
488+ exchange_type = "fanout"
489+
490+ def __init__(self, connection=None, topic="broadcast", proxy=None):
491+ self.exchange = "%s_fanout" % topic
492+ self.routing_key = topic
493+ unique = uuid.uuid4().hex
494+ self.queue = "%s_fanout_%s" % (topic, unique)
495+ self.durable = False
496+ LOG.info(_("Created '%(exchange)s' fanout exchange "
497+ "with '%(key)s' routing key"),
498+ dict(exchange=self.exchange, key=self.routing_key))
499+ super(FanoutAdapterConsumer, self).__init__(connection=connection,
500+ topic=topic, proxy=proxy)
501+
502+
503 class TopicPublisher(Publisher):
504 """Publishes messages on a specific topic"""
505 exchange_type = "topic"
506@@ -218,6 +236,19 @@
507 super(TopicPublisher, self).__init__(connection=connection)
508
509
510+class FanoutPublisher(Publisher):
511+ """Publishes messages to a fanout exchange."""
512+ exchange_type = "fanout"
513+
514+ def __init__(self, topic, connection=None):
515+ self.exchange = "%s_fanout" % topic
516+ self.queue = "%s_fanout" % topic
517+ self.durable = False
518+ LOG.info(_("Creating '%(exchange)s' fanout exchange"),
519+ dict(exchange=self.exchange))
520+ super(FanoutPublisher, self).__init__(connection=connection)
521+
522+
523 class DirectConsumer(Consumer):
524 """Consumes messages directly on a channel specified by msg_id"""
525 exchange_type = "direct"
526@@ -360,6 +391,16 @@
527 publisher.close()
528
529
530+def fanout_cast(context, topic, msg):
531+ """Sends a message on a fanout exchange without waiting for a response"""
532+ LOG.debug(_("Making asynchronous fanout cast..."))
533+ _pack_context(msg, context)
534+ conn = Connection.instance()
535+ publisher = FanoutPublisher(topic, connection=conn)
536+ publisher.send(msg)
537+ publisher.close()
538+
539+
540 def generic_response(message_data, message):
541 """Logs a result and exits"""
542 LOG.debug(_('response %s'), message_data)
543
544=== modified file 'nova/scheduler/api.py'
545--- nova/scheduler/api.py 2011-02-25 21:40:15 +0000
546+++ nova/scheduler/api.py 2011-03-24 12:14:14 +0000
547@@ -17,33 +17,225 @@
548 Handles all requests relating to schedulers.
549 """
550
551+import novaclient
552+
553+from nova import db
554+from nova import exception
555 from nova import flags
556 from nova import log as logging
557 from nova import rpc
558
559+from eventlet import greenpool
560+
561 FLAGS = flags.FLAGS
562+flags.DEFINE_bool('enable_zone_routing',
563+ False,
564+ 'When True, routing to child zones will occur.')
565+
566 LOG = logging.getLogger('nova.scheduler.api')
567
568
569-class API(object):
570- """API for interacting with the scheduler."""
571-
572- def _call_scheduler(self, method, context, params=None):
573- """Generic handler for RPC calls to the scheduler.
574-
575- :param params: Optional dictionary of arguments to be passed to the
576- scheduler worker
577-
578- :retval: Result returned by scheduler worker
579- """
580- if not params:
581- params = {}
582- queue = FLAGS.scheduler_topic
583- kwargs = {'method': method, 'args': params}
584- return rpc.call(context, queue, kwargs)
585-
586- def get_zone_list(self, context):
587- items = self._call_scheduler('get_zone_list', context)
588- for item in items:
589- item['api_url'] = item['api_url'].replace('\\/', '/')
590- return items
591+def _call_scheduler(method, context, params=None):
592+ """Generic handler for RPC calls to the scheduler.
593+
594+ :param params: Optional dictionary of arguments to be passed to the
595+ scheduler worker
596+
597+ :retval: Result returned by scheduler worker
598+ """
599+ if not params:
600+ params = {}
601+ queue = FLAGS.scheduler_topic
602+ kwargs = {'method': method, 'args': params}
603+ return rpc.call(context, queue, kwargs)
604+
605+
606+def get_zone_list(context):
607+ """Return a list of zones assoicated with this zone."""
608+ items = _call_scheduler('get_zone_list', context)
609+ for item in items:
610+ item['api_url'] = item['api_url'].replace('\\/', '/')
611+ if not items:
612+ items = db.zone_get_all(context)
613+ return items
614+
615+
616+def zone_get(context, zone_id):
617+ return db.zone_get(context, zone_id)
618+
619+
620+def zone_delete(context, zone_id):
621+ return db.zone_delete(context, zone_id)
622+
623+
624+def zone_create(context, data):
625+ return db.zone_create(context, data)
626+
627+
628+def zone_update(context, zone_id, data):
629+ return db.zone_update(context, zone_id, data)
630+
631+
632+def get_zone_capabilities(context, service=None):
633+ """Returns a dict of key, value capabilities for this zone,
634+ or for a particular class of services running in this zone."""
635+ return _call_scheduler('get_zone_capabilities', context=context,
636+ params=dict(service=service))
637+
638+
639+def update_service_capabilities(context, service_name, host, capabilities):
640+ """Send an update to all the scheduler services informing them
641+ of the capabilities of this service."""
642+ kwargs = dict(method='update_service_capabilities',
643+ args=dict(service_name=service_name, host=host,
644+ capabilities=capabilities))
645+ return rpc.fanout_cast(context, 'scheduler', kwargs)
646+
647+
648+def _wrap_method(function, self):
649+ """Wrap method to supply self."""
650+ def _wrap(*args, **kwargs):
651+ return function(self, *args, **kwargs)
652+ return _wrap
653+
654+
655+def _process(func, zone):
656+ """Worker stub for green thread pool. Give the worker
657+ an authenticated nova client and zone info."""
658+ nova = novaclient.OpenStack(zone.username, zone.password, zone.api_url)
659+ nova.authenticate()
660+ return func(nova, zone)
661+
662+
663+def child_zone_helper(zone_list, func):
664+ """Fire off a command to each zone in the list.
665+ The return is [novaclient return objects] from each child zone.
666+ For example, if you are calling server.pause(), the list will
667+ be whatever the response from server.pause() is. One entry
668+ per child zone called."""
669+ green_pool = greenpool.GreenPool()
670+ return [result for result in green_pool.imap(
671+ _wrap_method(_process, func), zone_list)]
672+
673+
674+def _issue_novaclient_command(nova, zone, collection, method_name, item_id):
675+ """Use novaclient to issue command to a single child zone.
676+ One of these will be run in parallel for each child zone."""
677+ manager = getattr(nova, collection)
678+ result = None
679+ try:
680+ try:
681+ result = manager.get(int(item_id))
682+ except ValueError, e:
683+ result = manager.find(name=item_id)
684+ except novaclient.NotFound:
685+ url = zone.api_url
686+ LOG.debug(_("%(collection)s '%(item_id)s' not found on '%(url)s'" %
687+ locals()))
688+ return None
689+
690+ if method_name.lower() not in ['get', 'find']:
691+ result = getattr(result, method_name)()
692+ return result
693+
694+
695+def wrap_novaclient_function(f, collection, method_name, item_id):
696+ """Appends collection, method_name and item_id to the incoming
697+ (nova, zone) call from child_zone_helper."""
698+ def inner(nova, zone):
699+ return f(nova, zone, collection, method_name, item_id)
700+
701+ return inner
702+
703+
704+class RedirectResult(exception.Error):
705+ """Used to the HTTP API know that these results are pre-cooked
706+ and they can be returned to the caller directly."""
707+ def __init__(self, results):
708+ self.results = results
709+ super(RedirectResult, self).__init__(
710+ message=_("Uncaught Zone redirection exception"))
711+
712+
713+class reroute_compute(object):
714+ """Decorator used to indicate that the method should
715+ delegate the call the child zones if the db query
716+ can't find anything."""
717+ def __init__(self, method_name):
718+ self.method_name = method_name
719+
720+ def __call__(self, f):
721+ def wrapped_f(*args, **kwargs):
722+ collection, context, item_id = \
723+ self.get_collection_context_and_id(args, kwargs)
724+ try:
725+ # Call the original function ...
726+ return f(*args, **kwargs)
727+ except exception.InstanceNotFound, e:
728+ LOG.debug(_("Instance %(item_id)s not found "
729+ "locally: '%(e)s'" % locals()))
730+
731+ if not FLAGS.enable_zone_routing:
732+ raise
733+
734+ zones = db.zone_get_all(context)
735+ if not zones:
736+ raise
737+
738+ # Ask the children to provide an answer ...
739+ LOG.debug(_("Asking child zones ..."))
740+ result = self._call_child_zones(zones,
741+ wrap_novaclient_function(_issue_novaclient_command,
742+ collection, self.method_name, item_id))
743+ # Scrub the results and raise another exception
744+ # so the API layers can bail out gracefully ...
745+ raise RedirectResult(self.unmarshall_result(result))
746+ return wrapped_f
747+
748+ def _call_child_zones(self, zones, function):
749+ """Ask the child zones to perform this operation.
750+ Broken out for testing."""
751+ return child_zone_helper(zones, function)
752+
753+ def get_collection_context_and_id(self, args, kwargs):
754+ """Returns a tuple of (novaclient collection name, security
755+ context and resource id. Derived class should override this."""
756+ context = kwargs.get('context', None)
757+ instance_id = kwargs.get('instance_id', None)
758+ if len(args) > 0 and not context:
759+ context = args[1]
760+ if len(args) > 1 and not instance_id:
761+ instance_id = args[2]
762+ return ("servers", context, instance_id)
763+
764+ def unmarshall_result(self, zone_responses):
765+ """Result is a list of responses from each child zone.
766+ Each decorator derivation is responsible to turning this
767+ into a format expected by the calling method. For
768+ example, this one is expected to return a single Server
769+ dict {'server':{k:v}}. Others may return a list of them, like
770+ {'servers':[{k,v}]}"""
771+ reduced_response = []
772+ for zone_response in zone_responses:
773+ if not zone_response:
774+ continue
775+
776+ server = zone_response.__dict__
777+
778+ for k in server.keys():
779+ if k[0] == '_' or k == 'manager':
780+ del server[k]
781+
782+ reduced_response.append(dict(server=server))
783+ if reduced_response:
784+ return reduced_response[0] # first for now.
785+ return {}
786+
787+
788+def redirect_handler(f):
789+ def new_f(*args, **kwargs):
790+ try:
791+ return f(*args, **kwargs)
792+ except RedirectResult, e:
793+ return e.results
794+ return new_f
795
796=== modified file 'nova/scheduler/driver.py'
797--- nova/scheduler/driver.py 2011-03-10 04:30:52 +0000
798+++ nova/scheduler/driver.py 2011-03-24 12:14:14 +0000
799@@ -49,6 +49,13 @@
800 class Scheduler(object):
801 """The base class that all Scheduler clases should inherit from."""
802
803+ def __init__(self):
804+ self.zone_manager = None
805+
806+ def set_zone_manager(self, zone_manager):
807+ """Called by the Scheduler Service to supply a ZoneManager."""
808+ self.zone_manager = zone_manager
809+
810 @staticmethod
811 def service_is_up(service):
812 """Check whether a service is up based on last heartbeat."""
813
814=== modified file 'nova/scheduler/manager.py'
815--- nova/scheduler/manager.py 2011-03-14 17:59:41 +0000
816+++ nova/scheduler/manager.py 2011-03-24 12:14:14 +0000
817@@ -41,10 +41,11 @@
818 class SchedulerManager(manager.Manager):
819 """Chooses a host to run instances on."""
820 def __init__(self, scheduler_driver=None, *args, **kwargs):
821+ self.zone_manager = zone_manager.ZoneManager()
822 if not scheduler_driver:
823 scheduler_driver = FLAGS.scheduler_driver
824 self.driver = utils.import_object(scheduler_driver)
825- self.zone_manager = zone_manager.ZoneManager()
826+ self.driver.set_zone_manager(self.zone_manager)
827 super(SchedulerManager, self).__init__(*args, **kwargs)
828
829 def __getattr__(self, key):
830@@ -59,6 +60,17 @@
831 """Get a list of zones from the ZoneManager."""
832 return self.zone_manager.get_zone_list()
833
834+ def get_zone_capabilities(self, context=None, service=None):
835+ """Get the normalized set of capabilites for this zone,
836+ or for a particular service."""
837+ return self.zone_manager.get_zone_capabilities(context, service)
838+
839+ def update_service_capabilities(self, context=None, service_name=None,
840+ host=None, capabilities={}):
841+ """Process a capability update from a service node."""
842+ self.zone_manager.update_service_capabilities(service_name,
843+ host, capabilities)
844+
845 def _schedule(self, method, context, topic, *args, **kwargs):
846 """Tries to call schedule_* method on the driver to retrieve host.
847
848
849=== modified file 'nova/scheduler/zone_manager.py'
850--- nova/scheduler/zone_manager.py 2011-03-03 14:55:02 +0000
851+++ nova/scheduler/zone_manager.py 2011-03-24 12:14:14 +0000
852@@ -58,8 +58,9 @@
853 child zone."""
854 self.last_seen = datetime.now()
855 self.attempt = 0
856- self.name = zone_metadata["name"]
857- self.capabilities = zone_metadata["capabilities"]
858+ self.name = zone_metadata.get("name", "n/a")
859+ self.capabilities = ", ".join(["%s=%s" % (k, v)
860+ for k, v in zone_metadata.iteritems() if k != 'name'])
861 self.is_active = True
862
863 def to_dict(self):
864@@ -104,13 +105,37 @@
865 """Keeps the zone states updated."""
866 def __init__(self):
867 self.last_zone_db_check = datetime.min
868- self.zone_states = {}
869+ self.zone_states = {} # { <zone_id> : ZoneState }
870+ self.service_states = {} # { <service> : { <host> : { cap k : v }}}
871 self.green_pool = greenpool.GreenPool()
872
873 def get_zone_list(self):
874 """Return the list of zones we know about."""
875 return [zone.to_dict() for zone in self.zone_states.values()]
876
877+ def get_zone_capabilities(self, context, service=None):
878+ """Roll up all the individual host info to generic 'service'
879+ capabilities. Each capability is aggregated into
880+ <cap>_min and <cap>_max values."""
881+ service_dict = self.service_states
882+ if service:
883+ service_dict = {service: self.service_states.get(service, {})}
884+
885+ # TODO(sandy) - be smarter about fabricating this structure.
886+ # But it's likely to change once we understand what the Best-Match
887+ # code will need better.
888+ combined = {} # { <service>_<cap> : (min, max), ... }
889+ for service_name, host_dict in service_dict.iteritems():
890+ for host, caps_dict in host_dict.iteritems():
891+ for cap, value in caps_dict.iteritems():
892+ key = "%s_%s" % (service_name, cap)
893+ min_value, max_value = combined.get(key, (value, value))
894+ min_value = min(min_value, value)
895+ max_value = max(max_value, value)
896+ combined[key] = (min_value, max_value)
897+
898+ return combined
899+
900 def _refresh_from_db(self, context):
901 """Make our zone state map match the db."""
902 # Add/update existing zones ...
903@@ -141,3 +166,11 @@
904 self.last_zone_db_check = datetime.now()
905 self._refresh_from_db(context)
906 self._poll_zones(context)
907+
908+ def update_service_capabilities(self, service_name, host, capabilities):
909+ """Update the per-service capabilities based on this notification."""
910+ logging.debug(_("Received %(service_name)s service update from "
911+ "%(host)s: %(capabilities)s") % locals())
912+ service_caps = self.service_states.get(service_name, {})
913+ service_caps[host] = capabilities
914+ self.service_states[service_name] = service_caps
915
916=== modified file 'nova/service.py'
917--- nova/service.py 2011-03-18 13:56:05 +0000
918+++ nova/service.py 2011-03-24 12:14:14 +0000
919@@ -97,18 +97,24 @@
920
921 conn1 = rpc.Connection.instance(new=True)
922 conn2 = rpc.Connection.instance(new=True)
923+ conn3 = rpc.Connection.instance(new=True)
924 if self.report_interval:
925- consumer_all = rpc.AdapterConsumer(
926+ consumer_all = rpc.TopicAdapterConsumer(
927 connection=conn1,
928 topic=self.topic,
929 proxy=self)
930- consumer_node = rpc.AdapterConsumer(
931+ consumer_node = rpc.TopicAdapterConsumer(
932 connection=conn2,
933 topic='%s.%s' % (self.topic, self.host),
934 proxy=self)
935+ fanout = rpc.FanoutAdapterConsumer(
936+ connection=conn3,
937+ topic=self.topic,
938+ proxy=self)
939
940 self.timers.append(consumer_all.attach_to_eventlet())
941 self.timers.append(consumer_node.attach_to_eventlet())
942+ self.timers.append(fanout.attach_to_eventlet())
943
944 pulse = utils.LoopingCall(self.report_state)
945 pulse.start(interval=self.report_interval, now=False)
946
947=== modified file 'nova/tests/api/openstack/test_zones.py'
948--- nova/tests/api/openstack/test_zones.py 2011-03-11 19:49:32 +0000
949+++ nova/tests/api/openstack/test_zones.py 2011-03-24 12:14:14 +0000
950@@ -75,6 +75,10 @@
951 ]
952
953
954+def zone_capabilities(method, context, params):
955+ return dict()
956+
957+
958 class ZonesTest(test.TestCase):
959 def setUp(self):
960 super(ZonesTest, self).setUp()
961@@ -93,13 +97,18 @@
962 self.stubs.Set(nova.db, 'zone_create', zone_create)
963 self.stubs.Set(nova.db, 'zone_delete', zone_delete)
964
965+ self.old_zone_name = FLAGS.zone_name
966+ self.old_zone_capabilities = FLAGS.zone_capabilities
967+
968 def tearDown(self):
969 self.stubs.UnsetAll()
970 FLAGS.allow_admin_api = self.allow_admin
971+ FLAGS.zone_name = self.old_zone_name
972+ FLAGS.zone_capabilities = self.old_zone_capabilities
973 super(ZonesTest, self).tearDown()
974
975 def test_get_zone_list_scheduler(self):
976- self.stubs.Set(api.API, '_call_scheduler', zone_get_all_scheduler)
977+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler)
978 req = webob.Request.blank('/v1.0/zones')
979 res = req.get_response(fakes.wsgi_app())
980 res_dict = json.loads(res.body)
981@@ -108,8 +117,7 @@
982 self.assertEqual(len(res_dict['zones']), 2)
983
984 def test_get_zone_list_db(self):
985- self.stubs.Set(api.API, '_call_scheduler',
986- zone_get_all_scheduler_empty)
987+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty)
988 self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
989 req = webob.Request.blank('/v1.0/zones')
990 req.headers["Content-Type"] = "application/json"
991@@ -167,3 +175,18 @@
992 self.assertEqual(res_dict['zone']['id'], 1)
993 self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
994 self.assertFalse('username' in res_dict['zone'])
995+
996+ def test_zone_info(self):
997+ FLAGS.zone_name = 'darksecret'
998+ FLAGS.zone_capabilities = ['cap1=a;b', 'cap2=c;d']
999+ self.stubs.Set(api, '_call_scheduler', zone_capabilities)
1000+
1001+ body = dict(zone=dict(username='zeb', password='sneaky'))
1002+ req = webob.Request.blank('/v1.0/zones/info')
1003+
1004+ res = req.get_response(fakes.wsgi_app())
1005+ res_dict = json.loads(res.body)
1006+ self.assertEqual(res.status_int, 200)
1007+ self.assertEqual(res_dict['zone']['name'], 'darksecret')
1008+ self.assertEqual(res_dict['zone']['cap1'], 'a;b')
1009+ self.assertEqual(res_dict['zone']['cap2'], 'c;d')
1010
1011=== modified file 'nova/tests/test_rpc.py'
1012--- nova/tests/test_rpc.py 2011-01-19 20:26:09 +0000
1013+++ nova/tests/test_rpc.py 2011-03-24 12:14:14 +0000
1014@@ -36,7 +36,7 @@
1015 super(RpcTestCase, self).setUp()
1016 self.conn = rpc.Connection.instance(True)
1017 self.receiver = TestReceiver()
1018- self.consumer = rpc.AdapterConsumer(connection=self.conn,
1019+ self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
1020 topic='test',
1021 proxy=self.receiver)
1022 self.consumer.attach_to_eventlet()
1023@@ -97,7 +97,7 @@
1024
1025 nested = Nested()
1026 conn = rpc.Connection.instance(True)
1027- consumer = rpc.AdapterConsumer(connection=conn,
1028+ consumer = rpc.TopicAdapterConsumer(connection=conn,
1029 topic='nested',
1030 proxy=nested)
1031 consumer.attach_to_eventlet()
1032
1033=== modified file 'nova/tests/test_scheduler.py'
1034--- nova/tests/test_scheduler.py 2011-03-10 06:23:13 +0000
1035+++ nova/tests/test_scheduler.py 2011-03-24 12:14:14 +0000
1036@@ -21,6 +21,9 @@
1037
1038 import datetime
1039 import mox
1040+import novaclient.exceptions
1041+import stubout
1042+import webob
1043
1044 from mox import IgnoreArg
1045 from nova import context
1046@@ -32,6 +35,7 @@
1047 from nova import rpc
1048 from nova import utils
1049 from nova.auth import manager as auth_manager
1050+from nova.scheduler import api
1051 from nova.scheduler import manager
1052 from nova.scheduler import driver
1053 from nova.compute import power_state
1054@@ -937,3 +941,160 @@
1055 db.instance_destroy(self.context, instance_id)
1056 db.service_destroy(self.context, s_ref['id'])
1057 db.service_destroy(self.context, s_ref2['id'])
1058+
1059+
1060+class FakeZone(object):
1061+ def __init__(self, api_url, username, password):
1062+ self.api_url = api_url
1063+ self.username = username
1064+ self.password = password
1065+
1066+
1067+def zone_get_all(context):
1068+ return [
1069+ FakeZone('http://example.com', 'bob', 'xxx'),
1070+ ]
1071+
1072+
1073+class FakeRerouteCompute(api.reroute_compute):
1074+ def _call_child_zones(self, zones, function):
1075+ return []
1076+
1077+ def get_collection_context_and_id(self, args, kwargs):
1078+ return ("servers", None, 1)
1079+
1080+ def unmarshall_result(self, zone_responses):
1081+ return dict(magic="found me")
1082+
1083+
1084+def go_boom(self, context, instance):
1085+ raise exception.InstanceNotFound("boom message", instance)
1086+
1087+
1088+def found_instance(self, context, instance):
1089+ return dict(name='myserver')
1090+
1091+
1092+class FakeResource(object):
1093+ def __init__(self, attribute_dict):
1094+ for k, v in attribute_dict.iteritems():
1095+ setattr(self, k, v)
1096+
1097+ def pause(self):
1098+ pass
1099+
1100+
1101+class ZoneRedirectTest(test.TestCase):
1102+ def setUp(self):
1103+ super(ZoneRedirectTest, self).setUp()
1104+ self.stubs = stubout.StubOutForTesting()
1105+
1106+ self.stubs.Set(db, 'zone_get_all', zone_get_all)
1107+
1108+ self.enable_zone_routing = FLAGS.enable_zone_routing
1109+ FLAGS.enable_zone_routing = True
1110+
1111+ def tearDown(self):
1112+ self.stubs.UnsetAll()
1113+ FLAGS.enable_zone_routing = self.enable_zone_routing
1114+ super(ZoneRedirectTest, self).tearDown()
1115+
1116+ def test_trap_found_locally(self):
1117+ decorator = FakeRerouteCompute("foo")
1118+ try:
1119+ result = decorator(found_instance)(None, None, 1)
1120+ except api.RedirectResult, e:
1121+ self.fail(_("Successful database hit should succeed"))
1122+
1123+ def test_trap_not_found_locally(self):
1124+ decorator = FakeRerouteCompute("foo")
1125+ try:
1126+ result = decorator(go_boom)(None, None, 1)
1127+ self.assertFail(_("Should have rerouted."))
1128+ except api.RedirectResult, e:
1129+ self.assertEquals(e.results['magic'], 'found me')
1130+
1131+ def test_routing_flags(self):
1132+ FLAGS.enable_zone_routing = False
1133+ decorator = FakeRerouteCompute("foo")
1134+ try:
1135+ result = decorator(go_boom)(None, None, 1)
1136+ self.assertFail(_("Should have thrown exception."))
1137+ except exception.InstanceNotFound, e:
1138+ self.assertEquals(e.message, 'boom message')
1139+
1140+ def test_get_collection_context_and_id(self):
1141+ decorator = api.reroute_compute("foo")
1142+ self.assertEquals(decorator.get_collection_context_and_id(
1143+ (None, 10, 20), {}), ("servers", 10, 20))
1144+ self.assertEquals(decorator.get_collection_context_and_id(
1145+ (None, 11,), dict(instance_id=21)), ("servers", 11, 21))
1146+ self.assertEquals(decorator.get_collection_context_and_id(
1147+ (None,), dict(context=12, instance_id=22)), ("servers", 12, 22))
1148+
1149+ def test_unmarshal_single_server(self):
1150+ decorator = api.reroute_compute("foo")
1151+ self.assertEquals(decorator.unmarshall_result([]), {})
1152+ self.assertEquals(decorator.unmarshall_result(
1153+ [FakeResource(dict(a=1, b=2)), ]),
1154+ dict(server=dict(a=1, b=2)))
1155+ self.assertEquals(decorator.unmarshall_result(
1156+ [FakeResource(dict(a=1, _b=2)), ]),
1157+ dict(server=dict(a=1,)))
1158+ self.assertEquals(decorator.unmarshall_result(
1159+ [FakeResource(dict(a=1, manager=2)), ]),
1160+ dict(server=dict(a=1,)))
1161+ self.assertEquals(decorator.unmarshall_result(
1162+ [FakeResource(dict(_a=1, manager=2)), ]),
1163+ dict(server={}))
1164+
1165+
1166+class FakeServerCollection(object):
1167+ def get(self, instance_id):
1168+ return FakeResource(dict(a=10, b=20))
1169+
1170+ def find(self, name):
1171+ return FakeResource(dict(a=11, b=22))
1172+
1173+
1174+class FakeEmptyServerCollection(object):
1175+ def get(self, f):
1176+ raise novaclient.NotFound(1)
1177+
1178+ def find(self, name):
1179+ raise novaclient.NotFound(2)
1180+
1181+
1182+class FakeNovaClient(object):
1183+ def __init__(self, collection):
1184+ self.servers = collection
1185+
1186+
1187+class DynamicNovaClientTest(test.TestCase):
1188+ def test_issue_novaclient_command_found(self):
1189+ zone = FakeZone('http://example.com', 'bob', 'xxx')
1190+ self.assertEquals(api._issue_novaclient_command(
1191+ FakeNovaClient(FakeServerCollection()),
1192+ zone, "servers", "get", 100).a, 10)
1193+
1194+ self.assertEquals(api._issue_novaclient_command(
1195+ FakeNovaClient(FakeServerCollection()),
1196+ zone, "servers", "find", "name").b, 22)
1197+
1198+ self.assertEquals(api._issue_novaclient_command(
1199+ FakeNovaClient(FakeServerCollection()),
1200+ zone, "servers", "pause", 100), None)
1201+
1202+ def test_issue_novaclient_command_not_found(self):
1203+ zone = FakeZone('http://example.com', 'bob', 'xxx')
1204+ self.assertEquals(api._issue_novaclient_command(
1205+ FakeNovaClient(FakeEmptyServerCollection()),
1206+ zone, "servers", "get", 100), None)
1207+
1208+ self.assertEquals(api._issue_novaclient_command(
1209+ FakeNovaClient(FakeEmptyServerCollection()),
1210+ zone, "servers", "find", "name"), None)
1211+
1212+ self.assertEquals(api._issue_novaclient_command(
1213+ FakeNovaClient(FakeEmptyServerCollection()),
1214+ zone, "servers", "any", "name"), None)
1215
1216=== modified file 'nova/tests/test_service.py'
1217--- nova/tests/test_service.py 2011-03-10 06:16:03 +0000
1218+++ nova/tests/test_service.py 2011-03-24 12:14:14 +0000
1219@@ -109,20 +109,29 @@
1220 app = service.Service.create(host=host, binary=binary)
1221
1222 self.mox.StubOutWithMock(rpc,
1223- 'AdapterConsumer',
1224- use_mock_anything=True)
1225- rpc.AdapterConsumer(connection=mox.IgnoreArg(),
1226+ 'TopicAdapterConsumer',
1227+ use_mock_anything=True)
1228+ self.mox.StubOutWithMock(rpc,
1229+ 'FanoutAdapterConsumer',
1230+ use_mock_anything=True)
1231+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
1232 topic=topic,
1233 proxy=mox.IsA(service.Service)).AndReturn(
1234- rpc.AdapterConsumer)
1235+ rpc.TopicAdapterConsumer)
1236
1237- rpc.AdapterConsumer(connection=mox.IgnoreArg(),
1238+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
1239 topic='%s.%s' % (topic, host),
1240 proxy=mox.IsA(service.Service)).AndReturn(
1241- rpc.AdapterConsumer)
1242-
1243- rpc.AdapterConsumer.attach_to_eventlet()
1244- rpc.AdapterConsumer.attach_to_eventlet()
1245+ rpc.TopicAdapterConsumer)
1246+
1247+ rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
1248+ topic=topic,
1249+ proxy=mox.IsA(service.Service)).AndReturn(
1250+ rpc.FanoutAdapterConsumer)
1251+
1252+ rpc.TopicAdapterConsumer.attach_to_eventlet()
1253+ rpc.TopicAdapterConsumer.attach_to_eventlet()
1254+ rpc.FanoutAdapterConsumer.attach_to_eventlet()
1255
1256 service_create = {'host': host,
1257 'binary': binary,
1258@@ -279,6 +288,7 @@
1259 self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
1260 service.rpc.Connection.instance(new=mox.IgnoreArg())
1261 service.rpc.Connection.instance(new=mox.IgnoreArg())
1262+ service.rpc.Connection.instance(new=mox.IgnoreArg())
1263 self.mox.StubOutWithMock(serv.manager.driver,
1264 'update_available_resource')
1265 serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
1266
1267=== modified file 'nova/tests/test_test.py'
1268--- nova/tests/test_test.py 2011-02-21 22:55:06 +0000
1269+++ nova/tests/test_test.py 2011-03-24 12:14:14 +0000
1270@@ -34,7 +34,7 @@
1271
1272 def test_rpc_consumer_isolation(self):
1273 connection = rpc.Connection.instance(new=True)
1274- consumer = rpc.TopicConsumer(connection, topic='compute')
1275+ consumer = rpc.TopicAdapterConsumer(connection, topic='compute')
1276 consumer.register_callback(
1277 lambda x, y: self.fail('I should never be called'))
1278 consumer.attach_to_eventlet()
1279
1280=== modified file 'nova/tests/test_zones.py'
1281--- nova/tests/test_zones.py 2011-03-03 14:55:02 +0000
1282+++ nova/tests/test_zones.py 2011-03-24 12:14:14 +0000
1283@@ -76,6 +76,40 @@
1284 self.assertEquals(len(zm.zone_states), 1)
1285 self.assertEquals(zm.zone_states[1].username, 'user1')
1286
1287+ def test_service_capabilities(self):
1288+ zm = zone_manager.ZoneManager()
1289+ caps = zm.get_zone_capabilities(self, None)
1290+ self.assertEquals(caps, {})
1291+
1292+ zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
1293+ caps = zm.get_zone_capabilities(self, None)
1294+ self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
1295+
1296+ zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
1297+ caps = zm.get_zone_capabilities(self, None)
1298+ self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
1299+
1300+ zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
1301+ caps = zm.get_zone_capabilities(self, None)
1302+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
1303+
1304+ zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
1305+ caps = zm.get_zone_capabilities(self, None)
1306+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
1307+ svc10_a=(99, 99), svc10_b=(99, 99)))
1308+
1309+ zm.update_service_capabilities("svc1", "host3", dict(c=5))
1310+ caps = zm.get_zone_capabilities(self, None)
1311+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
1312+ svc1_c=(5, 5), svc10_a=(99, 99),
1313+ svc10_b=(99, 99)))
1314+
1315+ caps = zm.get_zone_capabilities(self, 'svc1')
1316+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
1317+ svc1_c=(5, 5)))
1318+ caps = zm.get_zone_capabilities(self, 'svc10')
1319+ self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99)))
1320+
1321 def test_refresh_from_db_replace_existing(self):
1322 zm = zone_manager.ZoneManager()
1323 zone_state = zone_manager.ZoneState()
1324
1325=== modified file 'nova/volume/manager.py'
1326--- nova/volume/manager.py 2011-03-03 13:54:11 +0000
1327+++ nova/volume/manager.py 2011-03-24 12:14:14 +0000
1328@@ -64,14 +64,15 @@
1329 'if True, will not discover local volumes')
1330
1331
1332-class VolumeManager(manager.Manager):
1333+class VolumeManager(manager.SchedulerDependentManager):
1334 """Manages attachable block storage devices."""
1335 def __init__(self, volume_driver=None, *args, **kwargs):
1336 """Load the driver from the one specified in args, or from flags."""
1337 if not volume_driver:
1338 volume_driver = FLAGS.volume_driver
1339 self.driver = utils.import_object(volume_driver)
1340- super(VolumeManager, self).__init__(*args, **kwargs)
1341+ super(VolumeManager, self).__init__(service_name='volume',
1342+ *args, **kwargs)
1343 # NOTE(vish): Implementation specific db handling is done
1344 # by the driver.
1345 self.driver.db = self.db