Merge lp:~sateesh-chodapuneedi/nova/lp831497 into lp:~citrix-openstack/nova/peer-review

Proposed by Salvatore Orlando
Status: Needs review
Proposed branch: lp:~sateesh-chodapuneedi/nova/lp831497
Merge into: lp:~citrix-openstack/nova/peer-review
Diff against target: 11607 lines (+7714/-1102)
92 files modified
bin/instance-usage-audit (+2/-3)
bin/nova-ajax-console-proxy (+6/-6)
bin/nova-manage (+477/-1)
bin/nova-vsa (+49/-0)
contrib/nova.sh (+1/-1)
nova/api/ec2/__init__.py (+17/-10)
nova/api/ec2/cloud.py (+38/-12)
nova/api/openstack/common.py (+54/-28)
nova/api/openstack/contrib/floating_ips.py (+10/-5)
nova/api/openstack/contrib/simple_tenant_usage.py (+236/-0)
nova/api/openstack/contrib/virtual_storage_arrays.py (+606/-0)
nova/api/openstack/create_instance_helper.py (+26/-17)
nova/api/openstack/servers.py (+28/-14)
nova/api/openstack/views/servers.py (+5/-9)
nova/compute/api.py (+129/-59)
nova/compute/manager.py (+252/-235)
nova/compute/task_states.py (+59/-0)
nova/compute/vm_states.py (+39/-0)
nova/context.py (+1/-1)
nova/db/api.py (+49/-4)
nova/db/sqlalchemy/api.py (+132/-23)
nova/db/sqlalchemy/migrate_repo/versions/043_add_vsa_data.py (+75/-0)
nova/db/sqlalchemy/migrate_repo/versions/044_update_instance_states.py (+138/-0)
nova/db/sqlalchemy/migration.py (+1/-0)
nova/db/sqlalchemy/models.py (+36/-13)
nova/exception.py (+17/-1)
nova/flags.py (+18/-2)
nova/image/glance.py (+15/-4)
nova/ipv6/account_identifier.py (+2/-1)
nova/network/api.py (+6/-0)
nova/network/manager.py (+13/-6)
nova/notifier/api.py (+2/-1)
nova/quota.py (+3/-2)
nova/rpc/__init__.py (+17/-26)
nova/rpc/common.py (+6/-0)
nova/rpc/impl_carrot.py (+81/-21)
nova/rpc/impl_kombu.py (+781/-0)
nova/scheduler/abstract_scheduler.py (+0/-1)
nova/scheduler/driver.py (+4/-6)
nova/scheduler/vsa.py (+535/-0)
nova/service.py (+11/-21)
nova/test.py (+18/-0)
nova/tests/api/openstack/contrib/test_createserverext.py (+44/-0)
nova/tests/api/openstack/contrib/test_floating_ips.py (+127/-12)
nova/tests/api/openstack/contrib/test_security_groups.py (+36/-36)
nova/tests/api/openstack/contrib/test_simple_tenant_usage.py (+172/-0)
nova/tests/api/openstack/contrib/test_vsa.py (+450/-0)
nova/tests/api/openstack/fakes.py (+9/-2)
nova/tests/api/openstack/test_extensions.py (+2/-0)
nova/tests/api/openstack/test_server_actions.py (+22/-33)
nova/tests/api/openstack/test_servers.py (+117/-41)
nova/tests/image/test_glance.py (+22/-7)
nova/tests/integrated/test_servers.py (+25/-12)
nova/tests/scheduler/test_scheduler.py (+9/-4)
nova/tests/scheduler/test_vsa_scheduler.py (+641/-0)
nova/tests/test_adminapi.py (+0/-2)
nova/tests/test_cloud.py (+10/-7)
nova/tests/test_compute.py (+21/-17)
nova/tests/test_context.py (+33/-0)
nova/tests/test_ipv6.py (+1/-1)
nova/tests/test_network.py (+16/-0)
nova/tests/test_rpc.py (+6/-158)
nova/tests/test_rpc_amqp.py (+0/-88)
nova/tests/test_rpc_carrot.py (+45/-0)
nova/tests/test_rpc_common.py (+189/-0)
nova/tests/test_rpc_kombu.py (+110/-0)
nova/tests/test_test.py (+2/-3)
nova/tests/test_vsa.py (+182/-0)
nova/tests/test_vsa_volumes.py (+136/-0)
nova/tests/test_xenapi.py (+0/-37)
nova/tests/vmwareapi/db_fakes.py (+4/-1)
nova/tests/xenapi/stubs.py (+4/-0)
nova/virt/libvirt.xml.template (+3/-1)
nova/virt/libvirt/connection.py (+5/-0)
nova/virt/vmwareapi/fake.py (+1/-1)
nova/virt/vmwareapi/vif.py (+9/-2)
nova/virt/vmwareapi/vm_util.py (+12/-16)
nova/virt/vmwareapi/vmops.py (+61/-43)
nova/virt/xenapi/fake.py (+3/-0)
nova/volume/api.py (+14/-4)
nova/volume/driver.py (+272/-0)
nova/volume/manager.py (+78/-0)
nova/volume/volume_types.py (+40/-3)
nova/vsa/__init__.py (+18/-0)
nova/vsa/api.py (+411/-0)
nova/vsa/connection.py (+25/-0)
nova/vsa/fake.py (+22/-0)
nova/vsa/manager.py (+179/-0)
nova/vsa/utils.py (+80/-0)
run_tests.sh (+10/-8)
tools/esx/guest_tool.py (+40/-30)
tools/pip-requires (+1/-0)
To merge this branch: bzr merge lp:~sateesh-chodapuneedi/nova/lp831497
Reviewer Review Type Date Requested Status
Salvatore Orlando (community) Approve
Review via email: mp+73023@code.launchpad.net

Description of the change

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
NOVA-CORE DEVELOPERS SHOULD NOT REVIEW THIS BRANCH
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This is a lp:nova cache used only by the Citrix OpenStack
team. It is used for internal peer reviews to generate diffs
between lp:nova and whatever feature or bug fix we are
planning to add to OpenStack Nova.

To post a comment you must log in.
1495. By Sateesh

updated vlan bridge_interface to ESX specific name starting with 'vmnic'

Revision history for this message
Salvatore Orlando (salvatore-orlando) wrote :

17 + # vlan_num, bridge, net_attrs=None):

Commented code lines usually cause a "Needs fixing" review, better remove them :)

VMWareVlanBridgeDriver.unplug

I understand leaving it unimplemented, as it is the same approach adopted for libvirt and xenapi. We might think in the future to add something which removes the port group if the VIF being unplugged is the last one on that port group. I don't see a case for doing it now, though.

47 + vif_spec_list = []
48 + for vif_info in vif_infos:
49 + vif_spec = create_network_spec(client_factory,
50 + vif_info[0],
51 + vif_info[1],
52 + vif_info[2])

This is another minor thing. Can we use a dictionary instead of a sequence for vif_info? That would improve readability, and make the code less error-prone.

Also, It would be great if we can address Arvind Somya's comment in vm_util.create_network_spec, line 109

In vm_util.create_network_spec we use fixed temporary key for the device (-47). Isn't there a chance to have issues if two network specs are created for the same vm with the same key?

190 + interface_str = "%s;%s;%s;%s;%s;%s" % \
191 + (info['mac'],
192 + ip_v4 and ip_v4['ip'] or '',
193 + ip_v4 and ip_v4['netmask'] or '',
194 + info['gateway'],
195 + info['broadcast'],
196 + dns)

The machine_id_str now is the concatenation of MAC,ip,netmask,gateway,broadcast, and DNS for each interface. Is there a limit on the number of characters for this string? We will roughly have 92 characters for each interface. I understand this is important since the guest tools require the machine id to be structured like this.

However, several tests in VMWareAPITestCase (10) fail. This might be due to something being wrong on my dev machine. Do they pass on your dev machine?

Cheers,
Salvatore

1496. By Sateesh

Addressed Salvatore's review comments.
1) Used vif_info dict object instead of passing list members as vif_info[index]
2) Removed stale comments like + # vlan_num, bridge, net_attrs=None):
3) Fixed unit tests that got araised due to increased code coverage.

Also check for flat_inject state before attempting an inject operation.
Also updated some comments to be more meaningful.

Revision history for this message
Sateesh (sateesh-chodapuneedi) wrote :
Download full text (4.1 KiB)

Thanks Salvatore for reviewing the branch.
I have addressed your comments and updated the branch.
Please find my comments inline.

> 17 + # vlan_num, bridge, net_attrs=None):
>
> Commented code lines usually cause a "Needs fixing" review, better remove them
> :)

Removed this comment.

> VMWareVlanBridgeDriver.unplug
>
> I understand leaving it unimplemented, as it is the same approach adopted for
> libvirt and xenapi. We might think in the future to add something which
> removes the port group if the VIF being unplugged is the last one on that port
> group. I don't see a case for doing it now, though.

I think it's better to address this as separate bug, leaving this patch to address basic multi-nic only.

> 47 + vif_spec_list = []
> 48 + for vif_info in vif_infos:
> 49 + vif_spec = create_network_spec(client_factory,
> 50 + vif_info[0],
> 51 + vif_info[1],
> 52 + vif_info[2])
>
> This is another minor thing. Can we use a dictionary instead of a sequence for
> vif_info? That would improve readability, and make the code less error-prone.

True, addressed now.

> Also, It would be great if we can address Arvind Somya's comment in
> vm_util.create_network_spec, line 109

Yes, Arvind's fix to add support to distribute virtual port group does support only if the binding type is ' ephemeral'. We can try to address this for other types of binding that are, static and dynamic. But I think it would be better to have separate launchpad bug to address this effort.

>
> In vm_util.create_network_spec we use fixed temporary key for the device
> (-47). Isn't there a chance to have issues if two network specs are created
> for the same vm with the same key?
No, there is no chance of key clash, as it's temporary in the specification. And we are not colliding with key of existing device. Because all existing devices will have +ve keys.
Here is some text from API documentation about the device keys.

"When adding new devices, it may be necessary for a client to assign keys temporarily in order to associate controllers with devices in configuring a virtual machine. However, the server does not allow a client to reassign a device key, and the server may assign a different value from the one passed during configuration. Clients should ensure that existing device keys are not reused as temporary key values for the new device to be added (for example, by using negative integers as temporary keys)."

> 190 + interface_str = "%s;%s;%s;%s;%s;%s" % \
> 191 + (info['mac'],
> 192 + ip_v4 and ip_v4['ip'] or '',
> 193 + ip_v4 and ip_v4['netmask'] or '',
> 194 + info['gateway'],
> 195 + info['broadcast'],
> 196 + dns)
>
> The machine_id_str now is the concatenation of
> MAC,ip,netmask,gateway,broadcast, and DNS for each interface. Is there a limit
> on the number of characters for this string? We will roughly have 92
> characters for each interface. I understand this is important since the guest
> tools require the machine id to be structured like this.

The machind_id_str is being set to machine.id configuration parameter of the VM, which is written to vmx file. I have searched for any maximum limit...

Read more...

Revision history for this message
Salvatore Orlando (salvatore-orlando) wrote :
Download full text (4.8 KiB)

>
> Thanks Salvatore for reviewing the branch.
> I have addressed your comments and updated the branch.
> Please find my comments inline.
>
> > 17 + # vlan_num, bridge, net_attrs=None):
> >
> > Commented code lines usually cause a "Needs fixing" review, better remove
> them
> > :)
>
> Removed this comment.
>
>
> > VMWareVlanBridgeDriver.unplug
> >
> > I understand leaving it unimplemented, as it is the same approach adopted
> for
> > libvirt and xenapi. We might think in the future to add something which
> > removes the port group if the VIF being unplugged is the last one on that
> port
> > group. I don't see a case for doing it now, though.
>
> I think it's better to address this as separate bug, leaving this patch to
> address basic multi-nic only.

Agreed.

>
> > 47 + vif_spec_list = []
> > 48 + for vif_info in vif_infos:
> > 49 + vif_spec = create_network_spec(client_factory,
> > 50 + vif_info[0],
> > 51 + vif_info[1],
> > 52 + vif_info[2])
> >
> > This is another minor thing. Can we use a dictionary instead of a sequence
> for
> > vif_info? That would improve readability, and make the code less error-
> prone.
>
> True, addressed now.
>
> > Also, It would be great if we can address Arvind Somya's comment in
> > vm_util.create_network_spec, line 109
>
> Yes, Arvind's fix to add support to distribute virtual port group does support
> only if the binding type is ' ephemeral'. We can try to address this for other
> types of binding that are, static and dynamic. But I think it would be better
> to have separate launchpad bug to address this effort.

That probably makes sense. I'm not an ESX expert. How frequently non-ephemeral bindings occur? Or are they ephemeral because we create them in this way?

>
> >
> > In vm_util.create_network_spec we use fixed temporary key for the device
> > (-47). Isn't there a chance to have issues if two network specs are created
> > for the same vm with the same key?
> No, there is no chance of key clash, as it's temporary in the specification.
> And we are not colliding with key of existing device. Because all existing
> devices will have +ve keys.
> Here is some text from API documentation about the device keys.
>
> "When adding new devices, it may be necessary for a client to assign keys
> temporarily in order to associate controllers with devices in configuring a
> virtual machine. However, the server does not allow a client to reassign a
> device key, and the server may assign a different value from the one passed
> during configuration. Clients should ensure that existing device keys are not
> reused as temporary key values for the new device to be added (for example, by
> using negative integers as temporary keys)."

Very good.

>
>
> > 190 + interface_str = "%s;%s;%s;%s;%s;%s" % \
> > 191 + (info['mac'],
> > 192 + ip_v4 and ip_v4['ip'] or '',
> > 193 + ip_v4 and ip_v4['netmask'] or '',
> > 194 + info['gateway'],
> > 195 + info['broadcast'],
> > 196 + dns)
> >
> > The machine_id_str now is the concatenation of
> > MAC,ip,netmask,gateway,broadcast, and DNS for each interface. Is there a
> limit
> > on the number of characters...

Read more...

review: Approve
1497. By Sateesh

Rebasing to nova mainline revision 1519

Unmerged revisions

1497. By Sateesh

Rebasing to nova mainline revision 1519

1496. By Sateesh

Addressed Salvatore's review comments.
1) Used vif_info dict object instead of passing list members as vif_info[index]
2) Removed stale comments like + # vlan_num, bridge, net_attrs=None):
3) Fixed unit tests that got araised due to increased code coverage.

Also check for flat_inject state before attempting an inject operation.
Also updated some comments to be more meaningful.

1495. By Sateesh

updated vlan bridge_interface to ESX specific name starting with 'vmnic'

1494. By Sateesh

Minor nit.

1493. By Sateesh

