Merge lp:~sandy-walsh/nova/zones3 into lp:~hudson-openstack/nova/trunk

Proposed by Sandy Walsh
Status: Superseded
Proposed branch: lp:~sandy-walsh/nova/zones3
Merge into: lp:~hudson-openstack/nova/trunk
Prerequisite: lp:~sandy-walsh/nova/zones2
Diff against target: 681 lines (+292/-71)
17 files modified
nova/api/openstack/zones.py (+12/-4)
nova/compute/manager.py (+3/-2)
nova/flags.py (+3/-2)
nova/manager.py (+30/-1)
nova/network/manager.py (+3/-2)
nova/rpc.py (+59/-18)
nova/scheduler/api.py (+37/-22)
nova/scheduler/driver.py (+7/-0)
nova/scheduler/manager.py (+13/-1)
nova/scheduler/zone_manager.py (+32/-0)
nova/service.py (+8/-2)
nova/tests/api/openstack/test_zones.py (+26/-3)
nova/tests/test_rpc.py (+2/-2)
nova/tests/test_service.py (+19/-9)
nova/tests/test_test.py (+1/-1)
nova/tests/test_zones.py (+34/-0)
nova/volume/manager.py (+3/-2)
To merge this branch: bzr merge lp:~sandy-walsh/nova/zones3
Reviewer Review Type Date Requested Status
Rick Harris (community) Approve
Matt Dietz (community) Approve
Joshua McKenty (community) Needs Information
Todd Willey (community) Approve
Review via email: mp+52565@code.launchpad.net

This proposal has been superseded by a proposal from 2011-03-24.

Commit message

Aggregates capabilities from Compute, Network, Volume to the ZoneManager in Scheduler.

Description of the change

This branch adds a Fanout (broadcast) Queue to all services. This lets anyone talk to all services of a particular type without having to iterate through each.

Compute, Volume and Network now derive from nova.SchedulerDependentManager, which gives them the ability to update all the Scheduler nodes of their capabilities (via the fanout queue).

These capabilities are stored in each Scheduler Zone Manager and available via the Scheduler.API

The OS API '/zones/info' call now returns the aggregated (min, max) values of each of the service capabilities: http://paste.openstack.org/show/815/

To integration test this, simply fire up another scheduler (or more), add some capabilities to Compute, Network or Volume via the services update_service_capabilities() call and watch the events appear in each Scheduler node.

To post a comment you must log in.
Revision history for this message
Todd Willey (xtoddx) wrote :

I think this looks pretty good.

Is there any information (RST docs) regarding what meaningful values for capabilities are?

Also, in some flags we are using x=y,a=b syntax for multi-valued flags (defualt_log_levels, etc). Perhaps we can keep the same format? Long term perhaps we could make a new DEFINE_dict or some-such that would parse those into a dict.

It looks like the only information that is being fanned out is the service capabilities, but would it make sense to keep abilities in the database pointing to the services themselves? In other words, why are service capabilities ephemeral with the services starting and stopping? If we keep them in the db we can modify them while running and modify the capabilities with nova-manage. This of course doesn't make sense for things like changing the hypervisor for compute since that would always require a restart, but would it be practical for other capabilities?

review: Needs Information
Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Todd, thanks for the feedback.

I think the meaningful values for caps will come from the distributed scheduler effort. That branch really needs to drive what is required from the services.

RE flags: do you mean "x=1,x=2,x=3,y=9,y=8,y=7" vs. "x:1,2,3;y:9,8,7;" ?

RE storing the capabilities: yes, storing it in the db is a possibility. Since many capabilities are not static (disk remaining, bandwidth usage, cpu, etc) I had concerns about the frequency of the updates and the added load on the db. But this could use the same periodic storage scheme as the fanout mechanism.

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Oh, perhaps the multi value keys where "x:1;2;3, y;9;8;7" ? That would be an easy change. Perhaps move the parser into utils.py as well?

Revision history for this message
Todd Willey (xtoddx) wrote :

From nova/flags.py:

flags.DEFINE_list('default_log_levels',
                  ['amqplib=WARN',
                   'sqlalchemy=WARN',
                   'boto=WARN',
                   'eventlet.wsgi.server=WARN'],
                  'list of logger=LEVEL pairs')

I think that is a good pattern to follow. Your argument ends up looking like

--zone_capabilities=hypervisor=xenserver,os=linux

You'll end up with the list

['hypervisor=xenserver', 'os=linux']

You're definitely treating it as a list by splitting and iterating over it, so DEFINE_list seems reasonable. Based on your comments it looks like you might specify multiple values for any individual capability, in which case using a comma would break the flag list, maybe using a colon as the item separator would be reasonable?

Revision history for this message
Eric Day (eday) wrote :

8: Ahh! How did nova.db get into nova.api? I must have missed this in the last branch. It would be nice to abstract this in nova/scheduler or a nova/zones/ package.

Should SchedulerDependentManager just be in nova/manager.py?

As for using a DB to store this data (Todd's comment), we need to move away from compute/network/volume hosts writing to the DB directly for security concerns. We should be using the msg queue for all communications/data and let the scheduler verify the source once we have a mechanism to do so.

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

eric, agree on moving SchedulerDependentManager into nova/manager.py and abstracting out the db call into the scheduler api. Will do.

Todd, colon is bad for URI's. I'll mess around the the List option.

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Todd, how about x=1;2;3, y=9;8;7 .. that way we can use DEFINE_list and have multi-values?

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

All fixed up as requested ... thanks again for the feedback!

Revision history for this message
Todd Willey (xtoddx) wrote :

I'm on board with that style of flag.

review: Approve
Revision history for this message
Joshua McKenty (joshua-mckenty) wrote :

Typo on 79 (missing i).
Seems like service_name should be a class property rather than an instance property, any reason we can't do it this way? Then the super __init__ call doesn't need an extra arg.
I think TopicConsumer was the last of my code from the first nova-hacking weekend - I'm glad to see it go.

Log message on 231 seems wrong - is it actually "writing" at that point, or just initing?
I like how you've dropped _call_scheduler out of the API class - but do we even NEED that class any more? The rest of the class methods could be dropped down to functions as well.

467 - can we call this zone_capabilities? Caps seems like a pseudonym for "limits".
Looks like we need a simple test of the fanout in test_rpc.py.

Doesn't seem like there's any way to remove a host from service capabilities once it's been added - is this necessary?

review: Needs Information
Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Joshua, great feedback ... thanks for that. All good suggestions. I'll get on them.

Hmm, I was going to add something to decay the hosts after time. I may have forgotten that, lemme see.

If not, I can easily add it to Zones4 (coming soon).

If you'd like to see the direction this is heading have a look here (WIP):
https://code.launchpad.net/~sandy-walsh/nova/zones4/+merge/53726

Cheers!

Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Changes made ... the fanout test should be an integration test vs. a unit test. We'll need to revisit that once we get the integration test framework ironed out.

Revision history for this message
Rick Harris (rconradharris) wrote :

Overall this patch looks really good Sandy. Nice tests, and some really
helpful comments in there (as usual).

Generally speaking, I think a few whitespaces cleanups would make a make this
patch even better (in particular lining up arg-lists). Not a huge deal,
obviously, but worth mentioning.

In terms of design and implementation, I think it looks good. This matches up
with what we whiteboarded (AFAICT), so I'm all for it.

Specifics
=========

> 10 +from nova import log as logging

Imported but not used.

> 35 + for cap in caps:
> 36 + key_values = cap.split('=')
> 37 + zone[key_values[0]] = key_values[1]

Might be slightly easier to read with tuple-unpacking (also, passing 1 to
split in case the value has a '=' in it):

    for cap in caps:
        key, value = cap.split('=', 1)
        zone[key] = value

> 62 + super(ComputeManager, self).__init__(service_name="compute",
> 63 + *args, **kwargs)

Might be better if args aligned:

    super(ComputeManager, self).__init__(service_name="compute",
                                         *args, **kwargs)
> 77 +DEFINE_list('zone_capabilities',
> 78 + ['hypervisor=xenserver;kvm', 'os=linux;windows'],
> 79 + 'Key/Multi-value list representng capabilities of this zone')

Same as above:

    DEFINE_list('zone_capabilities',
                ['hypervisor=xenserver;kvm', 'os=linux;windows'],
                'Key/Multi-value list representng capabilities of this zone')

> 88 +from nova import log as logging

Might be worth using the LOG pattern here to get more specific logging:

    LOG = logging.getLogger('nova.manager')

> 106 + update_service_capabilities is called with non-None values."""
> 107 + def __init__(self, host=None, db_driver=None, service_name="undefined"):

Spacing. Might be better as:

       update_service_capabilities is called with non-None values.
    """

    def __init__(self, host=None, db_driver=None, service_name="undefined"):

> 313 + params=dict(service=service))

Couple spaces ---> thataway (to line up with open paren +1)

> 209 + LOG.info(_("Created '%(exchange)s' fanout exchange "
> 210 + "with '%(key)s' routing key"),
> 211 + dict(exchange=self.exchange, key=self.routing_key))

Could use a bit of formatting work:

    LOG.info(_("Created '%(exchange)s' fanout exchange "
               "with '%(key)s' routing key"),
             dict(exchange=self.exchange, key=self.routing_key))

Although, this might be personal preference on my part, so, if that change
doesn't look good to you, please feel free to ignore! :-)

> 638 + svc10_a=(99, 99), svc10_b=(99, 99)))

Spacing, a couple of spaces <---- thattaway.

review: Needs Fixing
Revision history for this message
Matt Dietz (cerberus) wrote :

I don't see anything glaring ;-)

lgtm. Just waiting on Rick

review: Approve
Revision history for this message
Sandy Walsh (sandy-walsh) wrote :

Thanks for the feedback Rick ... I agree with your suggestions. Doing the code reviews on glance and seeing the work you and jay have been doing might make me change my style :)

Stay tuned.

Revision history for this message
Rick Harris (rconradharris) wrote :

Looks great, thanks for the fix-ups, Sandy!

review: Approve
Revision history for this message
OpenStack Infra (hudson-openstack) wrote :

No proposals found for merge of lp:~sandy-walsh/nova/zones2 into lp:nova.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'nova/api/openstack/zones.py'
2--- nova/api/openstack/zones.py 2011-03-11 19:49:32 +0000
3+++ nova/api/openstack/zones.py 2011-03-23 19:38:07 +0000
4@@ -15,9 +15,9 @@
5
6 import common
7
8+from nova import db
9 from nova import flags
10 from nova import wsgi
11-from nova import db
12 from nova.scheduler import api
13
14
15@@ -52,7 +52,7 @@
16 """Return all zones in brief"""
17 # Ask the ZoneManager in the Scheduler for most recent data,
18 # or fall-back to the database ...
19- items = api.API().get_zone_list(req.environ['nova.context'])
20+ items = api.get_zone_list(req.environ['nova.context'])
21 if not items:
22 items = db.zone_get_all(req.environ['nova.context'])
23
24@@ -67,8 +67,16 @@
25
26 def info(self, req):
27 """Return name and capabilities for this zone."""
28- return dict(zone=dict(name=FLAGS.zone_name,
29- capabilities=FLAGS.zone_capabilities))
30+ items = api.get_zone_capabilities(req.environ['nova.context'])
31+
32+ zone = dict(name=FLAGS.zone_name)
33+ caps = FLAGS.zone_capabilities
34+ for cap in caps:
35+ key, value = cap.split('=')
36+ zone[key] = value
37+ for item, (min_value, max_value) in items.iteritems():
38+ zone[item] = "%s,%s" % (min_value, max_value)
39+ return dict(zone=zone)
40
41 def show(self, req, id):
42 """Return data about the given zone id"""
43
44=== modified file 'nova/compute/manager.py'
45--- nova/compute/manager.py 2011-03-21 15:41:03 +0000
46+++ nova/compute/manager.py 2011-03-23 19:38:07 +0000
47@@ -105,7 +105,7 @@
48 return decorated_function
49
50
51-class ComputeManager(manager.Manager):
52+class ComputeManager(manager.SchedulerDependentManager):
53
54 """Manages the running instances from creation to destruction."""
55
56@@ -124,7 +124,8 @@
57
58 self.network_manager = utils.import_object(FLAGS.network_manager)
59 self.volume_manager = utils.import_object(FLAGS.volume_manager)
60- super(ComputeManager, self).__init__(*args, **kwargs)
61+ super(ComputeManager, self).__init__(service_name="compute",
62+ *args, **kwargs)
63
64 def init_host(self):
65 """Do any initialization that needs to be run if this is a
66
67=== modified file 'nova/flags.py'
68--- nova/flags.py 2011-03-18 11:35:00 +0000
69+++ nova/flags.py 2011-03-23 19:38:07 +0000
70@@ -358,5 +358,6 @@
71 'availability zone of this node')
72
73 DEFINE_string('zone_name', 'nova', 'name of this zone')
74-DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux',
75- 'Key/Value tags which represent capabilities of this zone')
76+DEFINE_list('zone_capabilities',
77+ ['hypervisor=xenserver;kvm', 'os=linux;windows'],
78+ 'Key/Multi-value list representng capabilities of this zone')
79
80=== modified file 'nova/manager.py'
81--- nova/manager.py 2010-12-15 00:05:39 +0000
82+++ nova/manager.py 2011-03-23 19:38:07 +0000
83@@ -53,11 +53,14 @@
84
85 from nova import utils
86 from nova import flags
87+from nova import log as logging
88 from nova.db import base
89-
90+from nova.scheduler import api
91
92 FLAGS = flags.FLAGS
93
94+LOG = logging.getLogger('nova.manager')
95+
96
97 class Manager(base.Base):
98 def __init__(self, host=None, db_driver=None):
99@@ -74,3 +77,29 @@
100 """Do any initialization that needs to be run if this is a standalone
101 service. Child classes should override this method."""
102 pass
103+
104+
105+class SchedulerDependentManager(Manager):
106+ """Periodically send capability updates to the Scheduler services.
107+ Services that need to update the Scheduler of their capabilities
108+ should derive from this class. Otherwise they can derive from
109+ manager.Manager directly. Updates are only sent after
110+ update_service_capabilities is called with non-None values."""
111+
112+ def __init__(self, host=None, db_driver=None, service_name="undefined"):
113+ self.last_capabilities = None
114+ self.service_name = service_name
115+ super(SchedulerDependentManager, self).__init__(host, db_driver)
116+
117+ def update_service_capabilities(self, capabilities):
118+ """Remember these capabilities to send on next periodic update."""
119+ self.last_capabilities = capabilities
120+
121+ def periodic_tasks(self, context=None):
122+ """Pass data back to the scheduler at a periodic interval"""
123+ if self.last_capabilities:
124+ LOG.debug(_("Notifying Schedulers of capabilities ..."))
125+ api.update_service_capabilities(context, self.service_name,
126+ self.host, self.last_capabilities)
127+
128+ super(SchedulerDependentManager, self).periodic_tasks(context)
129
130=== modified file 'nova/network/manager.py'
131--- nova/network/manager.py 2011-03-23 05:29:32 +0000
132+++ nova/network/manager.py 2011-03-23 19:38:07 +0000
133@@ -105,7 +105,7 @@
134 pass
135
136
137-class NetworkManager(manager.Manager):
138+class NetworkManager(manager.SchedulerDependentManager):
139 """Implements common network manager functionality.
140
141 This class must be subclassed to support specific topologies.
142@@ -116,7 +116,8 @@
143 if not network_driver:
144 network_driver = FLAGS.network_driver
145 self.driver = utils.import_object(network_driver)
146- super(NetworkManager, self).__init__(*args, **kwargs)
147+ super(NetworkManager, self).__init__(service_name='network',
148+ *args, **kwargs)
149
150 def init_host(self):
151 """Do any initialization that needs to be run if this is a
152
153=== modified file 'nova/rpc.py'
154--- nova/rpc.py 2011-03-18 13:56:05 +0000
155+++ nova/rpc.py 2011-03-23 19:38:07 +0000
156@@ -137,24 +137,7 @@
157 return timer
158
159
160-class Publisher(messaging.Publisher):
161- """Publisher base class"""
162- pass
163-
164-
165-class TopicConsumer(Consumer):
166- """Consumes messages on a specific topic"""
167- exchange_type = "topic"
168-
169- def __init__(self, connection=None, topic="broadcast"):
170- self.queue = topic
171- self.routing_key = topic
172- self.exchange = FLAGS.control_exchange
173- self.durable = False
174- super(TopicConsumer, self).__init__(connection=connection)
175-
176-
177-class AdapterConsumer(TopicConsumer):
178+class AdapterConsumer(Consumer):
179 """Calls methods on a proxy object based on method and args"""
180 def __init__(self, connection=None, topic="broadcast", proxy=None):
181 LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
182@@ -207,6 +190,41 @@
183 return
184
185
186+class Publisher(messaging.Publisher):
187+ """Publisher base class"""
188+ pass
189+
190+
191+class TopicAdapterConsumer(AdapterConsumer):
192+ """Consumes messages on a specific topic"""
193+ exchange_type = "topic"
194+
195+ def __init__(self, connection=None, topic="broadcast", proxy=None):
196+ self.queue = topic
197+ self.routing_key = topic
198+ self.exchange = FLAGS.control_exchange
199+ self.durable = False
200+ super(TopicAdapterConsumer, self).__init__(connection=connection,
201+ topic=topic, proxy=proxy)
202+
203+
204+class FanoutAdapterConsumer(AdapterConsumer):
205+ """Consumes messages from a fanout exchange"""
206+ exchange_type = "fanout"
207+
208+ def __init__(self, connection=None, topic="broadcast", proxy=None):
209+ self.exchange = "%s_fanout" % topic
210+ self.routing_key = topic
211+ unique = uuid.uuid4().hex
212+ self.queue = "%s_fanout_%s" % (topic, unique)
213+ self.durable = False
214+ LOG.info(_("Created '%(exchange)s' fanout exchange "
215+ "with '%(key)s' routing key"),
216+ dict(exchange=self.exchange, key=self.routing_key))
217+ super(FanoutAdapterConsumer, self).__init__(connection=connection,
218+ topic=topic, proxy=proxy)
219+
220+
221 class TopicPublisher(Publisher):
222 """Publishes messages on a specific topic"""
223 exchange_type = "topic"
224@@ -218,6 +236,19 @@
225 super(TopicPublisher, self).__init__(connection=connection)
226
227
228+class FanoutPublisher(Publisher):
229+ """Publishes messages to a fanout exchange."""
230+ exchange_type = "fanout"
231+
232+ def __init__(self, topic, connection=None):
233+ self.exchange = "%s_fanout" % topic
234+ self.queue = "%s_fanout" % topic
235+ self.durable = False
236+ LOG.info(_("Creating '%(exchange)s' fanout exchange"),
237+ dict(exchange=self.exchange))
238+ super(FanoutPublisher, self).__init__(connection=connection)
239+
240+
241 class DirectConsumer(Consumer):
242 """Consumes messages directly on a channel specified by msg_id"""
243 exchange_type = "direct"
244@@ -360,6 +391,16 @@
245 publisher.close()
246
247
248+def fanout_cast(context, topic, msg):
249+ """Sends a message on a fanout exchange without waiting for a response"""
250+ LOG.debug(_("Making asynchronous fanout cast..."))
251+ _pack_context(msg, context)
252+ conn = Connection.instance()
253+ publisher = FanoutPublisher(topic, connection=conn)
254+ publisher.send(msg)
255+ publisher.close()
256+
257+
258 def generic_response(message_data, message):
259 """Logs a result and exits"""
260 LOG.debug(_('response %s'), message_data)
261
262=== modified file 'nova/scheduler/api.py'
263--- nova/scheduler/api.py 2011-02-25 21:40:15 +0000
264+++ nova/scheduler/api.py 2011-03-23 19:38:07 +0000
265@@ -25,25 +25,40 @@
266 LOG = logging.getLogger('nova.scheduler.api')
267
268
269-class API(object):
270- """API for interacting with the scheduler."""
271-
272- def _call_scheduler(self, method, context, params=None):
273- """Generic handler for RPC calls to the scheduler.
274-
275- :param params: Optional dictionary of arguments to be passed to the
276- scheduler worker
277-
278- :retval: Result returned by scheduler worker
279- """
280- if not params:
281- params = {}
282- queue = FLAGS.scheduler_topic
283- kwargs = {'method': method, 'args': params}
284- return rpc.call(context, queue, kwargs)
285-
286- def get_zone_list(self, context):
287- items = self._call_scheduler('get_zone_list', context)
288- for item in items:
289- item['api_url'] = item['api_url'].replace('\\/', '/')
290- return items
291+def _call_scheduler(method, context, params=None):
292+ """Generic handler for RPC calls to the scheduler.
293+
294+ :param params: Optional dictionary of arguments to be passed to the
295+ scheduler worker
296+
297+ :retval: Result returned by scheduler worker
298+ """
299+ if not params:
300+ params = {}
301+ queue = FLAGS.scheduler_topic
302+ kwargs = {'method': method, 'args': params}
303+ return rpc.call(context, queue, kwargs)
304+
305+
306+def get_zone_list(context):
307+ """Return a list of zones assoicated with this zone."""
308+ items = _call_scheduler('get_zone_list', context)
309+ for item in items:
310+ item['api_url'] = item['api_url'].replace('\\/', '/')
311+ return items
312+
313+
314+def get_zone_capabilities(context, service=None):
315+ """Returns a dict of key, value capabilities for this zone,
316+ or for a particular class of services running in this zone."""
317+ return _call_scheduler('get_zone_capabilities', context=context,
318+ params=dict(service=service))
319+
320+
321+def update_service_capabilities(context, service_name, host, capabilities):
322+ """Send an update to all the scheduler services informing them
323+ of the capabilities of this service."""
324+ kwargs = dict(method='update_service_capabilities',
325+ args=dict(service_name=service_name, host=host,
326+ capabilities=capabilities))
327+ return rpc.fanout_cast(context, 'scheduler', kwargs)
328
329=== modified file 'nova/scheduler/driver.py'
330--- nova/scheduler/driver.py 2011-03-10 04:30:52 +0000
331+++ nova/scheduler/driver.py 2011-03-23 19:38:07 +0000
332@@ -49,6 +49,13 @@
333 class Scheduler(object):
334 """The base class that all Scheduler clases should inherit from."""
335
336+ def __init__(self):
337+ self.zone_manager = None
338+
339+ def set_zone_manager(self, zone_manager):
340+ """Called by the Scheduler Service to supply a ZoneManager."""
341+ self.zone_manager = zone_manager
342+
343 @staticmethod
344 def service_is_up(service):
345 """Check whether a service is up based on last heartbeat."""
346
347=== modified file 'nova/scheduler/manager.py'
348--- nova/scheduler/manager.py 2011-03-14 17:59:41 +0000
349+++ nova/scheduler/manager.py 2011-03-23 19:38:07 +0000
350@@ -41,10 +41,11 @@
351 class SchedulerManager(manager.Manager):
352 """Chooses a host to run instances on."""
353 def __init__(self, scheduler_driver=None, *args, **kwargs):
354+ self.zone_manager = zone_manager.ZoneManager()
355 if not scheduler_driver:
356 scheduler_driver = FLAGS.scheduler_driver
357 self.driver = utils.import_object(scheduler_driver)
358- self.zone_manager = zone_manager.ZoneManager()
359+ self.driver.set_zone_manager(self.zone_manager)
360 super(SchedulerManager, self).__init__(*args, **kwargs)
361
362 def __getattr__(self, key):
363@@ -59,6 +60,17 @@
364 """Get a list of zones from the ZoneManager."""
365 return self.zone_manager.get_zone_list()
366
367+ def get_zone_capabilities(self, context=None, service=None):
368+ """Get the normalized set of capabilites for this zone,
369+ or for a particular service."""
370+ return self.zone_manager.get_zone_capabilities(context, service)
371+
372+ def update_service_capabilities(self, context=None, service_name=None,
373+ host=None, capabilities={}):
374+ """Process a capability update from a service node."""
375+ self.zone_manager.update_service_capabilities(service_name,
376+ host, capabilities)
377+
378 def _schedule(self, method, context, topic, *args, **kwargs):
379 """Tries to call schedule_* method on the driver to retrieve host.
380
381
382=== modified file 'nova/scheduler/zone_manager.py'
383--- nova/scheduler/zone_manager.py 2011-03-03 14:55:02 +0000
384+++ nova/scheduler/zone_manager.py 2011-03-23 19:38:07 +0000
385@@ -105,12 +105,36 @@
386 def __init__(self):
387 self.last_zone_db_check = datetime.min
388 self.zone_states = {}
389+ self.service_states = {} # { <service> : { <host> : { cap k : v }}}
390 self.green_pool = greenpool.GreenPool()
391
392 def get_zone_list(self):
393 """Return the list of zones we know about."""
394 return [zone.to_dict() for zone in self.zone_states.values()]
395
396+ def get_zone_capabilities(self, context, service=None):
397+ """Roll up all the individual host info to generic 'service'
398+ capabilities. Each capability is aggregated into
399+ <cap>_min and <cap>_max values."""
400+ service_dict = self.service_states
401+ if service:
402+ service_dict = {service: self.service_states.get(service, {})}
403+
404+ # TODO(sandy) - be smarter about fabricating this structure.
405+ # But it's likely to change once we understand what the Best-Match
406+ # code will need better.
407+ combined = {} # { <service>_<cap> : (min, max), ... }
408+ for service_name, host_dict in service_dict.iteritems():
409+ for host, caps_dict in host_dict.iteritems():
410+ for cap, value in caps_dict.iteritems():
411+ key = "%s_%s" % (service_name, cap)
412+ min_value, max_value = combined.get(key, (value, value))
413+ min_value = min(min_value, value)
414+ max_value = max(max_value, value)
415+ combined[key] = (min_value, max_value)
416+
417+ return combined
418+
419 def _refresh_from_db(self, context):
420 """Make our zone state map match the db."""
421 # Add/update existing zones ...
422@@ -141,3 +165,11 @@
423 self.last_zone_db_check = datetime.now()
424 self._refresh_from_db(context)
425 self._poll_zones(context)
426+
427+ def update_service_capabilities(self, service_name, host, capabilities):
428+ """Update the per-service capabilities based on this notification."""
429+ logging.debug(_("Received %(service_name)s service update from "
430+ "%(host)s: %(capabilities)s") % locals())
431+ service_caps = self.service_states.get(service_name, {})
432+ service_caps[host] = capabilities
433+ self.service_states[service_name] = service_caps
434
435=== modified file 'nova/service.py'
436--- nova/service.py 2011-03-18 13:56:05 +0000
437+++ nova/service.py 2011-03-23 19:38:07 +0000
438@@ -97,18 +97,24 @@
439
440 conn1 = rpc.Connection.instance(new=True)
441 conn2 = rpc.Connection.instance(new=True)
442+ conn3 = rpc.Connection.instance(new=True)
443 if self.report_interval:
444- consumer_all = rpc.AdapterConsumer(
445+ consumer_all = rpc.TopicAdapterConsumer(
446 connection=conn1,
447 topic=self.topic,
448 proxy=self)
449- consumer_node = rpc.AdapterConsumer(
450+ consumer_node = rpc.TopicAdapterConsumer(
451 connection=conn2,
452 topic='%s.%s' % (self.topic, self.host),
453 proxy=self)
454+ fanout = rpc.FanoutAdapterConsumer(
455+ connection=conn3,
456+ topic=self.topic,
457+ proxy=self)
458
459 self.timers.append(consumer_all.attach_to_eventlet())
460 self.timers.append(consumer_node.attach_to_eventlet())
461+ self.timers.append(fanout.attach_to_eventlet())
462
463 pulse = utils.LoopingCall(self.report_state)
464 pulse.start(interval=self.report_interval, now=False)
465
466=== modified file 'nova/tests/api/openstack/test_zones.py'
467--- nova/tests/api/openstack/test_zones.py 2011-03-11 19:49:32 +0000
468+++ nova/tests/api/openstack/test_zones.py 2011-03-23 19:38:07 +0000
469@@ -75,6 +75,10 @@
470 ]
471
472
473+def zone_capabilities(method, context, params):
474+ return dict()
475+
476+
477 class ZonesTest(test.TestCase):
478 def setUp(self):
479 super(ZonesTest, self).setUp()
480@@ -93,13 +97,18 @@
481 self.stubs.Set(nova.db, 'zone_create', zone_create)
482 self.stubs.Set(nova.db, 'zone_delete', zone_delete)
483
484+ self.old_zone_name = FLAGS.zone_name
485+ self.old_zone_capabilities = FLAGS.zone_capabilities
486+
487 def tearDown(self):
488 self.stubs.UnsetAll()
489 FLAGS.allow_admin_api = self.allow_admin
490+ FLAGS.zone_name = self.old_zone_name
491+ FLAGS.zone_capabilities = self.old_zone_capabilities
492 super(ZonesTest, self).tearDown()
493
494 def test_get_zone_list_scheduler(self):
495- self.stubs.Set(api.API, '_call_scheduler', zone_get_all_scheduler)
496+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler)
497 req = webob.Request.blank('/v1.0/zones')
498 res = req.get_response(fakes.wsgi_app())
499 res_dict = json.loads(res.body)
500@@ -108,8 +117,7 @@
501 self.assertEqual(len(res_dict['zones']), 2)
502
503 def test_get_zone_list_db(self):
504- self.stubs.Set(api.API, '_call_scheduler',
505- zone_get_all_scheduler_empty)
506+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty)
507 self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
508 req = webob.Request.blank('/v1.0/zones')
509 req.headers["Content-Type"] = "application/json"
510@@ -167,3 +175,18 @@
511 self.assertEqual(res_dict['zone']['id'], 1)
512 self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
513 self.assertFalse('username' in res_dict['zone'])
514+
515+ def test_zone_info(self):
516+ FLAGS.zone_name = 'darksecret'
517+ FLAGS.zone_capabilities = ['cap1=a;b', 'cap2=c;d']
518+ self.stubs.Set(api, '_call_scheduler', zone_capabilities)
519+
520+ body = dict(zone=dict(username='zeb', password='sneaky'))
521+ req = webob.Request.blank('/v1.0/zones/info')
522+
523+ res = req.get_response(fakes.wsgi_app())
524+ res_dict = json.loads(res.body)
525+ self.assertEqual(res.status_int, 200)
526+ self.assertEqual(res_dict['zone']['name'], 'darksecret')
527+ self.assertEqual(res_dict['zone']['cap1'], 'a;b')
528+ self.assertEqual(res_dict['zone']['cap2'], 'c;d')
529
530=== modified file 'nova/tests/test_rpc.py'
531--- nova/tests/test_rpc.py 2011-01-19 20:26:09 +0000
532+++ nova/tests/test_rpc.py 2011-03-23 19:38:07 +0000
533@@ -36,7 +36,7 @@
534 super(RpcTestCase, self).setUp()
535 self.conn = rpc.Connection.instance(True)
536 self.receiver = TestReceiver()
537- self.consumer = rpc.AdapterConsumer(connection=self.conn,
538+ self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
539 topic='test',
540 proxy=self.receiver)
541 self.consumer.attach_to_eventlet()
542@@ -97,7 +97,7 @@
543
544 nested = Nested()
545 conn = rpc.Connection.instance(True)
546- consumer = rpc.AdapterConsumer(connection=conn,
547+ consumer = rpc.TopicAdapterConsumer(connection=conn,
548 topic='nested',
549 proxy=nested)
550 consumer.attach_to_eventlet()
551
552=== modified file 'nova/tests/test_service.py'
553--- nova/tests/test_service.py 2011-03-10 06:16:03 +0000
554+++ nova/tests/test_service.py 2011-03-23 19:38:07 +0000
555@@ -109,20 +109,29 @@
556 app = service.Service.create(host=host, binary=binary)
557
558 self.mox.StubOutWithMock(rpc,
559- 'AdapterConsumer',
560- use_mock_anything=True)
561- rpc.AdapterConsumer(connection=mox.IgnoreArg(),
562+ 'TopicAdapterConsumer',
563+ use_mock_anything=True)
564+ self.mox.StubOutWithMock(rpc,
565+ 'FanoutAdapterConsumer',
566+ use_mock_anything=True)
567+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
568 topic=topic,
569 proxy=mox.IsA(service.Service)).AndReturn(
570- rpc.AdapterConsumer)
571+ rpc.TopicAdapterConsumer)
572
573- rpc.AdapterConsumer(connection=mox.IgnoreArg(),
574+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
575 topic='%s.%s' % (topic, host),
576 proxy=mox.IsA(service.Service)).AndReturn(
577- rpc.AdapterConsumer)
578-
579- rpc.AdapterConsumer.attach_to_eventlet()
580- rpc.AdapterConsumer.attach_to_eventlet()
581+ rpc.TopicAdapterConsumer)
582+
583+ rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
584+ topic=topic,
585+ proxy=mox.IsA(service.Service)).AndReturn(
586+ rpc.FanoutAdapterConsumer)
587+
588+ rpc.TopicAdapterConsumer.attach_to_eventlet()
589+ rpc.TopicAdapterConsumer.attach_to_eventlet()
590+ rpc.FanoutAdapterConsumer.attach_to_eventlet()
591
592 service_create = {'host': host,
593 'binary': binary,
594@@ -279,6 +288,7 @@
595 self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
596 service.rpc.Connection.instance(new=mox.IgnoreArg())
597 service.rpc.Connection.instance(new=mox.IgnoreArg())
598+ service.rpc.Connection.instance(new=mox.IgnoreArg())
599 self.mox.StubOutWithMock(serv.manager.driver,
600 'update_available_resource')
601 serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
602
603=== modified file 'nova/tests/test_test.py'
604--- nova/tests/test_test.py 2011-02-21 22:55:06 +0000
605+++ nova/tests/test_test.py 2011-03-23 19:38:07 +0000
606@@ -34,7 +34,7 @@
607
608 def test_rpc_consumer_isolation(self):
609 connection = rpc.Connection.instance(new=True)
610- consumer = rpc.TopicConsumer(connection, topic='compute')
611+ consumer = rpc.TopicAdapterConsumer(connection, topic='compute')
612 consumer.register_callback(
613 lambda x, y: self.fail('I should never be called'))
614 consumer.attach_to_eventlet()
615
616=== modified file 'nova/tests/test_zones.py'
617--- nova/tests/test_zones.py 2011-03-03 14:55:02 +0000
618+++ nova/tests/test_zones.py 2011-03-23 19:38:07 +0000
619@@ -76,6 +76,40 @@
620 self.assertEquals(len(zm.zone_states), 1)
621 self.assertEquals(zm.zone_states[1].username, 'user1')
622
623+ def test_service_capabilities(self):
624+ zm = zone_manager.ZoneManager()
625+ caps = zm.get_zone_capabilities(self, None)
626+ self.assertEquals(caps, {})
627+
628+ zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
629+ caps = zm.get_zone_capabilities(self, None)
630+ self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
631+
632+ zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
633+ caps = zm.get_zone_capabilities(self, None)
634+ self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
635+
636+ zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
637+ caps = zm.get_zone_capabilities(self, None)
638+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
639+
640+ zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
641+ caps = zm.get_zone_capabilities(self, None)
642+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
643+ svc10_a=(99, 99), svc10_b=(99, 99)))
644+
645+ zm.update_service_capabilities("svc1", "host3", dict(c=5))
646+ caps = zm.get_zone_capabilities(self, None)
647+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
648+ svc1_c=(5, 5), svc10_a=(99, 99),
649+ svc10_b=(99, 99)))
650+
651+ caps = zm.get_zone_capabilities(self, 'svc1')
652+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
653+ svc1_c=(5, 5)))
654+ caps = zm.get_zone_capabilities(self, 'svc10')
655+ self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99)))
656+
657 def test_refresh_from_db_replace_existing(self):
658 zm = zone_manager.ZoneManager()
659 zone_state = zone_manager.ZoneState()
660
661=== modified file 'nova/volume/manager.py'
662--- nova/volume/manager.py 2011-03-03 13:54:11 +0000
663+++ nova/volume/manager.py 2011-03-23 19:38:07 +0000
664@@ -64,14 +64,15 @@
665 'if True, will not discover local volumes')
666
667
668-class VolumeManager(manager.Manager):
669+class VolumeManager(manager.SchedulerDependentManager):
670 """Manages attachable block storage devices."""
671 def __init__(self, volume_driver=None, *args, **kwargs):
672 """Load the driver from the one specified in args, or from flags."""
673 if not volume_driver:
674 volume_driver = FLAGS.volume_driver
675 self.driver = utils.import_object(volume_driver)
676- super(VolumeManager, self).__init__(*args, **kwargs)
677+ super(VolumeManager, self).__init__(service_name='volume',
678+ *args, **kwargs)
679 # NOTE(vish): Implementation specific db handling is done
680 # by the driver.
681 self.driver.db = self.db