Multi-NIC support for vmwareapi virt driver in nova.
Does injection of Multi-NIC information to instances with Operating system flavors Ubuntu, Windows and RHEL.
vmwareapi virt driver now relies on calls to network manager instead of nova db calls for network configuration information of instance.
Ensures vlan bridge on specified interface in case of VLAN networking for instances.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bin/instance-usage-audit'
2--- bin/instance-usage-audit 2011-06-28 20:37:05 +0000
3+++ bin/instance-usage-audit 2011-09-01 10:57:46 +0000
4@@ -102,9 +102,8 @@
5 logging.setup()
6 begin, end = time_period(FLAGS.instance_usage_audit_period)
7 print "Creating usages for %s until %s" % (str(begin), str(end))
8- instances = db.instance_get_active_by_window(context.get_admin_context(),
9- begin,
10- end)
11+ ctxt = context.get_admin_context()
12+ instances = db.instance_get_active_by_window_joined(ctxt, begin, end)
13 print "%s instances" % len(instances)
14 for instance_ref in instances:
15 usage_info = utils.usage_from_instance(instance_ref,
16
17=== modified file 'bin/nova-ajax-console-proxy'
18--- bin/nova-ajax-console-proxy 2011-08-18 17:55:39 +0000
19+++ bin/nova-ajax-console-proxy 2011-09-01 10:57:46 +0000
20@@ -113,11 +113,10 @@
21 AjaxConsoleProxy.tokens[kwargs['token']] = \
22 {'args': kwargs, 'last_activity': time.time()}
23
24- conn = rpc.create_connection(new=True)
25- consumer = rpc.create_consumer(
26- conn,
27- FLAGS.ajax_console_proxy_topic,
28- TopicProxy)
29+ self.conn = rpc.create_connection(new=True)
30+ self.conn.create_consumer(
31+ FLAGS.ajax_console_proxy_topic,
32+ TopicProxy)
33
34 def delete_expired_tokens():
35 now = time.time()
36@@ -129,7 +128,7 @@
37 for k in to_delete:
38 del AjaxConsoleProxy.tokens[k]
39
40- utils.LoopingCall(consumer.fetch, enable_callbacks=True).start(0.1)
41+ self.conn.consume_in_thread()
42 utils.LoopingCall(delete_expired_tokens).start(1)
43
44 if __name__ == '__main__':
45@@ -142,3 +141,4 @@
46 server = wsgi.Server("AJAX Console Proxy", acp, port=acp_port)
47 service.serve(server)
48 service.wait()
49+ self.conn.close()
50
51=== modified file 'bin/nova-manage'
52--- bin/nova-manage 2011-08-24 21:01:33 +0000
53+++ bin/nova-manage 2011-09-01 10:57:46 +0000
54@@ -53,6 +53,7 @@
55 CLI interface for nova management.
56 """
57
58+import ast
59 import gettext
60 import glob
61 import json
62@@ -85,11 +86,13 @@
63 from nova import rpc
64 from nova import utils
65 from nova import version
66+from nova import vsa
67 from nova.api.ec2 import ec2utils
68 from nova.auth import manager
69 from nova.cloudpipe import pipelib
70 from nova.compute import instance_types
71 from nova.db import migration
72+from nova.volume import volume_types
73
74 FLAGS = flags.FLAGS
75 flags.DECLARE('fixed_range', 'nova.network.manager')
76@@ -1097,6 +1100,477 @@
77 self.list()
78
79
80+class VsaCommands(object):
81+ """Methods for dealing with VSAs"""
82+
83+ def __init__(self, *args, **kwargs):
84+ self.manager = manager.AuthManager()
85+ self.vsa_api = vsa.API()
86+ self.context = context.get_admin_context()
87+
88+ self._format_str_vsa = "%(id)-5s %(vsa_id)-15s %(name)-25s "\
89+ "%(type)-10s %(vcs)-6s %(drives)-9s %(stat)-10s "\
90+ "%(az)-10s %(time)-10s"
91+ self._format_str_volume = "\t%(id)-4s %(name)-15s %(size)-5s "\
92+ "%(stat)-10s %(att)-20s %(time)s"
93+ self._format_str_drive = "\t%(id)-4s %(name)-15s %(size)-5s "\
94+ "%(stat)-10s %(host)-20s %(type)-4s %(tname)-10s %(time)s"
95+ self._format_str_instance = "\t%(id)-4s %(name)-10s %(dname)-20s "\
96+ "%(image)-12s %(type)-10s %(fl_ip)-15s %(fx_ip)-15s "\
97+ "%(stat)-10s %(host)-15s %(time)s"
98+
99+ def _print_vsa_header(self):
100+ print self._format_str_vsa %\
101+ dict(id=_('ID'),
102+ vsa_id=_('vsa_id'),
103+ name=_('displayName'),
104+ type=_('vc_type'),
105+ vcs=_('vc_cnt'),
106+ drives=_('drive_cnt'),
107+ stat=_('status'),
108+ az=_('AZ'),
109+ time=_('createTime'))
110+
111+ def _print_vsa(self, vsa):
112+ print self._format_str_vsa %\
113+ dict(id=vsa['id'],
114+ vsa_id=vsa['name'],
115+ name=vsa['display_name'],
116+ type=vsa['vsa_instance_type'].get('name', None),
117+ vcs=vsa['vc_count'],
118+ drives=vsa['vol_count'],
119+ stat=vsa['status'],
120+ az=vsa['availability_zone'],
121+ time=str(vsa['created_at']))
122+
123+ def _print_volume_header(self):
124+ print _(' === Volumes ===')
125+ print self._format_str_volume %\
126+ dict(id=_('ID'),
127+ name=_('name'),
128+ size=_('size'),
129+ stat=_('status'),
130+ att=_('attachment'),
131+ time=_('createTime'))
132+
133+ def _print_volume(self, vol):
134+ print self._format_str_volume %\
135+ dict(id=vol['id'],
136+ name=vol['display_name'] or vol['name'],
137+ size=vol['size'],
138+ stat=vol['status'],
139+ att=vol['attach_status'],
140+ time=str(vol['created_at']))
141+
142+ def _print_drive_header(self):
143+ print _(' === Drives ===')
144+ print self._format_str_drive %\
145+ dict(id=_('ID'),
146+ name=_('name'),
147+ size=_('size'),
148+ stat=_('status'),
149+ host=_('host'),
150+ type=_('type'),
151+ tname=_('typeName'),
152+ time=_('createTime'))
153+
154+ def _print_drive(self, drive):
155+ if drive['volume_type_id'] is not None and drive.get('volume_type'):
156+ drive_type_name = drive['volume_type'].get('name')
157+ else:
158+ drive_type_name = ''
159+
160+ print self._format_str_drive %\
161+ dict(id=drive['id'],
162+ name=drive['display_name'],
163+ size=drive['size'],
164+ stat=drive['status'],
165+ host=drive['host'],
166+ type=drive['volume_type_id'],
167+ tname=drive_type_name,
168+ time=str(drive['created_at']))
169+
170+ def _print_instance_header(self):
171+ print _(' === Instances ===')
172+ print self._format_str_instance %\
173+ dict(id=_('ID'),
174+ name=_('name'),
175+ dname=_('disp_name'),
176+ image=_('image'),
177+ type=_('type'),
178+ fl_ip=_('floating_IP'),
179+ fx_ip=_('fixed_IP'),
180+ stat=_('status'),
181+ host=_('host'),
182+ time=_('createTime'))
183+
184+ def _print_instance(self, vc):
185+
186+ fixed_addr = None
187+ floating_addr = None
188+ if vc['fixed_ips']:
189+ fixed = vc['fixed_ips'][0]
190+ fixed_addr = fixed['address']
191+ if fixed['floating_ips']:
192+ floating_addr = fixed['floating_ips'][0]['address']
193+ floating_addr = floating_addr or fixed_addr
194+
195+ print self._format_str_instance %\
196+ dict(id=vc['id'],
197+ name=ec2utils.id_to_ec2_id(vc['id']),
198+ dname=vc['display_name'],
199+ image=('ami-%08x' % int(vc['image_ref'])),
200+ type=vc['instance_type']['name'],
201+ fl_ip=floating_addr,
202+ fx_ip=fixed_addr,
203+ stat=vc['state_description'],
204+ host=vc['host'],
205+ time=str(vc['created_at']))
206+
207+ def _list(self, context, vsas, print_drives=False,
208+ print_volumes=False, print_instances=False):
209+ if vsas:
210+ self._print_vsa_header()
211+
212+ for vsa in vsas:
213+ self._print_vsa(vsa)
214+ vsa_id = vsa.get('id')
215+
216+ if print_instances:
217+ instances = self.vsa_api.get_all_vsa_instances(context, vsa_id)
218+ if instances:
219+ print
220+ self._print_instance_header()
221+ for instance in instances:
222+ self._print_instance(instance)
223+ print
224+
225+ if print_drives:
226+ drives = self.vsa_api.get_all_vsa_drives(context, vsa_id)
227+ if drives:
228+ self._print_drive_header()
229+ for drive in drives:
230+ self._print_drive(drive)
231+ print
232+
233+ if print_volumes:
234+ volumes = self.vsa_api.get_all_vsa_volumes(context, vsa_id)
235+ if volumes:
236+ self._print_volume_header()
237+ for volume in volumes:
238+ self._print_volume(volume)
239+ print
240+
241+ @args('--storage', dest='storage',
242+ metavar="[{'drive_name': 'type', 'num_drives': N, 'size': M},..]",
243+ help='Initial storage allocation for VSA')
244+ @args('--name', dest='name', metavar="<name>", help='VSA name')
245+ @args('--description', dest='description', metavar="<description>",
246+ help='VSA description')
247+ @args('--vc', dest='vc_count', metavar="<number>", help='Number of VCs')
248+ @args('--instance_type', dest='instance_type_name', metavar="<name>",
249+ help='Instance type name')
250+ @args('--image', dest='image_name', metavar="<name>", help='Image name')
251+ @args('--shared', dest='shared', action="store_true", default=False,
252+ help='Use shared drives')
253+ @args('--az', dest='az', metavar="<zone:host>", help='Availability zone')
254+ @args('--user', dest="user_id", metavar='<User name>',
255+ help='User name')
256+ @args('--project', dest="project_id", metavar='<Project name>',
257+ help='Project name')
258+ def create(self, storage='[]', name=None, description=None, vc_count=1,
259+ instance_type_name=None, image_name=None, shared=None,
260+ az=None, user_id=None, project_id=None):
261+ """Create a VSA."""
262+
263+ if project_id is None:
264+ try:
265+ project_id = os.getenv("EC2_ACCESS_KEY").split(':')[1]
266+ except Exception as exc:
267+ print _("Failed to retrieve project id: %(exc)s") % exc
268+ raise
269+
270+ if user_id is None:
271+ try:
272+ project = self.manager.get_project(project_id)
273+ user_id = project.project_manager_id
274+ except Exception as exc:
275+ print _("Failed to retrieve user info: %(exc)s") % exc
276+ raise
277+
278+ is_admin = self.manager.is_admin(user_id)
279+ ctxt = context.RequestContext(user_id, project_id, is_admin)
280+ if not is_admin and \
281+ not self.manager.is_project_member(user_id, project_id):
282+ msg = _("%(user_id)s must be an admin or a "
283+ "member of %(project_id)s")
284+ LOG.warn(msg % locals())
285+ raise ValueError(msg % locals())
286+
287+ # Sanity check for storage string
288+ storage_list = []
289+ if storage is not None:
290+ try:
291+ storage_list = ast.literal_eval(storage)
292+ except:
293+ print _("Invalid string format %s") % storage
294+ raise
295+
296+ for node in storage_list:
297+ if ('drive_name' not in node) or ('num_drives' not in node):
298+ print (_("Invalid string format for element %s. " \
299+ "Expecting keys 'drive_name' & 'num_drives'"),
300+ str(node))
301+ raise KeyError
302+
303+ if instance_type_name == '':
304+ instance_type_name = None
305+ instance_type = instance_types.get_instance_type_by_name(
306+ instance_type_name)
307+
308+ if image_name == '':
309+ image_name = None
310+
311+ if shared in [None, False, "--full_drives"]:
312+ shared = False
313+ elif shared in [True, "--shared"]:
314+ shared = True
315+ else:
316+ raise ValueError(_('Shared parameter should be set either to "\
317+ "--shared or --full_drives'))
318+
319+ values = {
320+ 'display_name': name,
321+ 'display_description': description,
322+ 'vc_count': int(vc_count),
323+ 'instance_type': instance_type,
324+ 'image_name': image_name,
325+ 'availability_zone': az,
326+ 'storage': storage_list,
327+ 'shared': shared,
328+ }
329+
330+ result = self.vsa_api.create(ctxt, **values)
331+ self._list(ctxt, [result])
332+
333+ @args('--id', dest='vsa_id', metavar="<vsa_id>", help='VSA ID')
334+ @args('--name', dest='name', metavar="<name>", help='VSA name')
335+ @args('--description', dest='description', metavar="<description>",
336+ help='VSA description')
337+ @args('--vc', dest='vc_count', metavar="<number>", help='Number of VCs')
338+ def update(self, vsa_id, name=None, description=None, vc_count=None):
339+ """Updates name/description of vsa and number of VCs."""
340+
341+ values = {}
342+ if name is not None:
343+ values['display_name'] = name
344+ if description is not None:
345+ values['display_description'] = description
346+ if vc_count is not None:
347+ values['vc_count'] = int(vc_count)
348+
349+ vsa_id = ec2utils.ec2_id_to_id(vsa_id)
350+ result = self.vsa_api.update(self.context, vsa_id=vsa_id, **values)
351+ self._list(self.context, [result])
352+
353+ @args('--id', dest='vsa_id', metavar="<vsa_id>", help='VSA ID')
354+ def delete(self, vsa_id):
355+ """Delete a VSA."""
356+ vsa_id = ec2utils.ec2_id_to_id(vsa_id)
357+ self.vsa_api.delete(self.context, vsa_id)
358+
359+ @args('--id', dest='vsa_id', metavar="<vsa_id>",
360+ help='VSA ID (optional)')
361+ @args('--all', dest='all', action="store_true", default=False,
362+ help='Show all available details')
363+ @args('--drives', dest='drives', action="store_true",
364+ help='Include drive-level details')
365+ @args('--volumes', dest='volumes', action="store_true",
366+ help='Include volume-level details')
367+ @args('--instances', dest='instances', action="store_true",
368+ help='Include instance-level details')
369+ def list(self, vsa_id=None, all=False,
370+ drives=False, volumes=False, instances=False):
371+ """Describe all available VSAs (or particular one)."""
372+
373+ vsas = []
374+ if vsa_id is not None:
375+ internal_id = ec2utils.ec2_id_to_id(vsa_id)
376+ vsa = self.vsa_api.get(self.context, internal_id)
377+ vsas.append(vsa)
378+ else:
379+ vsas = self.vsa_api.get_all(self.context)
380+
381+ if all:
382+ drives = volumes = instances = True
383+
384+ self._list(self.context, vsas, drives, volumes, instances)
385+
386+ def update_capabilities(self):
387+ """Forces updates capabilities on all nova-volume nodes."""
388+
389+ rpc.fanout_cast(context.get_admin_context(),
390+ FLAGS.volume_topic,
391+ {"method": "notification",
392+ "args": {"event": "startup"}})
393+
394+
395+class VsaDriveTypeCommands(object):
396+ """Methods for dealing with VSA drive types"""
397+
398+ def __init__(self, *args, **kwargs):
399+ super(VsaDriveTypeCommands, self).__init__(*args, **kwargs)
400+ self.context = context.get_admin_context()
401+ self._drive_type_template = '%s_%sGB_%sRPM'
402+
403+ def _list(self, drives):
404+ format_str = "%-5s %-30s %-10s %-10s %-10s %-20s %-10s %s"
405+ if len(drives):
406+ print format_str %\
407+ (_('ID'),
408+ _('name'),
409+ _('type'),
410+ _('size_gb'),
411+ _('rpm'),
412+ _('capabilities'),
413+ _('visible'),
414+ _('createTime'))
415+
416+ for name, vol_type in drives.iteritems():
417+ drive = vol_type.get('extra_specs')
418+ print format_str %\
419+ (str(vol_type['id']),
420+ drive['drive_name'],
421+ drive['drive_type'],
422+ drive['drive_size'],
423+ drive['drive_rpm'],
424+ drive.get('capabilities', ''),
425+ str(drive.get('visible', '')),
426+ str(vol_type['created_at']))
427+
428+ @args('--type', dest='type', metavar="<type>",
429+ help='Drive type (SATA, SAS, SSD, etc.)')
430+ @args('--size', dest='size_gb', metavar="<gb>", help='Drive size in GB')
431+ @args('--rpm', dest='rpm', metavar="<rpm>", help='RPM')
432+ @args('--capabilities', dest='capabilities', default=None,
433+ metavar="<string>", help='Different capabilities')
434+ @args('--hide', dest='hide', action="store_true", default=False,
435+ help='Show or hide drive')
436+ @args('--name', dest='name', metavar="<name>", help='Drive name')
437+ def create(self, type, size_gb, rpm, capabilities=None,
438+ hide=False, name=None):
439+ """Create drive type."""
440+
441+ hide = True if hide in [True, "True", "--hide", "hide"] else False
442+
443+ if name is None:
444+ name = self._drive_type_template % (type, size_gb, rpm)
445+
446+ extra_specs = {'type': 'vsa_drive',
447+ 'drive_name': name,
448+ 'drive_type': type,
449+ 'drive_size': size_gb,
450+ 'drive_rpm': rpm,
451+ 'visible': True,
452+ }
453+ if hide:
454+ extra_specs['visible'] = False
455+
456+ if capabilities is not None and capabilities != '':
457+ extra_specs['capabilities'] = capabilities
458+
459+ volume_types.create(self.context, name, extra_specs)
460+ result = volume_types.get_volume_type_by_name(self.context, name)
461+ self._list({name: result})
462+
463+ @args('--name', dest='name', metavar="<name>", help='Drive name')
464+ @args('--purge', action="store_true", dest='purge', default=False,
465+ help='purge record from database')
466+ def delete(self, name, purge):
467+ """Marks instance types / flavors as deleted"""
468+ try:
469+ if purge:
470+ volume_types.purge(self.context, name)
471+ verb = "purged"
472+ else:
473+ volume_types.destroy(self.context, name)
474+ verb = "deleted"
475+ except exception.ApiError:
476+ print "Valid volume type name is required"
477+ sys.exit(1)
478+ except exception.DBError, e:
479+ print "DB Error: %s" % e
480+ sys.exit(2)
481+ except:
482+ sys.exit(3)
483+ else:
484+ print "%s %s" % (name, verb)
485+
486+ @args('--all', dest='all', action="store_true", default=False,
487+ help='Show all drives (including invisible)')
488+ @args('--name', dest='name', metavar="<name>",
489+ help='Show only specified drive')
490+ def list(self, all=False, name=None):
491+ """Describe all available VSA drive types (or particular one)."""
492+
493+ all = False if all in ["--all", False, "False"] else True
494+
495+ search_opts = {'extra_specs': {'type': 'vsa_drive'}}
496+ if name is not None:
497+ search_opts['extra_specs']['name'] = name
498+
499+ if all == False:
500+ search_opts['extra_specs']['visible'] = '1'
501+
502+ drives = volume_types.get_all_types(self.context,
503+ search_opts=search_opts)
504+ self._list(drives)
505+
506+ @args('--name', dest='name', metavar="<name>", help='Drive name')
507+ @args('--type', dest='type', metavar="<type>",
508+ help='Drive type (SATA, SAS, SSD, etc.)')
509+ @args('--size', dest='size_gb', metavar="<gb>", help='Drive size in GB')
510+ @args('--rpm', dest='rpm', metavar="<rpm>", help='RPM')
511+ @args('--capabilities', dest='capabilities', default=None,
512+ metavar="<string>", help='Different capabilities')
513+ @args('--visible', dest='visible',
514+ metavar="<show|hide>", help='Show or hide drive')
515+ def update(self, name, type=None, size_gb=None, rpm=None,
516+ capabilities=None, visible=None):
517+ """Update drive type."""
518+
519+ volume_type = volume_types.get_volume_type_by_name(self.context, name)
520+
521+ extra_specs = {'type': 'vsa_drive'}
522+
523+ if type:
524+ extra_specs['drive_type'] = type
525+
526+ if size_gb:
527+ extra_specs['drive_size'] = size_gb
528+
529+ if rpm:
530+ extra_specs['drive_rpm'] = rpm
531+
532+ if capabilities:
533+ extra_specs['capabilities'] = capabilities
534+
535+ if visible is not None:
536+ if visible in ["show", True, "True"]:
537+ extra_specs['visible'] = True
538+ elif visible in ["hide", False, "False"]:
539+ extra_specs['visible'] = False
540+ else:
541+ raise ValueError(_('visible parameter should be set to '\
542+ 'show or hide'))
543+
544+ db.api.volume_type_extra_specs_update_or_create(self.context,
545+ volume_type['id'],
546+ extra_specs)
547+ result = volume_types.get_volume_type_by_name(self.context, name)
548+ self._list({name: result})
549+
550+
551 class VolumeCommands(object):
552 """Methods for dealing with a cloud in an odd state"""
553
554@@ -1483,6 +1957,7 @@
555 ('agent', AgentBuildCommands),
556 ('config', ConfigCommands),
557 ('db', DbCommands),
558+ ('drive', VsaDriveTypeCommands),
559 ('fixed', FixedIpCommands),
560 ('flavor', InstanceTypeCommands),
561 ('floating', FloatingIpCommands),
562@@ -1498,7 +1973,8 @@
563 ('version', VersionCommands),
564 ('vm', VmCommands),
565 ('volume', VolumeCommands),
566- ('vpn', VpnCommands)]
567+ ('vpn', VpnCommands),
568+ ('vsa', VsaCommands)]
569
570
571 def lazy_match(name, key_value_tuples):
572
573=== added file 'bin/nova-vsa'
574--- bin/nova-vsa 1970-01-01 00:00:00 +0000
575+++ bin/nova-vsa 2011-09-01 10:57:46 +0000
576@@ -0,0 +1,49 @@
577+#!/usr/bin/env python
578+# vim: tabstop=4 shiftwidth=4 softtabstop=4
579+
580+# Copyright (c) 2011 Zadara Storage Inc.
581+# Copyright (c) 2011 OpenStack LLC.
582+#
583+#
584+# Licensed under the Apache License, Version 2.0 (the "License"); you may
585+# not use this file except in compliance with the License. You may obtain
586+# a copy of the License at
587+#
588+# http://www.apache.org/licenses/LICENSE-2.0
589+#
590+# Unless required by applicable law or agreed to in writing, software
591+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
592+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
593+# License for the specific language governing permissions and limitations
594+# under the License.
595+
596+"""Starter script for Nova VSA."""
597+
598+import eventlet
599+eventlet.monkey_patch()
600+
601+import os
602+import sys
603+
604+# If ../nova/__init__.py exists, add ../ to Python search path, so that
605+# it will override what happens to be installed in /usr/(local/)lib/python...
606+possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
607+ os.pardir,
608+ os.pardir))
609+if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
610+ sys.path.insert(0, possible_topdir)
611+
612+
613+from nova import flags
614+from nova import log as logging
615+from nova import service
616+from nova import utils
617+
618+if __name__ == '__main__':
619+ utils.default_flagfile()
620+ flags.FLAGS(sys.argv)
621+ logging.setup()
622+ utils.monkey_patch()
623+ server = service.Service.create(binary='nova-vsa')
624+ service.serve(server)
625+ service.wait()
626
627=== modified file 'contrib/nova.sh'
628--- contrib/nova.sh 2011-08-02 14:09:58 +0000
629+++ contrib/nova.sh 2011-09-01 10:57:46 +0000
630@@ -81,7 +81,7 @@
631 sudo apt-get install -y python-netaddr python-pastedeploy python-eventlet
632 sudo apt-get install -y python-novaclient python-glance python-cheetah
633 sudo apt-get install -y python-carrot python-tempita python-sqlalchemy
634- sudo apt-get install -y python-suds
635+ sudo apt-get install -y python-suds python-kombu
636
637
638 if [ "$USE_IPV6" == 1 ]; then
639
640=== modified file 'nova/api/ec2/__init__.py'
641--- nova/api/ec2/__init__.py 2011-08-22 21:24:59 +0000
642+++ nova/api/ec2/__init__.py 2011-09-01 10:57:46 +0000
643@@ -20,7 +20,10 @@
644
645 """
646
647-import httplib2
648+from urlparse import urlparse
649+
650+import eventlet
651+from eventlet.green import httplib
652 import webob
653 import webob.dec
654 import webob.exc
655@@ -35,7 +38,6 @@
656 from nova.api.ec2 import ec2utils
657 from nova.auth import manager
658
659-
660 FLAGS = flags.FLAGS
661 LOG = logging.getLogger("nova.api")
662 flags.DEFINE_integer('lockout_attempts', 5,
663@@ -158,7 +160,6 @@
664 auth_params.pop('Signature')
665
666 # Authenticate the request.
667- client = httplib2.Http()
668 creds = {'ec2Credentials': {'access': access,
669 'signature': signature,
670 'host': req.host,
671@@ -166,18 +167,24 @@
672 'path': req.path,
673 'params': auth_params,
674 }}
675- headers = {'Content-Type': 'application/json'},
676- resp, content = client.request(FLAGS.keystone_ec2_url,
677- 'POST',
678- headers=headers,
679- body=utils.dumps(creds))
680+ creds_json = utils.dumps(creds)
681+ headers = {'Content-Type': 'application/json'}
682+ o = urlparse(FLAGS.keystone_ec2_url)
683+ if o.scheme == "http":
684+ conn = httplib.HTTPConnection(o.netloc)
685+ else:
686+ conn = httplib.HTTPSConnection(o.netloc)
687+ conn.request('POST', o.path, body=creds_json, headers=headers)
688+ response = conn.getresponse().read()
689+ conn.close()
690+
691 # NOTE(vish): We could save a call to keystone by
692 # having keystone return token, tenant,
693 # user, and roles from this call.
694- result = utils.loads(content)
695+ result = utils.loads(response)
696 # TODO(vish): check for errors
697+
698 token_id = result['auth']['token']['id']
699-
700 # Authenticated!
701 req.headers['X-Auth-Token'] = token_id
702 return self.application
703
704=== modified file 'nova/api/ec2/cloud.py'
705--- nova/api/ec2/cloud.py 2011-08-16 14:49:26 +0000
706+++ nova/api/ec2/cloud.py 2011-09-01 10:57:46 +0000
707@@ -47,6 +47,7 @@
708 from nova import volume
709 from nova.api.ec2 import ec2utils
710 from nova.compute import instance_types
711+from nova.compute import vm_states
712 from nova.image import s3
713
714
715@@ -78,6 +79,30 @@
716 return {'private_key': private_key, 'fingerprint': fingerprint}
717
718
719+# EC2 API can return the following values as documented in the EC2 API
720+# http://docs.amazonwebservices.com/AWSEC2/latest/APIReference/
721+# ApiReference-ItemType-InstanceStateType.html
722+# pending | running | shutting-down | terminated | stopping | stopped
723+_STATE_DESCRIPTION_MAP = {
724+ None: 'pending',
725+ vm_states.ACTIVE: 'running',
726+ vm_states.BUILDING: 'pending',
727+ vm_states.REBUILDING: 'pending',
728+ vm_states.DELETED: 'terminated',
729+ vm_states.STOPPED: 'stopped',
730+ vm_states.MIGRATING: 'migrate',
731+ vm_states.RESIZING: 'resize',
732+ vm_states.PAUSED: 'pause',
733+ vm_states.SUSPENDED: 'suspend',
734+ vm_states.RESCUED: 'rescue',
735+}
736+
737+
738+def state_description_from_vm_state(vm_state):
739+ """Map the vm state to the server status string"""
740+ return _STATE_DESCRIPTION_MAP.get(vm_state, vm_state)
741+
742+
743 # TODO(yamahata): hypervisor dependent default device name
744 _DEFAULT_ROOT_DEVICE_NAME = '/dev/sda1'
745 _DEFAULT_MAPPINGS = {'ami': 'sda1',
746@@ -1039,11 +1064,12 @@
747
748 def _format_attr_instance_initiated_shutdown_behavior(instance,
749 result):
750- state_description = instance['state_description']
751- state_to_value = {'stopping': 'stop',
752- 'stopped': 'stop',
753- 'terminating': 'terminate'}
754- value = state_to_value.get(state_description)
755+ vm_state = instance['vm_state']
756+ state_to_value = {
757+ vm_states.STOPPED: 'stopped',
758+ vm_states.DELETED: 'terminated',
759+ }
760+ value = state_to_value.get(vm_state)
761 if value:
762 result['instanceInitiatedShutdownBehavior'] = value
763
764@@ -1198,8 +1224,8 @@
765 self._format_kernel_id(instance, i, 'kernelId')
766 self._format_ramdisk_id(instance, i, 'ramdiskId')
767 i['instanceState'] = {
768- 'code': instance['state'],
769- 'name': instance['state_description']}
770+ 'code': instance['power_state'],
771+ 'name': state_description_from_vm_state(instance['vm_state'])}
772 fixed_addr = None
773 floating_addr = None
774 if instance['fixed_ips']:
775@@ -1618,22 +1644,22 @@
776 # stop the instance if necessary
777 restart_instance = False
778 if not no_reboot:
779- state_description = instance['state_description']
780+ vm_state = instance['vm_state']
781
782 # if the instance is in subtle state, refuse to proceed.
783- if state_description not in ('running', 'stopping', 'stopped'):
784+ if vm_state not in (vm_states.ACTIVE, vm_states.STOPPED):
785 raise exception.InstanceNotRunning(instance_id=ec2_instance_id)
786
787- if state_description == 'running':
788+ if vm_state == vm_states.ACTIVE:
789 restart_instance = True
790 self.compute_api.stop(context, instance_id=instance_id)
791
792 # wait instance for really stopped
793 start_time = time.time()
794- while state_description != 'stopped':
795+ while vm_state != vm_states.STOPPED:
796 time.sleep(1)
797 instance = self.compute_api.get(context, instance_id)
798- state_description = instance['state_description']
799+ vm_state = instance['vm_state']
800 # NOTE(yamahata): timeout and error. 1 hour for now for safety.
801 # Is it too short/long?
802 # Or is there any better way?
803
804=== modified file 'nova/api/openstack/common.py'
805--- nova/api/openstack/common.py 2011-08-17 07:41:17 +0000
806+++ nova/api/openstack/common.py 2011-09-01 10:57:46 +0000
807@@ -27,7 +27,8 @@
808 from nova import log as logging
809 from nova import quota
810 from nova.api.openstack import wsgi
811-from nova.compute import power_state as compute_power_state
812+from nova.compute import vm_states
813+from nova.compute import task_states
814
815
816 LOG = logging.getLogger('nova.api.openstack.common')
817@@ -38,36 +39,61 @@
818 XML_NS_V11 = 'http://docs.openstack.org/compute/api/v1.1'
819
820
821-_STATUS_MAP = {
822- None: 'BUILD',
823- compute_power_state.NOSTATE: 'BUILD',
824- compute_power_state.RUNNING: 'ACTIVE',
825- compute_power_state.BLOCKED: 'ACTIVE',
826- compute_power_state.SUSPENDED: 'SUSPENDED',
827- compute_power_state.PAUSED: 'PAUSED',
828- compute_power_state.SHUTDOWN: 'SHUTDOWN',
829- compute_power_state.SHUTOFF: 'SHUTOFF',
830- compute_power_state.CRASHED: 'ERROR',
831- compute_power_state.FAILED: 'ERROR',
832- compute_power_state.BUILDING: 'BUILD',
833+_STATE_MAP = {
834+ vm_states.ACTIVE: {
835+ 'default': 'ACTIVE',
836+ task_states.REBOOTING: 'REBOOT',
837+ task_states.UPDATING_PASSWORD: 'PASSWORD',
838+ task_states.RESIZE_VERIFY: 'VERIFY_RESIZE',
839+ },
840+ vm_states.BUILDING: {
841+ 'default': 'BUILD',
842+ },
843+ vm_states.REBUILDING: {
844+ 'default': 'REBUILD',
845+ },
846+ vm_states.STOPPED: {
847+ 'default': 'STOPPED',
848+ },
849+ vm_states.MIGRATING: {
850+ 'default': 'MIGRATING',
851+ },
852+ vm_states.RESIZING: {
853+ 'default': 'RESIZE',
854+ },
855+ vm_states.PAUSED: {
856+ 'default': 'PAUSED',
857+ },
858+ vm_states.SUSPENDED: {
859+ 'default': 'SUSPENDED',
860+ },
861+ vm_states.RESCUED: {
862+ 'default': 'RESCUE',
863+ },
864+ vm_states.ERROR: {
865+ 'default': 'ERROR',
866+ },
867+ vm_states.DELETED: {
868+ 'default': 'DELETED',
869+ },
870 }
871
872
873-def status_from_power_state(power_state):
874- """Map the power state to the server status string"""
875- return _STATUS_MAP[power_state]
876-
877-
878-def power_states_from_status(status):
879- """Map the server status string to a list of power states"""
880- power_states = []
881- for power_state, status_map in _STATUS_MAP.iteritems():
882- # Skip the 'None' state
883- if power_state is None:
884- continue
885- if status.lower() == status_map.lower():
886- power_states.append(power_state)
887- return power_states
888+def status_from_state(vm_state, task_state='default'):
889+ """Given vm_state and task_state, return a status string."""
890+ task_map = _STATE_MAP.get(vm_state, dict(default='UNKNOWN_STATE'))
891+ status = task_map.get(task_state, task_map['default'])
892+ LOG.debug("Generated %(status)s from vm_state=%(vm_state)s "
893+ "task_state=%(task_state)s." % locals())
894+ return status
895+
896+
897+def vm_state_from_status(status):
898+ """Map the server status string to a vm state."""
899+ for state, task_map in _STATE_MAP.iteritems():
900+ status_string = task_map.get("default")
901+ if status.lower() == status_string.lower():
902+ return state
903
904
905 def get_pagination_params(request):
906
907=== modified file 'nova/api/openstack/contrib/floating_ips.py'
908--- nova/api/openstack/contrib/floating_ips.py 2011-08-22 12:28:12 +0000
909+++ nova/api/openstack/contrib/floating_ips.py 2011-09-01 10:57:46 +0000
910@@ -36,9 +36,9 @@
911 result['fixed_ip'] = floating_ip['fixed_ip']['address']
912 except (TypeError, KeyError):
913 result['fixed_ip'] = None
914- if 'instance' in floating_ip:
915- result['instance_id'] = floating_ip['instance']['id']
916- else:
917+ try:
918+ result['instance_id'] = floating_ip['fixed_ip']['instance_id']
919+ except (TypeError, KeyError):
920 result['instance_id'] = None
921 return {'floating_ip': result}
922
923@@ -96,7 +96,8 @@
924 except rpc.RemoteError as ex:
925 # NOTE(tr3buchet) - why does this block exist?
926 if ex.exc_type == 'NoMoreFloatingIps':
927- raise exception.NoMoreFloatingIps()
928+ msg = _("No more floating ips available.")
929+ raise webob.exc.HTTPBadRequest(explanation=msg)
930 else:
931 raise
932
933@@ -138,7 +139,11 @@
934 msg = _("Address not specified")
935 raise webob.exc.HTTPBadRequest(explanation=msg)
936
937- self.compute_api.associate_floating_ip(context, instance_id, address)
938+ try:
939+ self.compute_api.associate_floating_ip(context, instance_id,
940+ address)
941+ except exception.ApiError, e:
942+ raise webob.exc.HTTPBadRequest(explanation=e.message)
943
944 return webob.Response(status_int=202)
945
946
947=== added file 'nova/api/openstack/contrib/simple_tenant_usage.py'
948--- nova/api/openstack/contrib/simple_tenant_usage.py 1970-01-01 00:00:00 +0000
949+++ nova/api/openstack/contrib/simple_tenant_usage.py 2011-09-01 10:57:46 +0000
950@@ -0,0 +1,236 @@
951+# vim: tabstop=4 shiftwidth=4 softtabstop=4
952+
953+# Copyright 2011 OpenStack LLC.
954+# All Rights Reserved.
955+#
956+# Licensed under the Apache License, Version 2.0 (the "License"); you may
957+# not use this file except in compliance with the License. You may obtain
958+# a copy of the License at
959+#
960+# http://www.apache.org/licenses/LICENSE-2.0
961+#
962+# Unless required by applicable law or agreed to in writing, software
963+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
964+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
965+# License for the specific language governing permissions and limitations
966+# under the License.
967+
968+import urlparse
969+import webob
970+
971+from datetime import datetime
972+from nova import exception
973+from nova import flags
974+from nova.compute import api
975+from nova.api.openstack import extensions
976+from nova.api.openstack import views
977+from nova.db.sqlalchemy.session import get_session
978+from webob import exc
979+
980+
981+FLAGS = flags.FLAGS
982+
983+
984+class SimpleTenantUsageController(object):
985+ def _hours_for(self, instance, period_start, period_stop):
986+ launched_at = instance['launched_at']
987+ terminated_at = instance['terminated_at']
988+ if terminated_at is not None:
989+ if not isinstance(terminated_at, datetime):
990+ terminated_at = datetime.strptime(terminated_at,
991+ "%Y-%m-%d %H:%M:%S.%f")
992+
993+ if launched_at is not None:
994+ if not isinstance(launched_at, datetime):
995+ launched_at = datetime.strptime(launched_at,
996+ "%Y-%m-%d %H:%M:%S.%f")
997+
998+ if terminated_at and terminated_at < period_start:
999+ return 0
1000+ # nothing if it started after the usage report ended
1001+ if launched_at and launched_at > period_stop:
1002+ return 0
1003+ if launched_at:
1004+ # if instance launched after period_started, don't charge for first
1005+ start = max(launched_at, period_start)
1006+ if terminated_at:
1007+ # if instance stopped before period_stop, don't charge after
1008+ stop = min(period_stop, terminated_at)
1009+ else:
1010+ # instance is still running, so charge them up to current time
1011+ stop = period_stop
1012+ dt = stop - start
1013+ seconds = dt.days * 3600 * 24 + dt.seconds\
1014+ + dt.microseconds / 100000.0
1015+
1016+ return seconds / 3600.0
1017+ else:
1018+ # instance hasn't launched, so no charge
1019+ return 0
1020+
1021+ def _tenant_usages_for_period(self, context, period_start,
1022+ period_stop, tenant_id=None, detailed=True):
1023+
1024+ compute_api = api.API()
1025+ instances = compute_api.get_active_by_window(context,
1026+ period_start,
1027+ period_stop,
1028+ tenant_id)
1029+ from nova import log as logging
1030+ logging.info(instances)
1031+ rval = {}
1032+ flavors = {}
1033+
1034+ for instance in instances:
1035+ info = {}
1036+ info['hours'] = self._hours_for(instance,
1037+ period_start,
1038+ period_stop)
1039+ flavor_type = instance['instance_type_id']
1040+
1041+ if not flavors.get(flavor_type):
1042+ try:
1043+ it_ref = compute_api.get_instance_type(context,
1044+ flavor_type)
1045+ flavors[flavor_type] = it_ref
1046+ except exception.InstanceTypeNotFound:
1047+ # can't bill if there is no instance type
1048+ continue
1049+
1050+ flavor = flavors[flavor_type]
1051+
1052+ info['name'] = instance['display_name']
1053+
1054+ info['memory_mb'] = flavor['memory_mb']
1055+ info['local_gb'] = flavor['local_gb']
1056+ info['vcpus'] = flavor['vcpus']
1057+
1058+ info['tenant_id'] = instance['project_id']
1059+
1060+ info['flavor'] = flavor['name']
1061+
1062+ info['started_at'] = instance['launched_at']
1063+
1064+ info['ended_at'] = instance['terminated_at']
1065+
1066+ if info['ended_at']:
1067+ info['state'] = 'terminated'
1068+ else:
1069+ info['state'] = instance['state_description']
1070+
1071+ now = datetime.utcnow()
1072+
1073+ if info['state'] == 'terminated':
1074+ delta = info['ended_at'] - info['started_at']
1075+ else:
1076+ delta = now - info['started_at']
1077+
1078+ info['uptime'] = delta.days * 24 * 60 + delta.seconds
1079+
1080+ if not info['tenant_id'] in rval:
1081+ summary = {}
1082+ summary['tenant_id'] = info['tenant_id']
1083+ if detailed:
1084+ summary['server_usages'] = []
1085+ summary['total_local_gb_usage'] = 0
1086+ summary['total_vcpus_usage'] = 0
1087+ summary['total_memory_mb_usage'] = 0
1088+ summary['total_hours'] = 0
1089+ summary['start'] = period_start
1090+ summary['stop'] = period_stop
1091+ rval[info['tenant_id']] = summary
1092+
1093+ summary = rval[info['tenant_id']]
1094+ summary['total_local_gb_usage'] += info['local_gb'] * info['hours']
1095+ summary['total_vcpus_usage'] += info['vcpus'] * info['hours']
1096+ summary['total_memory_mb_usage'] += info['memory_mb']\
1097+ * info['hours']
1098+
1099+ summary['total_hours'] += info['hours']
1100+ if detailed:
1101+ summary['server_usages'].append(info)
1102+
1103+ return rval.values()
1104+
1105+ def _parse_datetime(self, dtstr):
1106+ if isinstance(dtstr, datetime):
1107+ return dtstr
1108+ try:
1109+ return datetime.strptime(dtstr, "%Y-%m-%dT%H:%M:%S")
1110+ except:
1111+ try:
1112+ return datetime.strptime(dtstr, "%Y-%m-%dT%H:%M:%S.%f")
1113+ except:
1114+ return datetime.strptime(dtstr, "%Y-%m-%d %H:%M:%S.%f")
1115+
1116+ def _get_datetime_range(self, req):
1117+ qs = req.environ.get('QUERY_STRING', '')
1118+ env = urlparse.parse_qs(qs)
1119+ period_start = self._parse_datetime(env.get('start',
1120+ [datetime.utcnow().isoformat()])[0])
1121+ period_stop = self._parse_datetime(env.get('end',
1122+ [datetime.utcnow().isoformat()])[0])
1123+
1124+ detailed = bool(env.get('detailed', False))
1125+ return (period_start, period_stop, detailed)
1126+
1127+ def index(self, req):
1128+ """Retrive tenant_usage for all tenants"""
1129+ context = req.environ['nova.context']
1130+
1131+ if not context.is_admin and FLAGS.allow_admin_api:
1132+ return webob.Response(status_int=403)
1133+
1134+ (period_start, period_stop, detailed) = self._get_datetime_range(req)
1135+ usages = self._tenant_usages_for_period(context,
1136+ period_start,
1137+ period_stop,
1138+ detailed=detailed)
1139+ return {'tenant_usages': usages}
1140+
1141+ def show(self, req, id):
1142+ """Retrive tenant_usage for a specified tenant"""
1143+ tenant_id = id
1144+ context = req.environ['nova.context']
1145+
1146+ if not context.is_admin and FLAGS.allow_admin_api:
1147+ if tenant_id != context.project_id:
1148+ return webob.Response(status_int=403)
1149+
1150+ (period_start, period_stop, ignore) = self._get_datetime_range(req)
1151+ usage = self._tenant_usages_for_period(context,
1152+ period_start,
1153+ period_stop,
1154+ tenant_id=tenant_id,
1155+ detailed=True)
1156+ if len(usage):
1157+ usage = usage[0]
1158+ else:
1159+ usage = {}
1160+ return {'tenant_usage': usage}
1161+
1162+
1163+class Simple_tenant_usage(extensions.ExtensionDescriptor):
1164+ def get_name(self):
1165+ return "SimpleTenantUsage"
1166+
1167+ def get_alias(self):
1168+ return "os-simple-tenant-usage"
1169+
1170+ def get_description(self):
1171+ return "Simple tenant usage extension"
1172+
1173+ def get_namespace(self):
1174+ return "http://docs.openstack.org/ext/os-simple-tenant-usage/api/v1.1"
1175+
1176+ def get_updated(self):
1177+ return "2011-08-19T00:00:00+00:00"
1178+
1179+ def get_resources(self):
1180+ resources = []
1181+
1182+ res = extensions.ResourceExtension('os-simple-tenant-usage',
1183+ SimpleTenantUsageController())
1184+ resources.append(res)
1185+
1186+ return resources
1187
1188=== added file 'nova/api/openstack/contrib/virtual_storage_arrays.py'
1189--- nova/api/openstack/contrib/virtual_storage_arrays.py 1970-01-01 00:00:00 +0000
1190+++ nova/api/openstack/contrib/virtual_storage_arrays.py 2011-09-01 10:57:46 +0000
1191@@ -0,0 +1,606 @@
1192+# vim: tabstop=4 shiftwidth=4 softtabstop=4
1193+
1194+# Copyright (c) 2011 Zadara Storage Inc.
1195+# Copyright (c) 2011 OpenStack LLC.
1196+#
1197+# Licensed under the Apache License, Version 2.0 (the "License"); you may
1198+# not use this file except in compliance with the License. You may obtain
1199+# a copy of the License at
1200+#
1201+# http://www.apache.org/licenses/LICENSE-2.0
1202+#
1203+# Unless required by applicable law or agreed to in writing, software
1204+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
1205+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1206+# License for the specific language governing permissions and limitations
1207+# under the License.
1208+
1209+""" The virtul storage array extension"""
1210+
1211+
1212+from webob import exc
1213+
1214+from nova import vsa
1215+from nova import volume
1216+from nova import compute
1217+from nova import network
1218+from nova import db
1219+from nova import quota
1220+from nova import exception
1221+from nova import log as logging
1222+from nova.api.openstack import common
1223+from nova.api.openstack import extensions
1224+from nova.api.openstack import faults
1225+from nova.api.openstack import wsgi
1226+from nova.api.openstack import servers
1227+from nova.api.openstack.contrib import volumes
1228+from nova.compute import instance_types
1229+
1230+from nova import flags
1231+FLAGS = flags.FLAGS
1232+
1233+LOG = logging.getLogger("nova.api.vsa")
1234+
1235+
1236+def _vsa_view(context, vsa, details=False, instances=None):
1237+ """Map keys for vsa summary/detailed view."""
1238+ d = {}
1239+
1240+ d['id'] = vsa.get('id')
1241+ d['name'] = vsa.get('name')
1242+ d['displayName'] = vsa.get('display_name')
1243+ d['displayDescription'] = vsa.get('display_description')
1244+
1245+ d['createTime'] = vsa.get('created_at')
1246+ d['status'] = vsa.get('status')
1247+
1248+ if 'vsa_instance_type' in vsa:
1249+ d['vcType'] = vsa['vsa_instance_type'].get('name', None)
1250+ else:
1251+ d['vcType'] = vsa['instance_type_id']
1252+
1253+ d['vcCount'] = vsa.get('vc_count')
1254+ d['driveCount'] = vsa.get('vol_count')
1255+
1256+ d['ipAddress'] = None
1257+ for instance in instances:
1258+ fixed_addr = None
1259+ floating_addr = None
1260+ if instance['fixed_ips']:
1261+ fixed = instance['fixed_ips'][0]
1262+ fixed_addr = fixed['address']
1263+ if fixed['floating_ips']:
1264+ floating_addr = fixed['floating_ips'][0]['address']
1265+
1266+ if floating_addr:
1267+ d['ipAddress'] = floating_addr
1268+ break
1269+ else:
1270+ d['ipAddress'] = d['ipAddress'] or fixed_addr
1271+
1272+ return d
1273+
1274+
1275+class VsaController(object):
1276+ """The Virtual Storage Array API controller for the OpenStack API."""
1277+
1278+ _serialization_metadata = {
1279+ 'application/xml': {
1280+ "attributes": {
1281+ "vsa": [
1282+ "id",
1283+ "name",
1284+ "displayName",
1285+ "displayDescription",
1286+ "createTime",
1287+ "status",
1288+ "vcType",
1289+ "vcCount",
1290+ "driveCount",
1291+ "ipAddress",
1292+ ]}}}
1293+
1294+ def __init__(self):
1295+ self.vsa_api = vsa.API()
1296+ self.compute_api = compute.API()
1297+ self.network_api = network.API()
1298+ super(VsaController, self).__init__()
1299+
1300+ def _get_instances_by_vsa_id(self, context, id):
1301+ return self.compute_api.get_all(context,
1302+ search_opts={'metadata': dict(vsa_id=str(id))})
1303+
1304+ def _items(self, req, details):
1305+ """Return summary or detailed list of VSAs."""
1306+ context = req.environ['nova.context']
1307+ vsas = self.vsa_api.get_all(context)
1308+ limited_list = common.limited(vsas, req)
1309+
1310+ vsa_list = []
1311+ for vsa in limited_list:
1312+ instances = self._get_instances_by_vsa_id(context, vsa.get('id'))
1313+ vsa_list.append(_vsa_view(context, vsa, details, instances))
1314+ return {'vsaSet': vsa_list}
1315+
1316+ def index(self, req):
1317+ """Return a short list of VSAs."""
1318+ return self._items(req, details=False)
1319+
1320+ def detail(self, req):
1321+ """Return a detailed list of VSAs."""
1322+ return self._items(req, details=True)
1323+
1324+ def show(self, req, id):
1325+ """Return data about the given VSA."""
1326+ context = req.environ['nova.context']
1327+
1328+ try:
1329+ vsa = self.vsa_api.get(context, vsa_id=id)
1330+ except exception.NotFound:
1331+ return faults.Fault(exc.HTTPNotFound())
1332+
1333+ instances = self._get_instances_by_vsa_id(context, vsa.get('id'))
1334+ return {'vsa': _vsa_view(context, vsa, True, instances)}
1335+
1336+ def create(self, req, body):
1337+ """Create a new VSA."""
1338+ context = req.environ['nova.context']
1339+
1340+ if not body or 'vsa' not in body:
1341+ LOG.debug(_("No body provided"), context=context)
1342+ return faults.Fault(exc.HTTPUnprocessableEntity())
1343+
1344+ vsa = body['vsa']
1345+
1346+ display_name = vsa.get('displayName')
1347+ vc_type = vsa.get('vcType', FLAGS.default_vsa_instance_type)
1348+ try:
1349+ instance_type = instance_types.get_instance_type_by_name(vc_type)
1350+ except exception.NotFound:
1351+ return faults.Fault(exc.HTTPNotFound())
1352+
1353+ LOG.audit(_("Create VSA %(display_name)s of type %(vc_type)s"),
1354+ locals(), context=context)
1355+
1356+ args = dict(display_name=display_name,
1357+ display_description=vsa.get('displayDescription'),
1358+ instance_type=instance_type,
1359+ storage=vsa.get('storage'),
1360+ shared=vsa.get('shared'),
1361+ availability_zone=vsa.get('placement', {}).\
1362+ get('AvailabilityZone'))
1363+
1364+ vsa = self.vsa_api.create(context, **args)
1365+
1366+ instances = self._get_instances_by_vsa_id(context, vsa.get('id'))
1367+ return {'vsa': _vsa_view(context, vsa, True, instances)}
1368+
1369+ def delete(self, req, id):
1370+ """Delete a VSA."""
1371+ context = req.environ['nova.context']
1372+
1373+ LOG.audit(_("Delete VSA with id: %s"), id, context=context)
1374+
1375+ try:
1376+ self.vsa_api.delete(context, vsa_id=id)
1377+ except exception.NotFound:
1378+ return faults.Fault(exc.HTTPNotFound())
1379+
1380+ def associate_address(self, req, id, body):
1381+ """ /zadr-vsa/{vsa_id}/associate_address
1382+ auto or manually associate an IP to VSA
1383+ """
1384+ context = req.environ['nova.context']
1385+
1386+ if body is None:
1387+ ip = 'auto'
1388+ else:
1389+ ip = body.get('ipAddress', 'auto')
1390+
1391+ LOG.audit(_("Associate address %(ip)s to VSA %(id)s"),
1392+ locals(), context=context)
1393+
1394+ try:
1395+ instances = self._get_instances_by_vsa_id(context, id)
1396+ if instances is None or len(instances) == 0:
1397+ return faults.Fault(exc.HTTPNotFound())
1398+
1399+ for instance in instances:
1400+ self.network_api.allocate_for_instance(context, instance,
1401+ vpn=False)
1402+ # Placeholder
1403+ return
1404+
1405+ except exception.NotFound:
1406+ return faults.Fault(exc.HTTPNotFound())
1407+
1408+ def disassociate_address(self, req, id, body):
1409+ """ /zadr-vsa/{vsa_id}/disassociate_address
1410+ auto or manually associate an IP to VSA
1411+ """
1412+ context = req.environ['nova.context']
1413+
1414+ if body is None:
1415+ ip = 'auto'
1416+ else:
1417+ ip = body.get('ipAddress', 'auto')
1418+
1419+ LOG.audit(_("Disassociate address from VSA %(id)s"),
1420+ locals(), context=context)
1421+ # Placeholder
1422+
1423+
1424+class VsaVolumeDriveController(volumes.VolumeController):
1425+ """The base class for VSA volumes & drives.
1426+
1427+ A child resource of the VSA object. Allows operations with
1428+ volumes and drives created to/from particular VSA
1429+
1430+ """
1431+
1432+ _serialization_metadata = {
1433+ 'application/xml': {
1434+ "attributes": {
1435+ "volume": [
1436+ "id",
1437+ "name",
1438+ "status",
1439+ "size",
1440+ "availabilityZone",
1441+ "createdAt",
1442+ "displayName",
1443+ "displayDescription",
1444+ "vsaId",
1445+ ]}}}
1446+
1447+ def __init__(self):
1448+ self.volume_api = volume.API()
1449+ self.vsa_api = vsa.API()
1450+ super(VsaVolumeDriveController, self).__init__()
1451+
1452+ def _translation(self, context, vol, vsa_id, details):
1453+ if details:
1454+ translation = volumes._translate_volume_detail_view
1455+ else:
1456+ translation = volumes._translate_volume_summary_view
1457+
1458+ d = translation(context, vol)
1459+ d['vsaId'] = vsa_id
1460+ d['name'] = vol['name']
1461+ return d
1462+
1463+ def _check_volume_ownership(self, context, vsa_id, id):
1464+ obj = self.object
1465+ try:
1466+ volume_ref = self.volume_api.get(context, volume_id=id)
1467+ except exception.NotFound:
1468+ LOG.error(_("%(obj)s with ID %(id)s not found"), locals())
1469+ raise
1470+
1471+ own_vsa_id = self.volume_api.get_volume_metadata_value(volume_ref,
1472+ self.direction)
1473+ if own_vsa_id != vsa_id:
1474+ LOG.error(_("%(obj)s with ID %(id)s belongs to VSA %(own_vsa_id)s"\
1475+ " and not to VSA %(vsa_id)s."), locals())
1476+ raise exception.Invalid()
1477+
1478+ def _items(self, req, vsa_id, details):
1479+ """Return summary or detailed list of volumes for particular VSA."""
1480+ context = req.environ['nova.context']
1481+
1482+ vols = self.volume_api.get_all(context,
1483+ search_opts={'metadata': {self.direction: str(vsa_id)}})
1484+ limited_list = common.limited(vols, req)
1485+
1486+ res = [self._translation(context, vol, vsa_id, details) \
1487+ for vol in limited_list]
1488+
1489+ return {self.objects: res}
1490+
1491+ def index(self, req, vsa_id):
1492+ """Return a short list of volumes created from particular VSA."""
1493+ LOG.audit(_("Index. vsa_id=%(vsa_id)s"), locals())
1494+ return self._items(req, vsa_id, details=False)
1495+
1496+ def detail(self, req, vsa_id):
1497+ """Return a detailed list of volumes created from particular VSA."""
1498+ LOG.audit(_("Detail. vsa_id=%(vsa_id)s"), locals())
1499+ return self._items(req, vsa_id, details=True)
1500+
1501+ def create(self, req, vsa_id, body):
1502+ """Create a new volume from VSA."""
1503+ LOG.audit(_("Create. vsa_id=%(vsa_id)s, body=%(body)s"), locals())
1504+ context = req.environ['nova.context']
1505+
1506+ if not body:
1507+ return faults.Fault(exc.HTTPUnprocessableEntity())
1508+
1509+ vol = body[self.object]
1510+ size = vol['size']
1511+ LOG.audit(_("Create volume of %(size)s GB from VSA ID %(vsa_id)s"),
1512+ locals(), context=context)
1513+ try:
1514+ # create is supported for volumes only (drives created through VSA)
1515+ volume_type = self.vsa_api.get_vsa_volume_type(context)
1516+ except exception.NotFound:
1517+ return faults.Fault(exc.HTTPNotFound())
1518+
1519+ new_volume = self.volume_api.create(context,
1520+ size,
1521+ None,
1522+ vol.get('displayName'),
1523+ vol.get('displayDescription'),
1524+ volume_type=volume_type,
1525+ metadata=dict(from_vsa_id=str(vsa_id)))
1526+
1527+ return {self.object: self._translation(context, new_volume,
1528+ vsa_id, True)}
1529+
1530+ def update(self, req, vsa_id, id, body):
1531+ """Update a volume."""
1532+ context = req.environ['nova.context']
1533+
1534+ try:
1535+ self._check_volume_ownership(context, vsa_id, id)
1536+ except exception.NotFound:
1537+ return faults.Fault(exc.HTTPNotFound())
1538+ except exception.Invalid:
1539+ return faults.Fault(exc.HTTPBadRequest())
1540+
1541+ vol = body[self.object]
1542+ updatable_fields = [{'displayName': 'display_name'},
1543+ {'displayDescription': 'display_description'},
1544+ {'status': 'status'},
1545+ {'providerLocation': 'provider_location'},
1546+ {'providerAuth': 'provider_auth'}]
1547+ changes = {}
1548+ for field in updatable_fields:
1549+ key = field.keys()[0]
1550+ val = field[key]
1551+ if key in vol:
1552+ changes[val] = vol[key]
1553+
1554+ obj = self.object
1555+ LOG.audit(_("Update %(obj)s with id: %(id)s, changes: %(changes)s"),
1556+ locals(), context=context)
1557+
1558+ try:
1559+ self.volume_api.update(context, volume_id=id, fields=changes)
1560+ except exception.NotFound:
1561+ return faults.Fault(exc.HTTPNotFound())
1562+ return exc.HTTPAccepted()
1563+
1564+ def delete(self, req, vsa_id, id):
1565+ """Delete a volume."""
1566+ context = req.environ['nova.context']
1567+
1568+ LOG.audit(_("Delete. vsa_id=%(vsa_id)s, id=%(id)s"), locals())
1569+
1570+ try:
1571+ self._check_volume_ownership(context, vsa_id, id)
1572+ except exception.NotFound:
1573+ return faults.Fault(exc.HTTPNotFound())
1574+ except exception.Invalid:
1575+ return faults.Fault(exc.HTTPBadRequest())
1576+
1577+ return super(VsaVolumeDriveController, self).delete(req, id)
1578+
1579+ def show(self, req, vsa_id, id):
1580+ """Return data about the given volume."""
1581+ context = req.environ['nova.context']
1582+
1583+ LOG.audit(_("Show. vsa_id=%(vsa_id)s, id=%(id)s"), locals())
1584+
1585+ try:
1586+ self._check_volume_ownership(context, vsa_id, id)
1587+ except exception.NotFound:
1588+ return faults.Fault(exc.HTTPNotFound())
1589+ except exception.Invalid:
1590+ return faults.Fault(exc.HTTPBadRequest())
1591+
1592+ return super(VsaVolumeDriveController, self).show(req, id)
1593+
1594+
1595+class VsaVolumeController(VsaVolumeDriveController):
1596+ """The VSA volume API controller for the Openstack API.
1597+
1598+ A child resource of the VSA object. Allows operations with volumes created
1599+ by particular VSA
1600+
1601+ """
1602+
1603+ def __init__(self):
1604+ self.direction = 'from_vsa_id'
1605+ self.objects = 'volumes'
1606+ self.object = 'volume'
1607+ super(VsaVolumeController, self).__init__()
1608+
1609+
1610+class VsaDriveController(VsaVolumeDriveController):
1611+ """The VSA Drive API controller for the Openstack API.
1612+
1613+ A child resource of the VSA object. Allows operations with drives created
1614+ for particular VSA
1615+
1616+ """
1617+
1618+ def __init__(self):
1619+ self.direction = 'to_vsa_id'
1620+ self.objects = 'drives'
1621+ self.object = 'drive'
1622+ super(VsaDriveController, self).__init__()
1623+
1624+ def create(self, req, vsa_id, body):
1625+ """Create a new drive for VSA. Should be done through VSA APIs"""
1626+ return faults.Fault(exc.HTTPBadRequest())
1627+
1628+ def update(self, req, vsa_id, id, body):
1629+ """Update a drive. Should be done through VSA APIs"""
1630+ return faults.Fault(exc.HTTPBadRequest())
1631+
1632+ def delete(self, req, vsa_id, id):
1633+ """Delete a volume. Should be done through VSA APIs"""
1634+ return faults.Fault(exc.HTTPBadRequest())
1635+
1636+
1637+class VsaVPoolController(object):
1638+ """The vPool VSA API controller for the OpenStack API."""
1639+
1640+ _serialization_metadata = {
1641+ 'application/xml': {
1642+ "attributes": {
1643+ "vpool": [
1644+ "id",
1645+ "vsaId",
1646+ "name",
1647+ "displayName",
1648+ "displayDescription",
1649+ "driveCount",
1650+ "driveIds",
1651+ "protection",
1652+ "stripeSize",
1653+ "stripeWidth",
1654+ "createTime",
1655+ "status",
1656+ ]}}}
1657+
1658+ def __init__(self):
1659+ self.vsa_api = vsa.API()
1660+ super(VsaVPoolController, self).__init__()
1661+
1662+ def index(self, req, vsa_id):
1663+ """Return a short list of vpools created from particular VSA."""
1664+ return {'vpools': []}
1665+
1666+ def create(self, req, vsa_id, body):
1667+ """Create a new vPool for VSA."""
1668+ return faults.Fault(exc.HTTPBadRequest())
1669+
1670+ def update(self, req, vsa_id, id, body):
1671+ """Update vPool parameters."""
1672+ return faults.Fault(exc.HTTPBadRequest())
1673+
1674+ def delete(self, req, vsa_id, id):
1675+ """Delete a vPool."""
1676+ return faults.Fault(exc.HTTPBadRequest())
1677+
1678+ def show(self, req, vsa_id, id):
1679+ """Return data about the given vPool."""
1680+ return faults.Fault(exc.HTTPBadRequest())
1681+
1682+
1683+class VsaVCController(servers.ControllerV11):
1684+ """The VSA Virtual Controller API controller for the OpenStack API."""
1685+
1686+ def __init__(self):
1687+ self.vsa_api = vsa.API()
1688+ self.compute_api = compute.API()
1689+ self.vsa_id = None # VP-TODO: temporary ugly hack
1690+ super(VsaVCController, self).__init__()
1691+
1692+ def _get_servers(self, req, is_detail):
1693+ """Returns a list of servers, taking into account any search
1694+ options specified.
1695+ """
1696+
1697+ if self.vsa_id is None:
1698+ super(VsaVCController, self)._get_servers(req, is_detail)
1699+
1700+ context = req.environ['nova.context']
1701+
1702+ search_opts = {'metadata': dict(vsa_id=str(self.vsa_id))}
1703+ instance_list = self.compute_api.get_all(
1704+ context, search_opts=search_opts)
1705+
1706+ limited_list = self._limit_items(instance_list, req)
1707+ servers = [self._build_view(req, inst, is_detail)['server']
1708+ for inst in limited_list]
1709+ return dict(servers=servers)
1710+
1711+ def index(self, req, vsa_id):
1712+ """Return list of instances for particular VSA."""
1713+
1714+ LOG.audit(_("Index instances for VSA %s"), vsa_id)
1715+
1716+ self.vsa_id = vsa_id # VP-TODO: temporary ugly hack
1717+ result = super(VsaVCController, self).detail(req)
1718+ self.vsa_id = None
1719+ return result
1720+
1721+ def create(self, req, vsa_id, body):
1722+ """Create a new instance for VSA."""
1723+ return faults.Fault(exc.HTTPBadRequest())
1724+
1725+ def update(self, req, vsa_id, id, body):
1726+ """Update VSA instance."""
1727+ return faults.Fault(exc.HTTPBadRequest())
1728+
1729+ def delete(self, req, vsa_id, id):
1730+ """Delete VSA instance."""
1731+ return faults.Fault(exc.HTTPBadRequest())
1732+
1733+ def show(self, req, vsa_id, id):
1734+ """Return data about the given instance."""
1735+ return super(VsaVCController, self).show(req, id)
1736+
1737+
1738+class Virtual_storage_arrays(extensions.ExtensionDescriptor):
1739+
1740+ def get_name(self):
1741+ return "VSAs"
1742+
1743+ def get_alias(self):
1744+ return "zadr-vsa"
1745+
1746+ def get_description(self):
1747+ return "Virtual Storage Arrays support"
1748+
1749+ def get_namespace(self):
1750+ return "http://docs.openstack.org/ext/vsa/api/v1.1"
1751+
1752+ def get_updated(self):
1753+ return "2011-08-25T00:00:00+00:00"
1754+
1755+ def get_resources(self):
1756+ resources = []
1757+ res = extensions.ResourceExtension(
1758+ 'zadr-vsa',
1759+ VsaController(),
1760+ collection_actions={'detail': 'GET'},
1761+ member_actions={'add_capacity': 'POST',
1762+ 'remove_capacity': 'POST',
1763+ 'associate_address': 'POST',
1764+ 'disassociate_address': 'POST'})
1765+ resources.append(res)
1766+
1767+ res = extensions.ResourceExtension('volumes',
1768+ VsaVolumeController(),
1769+ collection_actions={'detail': 'GET'},
1770+ parent=dict(
1771+ member_name='vsa',
1772+ collection_name='zadr-vsa'))
1773+ resources.append(res)
1774+
1775+ res = extensions.ResourceExtension('drives',
1776+ VsaDriveController(),
1777+ collection_actions={'detail': 'GET'},
1778+ parent=dict(
1779+ member_name='vsa',
1780+ collection_name='zadr-vsa'))
1781+ resources.append(res)
1782+
1783+ res = extensions.ResourceExtension('vpools',
1784+ VsaVPoolController(),
1785+ parent=dict(
1786+ member_name='vsa',
1787+ collection_name='zadr-vsa'))
1788+ resources.append(res)
1789+
1790+ res = extensions.ResourceExtension('instances',
1791+ VsaVCController(),
1792+ parent=dict(
1793+ member_name='vsa',
1794+ collection_name='zadr-vsa'))
1795+ resources.append(res)
1796+
1797+ return resources
1798
1799=== modified file 'nova/api/openstack/create_instance_helper.py'
1800--- nova/api/openstack/create_instance_helper.py 2011-08-23 04:17:57 +0000
1801+++ nova/api/openstack/create_instance_helper.py 2011-09-01 10:57:46 +0000
1802@@ -19,7 +19,6 @@
1803 from webob import exc
1804 from xml.dom import minidom
1805
1806-from nova import db
1807 from nova import exception
1808 from nova import flags
1809 from nova import log as logging
1810@@ -74,20 +73,17 @@
1811 if not 'server' in body:
1812 raise exc.HTTPUnprocessableEntity()
1813
1814+ context = req.environ['nova.context']
1815 server_dict = body['server']
1816- context = req.environ['nova.context']
1817 password = self.controller._get_server_admin_password(server_dict)
1818
1819- key_name = None
1820- key_data = None
1821- # TODO(vish): Key pair access should move into a common library
1822- # instead of being accessed directly from the db.
1823- key_pairs = db.key_pair_get_all_by_user(context.elevated(),
1824- context.user_id)
1825- if key_pairs:
1826- key_pair = key_pairs[0]
1827- key_name = key_pair['name']
1828- key_data = key_pair['public_key']
1829+ if not 'name' in server_dict:
1830+ msg = _("Server name is not defined")
1831+ raise exc.HTTPBadRequest(explanation=msg)
1832+
1833+ name = server_dict['name']
1834+ self._validate_server_name(name)
1835+ name = name.strip()
1836
1837 image_href = self.controller._image_ref_from_req_data(body)
1838 # If the image href was generated by nova api, strip image_href
1839@@ -133,12 +129,13 @@
1840 msg = _("Invalid flavorRef provided.")
1841 raise exc.HTTPBadRequest(explanation=msg)
1842
1843- if not 'name' in server_dict:
1844- msg = _("Server name is not defined")
1845- raise exc.HTTPBadRequest(explanation=msg)
1846-
1847 zone_blob = server_dict.get('blob')
1848+
1849+ # optional openstack extensions:
1850+ key_name = server_dict.get('key_name')
1851 user_data = server_dict.get('user_data')
1852+ self._validate_user_data(user_data)
1853+
1854 availability_zone = server_dict.get('availability_zone')
1855 name = server_dict['name']
1856 self._validate_server_name(name)
1857@@ -173,7 +170,6 @@
1858 display_name=name,
1859 display_description=name,
1860 key_name=key_name,
1861- key_data=key_data,
1862 metadata=server_dict.get('metadata', {}),
1863 access_ip_v4=server_dict.get('accessIPv4'),
1864 access_ip_v6=server_dict.get('accessIPv6'),
1865@@ -196,6 +192,9 @@
1866 except exception.FlavorNotFound as error:
1867 msg = _("Invalid flavorRef provided.")
1868 raise exc.HTTPBadRequest(explanation=msg)
1869+ except exception.KeypairNotFound as error:
1870+ msg = _("Invalid key_name provided.")
1871+ raise exc.HTTPBadRequest(explanation=msg)
1872 except exception.SecurityGroupNotFound as error:
1873 raise exc.HTTPBadRequest(explanation=unicode(error))
1874 except RemoteError as err:
1875@@ -370,6 +369,16 @@
1876
1877 return networks
1878
1879+ def _validate_user_data(self, user_data):
1880+ """Check if the user_data is encoded properly"""
1881+ if not user_data:
1882+ return
1883+ try:
1884+ user_data = base64.b64decode(user_data)
1885+ except TypeError:
1886+ expl = _('Userdata content cannot be decoded')
1887+ raise exc.HTTPBadRequest(explanation=expl)
1888+
1889
1890 class ServerXMLDeserializer(wsgi.XMLDeserializer):
1891 """
1892
1893=== modified file 'nova/api/openstack/servers.py'
1894--- nova/api/openstack/servers.py 2011-08-24 14:37:59 +0000
1895+++ nova/api/openstack/servers.py 2011-09-01 10:57:46 +0000
1896@@ -22,6 +22,7 @@
1897 import webob
1898
1899 from nova import compute
1900+from nova import db
1901 from nova import exception
1902 from nova import flags
1903 from nova import log as logging
1904@@ -95,17 +96,15 @@
1905 search_opts['recurse_zones'] = utils.bool_from_str(
1906 search_opts.get('recurse_zones', False))
1907
1908- # If search by 'status', we need to convert it to 'state'
1909- # If the status is unknown, bail.
1910- # Leave 'state' in search_opts so compute can pass it on to
1911- # child zones..
1912+ # If search by 'status', we need to convert it to 'vm_state'
1913+ # to pass on to child zones.
1914 if 'status' in search_opts:
1915 status = search_opts['status']
1916- search_opts['state'] = common.power_states_from_status(status)
1917- if len(search_opts['state']) == 0:
1918+ state = common.vm_state_from_status(status)
1919+ if state is None:
1920 reason = _('Invalid server status: %(status)s') % locals()
1921- LOG.error(reason)
1922 raise exception.InvalidInput(reason=reason)
1923+ search_opts['vm_state'] = state
1924
1925 # By default, compute's get_all() will return deleted instances.
1926 # If an admin hasn't specified a 'deleted' search option, we need
1927@@ -143,10 +142,16 @@
1928 except exception.NotFound:
1929 raise exc.HTTPNotFound()
1930
1931+ def _get_key_name(self, req, body):
1932+ """ Get default keypair if not set """
1933+ raise NotImplementedError()
1934+
1935 def create(self, req, body):
1936 """ Creates a new server for a given user """
1937+ if 'server' in body:
1938+ body['server']['key_name'] = self._get_key_name(req, body)
1939+
1940 extra_values = None
1941- result = None
1942 extra_values, instances = self.helper.create_instance(
1943 req, body, self.compute_api.create)
1944
1945@@ -564,6 +569,13 @@
1946 raise exc.HTTPNotFound()
1947 return webob.Response(status_int=202)
1948
1949+ def _get_key_name(self, req, body):
1950+ context = req.environ["nova.context"]
1951+ keypairs = db.key_pair_get_all_by_user(context,
1952+ context.user_id)
1953+ if keypairs:
1954+ return keypairs[0]['name']
1955+
1956 def _image_ref_from_req_data(self, data):
1957 return data['server']['imageId']
1958
1959@@ -608,9 +620,8 @@
1960
1961 try:
1962 self.compute_api.rebuild(context, instance_id, image_id, password)
1963- except exception.BuildInProgress:
1964- msg = _("Instance %s is currently being rebuilt.") % instance_id
1965- LOG.debug(msg)
1966+ except exception.RebuildRequiresActiveInstance:
1967+ msg = _("Instance %s must be active to rebuild.") % instance_id
1968 raise exc.HTTPConflict(explanation=msg)
1969
1970 return webob.Response(status_int=202)
1971@@ -635,6 +646,10 @@
1972 except exception.NotFound:
1973 raise exc.HTTPNotFound()
1974
1975+ def _get_key_name(self, req, body):
1976+ if 'server' in body:
1977+ return body['server'].get('key_name')
1978+
1979 def _image_ref_from_req_data(self, data):
1980 try:
1981 return data['server']['imageRef']
1982@@ -750,9 +765,8 @@
1983 self.compute_api.rebuild(context, instance_id, image_href,
1984 password, name=name, metadata=metadata,
1985 files_to_inject=personalities)
1986- except exception.BuildInProgress:
1987- msg = _("Instance %s is currently being rebuilt.") % instance_id
1988- LOG.debug(msg)
1989+ except exception.RebuildRequiresActiveInstance:
1990+ msg = _("Instance %s must be active to rebuild.") % instance_id
1991 raise exc.HTTPConflict(explanation=msg)
1992 except exception.InstanceNotFound:
1993 msg = _("Instance %s could not be found") % instance_id
1994
1995=== modified file 'nova/api/openstack/views/servers.py'
1996--- nova/api/openstack/views/servers.py 2011-08-23 04:17:57 +0000
1997+++ nova/api/openstack/views/servers.py 2011-09-01 10:57:46 +0000
1998@@ -21,13 +21,12 @@
1999 import os
2000
2001 from nova import exception
2002-import nova.compute
2003-import nova.context
2004 from nova.api.openstack import common
2005 from nova.api.openstack.views import addresses as addresses_view
2006 from nova.api.openstack.views import flavors as flavors_view
2007 from nova.api.openstack.views import images as images_view
2008 from nova import utils
2009+from nova.compute import vm_states
2010
2011
2012 class ViewBuilder(object):
2013@@ -61,17 +60,13 @@
2014
2015 def _build_detail(self, inst):
2016 """Returns a detailed model of a server."""
2017+ vm_state = inst.get('vm_state', vm_states.BUILDING)
2018+ task_state = inst.get('task_state')
2019
2020 inst_dict = {
2021 'id': inst['id'],
2022 'name': inst['display_name'],
2023- 'status': common.status_from_power_state(inst.get('state'))}
2024-
2025- ctxt = nova.context.get_admin_context()
2026- compute_api = nova.compute.API()
2027-
2028- if compute_api.has_finished_migration(ctxt, inst['uuid']):
2029- inst_dict['status'] = 'RESIZE-CONFIRM'
2030+ 'status': common.status_from_state(vm_state, task_state)}
2031
2032 # Return the metadata as a dictionary
2033 metadata = {}
2034@@ -188,6 +183,7 @@
2035 def _build_extra(self, response, inst):
2036 self._build_links(response, inst)
2037 response['uuid'] = inst['uuid']
2038+ response['key_name'] = inst.get('key_name', '')
2039 self._build_config_drive(response, inst)
2040
2041 def _build_links(self, response, inst):
2042
2043=== modified file 'nova/compute/api.py'
2044--- nova/compute/api.py 2011-08-24 16:23:20 +0000
2045+++ nova/compute/api.py 2011-09-01 10:57:46 +0000
2046@@ -19,13 +19,11 @@
2047
2048 """Handles all requests relating to instances (guest vms)."""
2049
2050-import eventlet
2051 import novaclient
2052 import re
2053 import time
2054
2055 from nova import block_device
2056-from nova import db
2057 from nova import exception
2058 from nova import flags
2059 import nova.image
2060@@ -37,6 +35,8 @@
2061 from nova import volume
2062 from nova.compute import instance_types
2063 from nova.compute import power_state
2064+from nova.compute import task_states
2065+from nova.compute import vm_states
2066 from nova.compute.utils import terminate_volumes
2067 from nova.scheduler import api as scheduler_api
2068 from nova.db import base
2069@@ -75,12 +75,18 @@
2070
2071
2072 def _is_able_to_shutdown(instance, instance_id):
2073- states = {'terminating': "Instance %s is already being terminated",
2074- 'migrating': "Instance %s is being migrated",
2075- 'stopping': "Instance %s is being stopped"}
2076- msg = states.get(instance['state_description'])
2077- if msg:
2078- LOG.warning(_(msg), instance_id)
2079+ vm_state = instance["vm_state"]
2080+ task_state = instance["task_state"]
2081+
2082+ valid_shutdown_states = [
2083+ vm_states.ACTIVE,
2084+ vm_states.REBUILDING,
2085+ vm_states.BUILDING,
2086+ ]
2087+
2088+ if vm_state not in valid_shutdown_states:
2089+ LOG.warn(_("Instance %(instance_id)s is not in an 'active' state. It "
2090+ "is currently %(vm_state)s. Shutdown aborted.") % locals())
2091 return False
2092
2093 return True
2094@@ -237,7 +243,7 @@
2095 self.ensure_default_security_group(context)
2096
2097 if key_data is None and key_name:
2098- key_pair = db.key_pair_get(context, context.user_id, key_name)
2099+ key_pair = self.db.key_pair_get(context, context.user_id, key_name)
2100 key_data = key_pair['public_key']
2101
2102 if reservation_id is None:
2103@@ -251,10 +257,10 @@
2104 'image_ref': image_href,
2105 'kernel_id': kernel_id or '',
2106 'ramdisk_id': ramdisk_id or '',
2107+ 'power_state': power_state.NOSTATE,
2108+ 'vm_state': vm_states.BUILDING,
2109 'config_drive_id': config_drive_id or '',
2110 'config_drive': config_drive or '',
2111- 'state': 0,
2112- 'state_description': 'scheduling',
2113 'user_id': context.user_id,
2114 'project_id': context.project_id,
2115 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
2116@@ -389,9 +395,9 @@
2117
2118 security_groups = []
2119 for security_group_name in security_group:
2120- group = db.security_group_get_by_name(context,
2121- context.project_id,
2122- security_group_name)
2123+ group = self.db.security_group_get_by_name(context,
2124+ context.project_id,
2125+ security_group_name)
2126 security_groups.append(group['id'])
2127
2128 for security_group_id in security_groups:
2129@@ -415,6 +421,8 @@
2130 updates['display_name'] = "Server %s" % instance_id
2131 instance['display_name'] = updates['display_name']
2132 updates['hostname'] = self.hostname_factory(instance)
2133+ updates['vm_state'] = vm_states.BUILDING
2134+ updates['task_state'] = task_states.SCHEDULING
2135
2136 instance = self.update(context, instance_id, **updates)
2137 return instance
2138@@ -551,8 +559,9 @@
2139 def has_finished_migration(self, context, instance_uuid):
2140 """Returns true if an instance has a finished migration."""
2141 try:
2142- db.migration_get_by_instance_and_status(context, instance_uuid,
2143- 'finished')
2144+ self.db.migration_get_by_instance_and_status(context,
2145+ instance_uuid,
2146+ 'finished')
2147 return True
2148 except exception.NotFound:
2149 return False
2150@@ -566,14 +575,15 @@
2151 :param context: the security context
2152 """
2153 try:
2154- db.security_group_get_by_name(context, context.project_id,
2155- 'default')
2156+ self.db.security_group_get_by_name(context,
2157+ context.project_id,
2158+ 'default')
2159 except exception.NotFound:
2160 values = {'name': 'default',
2161 'description': 'default',
2162 'user_id': context.user_id,
2163 'project_id': context.project_id}
2164- db.security_group_create(context, values)
2165+ self.db.security_group_create(context, values)
2166
2167 def trigger_security_group_rules_refresh(self, context, security_group_id):
2168 """Called when a rule is added to or removed from a security_group."""
2169@@ -638,7 +648,7 @@
2170 """Called when a rule is added to or removed from a security_group"""
2171
2172 hosts = [x['host'] for (x, idx)
2173- in db.service_get_all_compute_sorted(context)]
2174+ in self.db.service_get_all_compute_sorted(context)]
2175 for host in hosts:
2176 rpc.cast(context,
2177 self.db.queue_get_for(context, FLAGS.compute_topic, host),
2178@@ -666,11 +676,11 @@
2179
2180 def add_security_group(self, context, instance_id, security_group_name):
2181 """Add security group to the instance"""
2182- security_group = db.security_group_get_by_name(context,
2183- context.project_id,
2184- security_group_name)
2185+ security_group = self.db.security_group_get_by_name(context,
2186+ context.project_id,
2187+ security_group_name)
2188 # check if the server exists
2189- inst = db.instance_get(context, instance_id)
2190+ inst = self.db.instance_get(context, instance_id)
2191 #check if the security group is associated with the server
2192 if self._is_security_group_associated_with_server(security_group,
2193 instance_id):
2194@@ -682,21 +692,21 @@
2195 if inst['state'] != power_state.RUNNING:
2196 raise exception.InstanceNotRunning(instance_id=instance_id)
2197
2198- db.instance_add_security_group(context.elevated(),
2199- instance_id,
2200- security_group['id'])
2201+ self.db.instance_add_security_group(context.elevated(),
2202+ instance_id,
2203+ security_group['id'])
2204 rpc.cast(context,
2205- db.queue_get_for(context, FLAGS.compute_topic, inst['host']),
2206+ self.db.queue_get_for(context, FLAGS.compute_topic, inst['host']),
2207 {"method": "refresh_security_group_rules",
2208 "args": {"security_group_id": security_group['id']}})
2209
2210 def remove_security_group(self, context, instance_id, security_group_name):
2211 """Remove the security group associated with the instance"""
2212- security_group = db.security_group_get_by_name(context,
2213- context.project_id,
2214- security_group_name)
2215+ security_group = self.db.security_group_get_by_name(context,
2216+ context.project_id,
2217+ security_group_name)
2218 # check if the server exists
2219- inst = db.instance_get(context, instance_id)
2220+ inst = self.db.instance_get(context, instance_id)
2221 #check if the security group is associated with the server
2222 if not self._is_security_group_associated_with_server(security_group,
2223 instance_id):
2224@@ -708,11 +718,11 @@
2225 if inst['state'] != power_state.RUNNING:
2226 raise exception.InstanceNotRunning(instance_id=instance_id)
2227
2228- db.instance_remove_security_group(context.elevated(),
2229- instance_id,
2230- security_group['id'])
2231+ self.db.instance_remove_security_group(context.elevated(),
2232+ instance_id,
2233+ security_group['id'])
2234 rpc.cast(context,
2235- db.queue_get_for(context, FLAGS.compute_topic, inst['host']),
2236+ self.db.queue_get_for(context, FLAGS.compute_topic, inst['host']),
2237 {"method": "refresh_security_group_rules",
2238 "args": {"security_group_id": security_group['id']}})
2239
2240@@ -750,10 +760,8 @@
2241 return
2242
2243 self.update(context,
2244- instance['id'],
2245- state_description='terminating',
2246- state=0,
2247- terminated_at=utils.utcnow())
2248+ instance_id,
2249+ task_state=task_states.DELETING)
2250
2251 host = instance['host']
2252 if host:
2253@@ -773,9 +781,9 @@
2254 return
2255
2256 self.update(context,
2257- instance['id'],
2258- state_description='stopping',
2259- state=power_state.NOSTATE,
2260+ instance_id,
2261+ vm_state=vm_states.ACTIVE,
2262+ task_state=task_states.STOPPING,
2263 terminated_at=utils.utcnow())
2264
2265 host = instance['host']
2266@@ -787,12 +795,18 @@
2267 """Start an instance."""
2268 LOG.debug(_("Going to try to start %s"), instance_id)
2269 instance = self._get_instance(context, instance_id, 'starting')
2270- if instance['state_description'] != 'stopped':
2271- _state_description = instance['state_description']
2272+ vm_state = instance["vm_state"]
2273+
2274+ if vm_state != vm_states.STOPPED:
2275 LOG.warning(_("Instance %(instance_id)s is not "
2276- "stopped(%(_state_description)s)") % locals())
2277+ "stopped. (%(vm_state)s)") % locals())
2278 return
2279
2280+ self.update(context,
2281+ instance_id,
2282+ vm_state=vm_states.STOPPED,
2283+ task_state=task_states.STARTING)
2284+
2285 # TODO(yamahata): injected_files isn't supported right now.
2286 # It is used only for osapi. not for ec2 api.
2287 # availability_zone isn't used by run_instance.
2288@@ -802,6 +816,15 @@
2289 "args": {"topic": FLAGS.compute_topic,
2290 "instance_id": instance_id}})
2291
2292+ def get_active_by_window(self, context, begin, end=None, project_id=None):
2293+ """Get instances that were continuously active over a window."""
2294+ return self.db.instance_get_active_by_window(context, begin, end,
2295+ project_id)
2296+
2297+ def get_instance_type(self, context, instance_type_id):
2298+ """Get an instance type by instance type id."""
2299+ return self.db.instance_type_get(context, instance_type_id)
2300+
2301 def get(self, context, instance_id):
2302 """Get a single instance with the given instance_id."""
2303 # NOTE(sirp): id used to be exclusively integer IDs; now we're
2304@@ -1001,7 +1024,7 @@
2305 :param extra_properties: dict of extra image properties to include
2306
2307 """
2308- instance = db.api.instance_get(context, instance_id)
2309+ instance = self.db.instance_get(context, instance_id)
2310 properties = {'instance_uuid': instance['uuid'],
2311 'user_id': str(context.user_id),
2312 'image_state': 'creating',
2313@@ -1020,32 +1043,39 @@
2314 @scheduler_api.reroute_compute("reboot")
2315 def reboot(self, context, instance_id):
2316 """Reboot the given instance."""
2317+ self.update(context,
2318+ instance_id,
2319+ vm_state=vm_states.ACTIVE,
2320+ task_state=task_states.REBOOTING)
2321 self._cast_compute_message('reboot_instance', context, instance_id)
2322
2323 @scheduler_api.reroute_compute("rebuild")
2324 def rebuild(self, context, instance_id, image_href, admin_password,
2325 name=None, metadata=None, files_to_inject=None):
2326 """Rebuild the given instance with the provided metadata."""
2327- instance = db.api.instance_get(context, instance_id)
2328+ instance = self.db.instance_get(context, instance_id)
2329+ name = name or instance["display_name"]
2330
2331- if instance["state"] == power_state.BUILDING:
2332- msg = _("Instance already building")
2333- raise exception.BuildInProgress(msg)
2334+ if instance["vm_state"] != vm_states.ACTIVE:
2335+ msg = _("Instance must be active to rebuild.")
2336+ raise exception.RebuildRequiresActiveInstance(msg)
2337
2338 files_to_inject = files_to_inject or []
2339+ metadata = metadata or {}
2340+
2341 self._check_injected_file_quota(context, files_to_inject)
2342+ self._check_metadata_properties_quota(context, metadata)
2343
2344- values = {}
2345- if metadata is not None:
2346- self._check_metadata_properties_quota(context, metadata)
2347- values['metadata'] = metadata
2348- if name is not None:
2349- values['display_name'] = name
2350- self.db.instance_update(context, instance_id, values)
2351+ self.update(context,
2352+ instance_id,
2353+ metadata=metadata,
2354+ display_name=name,
2355+ image_ref=image_href,
2356+ vm_state=vm_states.ACTIVE,
2357+ task_state=task_states.REBUILDING)
2358
2359 rebuild_params = {
2360 "new_pass": admin_password,
2361- "image_ref": image_href,
2362 "injected_files": files_to_inject,
2363 }
2364
2365@@ -1066,6 +1096,11 @@
2366 raise exception.MigrationNotFoundByStatus(instance_id=instance_id,
2367 status='finished')
2368
2369+ self.update(context,
2370+ instance_id,
2371+ vm_state=vm_states.ACTIVE,
2372+ task_state=None)
2373+
2374 params = {'migration_id': migration_ref['id']}
2375 self._cast_compute_message('revert_resize', context,
2376 instance_ref['uuid'],
2377@@ -1086,6 +1121,12 @@
2378 if not migration_ref:
2379 raise exception.MigrationNotFoundByStatus(instance_id=instance_id,
2380 status='finished')
2381+
2382+ self.update(context,
2383+ instance_id,
2384+ vm_state=vm_states.ACTIVE,
2385+ task_state=None)
2386+
2387 params = {'migration_id': migration_ref['id']}
2388 self._cast_compute_message('confirm_resize', context,
2389 instance_ref['uuid'],
2390@@ -1131,6 +1172,11 @@
2391 if (current_memory_mb == new_memory_mb) and flavor_id:
2392 raise exception.CannotResizeToSameSize()
2393
2394+ self.update(context,
2395+ instance_id,
2396+ vm_state=vm_states.RESIZING,
2397+ task_state=task_states.RESIZE_PREP)
2398+
2399 instance_ref = self._get_instance(context, instance_id, 'resize')
2400 self._cast_scheduler_message(context,
2401 {"method": "prep_resize",
2402@@ -1164,11 +1210,19 @@
2403 @scheduler_api.reroute_compute("pause")
2404 def pause(self, context, instance_id):
2405 """Pause the given instance."""
2406+ self.update(context,
2407+ instance_id,
2408+ vm_state=vm_states.ACTIVE,
2409+ task_state=task_states.PAUSING)
2410 self._cast_compute_message('pause_instance', context, instance_id)
2411
2412 @scheduler_api.reroute_compute("unpause")
2413 def unpause(self, context, instance_id):
2414 """Unpause the given instance."""
2415+ self.update(context,
2416+ instance_id,
2417+ vm_state=vm_states.PAUSED,
2418+ task_state=task_states.UNPAUSING)
2419 self._cast_compute_message('unpause_instance', context, instance_id)
2420
2421 def _call_compute_message_for_host(self, action, context, host, params):
2422@@ -1201,21 +1255,37 @@
2423 @scheduler_api.reroute_compute("suspend")
2424 def suspend(self, context, instance_id):
2425 """Suspend the given instance."""
2426+ self.update(context,
2427+ instance_id,
2428+ vm_state=vm_states.ACTIVE,
2429+ task_state=task_states.SUSPENDING)
2430 self._cast_compute_message('suspend_instance', context, instance_id)
2431
2432 @scheduler_api.reroute_compute("resume")
2433 def resume(self, context, instance_id):
2434 """Resume the given instance."""
2435+ self.update(context,
2436+ instance_id,
2437+ vm_state=vm_states.SUSPENDED,
2438+ task_state=task_states.RESUMING)
2439 self._cast_compute_message('resume_instance', context, instance_id)
2440
2441 @scheduler_api.reroute_compute("rescue")
2442 def rescue(self, context, instance_id):
2443 """Rescue the given instance."""
2444+ self.update(context,
2445+ instance_id,
2446+ vm_state=vm_states.ACTIVE,
2447+ task_state=task_states.RESCUING)
2448 self._cast_compute_message('rescue_instance', context, instance_id)
2449
2450 @scheduler_api.reroute_compute("unrescue")
2451 def unrescue(self, context, instance_id):
2452 """Unrescue the given instance."""
2453+ self.update(context,
2454+ instance_id,
2455+ vm_state=vm_states.RESCUED,
2456+ task_state=task_states.UNRESCUING)
2457 self._cast_compute_message('unrescue_instance', context, instance_id)
2458
2459 @scheduler_api.reroute_compute("set_admin_password")
2460
2461=== modified file 'nova/compute/manager.py'
2462--- nova/compute/manager.py 2011-08-24 14:45:53 +0000
2463+++ nova/compute/manager.py 2011-09-01 10:57:46 +0000
2464@@ -56,6 +56,8 @@
2465 from nova import utils
2466 from nova import volume
2467 from nova.compute import power_state
2468+from nova.compute import task_states
2469+from nova.compute import vm_states
2470 from nova.notifier import api as notifier
2471 from nova.compute.utils import terminate_volumes
2472 from nova.virt import driver
2473@@ -146,6 +148,10 @@
2474 super(ComputeManager, self).__init__(service_name="compute",
2475 *args, **kwargs)
2476
2477+ def _instance_update(self, context, instance_id, **kwargs):
2478+ """Update an instance in the database using kwargs as value."""
2479+ return self.db.instance_update(context, instance_id, kwargs)
2480+
2481 def init_host(self):
2482 """Initialization for a standalone compute service."""
2483 self.driver.init_host(host=self.host)
2484@@ -153,8 +159,8 @@
2485 instances = self.db.instance_get_all_by_host(context, self.host)
2486 for instance in instances:
2487 inst_name = instance['name']
2488- db_state = instance['state']
2489- drv_state = self._update_state(context, instance['id'])
2490+ db_state = instance['power_state']
2491+ drv_state = self._get_power_state(context, instance)
2492
2493 expect_running = db_state == power_state.RUNNING \
2494 and drv_state != db_state
2495@@ -177,34 +183,13 @@
2496 LOG.warning(_('Hypervisor driver does not '
2497 'support firewall rules'))
2498
2499- def _update_state(self, context, instance_id, state=None):
2500- """Update the state of an instance from the driver info."""
2501- instance_ref = self.db.instance_get(context, instance_id)
2502-
2503- if state is None:
2504- try:
2505- LOG.debug(_('Checking state of %s'), instance_ref['name'])
2506- info = self.driver.get_info(instance_ref['name'])
2507- except exception.NotFound:
2508- info = None
2509-
2510- if info is not None:
2511- state = info['state']
2512- else:
2513- state = power_state.FAILED
2514-
2515- self.db.instance_set_state(context, instance_id, state)
2516- return state
2517-
2518- def _update_launched_at(self, context, instance_id, launched_at=None):
2519- """Update the launched_at parameter of the given instance."""
2520- data = {'launched_at': launched_at or utils.utcnow()}
2521- self.db.instance_update(context, instance_id, data)
2522-
2523- def _update_image_ref(self, context, instance_id, image_ref):
2524- """Update the image_id for the given instance."""
2525- data = {'image_ref': image_ref}
2526- self.db.instance_update(context, instance_id, data)
2527+ def _get_power_state(self, context, instance):
2528+ """Retrieve the power state for the given instance."""
2529+ LOG.debug(_('Checking state of %s'), instance['name'])
2530+ try:
2531+ return self.driver.get_info(instance['name'])["state"]
2532+ except exception.NotFound:
2533+ return power_state.FAILED
2534
2535 def get_console_topic(self, context, **kwargs):
2536 """Retrieves the console host for a project on this host.
2537@@ -256,11 +241,6 @@
2538
2539 def _setup_block_device_mapping(self, context, instance_id):
2540 """setup volumes for block device mapping"""
2541- self.db.instance_set_state(context,
2542- instance_id,
2543- power_state.NOSTATE,
2544- 'block_device_mapping')
2545-
2546 volume_api = volume.API()
2547 block_device_mapping = []
2548 swap = None
2549@@ -394,17 +374,12 @@
2550 updates = {}
2551 updates['host'] = self.host
2552 updates['launched_on'] = self.host
2553- instance = self.db.instance_update(context,
2554- instance_id,
2555- updates)
2556+ updates['vm_state'] = vm_states.BUILDING
2557+ updates['task_state'] = task_states.NETWORKING
2558+ instance = self.db.instance_update(context, instance_id, updates)
2559 instance['injected_files'] = kwargs.get('injected_files', [])
2560 instance['admin_pass'] = kwargs.get('admin_password', None)
2561
2562- self.db.instance_set_state(context,
2563- instance_id,
2564- power_state.NOSTATE,
2565- 'networking')
2566-
2567 is_vpn = instance['image_ref'] == str(FLAGS.vpn_image_id)
2568 try:
2569 # NOTE(vish): This could be a cast because we don't do anything
2570@@ -423,6 +398,11 @@
2571 # all vif creation and network injection, maybe this is correct
2572 network_info = []
2573
2574+ self._instance_update(context,
2575+ instance_id,
2576+ vm_state=vm_states.BUILDING,
2577+ task_state=task_states.BLOCK_DEVICE_MAPPING)
2578+
2579 (swap, ephemerals,
2580 block_device_mapping) = self._setup_block_device_mapping(
2581 context, instance_id)
2582@@ -432,9 +412,12 @@
2583 'ephemerals': ephemerals,
2584 'block_device_mapping': block_device_mapping}
2585
2586+ self._instance_update(context,
2587+ instance_id,
2588+ vm_state=vm_states.BUILDING,
2589+ task_state=task_states.SPAWNING)
2590+
2591 # TODO(vish) check to make sure the availability zone matches
2592- self._update_state(context, instance_id, power_state.BUILDING)
2593-
2594 try:
2595 self.driver.spawn(context, instance,
2596 network_info, block_device_info)
2597@@ -443,13 +426,21 @@
2598 "virtualization enabled in the BIOS? Details: "
2599 "%(ex)s") % locals()
2600 LOG.exception(msg)
2601-
2602- self._update_launched_at(context, instance_id)
2603- self._update_state(context, instance_id)
2604+ return
2605+
2606+ current_power_state = self._get_power_state(context, instance)
2607+ self._instance_update(context,
2608+ instance_id,
2609+ power_state=current_power_state,
2610+ vm_state=vm_states.ACTIVE,
2611+ task_state=None,
2612+ launched_at=utils.utcnow())
2613+
2614 usage_info = utils.usage_from_instance(instance)
2615 notifier.notify('compute.%s' % self.host,
2616 'compute.instance.create',
2617 notifier.INFO, usage_info)
2618+
2619 except exception.InstanceNotFound:
2620 # FIXME(wwolf): We are just ignoring InstanceNotFound
2621 # exceptions here in case the instance was immediately
2622@@ -485,8 +476,7 @@
2623 for volume in volumes:
2624 self._detach_volume(context, instance_id, volume['id'], False)
2625
2626- if (instance['state'] == power_state.SHUTOFF and
2627- instance['state_description'] != 'stopped'):
2628+ if instance['power_state'] == power_state.SHUTOFF:
2629 self.db.instance_destroy(context, instance_id)
2630 raise exception.Error(_('trying to destroy already destroyed'
2631 ' instance: %s') % instance_id)
2632@@ -501,9 +491,14 @@
2633 """Terminate an instance on this host."""
2634 self._shutdown_instance(context, instance_id, 'Terminating')
2635 instance = self.db.instance_get(context.elevated(), instance_id)
2636+ self._instance_update(context,
2637+ instance_id,
2638+ vm_state=vm_states.DELETED,
2639+ task_state=None,
2640+ terminated_at=utils.utcnow())
2641
2642- # TODO(ja): should we keep it in a terminated state for a bit?
2643 self.db.instance_destroy(context, instance_id)
2644+
2645 usage_info = utils.usage_from_instance(instance)
2646 notifier.notify('compute.%s' % self.host,
2647 'compute.instance.delete',
2648@@ -514,7 +509,10 @@
2649 def stop_instance(self, context, instance_id):
2650 """Stopping an instance on this host."""
2651 self._shutdown_instance(context, instance_id, 'Stopping')
2652- # instance state will be updated to stopped by _poll_instance_states()
2653+ self._instance_update(context,
2654+ instance_id,
2655+ vm_state=vm_states.STOPPED,
2656+ task_state=None)
2657
2658 @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
2659 @checks_instance_lock
2660@@ -526,7 +524,7 @@
2661
2662 :param context: `nova.RequestContext` object
2663 :param instance_id: Instance identifier (integer)
2664- :param image_ref: Image identifier (href or integer)
2665+ :param injected_files: Files to inject
2666 :param new_pass: password to set on rebuilt instance
2667 """
2668 context = context.elevated()
2669@@ -534,29 +532,46 @@
2670 instance_ref = self.db.instance_get(context, instance_id)
2671 LOG.audit(_("Rebuilding instance %s"), instance_id, context=context)
2672
2673- self._update_state(context, instance_id, power_state.BUILDING)
2674+ current_power_state = self._get_power_state(context, instance_ref)
2675+ self._instance_update(context,
2676+ instance_id,
2677+ power_state=current_power_state,
2678+ vm_state=vm_states.REBUILDING,
2679+ task_state=None)
2680
2681 network_info = self._get_instance_nw_info(context, instance_ref)
2682-
2683 self.driver.destroy(instance_ref, network_info)
2684- image_ref = kwargs.get('image_ref')
2685- instance_ref.image_ref = image_ref
2686+
2687+ self._instance_update(context,
2688+ instance_id,
2689+ vm_state=vm_states.REBUILDING,
2690+ task_state=task_states.BLOCK_DEVICE_MAPPING)
2691+
2692 instance_ref.injected_files = kwargs.get('injected_files', [])
2693 network_info = self.network_api.get_instance_nw_info(context,
2694 instance_ref)
2695 bd_mapping = self._setup_block_device_mapping(context, instance_id)
2696
2697+ self._instance_update(context,
2698+ instance_id,
2699+ vm_state=vm_states.REBUILDING,
2700+ task_state=task_states.SPAWNING)
2701+
2702 # pull in new password here since the original password isn't in the db
2703 instance_ref.admin_pass = kwargs.get('new_pass',
2704 utils.generate_password(FLAGS.password_length))
2705
2706 self.driver.spawn(context, instance_ref, network_info, bd_mapping)
2707
2708- self._update_image_ref(context, instance_id, image_ref)
2709- self._update_launched_at(context, instance_id)
2710- self._update_state(context, instance_id)
2711- usage_info = utils.usage_from_instance(instance_ref,
2712- image_ref=image_ref)
2713+ current_power_state = self._get_power_state(context, instance_ref)
2714+ self._instance_update(context,
2715+ instance_id,
2716+ power_state=current_power_state,
2717+ vm_state=vm_states.ACTIVE,
2718+ task_state=None,
2719+ launched_at=utils.utcnow())
2720+
2721+ usage_info = utils.usage_from_instance(instance_ref)
2722 notifier.notify('compute.%s' % self.host,
2723 'compute.instance.rebuild',
2724 notifier.INFO,
2725@@ -566,26 +581,34 @@
2726 @checks_instance_lock
2727 def reboot_instance(self, context, instance_id):
2728 """Reboot an instance on this host."""
2729- context = context.elevated()
2730- self._update_state(context, instance_id)
2731- instance_ref = self.db.instance_get(context, instance_id)
2732 LOG.audit(_("Rebooting instance %s"), instance_id, context=context)
2733-
2734- if instance_ref['state'] != power_state.RUNNING:
2735- state = instance_ref['state']
2736+ context = context.elevated()
2737+ instance_ref = self.db.instance_get(context, instance_id)
2738+
2739+ current_power_state = self._get_power_state(context, instance_ref)
2740+ self._instance_update(context,
2741+ instance_id,
2742+ power_state=current_power_state,
2743+ vm_state=vm_states.ACTIVE,
2744+ task_state=task_states.REBOOTING)
2745+
2746+ if instance_ref['power_state'] != power_state.RUNNING:
2747+ state = instance_ref['power_state']
2748 running = power_state.RUNNING
2749 LOG.warn(_('trying to reboot a non-running '
2750 'instance: %(instance_id)s (state: %(state)s '
2751 'expected: %(running)s)') % locals(),
2752 context=context)
2753
2754- self.db.instance_set_state(context,
2755- instance_id,
2756- power_state.NOSTATE,
2757- 'rebooting')
2758 network_info = self._get_instance_nw_info(context, instance_ref)
2759 self.driver.reboot(instance_ref, network_info)
2760- self._update_state(context, instance_id)
2761+
2762+ current_power_state = self._get_power_state(context, instance_ref)
2763+ self._instance_update(context,
2764+ instance_id,
2765+ power_state=current_power_state,
2766+ vm_state=vm_states.ACTIVE,
2767+ task_state=None)
2768
2769 @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
2770 def snapshot_instance(self, context, instance_id, image_id,
2771@@ -601,37 +624,45 @@
2772 :param rotation: int representing how many backups to keep around;
2773 None if rotation shouldn't be used (as in the case of snapshots)
2774 """
2775+ if image_type == "snapshot":
2776+ task_state = task_states.IMAGE_SNAPSHOT
2777+ elif image_type == "backup":
2778+ task_state = task_states.IMAGE_BACKUP
2779+ else:
2780+ raise Exception(_('Image type not recognized %s') % image_type)
2781+
2782 context = context.elevated()
2783 instance_ref = self.db.instance_get(context, instance_id)
2784
2785- #NOTE(sirp): update_state currently only refreshes the state field
2786- # if we add is_snapshotting, we will need this refreshed too,
2787- # potentially?
2788- self._update_state(context, instance_id)
2789+ current_power_state = self._get_power_state(context, instance_ref)
2790+ self._instance_update(context,
2791+ instance_id,
2792+ power_state=current_power_state,
2793+ vm_state=vm_states.ACTIVE,
2794+ task_state=task_state)
2795
2796 LOG.audit(_('instance %s: snapshotting'), instance_id,
2797 context=context)
2798- if instance_ref['state'] != power_state.RUNNING:
2799- state = instance_ref['state']
2800+
2801+ if instance_ref['power_state'] != power_state.RUNNING:
2802+ state = instance_ref['power_state']
2803 running = power_state.RUNNING
2804 LOG.warn(_('trying to snapshot a non-running '
2805 'instance: %(instance_id)s (state: %(state)s '
2806 'expected: %(running)s)') % locals())
2807
2808 self.driver.snapshot(context, instance_ref, image_id)
2809-
2810- if image_type == 'snapshot':
2811- if rotation:
2812- raise exception.ImageRotationNotAllowed()
2813+ self._instance_update(context, instance_id, task_state=None)
2814+
2815+ if image_type == 'snapshot' and rotation:
2816+ raise exception.ImageRotationNotAllowed()
2817+
2818+ elif image_type == 'backup' and rotation:
2819+ instance_uuid = instance_ref['uuid']
2820+ self.rotate_backups(context, instance_uuid, backup_type, rotation)
2821+
2822 elif image_type == 'backup':
2823- if rotation:
2824- instance_uuid = instance_ref['uuid']
2825- self.rotate_backups(context, instance_uuid, backup_type,
2826- rotation)
2827- else:
2828- raise exception.RotationRequiredForBackup()
2829- else:
2830- raise Exception(_('Image type not recognized %s') % image_type)
2831+ raise exception.RotationRequiredForBackup()
2832
2833 def rotate_backups(self, context, instance_uuid, backup_type, rotation):
2834 """Delete excess backups associated to an instance.
2835@@ -699,7 +730,7 @@
2836 for i in xrange(max_tries):
2837 instance_ref = self.db.instance_get(context, instance_id)
2838 instance_id = instance_ref["id"]
2839- instance_state = instance_ref["state"]
2840+ instance_state = instance_ref["power_state"]
2841 expected_state = power_state.RUNNING
2842
2843 if instance_state != expected_state:
2844@@ -734,7 +765,7 @@
2845 context = context.elevated()
2846 instance_ref = self.db.instance_get(context, instance_id)
2847 instance_id = instance_ref['id']
2848- instance_state = instance_ref['state']
2849+ instance_state = instance_ref['power_state']
2850 expected_state = power_state.RUNNING
2851 if instance_state != expected_state:
2852 LOG.warn(_('trying to inject a file into a non-running '
2853@@ -752,7 +783,7 @@
2854 context = context.elevated()
2855 instance_ref = self.db.instance_get(context, instance_id)
2856 instance_id = instance_ref['id']
2857- instance_state = instance_ref['state']
2858+ instance_state = instance_ref['power_state']
2859 expected_state = power_state.RUNNING
2860 if instance_state != expected_state:
2861 LOG.warn(_('trying to update agent on a non-running '
2862@@ -767,40 +798,41 @@
2863 @checks_instance_lock
2864 def rescue_instance(self, context, instance_id):
2865 """Rescue an instance on this host."""
2866- context = context.elevated()
2867- instance_ref = self.db.instance_get(context, instance_id)
2868 LOG.audit(_('instance %s: rescuing'), instance_id, context=context)
2869- self.db.instance_set_state(context,
2870- instance_id,
2871- power_state.NOSTATE,
2872- 'rescuing')
2873- _update_state = lambda result: self._update_state_callback(
2874- self, context, instance_id, result)
2875+ context = context.elevated()
2876+
2877+ instance_ref = self.db.instance_get(context, instance_id)
2878 network_info = self._get_instance_nw_info(context, instance_ref)
2879- self.driver.rescue(context, instance_ref, _update_state, network_info)
2880- self._update_state(context, instance_id)
2881+
2882+ # NOTE(blamar): None of the virt drivers use the 'callback' param
2883+ self.driver.rescue(context, instance_ref, None, network_info)
2884+
2885+ current_power_state = self._get_power_state(context, instance_ref)
2886+ self._instance_update(context,
2887+ instance_id,
2888+ vm_state=vm_states.RESCUED,
2889+ task_state=None,
2890+ power_state=current_power_state)
2891
2892 @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
2893 @checks_instance_lock
2894 def unrescue_instance(self, context, instance_id):
2895 """Rescue an instance on this host."""
2896- context = context.elevated()
2897- instance_ref = self.db.instance_get(context, instance_id)
2898 LOG.audit(_('instance %s: unrescuing'), instance_id, context=context)
2899- self.db.instance_set_state(context,
2900- instance_id,
2901- power_state.NOSTATE,
2902- 'unrescuing')
2903- _update_state = lambda result: self._update_state_callback(
2904- self, context, instance_id, result)
2905+ context = context.elevated()
2906+
2907+ instance_ref = self.db.instance_get(context, instance_id)
2908 network_info = self._get_instance_nw_info(context, instance_ref)
2909- self.driver.unrescue(instance_ref, _update_state, network_info)
2910- self._update_state(context, instance_id)
2911-
2912- @staticmethod
2913- def _update_state_callback(self, context, instance_id, result):
2914- """Update instance state when async task completes."""
2915- self._update_state(context, instance_id)
2916+
2917+ # NOTE(blamar): None of the virt drivers use the 'callback' param
2918+ self.driver.unrescue(instance_ref, None, network_info)
2919+
2920+ current_power_state = self._get_power_state(context, instance_ref)
2921+ self._instance_update(context,
2922+ instance_id,
2923+ vm_state=vm_states.ACTIVE,
2924+ task_state=None,
2925+ power_state=current_power_state)
2926
2927 @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
2928 @checks_instance_lock
2929@@ -859,11 +891,12 @@
2930
2931 # Just roll back the record. There's no need to resize down since
2932 # the 'old' VM already has the preferred attributes
2933- self.db.instance_update(context, instance_ref['uuid'],
2934- dict(memory_mb=instance_type['memory_mb'],
2935- vcpus=instance_type['vcpus'],
2936- local_gb=instance_type['local_gb'],
2937- instance_type_id=instance_type['id']))
2938+ self._instance_update(context,
2939+ instance_ref["uuid"],
2940+ memory_mb=instance_type['memory_mb'],
2941+ vcpus=instance_type['vcpus'],
2942+ local_gb=instance_type['local_gb'],
2943+ instance_type_id=instance_type['id'])
2944
2945 self.driver.revert_migration(instance_ref)
2946 self.db.migration_update(context, migration_id,
2947@@ -890,8 +923,11 @@
2948 instance_ref = self.db.instance_get_by_uuid(context, instance_id)
2949
2950 if instance_ref['host'] == FLAGS.host:
2951- raise exception.Error(_(
2952- 'Migration error: destination same as source!'))
2953+ self._instance_update(context,
2954+ instance_id,
2955+ vm_state=vm_states.ERROR)
2956+ msg = _('Migration error: destination same as source!')
2957+ raise exception.Error(msg)
2958
2959 old_instance_type = self.db.instance_type_get(context,
2960 instance_ref['instance_type_id'])
2961@@ -985,6 +1021,11 @@
2962 self.driver.finish_migration(context, instance_ref, disk_info,
2963 network_info, resize_instance)
2964
2965+ self._instance_update(context,
2966+ instance_id,
2967+ vm_state=vm_states.ACTIVE,
2968+ task_state=task_states.RESIZE_VERIFY)
2969+
2970 self.db.migration_update(context, migration_id,
2971 {'status': 'finished', })
2972
2973@@ -1016,35 +1057,35 @@
2974 @checks_instance_lock
2975 def pause_instance(self, context, instance_id):
2976 """Pause an instance on this host."""
2977- context = context.elevated()
2978- instance_ref = self.db.instance_get(context, instance_id)
2979 LOG.audit(_('instance %s: pausing'), instance_id, context=context)
2980- self.db.instance_set_state(context,
2981- instance_id,
2982- power_state.NOSTATE,
2983- 'pausing')
2984- self.driver.pause(instance_ref,
2985- lambda result: self._update_state_callback(self,
2986- context,
2987- instance_id,
2988- result))
2989+ context = context.elevated()
2990+
2991+ instance_ref = self.db.instance_get(context, instance_id)
2992+ self.driver.pause(instance_ref, lambda result: None)
2993+
2994+ current_power_state = self._get_power_state(context, instance_ref)
2995+ self._instance_update(context,
2996+ instance_id,
2997+ power_state=current_power_state,
2998+ vm_state=vm_states.PAUSED,
2999+ task_state=None)
3000
3001 @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
3002 @checks_instance_lock
3003 def unpause_instance(self, context, instance_id):
3004 """Unpause a paused instance on this host."""
3005- context = context.elevated()
3006- instance_ref = self.db.instance_get(context, instance_id)
3007 LOG.audit(_('instance %s: unpausing'), instance_id, context=context)
3008- self.db.instance_set_state(context,
3009- instance_id,
3010- power_state.NOSTATE,
3011- 'unpausing')
3012- self.driver.unpause(instance_ref,
3013- lambda result: self._update_state_callback(self,
3014- context,
3015- instance_id,
3016- result))
3017+ context = context.elevated()
3018+
3019+ instance_ref = self.db.instance_get(context, instance_id)
3020+ self.driver.unpause(instance_ref, lambda result: None)
3021+
3022+ current_power_state = self._get_power_state(context, instance_ref)
3023+ self._instance_update(context,
3024+ instance_id,
3025+ power_state=current_power_state,
3026+ vm_state=vm_states.ACTIVE,
3027+ task_state=None)
3028
3029 @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
3030 def host_power_action(self, context, host=None, action=None):
3031@@ -1060,7 +1101,7 @@
3032 def get_diagnostics(self, context, instance_id):
3033 """Retrieve diagnostics for an instance on this host."""
3034 instance_ref = self.db.instance_get(context, instance_id)
3035- if instance_ref["state"] == power_state.RUNNING:
3036+ if instance_ref["power_state"] == power_state.RUNNING:
3037 LOG.audit(_("instance %s: retrieving diagnostics"), instance_id,
3038 context=context)
3039 return self.driver.get_diagnostics(instance_ref)
3040@@ -1069,33 +1110,35 @@
3041 @checks_instance_lock
3042 def suspend_instance(self, context, instance_id):
3043 """Suspend the given instance."""
3044- context = context.elevated()
3045- instance_ref = self.db.instance_get(context, instance_id)
3046 LOG.audit(_('instance %s: suspending'), instance_id, context=context)
3047- self.db.instance_set_state(context, instance_id,
3048- power_state.NOSTATE,
3049- 'suspending')
3050- self.driver.suspend(instance_ref,
3051- lambda result: self._update_state_callback(self,
3052- context,
3053- instance_id,
3054- result))
3055+ context = context.elevated()
3056+
3057+ instance_ref = self.db.instance_get(context, instance_id)
3058+ self.driver.suspend(instance_ref, lambda result: None)
3059+
3060+ current_power_state = self._get_power_state(context, instance_ref)
3061+ self._instance_update(context,
3062+ instance_id,
3063+ power_state=current_power_state,
3064+ vm_state=vm_states.SUSPENDED,
3065+ task_state=None)
3066
3067 @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
3068 @checks_instance_lock
3069 def resume_instance(self, context, instance_id):
3070 """Resume the given suspended instance."""
3071- context = context.elevated()
3072- instance_ref = self.db.instance_get(context, instance_id)
3073 LOG.audit(_('instance %s: resuming'), instance_id, context=context)
3074- self.db.instance_set_state(context, instance_id,
3075- power_state.NOSTATE,
3076- 'resuming')
3077- self.driver.resume(instance_ref,
3078- lambda result: self._update_state_callback(self,
3079- context,
3080- instance_id,
3081- result))
3082+ context = context.elevated()
3083+
3084+ instance_ref = self.db.instance_get(context, instance_id)
3085+ self.driver.resume(instance_ref, lambda result: None)
3086+
3087+ current_power_state = self._get_power_state(context, instance_ref)
3088+ self._instance_update(context,
3089+ instance_id,
3090+ power_state=current_power_state,
3091+ vm_state=vm_states.ACTIVE,
3092+ task_state=None)
3093
3094 @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
3095 def lock_instance(self, context, instance_id):
3096@@ -1506,11 +1549,14 @@
3097 'block_migration': block_migration}})
3098
3099 # Restore instance state
3100- self.db.instance_update(ctxt,
3101- instance_ref['id'],
3102- {'state_description': 'running',
3103- 'state': power_state.RUNNING,
3104- 'host': dest})
3105+ current_power_state = self._get_power_state(ctxt, instance_ref)
3106+ self._instance_update(ctxt,
3107+ instance_ref["id"],
3108+ host=dest,
3109+ power_state=current_power_state,
3110+ vm_state=vm_states.ACTIVE,
3111+ task_state=None)
3112+
3113 # Restore volume state
3114 for volume_ref in instance_ref['volumes']:
3115 volume_id = volume_ref['id']
3116@@ -1556,11 +1602,11 @@
3117 This param specifies destination host.
3118 """
3119 host = instance_ref['host']
3120- self.db.instance_update(context,
3121- instance_ref['id'],
3122- {'state_description': 'running',
3123- 'state': power_state.RUNNING,
3124- 'host': host})
3125+ self._instance_update(context,
3126+ instance_ref['id'],
3127+ host=host,
3128+ vm_state=vm_states.ACTIVE,
3129+ task_state=None)
3130
3131 for volume_ref in instance_ref['volumes']:
3132 volume_id = volume_ref['id']
3133@@ -1608,10 +1654,9 @@
3134 error_list.append(ex)
3135
3136 try:
3137- self._poll_instance_states(context)
3138+ self._sync_power_states(context)
3139 except Exception as ex:
3140- LOG.warning(_("Error during instance poll: %s"),
3141- unicode(ex))
3142+ LOG.warning(_("Error during power_state sync: %s"), unicode(ex))
3143 error_list.append(ex)
3144
3145 return error_list
3146@@ -1626,68 +1671,40 @@
3147 self.update_service_capabilities(
3148 self.driver.get_host_stats(refresh=True))
3149
3150- def _poll_instance_states(self, context):
3151+ def _sync_power_states(self, context):
3152+ """Align power states between the database and the hypervisor.
3153+
3154+ The hypervisor is authoritative for the power_state data, so we
3155+ simply loop over all known instances for this host and update the
3156+ power_state according to the hypervisor. If the instance is not found
3157+ then it will be set to power_state.NOSTATE, because it doesn't exist
3158+ on the hypervisor.
3159+
3160+ """
3161 vm_instances = self.driver.list_instances_detail()
3162 vm_instances = dict((vm.name, vm) for vm in vm_instances)
3163-
3164- # Keep a list of VMs not in the DB, cross them off as we find them
3165- vms_not_found_in_db = list(vm_instances.keys())
3166-
3167 db_instances = self.db.instance_get_all_by_host(context, self.host)
3168
3169+ num_vm_instances = len(vm_instances)
3170+ num_db_instances = len(db_instances)
3171+
3172+ if num_vm_instances != num_db_instances:
3173+ LOG.info(_("Found %(num_db_instances)s in the database and "
3174+ "%(num_vm_instances)s on the hypervisor.") % locals())
3175+
3176 for db_instance in db_instances:
3177- name = db_instance['name']
3178- db_state = db_instance['state']
3179+ name = db_instance["name"]
3180+ db_power_state = db_instance['power_state']
3181 vm_instance = vm_instances.get(name)
3182
3183 if vm_instance is None:
3184- # NOTE(justinsb): We have to be very careful here, because a
3185- # concurrent operation could be in progress (e.g. a spawn)
3186- if db_state == power_state.BUILDING:
3187- # TODO(justinsb): This does mean that if we crash during a
3188- # spawn, the machine will never leave the spawning state,
3189- # but this is just the way nova is; this function isn't
3190- # trying to correct that problem.
3191- # We could have a separate task to correct this error.
3192- # TODO(justinsb): What happens during a live migration?
3193- LOG.info(_("Found instance '%(name)s' in DB but no VM. "
3194- "State=%(db_state)s, so assuming spawn is in "
3195- "progress.") % locals())
3196- vm_state = db_state
3197- else:
3198- LOG.info(_("Found instance '%(name)s' in DB but no VM. "
3199- "State=%(db_state)s, so setting state to "
3200- "shutoff.") % locals())
3201- vm_state = power_state.SHUTOFF
3202- if db_instance['state_description'] == 'stopping':
3203- self.db.instance_stop(context, db_instance['id'])
3204- continue
3205+ vm_power_state = power_state.NOSTATE
3206 else:
3207- vm_state = vm_instance.state
3208- vms_not_found_in_db.remove(name)
3209+ vm_power_state = vm_instance.state
3210
3211- if (db_instance['state_description'] in ['migrating', 'stopping']):
3212- # A situation which db record exists, but no instance"
3213- # sometimes occurs while live-migration at src compute,
3214- # this case should be ignored.
3215- LOG.debug(_("Ignoring %(name)s, as it's currently being "
3216- "migrated.") % locals())
3217+ if vm_power_state == db_power_state:
3218 continue
3219
3220- if vm_state != db_state:
3221- LOG.info(_("DB/VM state mismatch. Changing state from "
3222- "'%(db_state)s' to '%(vm_state)s'") % locals())
3223- self._update_state(context, db_instance['id'], vm_state)
3224-
3225- # NOTE(justinsb): We no longer auto-remove SHUTOFF instances
3226- # It's quite hard to get them back when we do.
3227-
3228- # Are there VMs not in the DB?
3229- for vm_not_found_in_db in vms_not_found_in_db:
3230- name = vm_not_found_in_db
3231-
3232- # We only care about instances that compute *should* know about
3233- if name.startswith("instance-"):
3234- # TODO(justinsb): What to do here? Adopt it? Shut it down?
3235- LOG.warning(_("Found VM not in DB: '%(name)s'. Ignoring")
3236- % locals())
3237+ self._instance_update(context,
3238+ db_instance["id"],
3239+ power_state=vm_power_state)
3240
3241=== added file 'nova/compute/task_states.py'
3242--- nova/compute/task_states.py 1970-01-01 00:00:00 +0000
3243+++ nova/compute/task_states.py 2011-09-01 10:57:46 +0000
3244@@ -0,0 +1,59 @@
3245+# vim: tabstop=4 shiftwidth=4 softtabstop=4
3246+
3247+# Copyright 2010 OpenStack LLC.
3248+# All Rights Reserved.
3249+#
3250+# Licensed under the Apache License, Version 2.0 (the "License"); you may
3251+# not use this file except in compliance with the License. You may obtain
3252+# a copy of the License at
3253+#
3254+# http://www.apache.org/licenses/LICENSE-2.0
3255+#
3256+# Unless required by applicable law or agreed to in writing, software
3257+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
3258+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
3259+# License for the specific language governing permissions and limitations
3260+# under the License.
3261+
3262+"""Possible task states for instances.
3263+
3264+Compute instance task states represent what is happening to the instance at the
3265+current moment. These tasks can be generic, such as 'spawning', or specific,
3266+such as 'block_device_mapping'. These task states allow for a better view into
3267+what an instance is doing and should be displayed to users/administrators as
3268+necessary.
3269+
3270+"""
3271+
3272+SCHEDULING = 'scheduling'
3273+BLOCK_DEVICE_MAPPING = 'block_device_mapping'
3274+NETWORKING = 'networking'
3275+SPAWNING = 'spawning'
3276+
3277+IMAGE_SNAPSHOT = 'image_snapshot'
3278+IMAGE_BACKUP = 'image_backup'
3279+
3280+UPDATING_PASSWORD = 'updating_password'
3281+
3282+RESIZE_PREP = 'resize_prep'
3283+RESIZE_MIGRATING = 'resize_migrating'
3284+RESIZE_MIGRATED = 'resize_migrated'
3285+RESIZE_FINISH = 'resize_finish'
3286+RESIZE_REVERTING = 'resize_reverting'
3287+RESIZE_CONFIRMING = 'resize_confirming'
3288+RESIZE_VERIFY = 'resize_verify'
3289+
3290+REBUILDING = 'rebuilding'
3291+
3292+REBOOTING = 'rebooting'
3293+PAUSING = 'pausing'
3294+UNPAUSING = 'unpausing'
3295+SUSPENDING = 'suspending'
3296+RESUMING = 'resuming'
3297+
3298+RESCUING = 'rescuing'
3299+UNRESCUING = 'unrescuing'
3300+
3301+DELETING = 'deleting'
3302+STOPPING = 'stopping'
3303+STARTING = 'starting'
3304
3305=== added file 'nova/compute/vm_states.py'
3306--- nova/compute/vm_states.py 1970-01-01 00:00:00 +0000
3307+++ nova/compute/vm_states.py 2011-09-01 10:57:46 +0000
3308@@ -0,0 +1,39 @@
3309+# vim: tabstop=4 shiftwidth=4 softtabstop=4
3310+
3311+# Copyright 2010 OpenStack LLC.
3312+# All Rights Reserved.
3313+#
3314+# Licensed under the Apache License, Version 2.0 (the "License"); you may
3315+# not use this file except in compliance with the License. You may obtain
3316+# a copy of the License at
3317+#
3318+# http://www.apache.org/licenses/LICENSE-2.0
3319+#
3320+# Unless required by applicable law or agreed to in writing, software
3321+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
3322+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
3323+# License for the specific language governing permissions and limitations
3324+# under the License.
3325+
3326+"""Possible vm states for instances.
3327+
3328+Compute instance vm states represent the state of an instance as it pertains to
3329+a user or administrator. When combined with task states (task_states.py), a
3330+better picture can be formed regarding the instance's health.
3331+
3332+"""
3333+
3334+ACTIVE = 'active'
3335+BUILDING = 'building'
3336+REBUILDING = 'rebuilding'
3337+
3338+PAUSED = 'paused'
3339+SUSPENDED = 'suspended'
3340+RESCUED = 'rescued'
3341+DELETED = 'deleted'
3342+STOPPED = 'stopped'
3343+
3344+MIGRATING = 'migrating'
3345+RESIZING = 'resizing'
3346+
3347+ERROR = 'error'
3348
3349=== modified file 'nova/context.py'
3350--- nova/context.py 2011-08-02 16:30:41 +0000
3351+++ nova/context.py 2011-09-01 10:57:46 +0000
3352@@ -38,7 +38,7 @@
3353 self.roles = roles or []
3354 self.is_admin = is_admin
3355 if self.is_admin is None:
3356- self.admin = 'admin' in self.roles
3357+ self.is_admin = 'admin' in [x.lower() for x in self.roles]
3358 self.read_deleted = read_deleted
3359 self.remote_address = remote_address
3360 if not timestamp:
3361
3362=== modified file 'nova/db/api.py'
3363--- nova/db/api.py 2011-08-25 16:14:44 +0000
3364+++ nova/db/api.py 2011-09-01 10:57:46 +0000
3365@@ -49,7 +49,8 @@
3366 'Template string to be used to generate instance names')
3367 flags.DEFINE_string('snapshot_name_template', 'snapshot-%08x',
3368 'Template string to be used to generate snapshot names')
3369-
3370+flags.DEFINE_string('vsa_name_template', 'vsa-%08x',
3371+ 'Template string to be used to generate VSA names')
3372
3373 IMPL = utils.LazyPluggable(FLAGS['db_backend'],
3374 sqlalchemy='nova.db.sqlalchemy.api')
3375@@ -495,9 +496,20 @@
3376 return IMPL.instance_get_all_by_filters(context, filters)
3377
3378
3379-def instance_get_active_by_window(context, begin, end=None):
3380- """Get instances active during a certain time window."""
3381- return IMPL.instance_get_active_by_window(context, begin, end)
3382+def instance_get_active_by_window(context, begin, end=None, project_id=None):
3383+ """Get instances active during a certain time window.
3384+
3385+ Specifying a project_id will filter for a certain project."""
3386+ return IMPL.instance_get_active_by_window(context, begin, end, project_id)
3387+
3388+
3389+def instance_get_active_by_window_joined(context, begin, end=None,
3390+ project_id=None):
3391+ """Get instances and joins active during a certain time window.
3392+
3393+ Specifying a project_id will filter for a certain project."""
3394+ return IMPL.instance_get_active_by_window_joined(context, begin, end,
3395+ project_id)
3396
3397
3398 def instance_get_all_by_user(context, user_id):
3399@@ -1512,3 +1524,36 @@
3400 key/value pairs specified in the extra specs dict argument"""
3401 IMPL.volume_type_extra_specs_update_or_create(context, volume_type_id,
3402 extra_specs)
3403+
3404+
3405+####################
3406+
3407+
3408+def vsa_create(context, values):
3409+ """Creates Virtual Storage Array record."""
3410+ return IMPL.vsa_create(context, values)
3411+
3412+
3413+def vsa_update(context, vsa_id, values):
3414+ """Updates Virtual Storage Array record."""
3415+ return IMPL.vsa_update(context, vsa_id, values)
3416+
3417+
3418+def vsa_destroy(context, vsa_id):
3419+ """Deletes Virtual Storage Array record."""
3420+ return IMPL.vsa_destroy(context, vsa_id)
3421+
3422+
3423+def vsa_get(context, vsa_id):
3424+ """Get Virtual Storage Array record by ID."""
3425+ return IMPL.vsa_get(context, vsa_id)
3426+
3427+
3428+def vsa_get_all(context):
3429+ """Get all Virtual Storage Array records."""
3430+ return IMPL.vsa_get_all(context)
3431+
3432+
3433+def vsa_get_all_by_project(context, project_id):
3434+ """Get all Virtual Storage Array records by project ID."""
3435+ return IMPL.vsa_get_all_by_project(context, project_id)
3436
3437=== modified file 'nova/db/sqlalchemy/api.py'
3438--- nova/db/sqlalchemy/api.py 2011-08-25 16:14:44 +0000
3439+++ nova/db/sqlalchemy/api.py 2011-09-01 10:57:46 +0000
3440@@ -28,6 +28,7 @@
3441 from nova import ipv6
3442 from nova import utils
3443 from nova import log as logging
3444+from nova.compute import vm_states
3445 from nova.db.sqlalchemy import models
3446 from nova.db.sqlalchemy.session import get_session
3447 from sqlalchemy import or_
3448@@ -1102,12 +1103,11 @@
3449 def instance_stop(context, instance_id):
3450 session = get_session()
3451 with session.begin():
3452- from nova.compute import power_state
3453 session.query(models.Instance).\
3454 filter_by(id=instance_id).\
3455 update({'host': None,
3456- 'state': power_state.SHUTOFF,
3457- 'state_description': 'stopped',
3458+ 'vm_state': vm_states.STOPPED,
3459+ 'task_state': None,
3460 'updated_at': literal_column('updated_at')})
3461 session.query(models.SecurityGroupInstanceAssociation).\
3462 filter_by(instance_id=instance_id).\
3463@@ -1266,7 +1266,7 @@
3464 # Filters for exact matches that we can do along with the SQL query...
3465 # For other filters that don't match this, we will do regexp matching
3466 exact_match_filter_names = ['project_id', 'user_id', 'image_ref',
3467- 'state', 'instance_type_id', 'deleted']
3468+ 'vm_state', 'instance_type_id', 'deleted']
3469
3470 query_filters = [key for key in filters.iterkeys()
3471 if key in exact_match_filter_names]
3472@@ -1306,21 +1306,40 @@
3473 return instances
3474
3475
3476+@require_context
3477+def instance_get_active_by_window(context, begin, end=None, project_id=None):
3478+ """Return instances that were continuously active over window."""
3479+ session = get_session()
3480+ query = session.query(models.Instance).\
3481+ filter(models.Instance.launched_at < begin)
3482+ if end:
3483+ query = query.filter(or_(models.Instance.terminated_at == None,
3484+ models.Instance.terminated_at > end))
3485+ else:
3486+ query = query.filter(models.Instance.terminated_at == None)
3487+ if project_id:
3488+ query = query.filter_by(project_id=project_id)
3489+ return query.all()
3490+
3491+
3492 @require_admin_context
3493-def instance_get_active_by_window(context, begin, end=None):
3494- """Return instances that were continuously active over the given window"""
3495+def instance_get_active_by_window_joined(context, begin, end=None,
3496+ project_id=None):
3497+ """Return instances and joins that were continuously active over window."""
3498 session = get_session()
3499 query = session.query(models.Instance).\
3500- options(joinedload_all('fixed_ips.floating_ips')).\
3501- options(joinedload('security_groups')).\
3502- options(joinedload_all('fixed_ips.network')).\
3503- options(joinedload('instance_type')).\
3504- filter(models.Instance.launched_at < begin)
3505+ options(joinedload_all('fixed_ips.floating_ips')).\
3506+ options(joinedload('security_groups')).\
3507+ options(joinedload_all('fixed_ips.network')).\
3508+ options(joinedload('instance_type')).\
3509+ filter(models.Instance.launched_at < begin)
3510 if end:
3511 query = query.filter(or_(models.Instance.terminated_at == None,
3512 models.Instance.terminated_at > end))
3513 else:
3514 query = query.filter(models.Instance.terminated_at == None)
3515+ if project_id:
3516+ query = query.filter_by(project_id=project_id)
3517 return query.all()
3518
3519
3520@@ -1484,18 +1503,6 @@
3521 return fixed_ip_refs[0].floating_ips[0]['address']
3522
3523
3524-@require_admin_context
3525-def instance_set_state(context, instance_id, state, description=None):
3526- # TODO(devcamcar): Move this out of models and into driver
3527- from nova.compute import power_state
3528- if not description:
3529- description = power_state.name(state)
3530- db.instance_update(context,
3531- instance_id,
3532- {'state': state,
3533- 'state_description': description})
3534-
3535-
3536 @require_context
3537 def instance_update(context, instance_id, values):
3538 session = get_session()
3539@@ -3837,3 +3844,105 @@
3540 "deleted": 0})
3541 spec_ref.save(session=session)
3542 return specs
3543+
3544+
3545+ ####################
3546+
3547+
3548+@require_admin_context
3549+def vsa_create(context, values):
3550+ """
3551+ Creates Virtual Storage Array record.
3552+ """
3553+ try:
3554+ vsa_ref = models.VirtualStorageArray()
3555+ vsa_ref.update(values)
3556+ vsa_ref.save()
3557+ except Exception, e:
3558+ raise exception.DBError(e)
3559+ return vsa_ref
3560+
3561+
3562+@require_admin_context
3563+def vsa_update(context, vsa_id, values):
3564+ """
3565+ Updates Virtual Storage Array record.
3566+ """
3567+ session = get_session()
3568+ with session.begin():
3569+ vsa_ref = vsa_get(context, vsa_id, session=session)
3570+ vsa_ref.update(values)
3571+ vsa_ref.save(session=session)
3572+ return vsa_ref
3573+
3574+
3575+@require_admin_context
3576+def vsa_destroy(context, vsa_id):
3577+ """
3578+ Deletes Virtual Storage Array record.
3579+ """
3580+ session = get_session()
3581+ with session.begin():
3582+ session.query(models.VirtualStorageArray).\
3583+ filter_by(id=vsa_id).\
3584+ update({'deleted': True,
3585+ 'deleted_at': utils.utcnow(),
3586+ 'updated_at': literal_column('updated_at')})
3587+
3588+
3589+@require_context
3590+def vsa_get(context, vsa_id, session=None):
3591+ """
3592+ Get Virtual Storage Array record by ID.
3593+ """
3594+ if not session:
3595+ session = get_session()
3596+ result = None
3597+
3598+ if is_admin_context(context):
3599+ result = session.query(models.VirtualStorageArray).\
3600+ options(joinedload('vsa_instance_type')).\
3601+ filter_by(id=vsa_id).\
3602+ filter_by(deleted=can_read_deleted(context)).\
3603+ first()
3604+ elif is_user_context(context):
3605+ result = session.query(models.VirtualStorageArray).\
3606+ options(joinedload('vsa_instance_type')).\
3607+ filter_by(project_id=context.project_id).\
3608+ filter_by(id=vsa_id).\
3609+ filter_by(deleted=False).\
3610+ first()
3611+ if not result:
3612+ raise exception.VirtualStorageArrayNotFound(id=vsa_id)
3613+
3614+ return result
3615+
3616+
3617+@require_admin_context
3618+def vsa_get_all(context):
3619+ """
3620+ Get all Virtual Storage Array records.
3621+ """
3622+ session = get_session()
3623+ return session.query(models.VirtualStorageArray).\
3624+ options(joinedload('vsa_instance_type')).\
3625+ filter_by(deleted=can_read_deleted(context)).\
3626+ all()
3627+
3628+
3629+@require_context
3630+def vsa_get_all_by_project(context, project_id):
3631+ """
3632+ Get all Virtual Storage Array records by project ID.
3633+ """
3634+ authorize_project_context(context, project_id)
3635+
3636+ session = get_session()
3637+ return session.query(models.VirtualStorageArray).\
3638+ options(joinedload('vsa_instance_type')).\
3639+ filter_by(project_id=project_id).\
3640+ filter_by(deleted=can_read_deleted(context)).\
3641+ all()
3642+
3643+
3644+ ####################
3645
3646=== added file 'nova/db/sqlalchemy/migrate_repo/versions/043_add_vsa_data.py'
3647--- nova/db/sqlalchemy/migrate_repo/versions/043_add_vsa_data.py 1970-01-01 00:00:00 +0000
3648+++ nova/db/sqlalchemy/migrate_repo/versions/043_add_vsa_data.py 2011-09-01 10:57:46 +0000
3649@@ -0,0 +1,75 @@
3650+# vim: tabstop=4 shiftwidth=4 softtabstop=4
3651+
3652+# Copyright (c) 2011 Zadara Storage Inc.
3653+# Copyright (c) 2011 OpenStack LLC.
3654+#
3655+# Licensed under the Apache License, Version 2.0 (the "License"); you may
3656+# not use this file except in compliance with the License. You may obtain
3657+# a copy of the License at
3658+#
3659+# http://www.apache.org/licenses/LICENSE-2.0
3660+#
3661+# Unless required by applicable law or agreed to in writing, software
3662+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
3663+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
3664+# License for the specific language governing permissions and limitations
3665+# under the License.
3666+
3667+from sqlalchemy import Column, DateTime, Integer, MetaData, String, Table
3668+from sqlalchemy import Text, Boolean, ForeignKey
3669+
3670+from nova import log as logging
3671+
3672+meta = MetaData()
3673+
3674+#
3675+# New Tables
3676+#
3677+
3678+virtual_storage_arrays = Table('virtual_storage_arrays', meta,
3679+ Column('created_at', DateTime(timezone=False)),
3680+ Column('updated_at', DateTime(timezone=False)),
3681+ Column('deleted_at', DateTime(timezone=False)),
3682+ Column('deleted', Boolean(create_constraint=True, name=None)),
3683+ Column('id', Integer(), primary_key=True, nullable=False),
3684+ Column('display_name',
3685+ String(length=255, convert_unicode=False, assert_unicode=None,
3686+ unicode_error=None, _warn_on_bytestring=False)),
3687+ Column('display_description',
3688+ String(length=255, convert_unicode=False, assert_unicode=None,
3689+ unicode_error=None, _warn_on_bytestring=False)),
3690+ Column('project_id',
3691+ String(length=255, convert_unicode=False, assert_unicode=None,
3692+ unicode_error=None, _warn_on_bytestring=False)),
3693+ Column('availability_zone',
3694+ String(length=255, convert_unicode=False, assert_unicode=None,
3695+ unicode_error=None, _warn_on_bytestring=False)),
3696+ Column('instance_type_id', Integer(), nullable=False),
3697+ Column('image_ref',
3698+ String(length=255, convert_unicode=False, assert_unicode=None,
3699+ unicode_error=None, _warn_on_bytestring=False)),
3700+ Column('vc_count', Integer(), nullable=False),
3701+ Column('vol_count', Integer(), nullable=False),
3702+ Column('status',
3703+ String(length=255, convert_unicode=False, assert_unicode=None,
3704+ unicode_error=None, _warn_on_bytestring=False)),
3705+ )
3706+
3707+
3708+def upgrade(migrate_engine):
3709+ # Upgrade operations go here. Don't create your own engine;
3710+ # bind migrate_engine to your metadata
3711+ meta.bind = migrate_engine
3712+
3713+ try:
3714+ virtual_storage_arrays.create()
3715+ except Exception:
3716+ logging.info(repr(table))
3717+ logging.exception('Exception while creating table')
3718+ raise
3719+
3720+
3721+def downgrade(migrate_engine):
3722+ meta.bind = migrate_engine
3723+
3724+ virtual_storage_arrays.drop()
3725
3726=== added file 'nova/db/sqlalchemy/migrate_repo/versions/044_update_instance_states.py'
3727--- nova/db/sqlalchemy/migrate_repo/versions/044_update_instance_states.py 1970-01-01 00:00:00 +0000
3728+++ nova/db/sqlalchemy/migrate_repo/versions/044_update_instance_states.py 2011-09-01 10:57:46 +0000
3729@@ -0,0 +1,138 @@
3730+# vim: tabstop=4 shiftwidth=4 softtabstop=4
3731+
3732+# Copyright 2010 OpenStack LLC.
3733+#
3734+# Licensed under the Apache License, Version 2.0 (the "License"); you may
3735+# not use this file except in compliance with the License. You may obtain
3736+# a copy of the License at
3737+#
3738+# http://www.apache.org/licenses/LICENSE-2.0
3739+#
3740+# Unless required by applicable law or agreed to in writing, software
3741+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
3742+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
3743+# License for the specific language governing permissions and limitations
3744+# under the License.
3745+
3746+import sqlalchemy
3747+from sqlalchemy import MetaData, Table, Column, String
3748+
3749+from nova.compute import task_states
3750+from nova.compute import vm_states
3751+
3752+
3753+meta = MetaData()
3754+
3755+
3756+c_task_state = Column('task_state',
3757+ String(length=255, convert_unicode=False,
3758+ assert_unicode=None, unicode_error=None,
3759+ _warn_on_bytestring=False),
3760+ nullable=True)
3761+
3762+
3763+_upgrade_translations = {
3764+ "stopping": {
3765+ "state_description": vm_states.ACTIVE,
3766+ "task_state": task_states.STOPPING,
3767+ },
3768+ "stopped": {
3769+ "state_description": vm_states.STOPPED,
3770+ "task_state": None,
3771+ },
3772+ "terminated": {
3773+ "state_description": vm_states.DELETED,
3774+ "task_state": None,
3775+ },
3776+ "terminating": {
3777+ "state_description": vm_states.ACTIVE,
3778+ "task_state": task_states.DELETING,
3779+ },
3780+ "running": {
3781+ "state_description": vm_states.ACTIVE,
3782+ "task_state": None,
3783+ },
3784+ "scheduling": {
3785+ "state_description": vm_states.BUILDING,
3786+ "task_state": task_states.SCHEDULING,
3787+ },
3788+ "migrating": {
3789+ "state_description": vm_states.MIGRATING,
3790+ "task_state": None,
3791+ },
3792+ "pending": {
3793+ "state_description": vm_states.BUILDING,
3794+ "task_state": task_states.SCHEDULING,
3795+ },
3796+}
3797+
3798+
3799+_downgrade_translations = {
3800+ vm_states.ACTIVE: {
3801+ None: "running",
3802+ task_states.DELETING: "terminating",
3803+ task_states.STOPPING: "stopping",
3804+ },
3805+ vm_states.BUILDING: {
3806+ None: "pending",
3807+ task_states.SCHEDULING: "scheduling",
3808+ },
3809+ vm_states.STOPPED: {
3810+ None: "stopped",
3811+ },
3812+ vm_states.REBUILDING: {
3813+ None: "pending",
3814+ },
3815+ vm_states.DELETED: {
3816+ None: "terminated",
3817+ },
3818+ vm_states.MIGRATING: {
3819+ None: "migrating",
3820+ },
3821+}
3822+
3823+
3824+def upgrade(migrate_engine):
3825+ meta.bind = migrate_engine
3826+
3827+ instance_table = Table('instances', meta, autoload=True,
3828+ autoload_with=migrate_engine)
3829+
3830+ c_state = instance_table.c.state
3831+ c_state.alter(name='power_state')
3832+
3833+ c_vm_state = instance_table.c.state_description
3834+ c_vm_state.alter(name='vm_state')
3835+
3836+ instance_table.create_column(c_task_state)
3837+
3838+ for old_state, values in _upgrade_translations.iteritems():
3839+ instance_table.update().\
3840+ values(**values).\
3841+ where(c_vm_state == old_state).\
3842+ execute()
3843+
3844+
3845+def downgrade(migrate_engine):
3846+ meta.bind = migrate_engine
3847+
3848+ instance_table = Table('instances', meta, autoload=True,
3849+ autoload_with=migrate_engine)
3850+
3851+ c_task_state = instance_table.c.task_state
3852+
3853+ c_state = instance_table.c.power_state
3854+ c_state.alter(name='state')
3855+
3856+ c_vm_state = instance_table.c.vm_state
3857+ c_vm_state.alter(name='state_description')
3858+
3859+ for old_vm_state, old_task_states in _downgrade_translations.iteritems():
3860+ for old_task_state, new_state_desc in old_task_states.iteritems():
3861+ instance_table.update().\
3862+ where(c_task_state == old_task_state).\
3863+ where(c_vm_state == old_vm_state).\
3864+ values(vm_state=new_state_desc).\
3865+ execute()
3866+
3867+ instance_table.drop_column('task_state')
3868
3869=== modified file 'nova/db/sqlalchemy/migration.py'
3870--- nova/db/sqlalchemy/migration.py 2011-08-24 23:41:14 +0000
3871+++ nova/db/sqlalchemy/migration.py 2011-09-01 10:57:46 +0000
3872@@ -64,6 +64,7 @@
3873 'users', 'user_project_association',
3874 'user_project_role_association',
3875 'user_role_association',
3876+ 'virtual_storage_arrays',
3877 'volumes', 'volume_metadata',
3878 'volume_types', 'volume_type_extra_specs'):
3879 assert table in meta.tables
3880
3881=== modified file 'nova/db/sqlalchemy/models.py'
3882--- nova/db/sqlalchemy/models.py 2011-08-24 16:10:28 +0000
3883+++ nova/db/sqlalchemy/models.py 2011-09-01 10:57:46 +0000
3884@@ -193,8 +193,9 @@
3885 key_name = Column(String(255))
3886 key_data = Column(Text)
3887
3888- state = Column(Integer)
3889- state_description = Column(String(255))
3890+ power_state = Column(Integer)
3891+ vm_state = Column(String(255))
3892+ task_state = Column(String(255))
3893
3894 memory_mb = Column(Integer)
3895 vcpus = Column(Integer)
3896@@ -238,16 +239,31 @@
3897 access_ip_v4 = Column(String(255))
3898 access_ip_v6 = Column(String(255))
3899
3900- # TODO(vish): see Ewan's email about state improvements, probably
3901- # should be in a driver base class or some such
3902- # vmstate_state = running, halted, suspended, paused
3903- # power_state = what we have
3904- # task_state = transitory and may trigger power state transition
3905-
3906- #@validates('state')
3907- #def validate_state(self, key, state):
3908- # assert(state in ['nostate', 'running', 'blocked', 'paused',
3909- # 'shutdown', 'shutoff', 'crashed'])
3910+
3911+class VirtualStorageArray(BASE, NovaBase):
3912+ """
3913+ Represents a virtual storage array supplying block storage to instances.
3914+ """
3915+ __tablename__ = 'virtual_storage_arrays'
3916+
3917+ id = Column(Integer, primary_key=True, autoincrement=True)
3918+
3919+ @property
3920+ def name(self):
3921+ return FLAGS.vsa_name_template % self.id
3922+
3923+ # User editable field for display in user-facing UIs
3924+ display_name = Column(String(255))
3925+ display_description = Column(String(255))
3926+
3927+ project_id = Column(String(255))
3928+ availability_zone = Column(String(255))
3929+
3930+ instance_type_id = Column(Integer, ForeignKey('instance_types.id'))
3931+ image_ref = Column(String(255))
3932+ vc_count = Column(Integer, default=0) # number of requested VC instances
3933+ vol_count = Column(Integer, default=0) # total number of BE volumes
3934+ status = Column(String(255))
3935
3936
3937 class InstanceActions(BASE, NovaBase):
3938@@ -279,6 +295,12 @@
3939 primaryjoin='and_(Instance.instance_type_id == '
3940 'InstanceTypes.id)')
3941
3942+ vsas = relationship(VirtualStorageArray,
3943+ backref=backref('vsa_instance_type', uselist=False),
3944+ foreign_keys=id,
3945+ primaryjoin='and_(VirtualStorageArray.instance_type_id'
3946+ ' == InstanceTypes.id)')
3947+
3948
3949 class Volume(BASE, NovaBase):
3950 """Represents a block storage device that can be attached to a vm."""
3951@@ -848,7 +870,8 @@
3952 SecurityGroupInstanceAssociation, AuthToken, User,
3953 Project, Certificate, ConsolePool, Console, Zone,
3954 VolumeMetadata, VolumeTypes, VolumeTypeExtraSpecs,
3955- AgentBuild, InstanceMetadata, InstanceTypeExtraSpecs, Migration)
3956+ AgentBuild, InstanceMetadata, InstanceTypeExtraSpecs, Migration,
3957+ VirtualStorageArray)
3958 engine = create_engine(FLAGS.sql_connection, echo=False)
3959 for model in models:
3960 model.metadata.create_all(engine)
3961
3962=== modified file 'nova/exception.py'
3963--- nova/exception.py 2011-08-24 16:10:28 +0000
3964+++ nova/exception.py 2011-09-01 10:57:46 +0000
3965@@ -61,7 +61,7 @@
3966 super(ApiError, self).__init__(outstr)
3967
3968
3969-class BuildInProgress(Error):
3970+class RebuildRequiresActiveInstance(Error):
3971 pass
3972
3973
3974@@ -533,6 +533,10 @@
3975 message = _("Zero floating ips available.")
3976
3977
3978+class FloatingIpAlreadyInUse(NovaException):
3979+ message = _("Floating ip %(address)s already in use by %(fixed_ip)s.")
3980+
3981+
3982 class NoFloatingIpsDefined(NotFound):
3983 message = _("Zero floating ips exist.")
3984
3985@@ -783,6 +787,18 @@
3986 message = _("Could not load paste app '%(name)s' from %(path)s")
3987
3988
3989+class VSANovaAccessParamNotFound(Invalid):
3990+ message = _("Nova access parameters were not specified.")
3991+
3992+
3993+class VirtualStorageArrayNotFound(NotFound):
3994+ message = _("Virtual Storage Array %(id)d could not be found.")
3995+
3996+
3997+class VirtualStorageArrayNotFoundByName(NotFound):
3998+ message = _("Virtual Storage Array %(name)s could not be found.")
3999+
4000+
4001 class CannotResizeToSameSize(NovaException):
4002 message = _("When resizing, instances must change size!")
4003
4004
4005=== modified file 'nova/flags.py'
4006--- nova/flags.py 2011-08-23 19:06:25 +0000
4007+++ nova/flags.py 2011-09-01 10:57:46 +0000
4008@@ -292,6 +292,7 @@
4009 in the form "http://127.0.0.1:8000"')
4010 DEFINE_string('ajax_console_proxy_port',
4011 8000, 'port that ajax_console_proxy binds')
4012+DEFINE_string('vsa_topic', 'vsa', 'the topic that nova-vsa service listens on')
4013 DEFINE_bool('verbose', False, 'show debug output')
4014 DEFINE_boolean('fake_rabbit', False, 'use a fake rabbit')
4015 DEFINE_bool('fake_network', False,
4016@@ -302,8 +303,12 @@
4017 DEFINE_string('rabbit_userid', 'guest', 'rabbit userid')
4018 DEFINE_string('rabbit_password', 'guest', 'rabbit password')
4019 DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host')
4020-DEFINE_integer('rabbit_retry_interval', 10, 'rabbit connection retry interval')
4021-DEFINE_integer('rabbit_max_retries', 12, 'rabbit connection attempts')
4022+DEFINE_integer('rabbit_retry_interval', 1,
4023+ 'rabbit connection retry interval to start')
4024+DEFINE_integer('rabbit_retry_backoff', 2,
4025+ 'rabbit connection retry backoff in seconds')
4026+DEFINE_integer('rabbit_max_retries', 0,
4027+ 'maximum rabbit connection attempts (0=try forever)')
4028 DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to')
4029 DEFINE_boolean('rabbit_durable_queues', False, 'use durable queues')
4030 DEFINE_list('enabled_apis', ['ec2', 'osapi'],
4031@@ -371,6 +376,17 @@
4032 'Manager for volume')
4033 DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
4034 'Manager for scheduler')
4035+DEFINE_string('vsa_manager', 'nova.vsa.manager.VsaManager',
4036+ 'Manager for vsa')
4037+DEFINE_string('vc_image_name', 'vc_image',
4038+ 'the VC image ID (for a VC image that exists in DB Glance)')
4039+# VSA constants and enums
4040+DEFINE_string('default_vsa_instance_type', 'm1.small',
4041+ 'default instance type for VSA instances')
4042+DEFINE_integer('max_vcs_in_vsa', 32,
4043+ 'maxinum VCs in a VSA')
4044+DEFINE_integer('vsa_part_size_gb', 100,
4045+ 'default partition size for shared capacity')
4046
4047 # The service to use for image search and retrieval
4048 DEFINE_string('image_service', 'nova.image.glance.GlanceImageService',
4049
4050=== modified file 'nova/image/glance.py'
4051--- nova/image/glance.py 2011-08-09 09:54:51 +0000
4052+++ nova/image/glance.py 2011-09-01 10:57:46 +0000
4053@@ -141,19 +141,30 @@
4054 """Paginate through results from glance server"""
4055 images = fetch_func(**kwargs)
4056
4057- for image in images:
4058- yield image
4059- else:
4060+ if not images:
4061 # break out of recursive loop to end pagination
4062 return
4063
4064+ for image in images:
4065+ yield image
4066+
4067 try:
4068 # attempt to advance the marker in order to fetch next page
4069 kwargs['marker'] = images[-1]['id']
4070 except KeyError:
4071 raise exception.ImagePaginationFailed()
4072
4073- self._fetch_images(fetch_func, **kwargs)
4074+ try:
4075+ kwargs['limit'] = kwargs['limit'] - len(images)
4076+ # break if we have reached a provided limit
4077+ if kwargs['limit'] <= 0:
4078+ return
4079+ except KeyError:
4080+ # ignore missing limit, just proceed without it
4081+ pass
4082+
4083+ for image in self._fetch_images(fetch_func, **kwargs):
4084+ yield image
4085
4086 def show(self, context, image_id):
4087 """Returns a dict with image data for the given opaque image id."""
4088
4089=== modified file 'nova/ipv6/account_identifier.py'
4090--- nova/ipv6/account_identifier.py 2011-08-22 01:01:34 +0000
4091+++ nova/ipv6/account_identifier.py 2011-09-01 10:57:46 +0000
4092@@ -39,7 +39,8 @@
4093 except TypeError:
4094 raise TypeError(_('Bad prefix for to_global_ipv6: %s') % prefix)
4095 except NameError:
4096- raise TypeError(_('Bad project_id for to_global_ipv6: %s') % project_id)
4097+ raise TypeError(_('Bad project_id for to_global_ipv6: %s') %
4098+ project_id)
4099
4100
4101 def to_mac(ipv6_address):
4102
4103=== modified file 'nova/network/api.py'
4104--- nova/network/api.py 2011-07-29 21:36:30 +0000
4105+++ nova/network/api.py 2011-09-01 10:57:46 +0000
4106@@ -111,6 +111,12 @@
4107 '(%(project)s)') %
4108 {'address': floating_ip['address'],
4109 'project': context.project_id})
4110+
4111+ # If this address has been previously associated to a
4112+ # different instance, disassociate the floating_ip
4113+ if floating_ip['fixed_ip'] and floating_ip['fixed_ip'] is not fixed_ip:
4114+ self.disassociate_floating_ip(context, floating_ip['address'])
4115+
4116 # NOTE(vish): if we are multi_host, send to the instances host
4117 if fixed_ip['network']['multi_host']:
4118 host = fixed_ip['instance']['host']
4119
4120=== modified file 'nova/network/manager.py'
4121--- nova/network/manager.py 2011-08-23 14:21:07 +0000
4122+++ nova/network/manager.py 2011-09-01 10:57:46 +0000
4123@@ -280,6 +280,13 @@
4124
4125 def associate_floating_ip(self, context, floating_address, fixed_address):
4126 """Associates an floating ip to a fixed ip."""
4127+ floating_ip = self.db.floating_ip_get_by_address(context,
4128+ floating_address)
4129+ if floating_ip['fixed_ip']:
4130+ raise exception.FloatingIpAlreadyInUse(
4131+ address=floating_ip['address'],
4132+ fixed_ip=floating_ip['fixed_ip']['address'])
4133+
4134 self.db.floating_ip_fixed_ip_associate(context,
4135 floating_address,
4136 fixed_address)
4137@@ -484,17 +491,17 @@
4138 # TODO(tr3buchet) eventually "enabled" should be determined
4139 def ip_dict(ip):
4140 return {
4141- "ip": ip,
4142- "netmask": network["netmask"],
4143- "enabled": "1"}
4144+ 'ip': ip,
4145+ 'netmask': network['netmask'],
4146+ 'enabled': '1'}
4147
4148 def ip6_dict():
4149 return {
4150- "ip": ipv6.to_global(network['cidr_v6'],
4151+ 'ip': ipv6.to_global(network['cidr_v6'],
4152 vif['address'],
4153 network['project_id']),
4154- "netmask": network['netmask_v6'],
4155- "enabled": "1"}
4156+ 'netmask': network['netmask_v6'],
4157+ 'enabled': '1'}
4158 network_dict = {
4159 'bridge': network['bridge'],
4160 'id': network['id'],
4161
4162=== modified file 'nova/notifier/api.py'
4163--- nova/notifier/api.py 2011-08-23 16:46:49 +0000
4164+++ nova/notifier/api.py 2011-09-01 10:57:46 +0000
4165@@ -122,4 +122,5 @@
4166 driver.notify(msg)
4167 except Exception, e:
4168 LOG.exception(_("Problem '%(e)s' attempting to "
4169- "send to notification system." % locals()))
4170+ "send to notification system. Payload=%(payload)s" %
4171+ locals()))
4172
4173=== modified file 'nova/quota.py'
4174--- nova/quota.py 2011-08-17 07:41:17 +0000
4175+++ nova/quota.py 2011-09-01 10:57:46 +0000
4176@@ -116,8 +116,9 @@
4177 allowed_gigabytes = _get_request_allotment(requested_gigabytes,
4178 used_gigabytes,
4179 quota['gigabytes'])
4180- allowed_volumes = min(allowed_volumes,
4181- int(allowed_gigabytes // size))
4182+ if size != 0:
4183+ allowed_volumes = min(allowed_volumes,
4184+ int(allowed_gigabytes // size))
4185 return min(requested_volumes, allowed_volumes)
4186
4187
4188
4189=== modified file 'nova/rpc/__init__.py'
4190--- nova/rpc/__init__.py 2011-07-29 19:08:59 +0000
4191+++ nova/rpc/__init__.py 2011-09-01 10:57:46 +0000
4192@@ -23,44 +23,35 @@
4193
4194 FLAGS = flags.FLAGS
4195 flags.DEFINE_string('rpc_backend',
4196- 'nova.rpc.amqp',
4197- "The messaging module to use, defaults to AMQP.")
4198-
4199-RPCIMPL = import_object(FLAGS.rpc_backend)
4200+ 'nova.rpc.impl_kombu',
4201+ "The messaging module to use, defaults to kombu.")
4202+
4203+_RPCIMPL = None
4204+
4205+
4206+def get_impl():
4207+ """Delay import of rpc_backend until FLAGS are loaded."""
4208+ global _RPCIMPL
4209+ if _RPCIMPL is None:
4210+ _RPCIMPL = import_object(FLAGS.rpc_backend)
4211+ return _RPCIMPL
4212
4213
4214 def create_connection(new=True):
4215- return RPCIMPL.Connection.instance(new=True)
4216-
4217-
4218-def create_consumer(conn, topic, proxy, fanout=False):
4219- if fanout:
4220- return RPCIMPL.FanoutAdapterConsumer(
4221- connection=conn,
4222- topic=topic,
4223- proxy=proxy)
4224- else:
4225- return RPCIMPL.TopicAdapterConsumer(
4226- connection=conn,
4227- topic=topic,
4228- proxy=proxy)
4229-
4230-
4231-def create_consumer_set(conn, consumers):
4232- return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)
4233+ return get_impl().create_connection(new=new)
4234
4235
4236 def call(context, topic, msg):
4237- return RPCIMPL.call(context, topic, msg)
4238+ return get_impl().call(context, topic, msg)
4239
4240
4241 def cast(context, topic, msg):
4242- return RPCIMPL.cast(context, topic, msg)
4243+ return get_impl().cast(context, topic, msg)
4244
4245
4246 def fanout_cast(context, topic, msg):
4247- return RPCIMPL.fanout_cast(context, topic, msg)
4248+ return get_impl().fanout_cast(context, topic, msg)
4249
4250
4251 def multicall(context, topic, msg):
4252- return RPCIMPL.multicall(context, topic, msg)
4253+ return get_impl().multicall(context, topic, msg)
4254
4255=== modified file 'nova/rpc/common.py'
4256--- nova/rpc/common.py 2011-07-29 19:08:59 +0000
4257+++ nova/rpc/common.py 2011-09-01 10:57:46 +0000
4258@@ -1,8 +1,14 @@
4259 from nova import exception
4260+from nova import flags
4261 from nova import log as logging
4262
4263 LOG = logging.getLogger('nova.rpc')
4264
4265+flags.DEFINE_integer('rpc_thread_pool_size', 1024,
4266+ 'Size of RPC thread pool')
4267+flags.DEFINE_integer('rpc_conn_pool_size', 30,
4268+ 'Size of RPC connection pool')
4269+
4270
4271 class RemoteError(exception.Error):
4272 """Signifies that a remote class has raised an exception.
4273
4274=== renamed file 'nova/rpc/amqp.py' => 'nova/rpc/impl_carrot.py'
4275--- nova/rpc/amqp.py 2011-08-16 00:30:13 +0000
4276+++ nova/rpc/impl_carrot.py 2011-09-01 10:57:46 +0000
4277@@ -33,6 +33,7 @@
4278
4279 from carrot import connection as carrot_connection
4280 from carrot import messaging
4281+import eventlet
4282 from eventlet import greenpool
4283 from eventlet import pools
4284 from eventlet import queue
4285@@ -42,21 +43,22 @@
4286 from nova import exception
4287 from nova import fakerabbit
4288 from nova import flags
4289-from nova import log as logging
4290-from nova import utils
4291 from nova.rpc.common import RemoteError, LOG
4292
4293+# Needed for tests
4294+eventlet.monkey_patch()
4295
4296 FLAGS = flags.FLAGS
4297-flags.DEFINE_integer('rpc_thread_pool_size', 1024,
4298- 'Size of RPC thread pool')
4299-flags.DEFINE_integer('rpc_conn_pool_size', 30,
4300- 'Size of RPC connection pool')
4301
4302
4303 class Connection(carrot_connection.BrokerConnection):
4304 """Connection instance object."""
4305
4306+ def __init__(self, *args, **kwargs):
4307+ super(Connection, self).__init__(*args, **kwargs)
4308+ self._rpc_consumers = []
4309+ self._rpc_consumer_thread = None
4310+
4311 @classmethod
4312 def instance(cls, new=True):
4313 """Returns the instance."""
4314@@ -94,13 +96,63 @@
4315 pass
4316 return cls.instance()
4317
4318+ def close(self):
4319+ self.cancel_consumer_thread()
4320+ for consumer in self._rpc_consumers:
4321+ try:
4322+ consumer.close()
4323+ except Exception:
4324+ # ignore all errors
4325+ pass
4326+ self._rpc_consumers = []
4327+ super(Connection, self).close()
4328+
4329+ def consume_in_thread(self):
4330+ """Consumer from all queues/consumers in a greenthread"""
4331+
4332+ consumer_set = ConsumerSet(connection=self,
4333+ consumer_list=self._rpc_consumers)
4334+
4335+ def _consumer_thread():
4336+ try:
4337+ consumer_set.wait()
4338+ except greenlet.GreenletExit:
4339+ return
4340+ if self._rpc_consumer_thread is None:
4341+ self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
4342+ return self._rpc_consumer_thread
4343+
4344+ def cancel_consumer_thread(self):
4345+ """Cancel a consumer thread"""
4346+ if self._rpc_consumer_thread is not None:
4347+ self._rpc_consumer_thread.kill()
4348+ try:
4349+ self._rpc_consumer_thread.wait()
4350+ except greenlet.GreenletExit:
4351+ pass
4352+ self._rpc_consumer_thread = None
4353+
4354+ def create_consumer(self, topic, proxy, fanout=False):
4355+ """Create a consumer that calls methods in the proxy"""
4356+ if fanout:
4357+ consumer = FanoutAdapterConsumer(
4358+ connection=self,
4359+ topic=topic,
4360+ proxy=proxy)
4361+ else:
4362+ consumer = TopicAdapterConsumer(
4363+ connection=self,
4364+ topic=topic,
4365+ proxy=proxy)
4366+ self._rpc_consumers.append(consumer)
4367+
4368
4369 class Pool(pools.Pool):
4370 """Class that implements a Pool of Connections."""
4371
4372 # TODO(comstud): Timeout connections not used in a while
4373 def create(self):
4374- LOG.debug('Creating new connection')
4375+ LOG.debug('Pool creating new connection')
4376 return Connection.instance(new=True)
4377
4378 # Create a ConnectionPool to use for RPC calls. We'll order the
4379@@ -119,25 +171,34 @@
4380 """
4381
4382 def __init__(self, *args, **kwargs):
4383- for i in xrange(FLAGS.rabbit_max_retries):
4384- if i > 0:
4385- time.sleep(FLAGS.rabbit_retry_interval)
4386+ max_retries = FLAGS.rabbit_max_retries
4387+ sleep_time = FLAGS.rabbit_retry_interval
4388+ tries = 0
4389+ while True:
4390+ tries += 1
4391+ if tries > 1:
4392+ time.sleep(sleep_time)
4393+ # backoff for next retry attempt.. if there is one
4394+ sleep_time += FLAGS.rabbit_retry_backoff
4395+ if sleep_time > 30:
4396+ sleep_time = 30
4397 try:
4398 super(Consumer, self).__init__(*args, **kwargs)
4399 self.failed_connection = False
4400 break
4401 except Exception as e: # Catching all because carrot sucks
4402+ self.failed_connection = True
4403+ if max_retries > 0 and tries == max_retries:
4404+ break
4405 fl_host = FLAGS.rabbit_host
4406 fl_port = FLAGS.rabbit_port
4407- fl_intv = FLAGS.rabbit_retry_interval
4408+ fl_intv = sleep_time
4409 LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
4410 ' unreachable: %(e)s. Trying again in %(fl_intv)d'
4411 ' seconds.') % locals())
4412- self.failed_connection = True
4413 if self.failed_connection:
4414 LOG.error(_('Unable to connect to AMQP server '
4415- 'after %d tries. Shutting down.'),
4416- FLAGS.rabbit_max_retries)
4417+ 'after %(tries)d tries. Shutting down.') % locals())
4418 sys.exit(1)
4419
4420 def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
4421@@ -166,12 +227,6 @@
4422 LOG.exception(_('Failed to fetch message from queue: %s' % e))
4423 self.failed_connection = True
4424
4425- def attach_to_eventlet(self):
4426- """Only needed for unit tests!"""
4427- timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
4428- timer.start(0.1)
4429- return timer
4430-
4431
4432 class AdapterConsumer(Consumer):
4433 """Calls methods on a proxy object based on method and args."""
4434@@ -242,7 +297,7 @@
4435 # NOTE(vish): this iterates through the generator
4436 list(rval)
4437 except Exception as e:
4438- logging.exception('Exception during message handling')
4439+ LOG.exception('Exception during message handling')
4440 if msg_id:
4441 msg_reply(msg_id, None, sys.exc_info())
4442 return
4443@@ -520,6 +575,11 @@
4444 yield result
4445
4446
4447+def create_connection(new=True):
4448+ """Create a connection"""
4449+ return Connection.instance(new=new)
4450+
4451+
4452 def call(context, topic, msg):
4453 """Sends a message on a topic and wait for a response."""
4454 rv = multicall(context, topic, msg)
4455
4456=== added file 'nova/rpc/impl_kombu.py'
4457--- nova/rpc/impl_kombu.py 1970-01-01 00:00:00 +0000
4458+++ nova/rpc/impl_kombu.py 2011-09-01 10:57:46 +0000
4459@@ -0,0 +1,781 @@
4460+# vim: tabstop=4 shiftwidth=4 softtabstop=4
4461+
4462+# Copyright 2011 OpenStack LLC
4463+#
4464+# Licensed under the Apache License, Version 2.0 (the "License"); you may
4465+# not use this file except in compliance with the License. You may obtain
4466+# a copy of the License at
4467+#
4468+# http://www.apache.org/licenses/LICENSE-2.0
4469+#
4470+# Unless required by applicable law or agreed to in writing, software
4471+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
4472+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
4473+# License for the specific language governing permissions and limitations
4474+# under the License.
4475+
4476+import kombu
4477+import kombu.entity
4478+import kombu.messaging
4479+import kombu.connection
4480+import itertools
4481+import sys
4482+import time
4483+import traceback
4484+import types
4485+import uuid
4486+
4487+import eventlet
4488+from eventlet import greenpool
4489+from eventlet import pools
4490+import greenlet
4491+
4492+from nova import context
4493+from nova import exception
4494+from nova import flags
4495+from nova.rpc.common import RemoteError, LOG
4496+
4497+# Needed for tests
4498+eventlet.monkey_patch()
4499+
4500+FLAGS = flags.FLAGS
4501+
4502+
4503+class ConsumerBase(object):
4504+ """Consumer base class."""
4505+
4506+ def __init__(self, channel, callback, tag, **kwargs):
4507+ """Declare a queue on an amqp channel.
4508+
4509+ 'channel' is the amqp channel to use
4510+ 'callback' is the callback to call when messages are received
4511+ 'tag' is a unique ID for the consumer on the channel
4512+
4513+ queue name, exchange name, and other kombu options are
4514+ passed in here as a dictionary.
4515+ """
4516+ self.callback = callback
4517+ self.tag = str(tag)
4518+ self.kwargs = kwargs
4519+ self.queue = None
4520+ self.reconnect(channel)
4521+
4522+ def reconnect(self, channel):
4523+ """Re-declare the queue after a rabbit reconnect"""
4524+ self.channel = channel
4525+ self.kwargs['channel'] = channel
4526+ self.queue = kombu.entity.Queue(**self.kwargs)
4527+ self.queue.declare()
4528+
4529+ def consume(self, *args, **kwargs):
4530+ """Actually declare the consumer on the amqp channel. This will
4531+ start the flow of messages from the queue. Using the
4532+ Connection.iterconsume() iterator will process the messages,
4533+ calling the appropriate callback.
4534+
4535+ If a callback is specified in kwargs, use that. Otherwise,
4536+ use the callback passed during __init__()
4537+
4538+ If kwargs['nowait'] is True, then this call will block until
4539+ a message is read.
4540+
4541+ Messages will automatically be acked if the callback doesn't
4542+ raise an exception
4543+ """
4544+
4545+ options = {'consumer_tag': self.tag}
4546+ options['nowait'] = kwargs.get('nowait', False)
4547+ callback = kwargs.get('callback', self.callback)
4548+ if not callback:
4549+ raise ValueError("No callback defined")
4550+
4551+ def _callback(raw_message):
4552+ message = self.channel.message_to_python(raw_message)
4553+ callback(message.payload)
4554+ message.ack()
4555+
4556+ self.queue.consume(*args, callback=_callback, **options)
4557+
4558+ def cancel(self):
4559+ """Cancel the consuming from the queue, if it has started"""
4560+ try:
4561+ self.queue.cancel(self.tag)
4562+ except KeyError, e:
4563+ # NOTE(comstud): Kludge to get around a amqplib bug
4564+ if str(e) != "u'%s'" % self.tag:
4565+ raise
4566+ self.queue = None
4567+
4568+
4569+class DirectConsumer(ConsumerBase):
4570+ """Queue/consumer class for 'direct'"""
4571+
4572+ def __init__(self, channel, msg_id, callback, tag, **kwargs):
4573+ """Init a 'direct' queue.
4574+
4575+ 'channel' is the amqp channel to use
4576+ 'msg_id' is the msg_id to listen on
4577+ 'callback' is the callback to call when messages are received
4578+ 'tag' is a unique ID for the consumer on the channel
4579+
4580+ Other kombu options may be passed
4581+ """
4582+ # Default options
4583+ options = {'durable': False,
4584+ 'auto_delete': True,
4585+ 'exclusive': True}
4586+ options.update(kwargs)
4587+ exchange = kombu.entity.Exchange(
4588+ name=msg_id,
4589+ type='direct',
4590+ durable=options['durable'],
4591+ auto_delete=options['auto_delete'])
4592+ super(DirectConsumer, self).__init__(
4593+ channel,
4594+ callback,
4595+ tag,
4596+ name=msg_id,
4597+ exchange=exchange,
4598+ routing_key=msg_id,
4599+ **options)
4600+
4601+
4602+class TopicConsumer(ConsumerBase):
4603+ """Consumer class for 'topic'"""
4604+
4605+ def __init__(self, channel, topic, callback, tag, **kwargs):
4606+ """Init a 'topic' queue.
4607+
4608+ 'channel' is the amqp channel to use
4609+ 'topic' is the topic to listen on
4610+ 'callback' is the callback to call when messages are received
4611+ 'tag' is a unique ID for the consumer on the channel
4612+
4613+ Other kombu options may be passed
4614+ """
4615+ # Default options
4616+ options = {'durable': FLAGS.rabbit_durable_queues,
4617+ 'auto_delete': False,
4618+ 'exclusive': False}
4619+ options.update(kwargs)
4620+ exchange = kombu.entity.Exchange(
4621+ name=FLAGS.control_exchange,
4622+ type='topic',
4623+ durable=options['durable'],
4624+ auto_delete=options['auto_delete'])
4625+ super(TopicConsumer, self).__init__(
4626+ channel,
4627+ callback,
4628+ tag,
4629+ name=topic,
4630+ exchange=exchange,
4631+ routing_key=topic,
4632+ **options)
4633+
4634+
4635+class FanoutConsumer(ConsumerBase):
4636+ """Consumer class for 'fanout'"""
4637+
4638+ def __init__(self, channel, topic, callback, tag, **kwargs):
4639+ """Init a 'fanout' queue.
4640+
4641+ 'channel' is the amqp channel to use
4642+ 'topic' is the topic to listen on
4643+ 'callback' is the callback to call when messages are received
4644+ 'tag' is a unique ID for the consumer on the channel
4645+
4646+ Other kombu options may be passed
4647+ """
4648+ unique = uuid.uuid4().hex
4649+ exchange_name = '%s_fanout' % topic
4650+ queue_name = '%s_fanout_%s' % (topic, unique)
4651+
4652+ # Default options
4653+ options = {'durable': False,
4654+ 'auto_delete': True,
4655+ 'exclusive': True}
4656+ options.update(kwargs)
4657+ exchange = kombu.entity.Exchange(
4658+ name=exchange_name,
4659+ type='fanout',
4660+ durable=options['durable'],
4661+ auto_delete=options['auto_delete'])
4662+ super(FanoutConsumer, self).__init__(
4663+ channel,
4664+ callback,
4665+ tag,
4666+ name=queue_name,
4667+ exchange=exchange,
4668+ routing_key=topic,
4669+ **options)
4670+
4671+
4672+class Publisher(object):
4673+ """Base Publisher class"""
4674+
4675+ def __init__(self, channel, exchange_name, routing_key, **kwargs):
4676+ """Init the Publisher class with the exchange_name, routing_key,
4677+ and other options
4678+ """
4679+ self.exchange_name = exchange_name
4680+ self.routing_key = routing_key
4681+ self.kwargs = kwargs
4682+ self.reconnect(channel)
4683+
4684+ def reconnect(self, channel):
4685+ """Re-establish the Producer after a rabbit reconnection"""
4686+ self.exchange = kombu.entity.Exchange(name=self.exchange_name,
4687+ **self.kwargs)
4688+ self.producer = kombu.messaging.Producer(exchange=self.exchange,
4689+ channel=channel, routing_key=self.routing_key)
4690+
4691+ def send(self, msg):
4692+ """Send a message"""
4693+ self.producer.publish(msg)
4694+
4695+
4696+class DirectPublisher(Publisher):
4697+ """Publisher class for 'direct'"""
4698+ def __init__(self, channel, msg_id, **kwargs):
4699+ """init a 'direct' publisher.
4700+
4701+ Kombu options may be passed as keyword args to override defaults
4702+ """
4703+
4704+ options = {'durable': False,
4705+ 'auto_delete': True,
4706+ 'exclusive': True}
4707+ options.update(kwargs)
4708+ super(DirectPublisher, self).__init__(channel,
4709+ msg_id,
4710+ msg_id,
4711+ type='direct',
4712+ **options)
4713+
4714+
4715+class TopicPublisher(Publisher):
4716+ """Publisher class for 'topic'"""
4717+ def __init__(self, channel, topic, **kwargs):
4718+ """init a 'topic' publisher.
4719+
4720+ Kombu options may be passed as keyword args to override defaults
4721+ """
4722+ options = {'durable': FLAGS.rabbit_durable_queues,
4723+ 'auto_delete': False,
4724+ 'exclusive': False}
4725+ options.update(kwargs)
4726+ super(TopicPublisher, self).__init__(channel,
4727+ FLAGS.control_exchange,
4728+ topic,
4729+ type='topic',
4730+ **options)
4731+
4732+
4733+class FanoutPublisher(Publisher):
4734+ """Publisher class for 'fanout'"""
4735+ def __init__(self, channel, topic, **kwargs):
4736+ """init a 'fanout' publisher.
4737+
4738+ Kombu options may be passed as keyword args to override defaults
4739+ """
4740+ options = {'durable': False,
4741+ 'auto_delete': True,
4742+ 'exclusive': True}
4743+ options.update(kwargs)
4744+ super(FanoutPublisher, self).__init__(channel,
4745+ '%s_fanout' % topic,
4746+ None,
4747+ type='fanout',
4748+ **options)
4749+
4750+
4751+class Connection(object):
4752+ """Connection object."""
4753+
4754+ def __init__(self):
4755+ self.consumers = []
4756+ self.consumer_thread = None
4757+ self.max_retries = FLAGS.rabbit_max_retries
4758+ # Try forever?
4759+ if self.max_retries <= 0:
4760+ self.max_retries = None
4761+ self.interval_start = FLAGS.rabbit_retry_interval
4762+ self.interval_stepping = FLAGS.rabbit_retry_backoff
4763+ # max retry-interval = 30 seconds
4764+ self.interval_max = 30
4765+ self.memory_transport = False
4766+
4767+ self.params = dict(hostname=FLAGS.rabbit_host,
4768+ port=FLAGS.rabbit_port,
4769+ userid=FLAGS.rabbit_userid,
4770+ password=FLAGS.rabbit_password,
4771+ virtual_host=FLAGS.rabbit_virtual_host)
4772+ if FLAGS.fake_rabbit:
4773+ self.params['transport'] = 'memory'
4774+ self.memory_transport = True
4775+ else:
4776+ self.memory_transport = False
4777+ self.connection = None
4778+ self.reconnect()
4779+
4780+ def reconnect(self):
4781+ """Handles reconnecting and re-estblishing queues"""
4782+ if self.connection:
4783+ try:
4784+ self.connection.close()
4785+ except self.connection.connection_errors:
4786+ pass
4787+ time.sleep(1)
4788+ self.connection = kombu.connection.BrokerConnection(**self.params)
4789+ if self.memory_transport:
4790+ # Kludge to speed up tests.
4791+ self.connection.transport.polling_interval = 0.0
4792+ self.consumer_num = itertools.count(1)
4793+
4794+ try:
4795+ self.connection.ensure_connection(errback=self.connect_error,
4796+ max_retries=self.max_retries,
4797+ interval_start=self.interval_start,
4798+ interval_step=self.interval_stepping,
4799+ interval_max=self.interval_max)
4800+ except self.connection.connection_errors, e:
4801+ # We should only get here if max_retries is set. We'll go
4802+ # ahead and exit in this case.
4803+ err_str = str(e)
4804+ max_retries = self.max_retries
4805+ LOG.error(_('Unable to connect to AMQP server '
4806+ 'after %(max_retries)d tries: %(err_str)s') % locals())
4807+ sys.exit(1)
4808+ LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
4809+ self.params))
4810+ self.channel = self.connection.channel()
4811+ # work around 'memory' transport bug in 1.1.3
4812+ if self.memory_transport:
4813+ self.channel._new_queue('ae.undeliver')
4814+ for consumer in self.consumers:
4815+ consumer.reconnect(self.channel)
4816+ if self.consumers:
4817+ LOG.debug(_("Re-established AMQP queues"))
4818+
4819+ def get_channel(self):
4820+ """Convenience call for bin/clear_rabbit_queues"""
4821+ return self.channel
4822+
4823+ def connect_error(self, exc, interval):
4824+ """Callback when there are connection re-tries by kombu"""
4825+ info = self.params.copy()
4826+ info['intv'] = interval
4827+ info['e'] = exc
4828+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is'
4829+ ' unreachable: %(e)s. Trying again in %(intv)d'
4830+ ' seconds.') % info)
4831+
4832+ def close(self):
4833+ """Close/release this connection"""
4834+ self.cancel_consumer_thread()
4835+ self.connection.release()
4836+ self.connection = None
4837+
4838+ def reset(self):
4839+ """Reset a connection so it can be used again"""
4840+ self.cancel_consumer_thread()
4841+ self.channel.close()
4842+ self.channel = self.connection.channel()
4843+ # work around 'memory' transport bug in 1.1.3
4844+ if self.memory_transport:
4845+ self.channel._new_queue('ae.undeliver')
4846+ self.consumers = []
4847+
4848+ def declare_consumer(self, consumer_cls, topic, callback):
4849+ """Create a Consumer using the class that was passed in and
4850+ add it to our list of consumers
4851+ """
4852+ consumer = consumer_cls(self.channel, topic, callback,
4853+ self.consumer_num.next())
4854+ self.consumers.append(consumer)
4855+ return consumer
4856+
4857+ def iterconsume(self, limit=None):
4858+ """Return an iterator that will consume from all queues/consumers"""
4859+ while True:
4860+ try:
4861+ queues_head = self.consumers[:-1]
4862+ queues_tail = self.consumers[-1]
4863+ for queue in queues_head:
4864+ queue.consume(nowait=True)
4865+ queues_tail.consume(nowait=False)
4866+
4867+ for iteration in itertools.count(0):
4868+ if limit and iteration >= limit:
4869+ raise StopIteration
4870+ yield self.connection.drain_events()
4871+ except self.connection.connection_errors, e:
4872+ LOG.exception(_('Failed to consume message from queue: '
4873+ '%s' % str(e)))
4874+ self.reconnect()
4875+
4876+ def cancel_consumer_thread(self):
4877+ """Cancel a consumer thread"""
4878+ if self.consumer_thread is not None:
4879+ self.consumer_thread.kill()
4880+ try:
4881+ self.consumer_thread.wait()
4882+ except greenlet.GreenletExit:
4883+ pass
4884+ self.consumer_thread = None
4885+
4886+ def publisher_send(self, cls, topic, msg):
4887+ """Send to a publisher based on the publisher class"""
4888+ while True:
4889+ publisher = None
4890+ try:
4891+ publisher = cls(self.channel, topic)
4892+ publisher.send(msg)
4893+ return
4894+ except self.connection.connection_errors, e:
4895+ LOG.exception(_('Failed to publish message %s' % str(e)))
4896+ try:
4897+ self.reconnect()
4898+ if publisher:
4899+ publisher.reconnect(self.channel)
4900+ except self.connection.connection_errors, e:
4901+ pass
4902+
4903+ def declare_direct_consumer(self, topic, callback):
4904+ """Create a 'direct' queue.
4905+ In nova's use, this is generally a msg_id queue used for
4906+ responses for call/multicall
4907+ """
4908+ self.declare_consumer(DirectConsumer, topic, callback)
4909+
4910+ def declare_topic_consumer(self, topic, callback=None):
4911+ """Create a 'topic' consumer."""
4912+ self.declare_consumer(TopicConsumer, topic, callback)
4913+
4914+ def declare_fanout_consumer(self, topic, callback):
4915+ """Create a 'fanout' consumer"""
4916+ self.declare_consumer(FanoutConsumer, topic, callback)
4917+
4918+ def direct_send(self, msg_id, msg):
4919+ """Send a 'direct' message"""
4920+ self.publisher_send(DirectPublisher, msg_id, msg)
4921+
4922+ def topic_send(self, topic, msg):
4923+ """Send a 'topic' message"""
4924+ self.publisher_send(TopicPublisher, topic, msg)
4925+
4926+ def fanout_send(self, topic, msg):
4927+ """Send a 'fanout' message"""
4928+ self.publisher_send(FanoutPublisher, topic, msg)
4929+
4930+ def consume(self, limit=None):
4931+ """Consume from all queues/consumers"""
4932+ it = self.iterconsume(limit=limit)
4933+ while True:
4934+ try:
4935+ it.next()
4936+ except StopIteration:
4937+ return
4938+
4939+ def consume_in_thread(self):
4940+ """Consumer from all queues/consumers in a greenthread"""
4941+ def _consumer_thread():
4942+ try:
4943+ self.consume()
4944+ except greenlet.GreenletExit:
4945+ return
4946+ if self.consumer_thread is None:
4947+ self.consumer_thread = eventlet.spawn(_consumer_thread)
4948+ return self.consumer_thread
4949+
4950+ def create_consumer(self, topic, proxy, fanout=False):
4951+ """Create a consumer that calls a method in a proxy object"""
4952+ if fanout:
4953+ self.declare_fanout_consumer(topic, ProxyCallback(proxy))
4954+ else:
4955+ self.declare_topic_consumer(topic, ProxyCallback(proxy))
4956+
4957+
4958+class Pool(pools.Pool):
4959+ """Class that implements a Pool of Connections."""
4960+
4961+ # TODO(comstud): Timeout connections not used in a while
4962+ def create(self):
4963+ LOG.debug('Pool creating new connection')
4964+ return Connection()
4965+
4966+# Create a ConnectionPool to use for RPC calls. We'll order the
4967+# pool as a stack (LIFO), so that we can potentially loop through and
4968+# timeout old unused connections at some point
4969+ConnectionPool = Pool(
4970+ max_size=FLAGS.rpc_conn_pool_size,
4971+ order_as_stack=True)
4972+
4973+
4974+class ConnectionContext(object):
4975+ """The class that is actually returned to the caller of
4976+ create_connection(). This is a essentially a wrapper around
4977+ Connection that supports 'with' and can return a new Connection or
4978+ one from a pool. It will also catch when an instance of this class
4979+ is to be deleted so that we can return Connections to the pool on
4980+ exceptions and so forth without making the caller be responsible for
4981+ catching all exceptions and making sure to return a connection to
4982+ the pool.
4983+ """
4984+
4985+ def __init__(self, pooled=True):
4986+ """Create a new connection, or get one from the pool"""
4987+ self.connection = None
4988+ if pooled:
4989+ self.connection = ConnectionPool.get()
4990+ else:
4991+ self.connection = Connection()
4992+ self.pooled = pooled
4993+
4994+ def __enter__(self):
4995+ """with ConnectionContext() should return self"""
4996+ return self
4997+
4998+ def _done(self):
4999+ """If the connection came from a pool, clean it up and put it back.
5000+ If it did not come from a pool, close it.
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches