Merge lp:~cerberus/nova/xs_migrations into lp:~hudson-openstack/nova/trunk

Proposed by Matt Dietz
Status: Merged
Approved by: Matt Dietz
Approved revision: 662
Merged at revision: 762
Proposed branch: lp:~cerberus/nova/xs_migrations
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 1458 lines (+948/-67)
20 files modified
nova/api/openstack/servers.py (+51/-3)
nova/compute/api.py (+43/-0)
nova/compute/manager.py (+106/-0)
nova/db/api.py (+28/-1)
nova/db/sqlalchemy/api.py (+56/-0)
nova/db/sqlalchemy/migrate_repo/versions/009_add_instance_migrations.py (+61/-0)
nova/db/sqlalchemy/migration.py (+1/-1)
nova/db/sqlalchemy/models.py (+13/-1)
nova/rpc.py (+1/-1)
nova/tests/api/openstack/common.py (+35/-0)
nova/tests/api/openstack/test_servers.py (+101/-1)
nova/tests/test_compute.py (+23/-1)
nova/tests/test_xenapi.py (+38/-0)
nova/tests/xenapi/stubs.py (+42/-0)
nova/virt/fake.py (+31/-0)
nova/virt/xenapi/fake.py (+3/-0)
nova/virt/xenapi/vm_utils.py (+58/-27)
nova/virt/xenapi/vmops.py (+121/-29)
nova/virt/xenapi_conn.py (+19/-2)
plugins/xenserver/xenapi/etc/xapi.d/plugins/migration (+117/-0)
To merge this branch: bzr merge lp:~cerberus/nova/xs_migrations
Reviewer Review Type Date Requested Status
Rick Harris (community) Approve
Josh Kearney (community) Approve
Review via email: mp+50404@code.launchpad.net

Description of the change

Implementation for XenServer migrations. There are several places for optimization but I based the current implementation on the chance scheduler just to be safe. Beyond that, a few features are missing, such as ensuring the IP address is transferred along with the migrated instance. This will be added in a subsequent patch. Finally, everything is implemented through the Openstack API resize hooks, but actual resizing of the instance RAM and hard drive space is not yet implemented.

To post a comment you must log in.
Revision history for this message
Rick Harris (rconradharris) wrote :
Download full text (4.5 KiB)

== Meta ==
First of all, really nice job. There's a lot of code here, so I am going to review this in passes, starting from the bottom (code I'm a bit more familiar with).

I looks like you and I are sharing quite a bit in terms of utility code for working with VHDs, executing commands from subprocess, etc. We should definitely get together and see if we can create a useful library we can both share. Perhaps this can be phase 2 of both of our patches.

== Review ==

1397 +def get_sr_path(session):
1398 + sr_ref = find_sr(session)
1399 +
1400 + if sr_ref is None:
1401 + raise Exception('Cannot find SR to read VDI from')
1402 +
1403 + sr_rec = session.xenapi.SR.get_record(sr_ref)
1404 + sr_uuid = sr_rec["uuid"]
1405 + sr_path = os.path.join(FILE_SR_PATH, sr_uuid)
1406 + return sr_path
1407 +
1408 +def find_sr(session):
1409 + host = get_this_host(session)
1410 + srs = session.xenapi.SR.get_all()
1411 + for sr in srs:
1412 + sr_rec = session.xenapi.SR.get_record(sr)
1413 + if not ('i18n-key' in sr_rec['other_config'] and
1414 + sr_rec['other_config']['i18n-key'] == 'local-storage'):
1415 + continue
1416 + for pbd in sr_rec['PBDs']:
1417 + pbd_rec = session.xenapi.PBD.get_record(pbd)
1418 + if pbd_rec['host'] == host:
1419 + return sr
1420 + return None

This code already exists `vm_utils`. Rather than duplicate here, it might be better to pass `sr_path` in as an argument to your plugin functions. (This is the approach I took with xs-unified-images).

This also has the advantage in that I added a FLAG for SR's base_path (/var/run/sr-mount), so it's only specified in one place now.

1491 + rsync_args = ['nohup', RSYNC, '-av', '--progress', '-e', ssh_cmd,
1492 + source_path, dest_path]

Optional, but you can use the `shlex` module to convert a normal command string into the args list. Makes it a little easier to read/deal with.

1499 + if rsync_proc.returncode != 0:
1500 + raise Exception("Unexpected VHD transfer failure")

I notice that we're both using subprocess to execute commands and then trying to handle non-zero exit codes appropriately. I wrote a method called `assert_subprocess_success` in xs-unified-images. It might be worth it to refactor this (and perhaps other similar utility code) to a file we can both share.

1462 + subprocess.call([VHD_UTIL, 'modify', '-n', new_cow_path, '-p',
1463 + new_base_copy_path])

Another candidate for a function we can share. Perhaps we need a `link_vhds` method (this exists in xs-unified-images) inside this utility file. Is pluginlib_nova the right place for this?

1434 + sr_temp_path = "%s/images/" % sr_path

The pattern you're using here of creating a temp_path is similar to what I'm doing (again :). We can definitely re-use some code here. `_make_staging_area` and `cleanup_staging_area` seem like candidates for utility file.

1389 +SSH_HOSTS = '/root/.ssh/known_hosts'
1390 +DEVNULL = '/dev/null'
1391 +KEYSCAN = '/usr/bin/ssh-keyscan'
1392 +RSYNC = '/usr/bin/rsync'

Lots of these are things that don't look likely to be tweaked and don't...

Read more...

review: Needs Fixing
Revision history for this message
Jay Pipes (jaypipes) wrote :

Hi! Please set the merge proposal status to Work In Progress while you work on any changes, and then set back to Needs Review when you've pushed changes. This way, reviewers get notified. Thanks, Matt!

Revision history for this message
Matt Dietz (cerberus) wrote :

Rick: I'm pretty sure we need the default_sr_scan idea because the instance in question doesn't actually exist on the destination machine during the step in which we link the VHDs. It might be possible refactor, but with the way the VM spawn code works, I don't think it would be trivial.

Revision history for this message
Rick Harris (rconradharris) wrote :

> 957 + # TODO(mdietz): replace this with the flag once unified-images merges
> 958 + return '/var/run/sr-mount/%s' % cls.get_sr(session, sr_label)

xs-unified is merged now. :-)

> 968 + @classmethod
> 969 + def scan_sr(cls, session, instance_id=None, sr_ref=None):
> 970 + """Scans the SR specified by sr_ref"""
> 971 + if sr_ref:
> 972 + LOG.debug(_("Re-scanning SR %s"), sr_ref)
> 973 + task = session.call_xenapi('Async.SR.scan', sr_ref)
> 974 + session.wait_for_task(instance_id, task)

Not sure I understand what's going on here. It looks like, sr_ref isn't
required; if it's not passed, this is a NOOP, if it is passed, it scans the
sr_ref.

I think scan_sr should take sr_ref as a positional, required argument and then
always scan the desired sr. Since `wait for task` takes an instance_id, that
needs to be required as well.

> I'm pretty sure we need the default_sr_scan idea because the instance in question doesn't actually exist on the destination machine

Hmm, I'm not sure I understand here. Is this because of the instance_id
argument?

> 948 + def get_sr(cls, session, sr_label='slices'):

Since multiple SRs can exist on a XS machine, I think it might be better to
call it `get_default_sr` for now to clarify that this is just one of many and
in some way a 'special' SR.

> 1293 - 'x-image-meta-type': 'vhd'
> 1294 - }
> 1295 + 'x-image-meta-type': 'vhd', }

Not sure about this change. Seems messy to leave that spurious comma in there
(even though it's syntactically valid).

> 1224 + task = None

Line is unecessary since `task` is guaranteed to be defined by if/else block.

> 1192 + 'new_cow_uuid': new_cow_uuid,

Should probably be using one space here, 'key': 'value'.

PEP8:
    - More than one space around an assignment (or other) operator to
      align it with another.

      Yes:

          x = 1
          y = 2
          long_variable = 3

      No:

          x = 1
          y = 2
          long_variable = 3

> 1093 + with self._get_snapshot(instance) as snapshot:

I'm mixed on this, context-managers are cool and all, but in this case,
creating the Snapshot class just to create a contextmanager seems like
overkill.

I like the idea of having a Snapshot class, but, I'd prefer for it inherit much
more of the functionality to create/destroy/upload snapshots (basically OOify
more of the code). Thoughts?

1071 + #Have a look at the VDI and see if it has a PV kernel

Femto-nit: needs a space betwee # and first letter of comment

> 104 + raise exception.Error(_("No finished migrations found for \
> 105 + instance"))

Should this be exception.NotFound()?

Revision history for this message
Matt Dietz (cerberus) wrote :

Wow, good feedback

Regarding the scan_default_sr concept, yes, I'm referring to the instance_id. Specifically the lack of an actual instance to infer the SR from or associate with wait_for_task when rescanning the SR.

I agree there would be a lot of value in a dedicated snapshot class. I'm hesitant to create one because it would likely be a substantial patch to get right (Since we would want to implement it abstractly with virt specific calls.) Could we agree to a refactor bug/blueprint down the road that makes better sense of this? If you'd like, I can back out the context manager change. I was simply trying to avoid having to try finally the snapshot deletion, as it seemed uglier than I what I did.

> 1295 + 'x-image-meta-type': 'vhd', }

Regarding this, I make this for two reasons. It saves me from stupid syntax errors when I add something to the list, and there's a lower chance that PEP8 is going to find extra whitespace at the end of the line. I could really go either way on that though.

I'll get on making the rest of the changes soon. Thanks!

Revision history for this message
Rick Harris (rconradharris) wrote :

> Regarding the scan_default_sr concept, yes, I'm referring to the instance_id

I'm sure there is a better way here. In an ideal world, I'd say we pair and try to come up with a solution to this, but, I think we're *both* pressed for time at the moment by other BPs. So, maybe we just throw a 'FIXME(sirp): refactor scan_default_sr in there', and come back to this.

I have this feeling we're starting to build up some technical debt in the XenAPI virt-layer, so, I'm hoping to do some cleanup refactorings in the coming weeks (OO-ification, etc).

> If you'd like, I can back out the context manager change.

I'd vote we back out the change, but I'm just one person. I'm curious other reviewers thing about this?

> It saves me from stupid syntax errors when I add something to the list, and there's a lower chance that PEP8 is going to find extra whitespace at the end of the line

Yeah, I've seen this done in other code bases for exactly this reason. Personally, I don't like the practice, but that' just personal preference, certainly not something that should be decree'd.

I think my real concern is that this isn't a Nova-wide convention. One of the strengths of our current code-base, IMHO, is that we have a pretty consistent style, and that greatly enhances its readability. Having some dicts use the trailing-comma and some not is a small increase in entropy, and that's a (small) lose.

If you think this is a good-style convention, I'd recommend that you propose this as a style-guideline and we debate it on the ML.

Revision history for this message
Josh Kearney (jk0) wrote :

Couple things I noticed -- the current migration number is at 008, so you'll want to move yours to 009. I'd also merge trunk and re-push since there have been some changes to vmops. _shutdown() will merge nicely with your change. There's also a _start() method now, but your _power_on() handles exceptions better, so I'd rename your method to _start() (since _start() is called at several places in vmops).

Revision history for this message
Josh Kearney (jk0) wrote :

Just a couple things:

930 + template_vm_ref = session.wait_for_task(instance_id, task)
974 + session.wait_for_task(instance_id, task)
1158 + self._session.wait_for_task(instance.id, task)
1170 + self._session.wait_for_task(instance.id, task)
1198 + self._session.wait_for_task(instance.id, task)

wait_for_task() now takes the `task` as the first arg and `id` as the second (so id can be None by default). This is likely why your unit tests were failing. You'll want to reverse the args on the lines above (wait_for_task(task, instance.id)) -- also may want to double check for others that I may have missed in your branch.

Other than that, I'd probably get rid of the extra space padding around your docstrings """[ ]Lorem Ipsum[ ]""", but that's mostly just a personal preference. Reason I like doing that is otherwise the extra spaces have to be stripped when you call __doc__.

review: Needs Fixing
Revision history for this message
Josh Kearney (jk0) wrote :

Looks great!

review: Approve
Revision history for this message
Rick Harris (rconradharris) wrote :

Code looks solid.

I think there is some room to refactor both the 'xs-unified' and 'xs-migration' branches to use some common code (quite a bit I think).

That said, getting some test failures (re-iterating here to capture this): http://paste.openstack.org/show/810/

review: Needs Fixing
Revision history for this message
Matt Dietz (cerberus) wrote :

Rick: I fixed the tests in question

Revision history for this message
Rick Harris (rconradharris) wrote :

lgtm

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'nova/api/openstack/servers.py'
2--- nova/api/openstack/servers.py 2011-03-01 16:53:19 +0000
3+++ nova/api/openstack/servers.py 2011-03-07 17:23:15 +0000
4@@ -203,10 +203,58 @@
5 return exc.HTTPNoContent()
6
7 def action(self, req, id):
8- """ Multi-purpose method used to reboot, rebuild, and
9- resize a server """
10+ """Multi-purpose method used to reboot, rebuild, or
11+ resize a server"""
12+
13+ actions = {
14+ 'reboot': self._action_reboot,
15+ 'resize': self._action_resize,
16+ 'confirmResize': self._action_confirm_resize,
17+ 'revertResize': self._action_revert_resize,
18+ 'rebuild': self._action_rebuild,
19+ }
20+
21 input_dict = self._deserialize(req.body, req)
22- #TODO(sandy): rebuild/resize not supported.
23+ for key in actions.keys():
24+ if key in input_dict:
25+ return actions[key](input_dict, req, id)
26+ return faults.Fault(exc.HTTPNotImplemented())
27+
28+ def _action_confirm_resize(self, input_dict, req, id):
29+ try:
30+ self.compute_api.confirm_resize(req.environ['nova.context'], id)
31+ except Exception, e:
32+ LOG.exception(_("Error in confirm-resize %s"), e)
33+ return faults.Fault(exc.HTTPBadRequest())
34+ return exc.HTTPNoContent()
35+
36+ def _action_revert_resize(self, input_dict, req, id):
37+ try:
38+ self.compute_api.revert_resize(req.environ['nova.context'], id)
39+ except Exception, e:
40+ LOG.exception(_("Error in revert-resize %s"), e)
41+ return faults.Fault(exc.HTTPBadRequest())
42+ return exc.HTTPAccepted()
43+
44+ def _action_rebuild(self, input_dict, req, id):
45+ return faults.Fault(exc.HTTPNotImplemented())
46+
47+ def _action_resize(self, input_dict, req, id):
48+ """ Resizes a given instance to the flavor size requested """
49+ try:
50+ if 'resize' in input_dict and 'flavorId' in input_dict['resize']:
51+ flavor_id = input_dict['resize']['flavorId']
52+ self.compute_api.resize(req.environ['nova.context'], id,
53+ flavor_id)
54+ else:
55+ LOG.exception(_("Missing arguments for resize"))
56+ return faults.Fault(exc.HTTPUnprocessableEntity())
57+ except Exception, e:
58+ LOG.exception(_("Error in resize %s"), e)
59+ return faults.Fault(exc.HTTPBadRequest())
60+ return faults.Fault(exc.HTTPAccepted())
61+
62+ def _action_reboot(self, input_dict, req, id):
63 try:
64 reboot_type = input_dict['reboot']['type']
65 except Exception:
66
67=== modified file 'nova/compute/api.py'
68--- nova/compute/api.py 2011-03-03 18:21:54 +0000
69+++ nova/compute/api.py 2011-03-07 17:23:15 +0000
70@@ -404,6 +404,10 @@
71 kwargs = {'method': method, 'args': params}
72 return rpc.call(context, queue, kwargs)
73
74+ def _cast_scheduler_message(self, context, args):
75+ """Generic handler for RPC calls to the scheduler"""
76+ rpc.cast(context, FLAGS.scheduler_topic, args)
77+
78 def snapshot(self, context, instance_id, name):
79 """Snapshot the given instance.
80
81@@ -420,6 +424,45 @@
82 """Reboot the given instance."""
83 self._cast_compute_message('reboot_instance', context, instance_id)
84
85+ def revert_resize(self, context, instance_id):
86+ """Reverts a resize, deleting the 'new' instance in the process"""
87+ context = context.elevated()
88+ migration_ref = self.db.migration_get_by_instance_and_status(context,
89+ instance_id, 'finished')
90+ if not migration_ref:
91+ raise exception.NotFound(_("No finished migrations found for "
92+ "instance"))
93+
94+ params = {'migration_id': migration_ref['id']}
95+ self._cast_compute_message('revert_resize', context, instance_id,
96+ migration_ref['dest_compute'], params=params)
97+
98+ def confirm_resize(self, context, instance_id):
99+ """Confirms a migration/resize, deleting the 'old' instance in the
100+ process."""
101+ context = context.elevated()
102+ migration_ref = self.db.migration_get_by_instance_and_status(context,
103+ instance_id, 'finished')
104+ if not migration_ref:
105+ raise exception.NotFound(_("No finished migrations found for "
106+ "instance"))
107+ instance_ref = self.db.instance_get(context, instance_id)
108+ params = {'migration_id': migration_ref['id']}
109+ self._cast_compute_message('confirm_resize', context, instance_id,
110+ migration_ref['source_compute'], params=params)
111+
112+ self.db.migration_update(context, migration_id,
113+ {'status': 'confirmed'})
114+ self.db.instance_update(context, instance_id,
115+ {'host': migration_ref['dest_compute'], })
116+
117+ def resize(self, context, instance_id, flavor):
118+ """Resize a running instance."""
119+ self._cast_scheduler_message(context,
120+ {"method": "prep_resize",
121+ "args": {"topic": FLAGS.compute_topic,
122+ "instance_id": instance_id, }},)
123+
124 def pause(self, context, instance_id):
125 """Pause the given instance."""
126 self._cast_compute_message('pause_instance', context, instance_id)
127
128=== modified file 'nova/compute/manager.py'
129--- nova/compute/manager.py 2011-03-01 19:26:31 +0000
130+++ nova/compute/manager.py 2011-03-07 17:23:15 +0000
131@@ -413,6 +413,112 @@
132
133 @exception.wrap_exception
134 @checks_instance_lock
135+ def confirm_resize(self, context, instance_id, migration_id):
136+ """Destroys the source instance"""
137+ context = context.elevated()
138+ instance_ref = self.db.instance_get(context, instance_id)
139+ migration_ref = self.db.migration_get(context, migration_id)
140+ self.driver.destroy(instance_ref)
141+
142+ @exception.wrap_exception
143+ @checks_instance_lock
144+ def revert_resize(self, context, instance_id, migration_id):
145+ """Destroys the new instance on the destination machine,
146+ reverts the model changes, and powers on the old
147+ instance on the source machine"""
148+ instance_ref = self.db.instance_get(context, instance_id)
149+ migration_ref = self.db.migration_get(context, migration_id)
150+
151+ #TODO(mdietz): we may want to split these into separate methods.
152+ if migration_ref['source_compute'] == FLAGS.host:
153+ self.driver._start(instance_ref)
154+ self.db.migration_update(context, migration_id,
155+ {'status': 'reverted'})
156+ else:
157+ self.driver.destroy(instance_ref)
158+ topic = self.db.queue_get_for(context, FLAGS.compute_topic,
159+ instance_ref['host'])
160+ rpc.cast(context, topic,
161+ {'method': 'revert_resize',
162+ 'args': {
163+ 'migration_id': migration_ref['id'],
164+ 'instance_id': instance_id, },
165+ })
166+
167+ @exception.wrap_exception
168+ @checks_instance_lock
169+ def prep_resize(self, context, instance_id):
170+ """Initiates the process of moving a running instance to another
171+ host, possibly changing the RAM and disk size in the process"""
172+ context = context.elevated()
173+ instance_ref = self.db.instance_get(context, instance_id)
174+ if instance_ref['host'] == FLAGS.host:
175+ raise exception.Error(_(
176+ 'Migration error: destination same as source!'))
177+
178+ migration_ref = self.db.migration_create(context,
179+ {'instance_id': instance_id,
180+ 'source_compute': instance_ref['host'],
181+ 'dest_compute': FLAGS.host,
182+ 'dest_host': self.driver.get_host_ip_addr(),
183+ 'status': 'pre-migrating'})
184+ LOG.audit(_('instance %s: migrating to '), instance_id,
185+ context=context)
186+ topic = self.db.queue_get_for(context, FLAGS.compute_topic,
187+ instance_ref['host'])
188+ rpc.cast(context, topic,
189+ {'method': 'resize_instance',
190+ 'args': {
191+ 'migration_id': migration_ref['id'],
192+ 'instance_id': instance_id, },
193+ })
194+
195+ @exception.wrap_exception
196+ @checks_instance_lock
197+ def resize_instance(self, context, instance_id, migration_id):
198+ """Starts the migration of a running instance to another host"""
199+ migration_ref = self.db.migration_get(context, migration_id)
200+ instance_ref = self.db.instance_get(context, instance_id)
201+ self.db.migration_update(context, migration_id,
202+ {'status': 'migrating', })
203+
204+ disk_info = self.driver.migrate_disk_and_power_off(instance_ref,
205+ migration_ref['dest_host'])
206+ self.db.migration_update(context, migration_id,
207+ {'status': 'post-migrating', })
208+
209+ #TODO(mdietz): This is where we would update the VM record
210+ #after resizing
211+ service = self.db.service_get_by_host_and_topic(context,
212+ migration_ref['dest_compute'], FLAGS.compute_topic)
213+ topic = self.db.queue_get_for(context, FLAGS.compute_topic,
214+ migration_ref['dest_compute'])
215+ rpc.cast(context, topic,
216+ {'method': 'finish_resize',
217+ 'args': {
218+ 'migration_id': migration_id,
219+ 'instance_id': instance_id,
220+ 'disk_info': disk_info, },
221+ })
222+
223+ @exception.wrap_exception
224+ @checks_instance_lock
225+ def finish_resize(self, context, instance_id, migration_id, disk_info):
226+ """Completes the migration process by setting up the newly transferred
227+ disk and turning on the instance on its new host machine"""
228+ migration_ref = self.db.migration_get(context, migration_id)
229+ instance_ref = self.db.instance_get(context,
230+ migration_ref['instance_id'])
231+
232+ # this may get passed into the following spawn instead
233+ new_disk_info = self.driver.attach_disk(instance_ref, disk_info)
234+ self.driver.spawn(instance_ref, disk=new_disk_info)
235+
236+ self.db.migration_update(context, migration_id,
237+ {'status': 'finished', })
238+
239+ @exception.wrap_exception
240+ @checks_instance_lock
241 def pause_instance(self, context, instance_id):
242 """Pause an instance on this server."""
243 context = context.elevated()
244
245=== modified file 'nova/db/api.py'
246--- nova/db/api.py 2011-02-28 19:44:17 +0000
247+++ nova/db/api.py 2011-03-07 17:23:15 +0000
248@@ -80,10 +80,15 @@
249
250
251 def service_get(context, service_id):
252- """Get an service or raise if it does not exist."""
253+ """Get a service or raise if it does not exist."""
254 return IMPL.service_get(context, service_id)
255
256
257+def service_get_by_host_and_topic(context, host, topic):
258+ """Get a service by host it's on and topic it listens to"""
259+ return IMPL.service_get_by_host_and_topic(context, host, topic)
260+
261+
262 def service_get_all(context, disabled=False):
263 """Get all services."""
264 return IMPL.service_get_all(context, disabled)
265@@ -254,6 +259,28 @@
266
267 ####################
268
269+def migration_update(context, id, values):
270+ """Update a migration instance"""
271+ return IMPL.migration_update(context, id, values)
272+
273+
274+def migration_create(context, values):
275+ """Create a migration record"""
276+ return IMPL.migration_create(context, values)
277+
278+
279+def migration_get(context, migration_id):
280+ """Finds a migration by the id"""
281+ return IMPL.migration_get(context, migration_id)
282+
283+
284+def migration_get_by_instance_and_status(context, instance_id, status):
285+ """Finds a migration by the instance id its migrating"""
286+ return IMPL.migration_get_by_instance_and_status(context, instance_id,
287+ status)
288+
289+####################
290+
291
292 def fixed_ip_associate(context, address, instance_id):
293 """Associate fixed ip to instance.
294
295=== modified file 'nova/db/sqlalchemy/api.py'
296--- nova/db/sqlalchemy/api.py 2011-03-03 00:37:02 +0000
297+++ nova/db/sqlalchemy/api.py 2011-03-07 17:23:15 +0000
298@@ -155,6 +155,17 @@
299
300
301 @require_admin_context
302+def service_get_by_host_and_topic(context, host, topic):
303+ session = get_session()
304+ return session.query(models.Service).\
305+ filter_by(deleted=False).\
306+ filter_by(disabled=False).\
307+ filter_by(host=host).\
308+ filter_by(topic=topic).\
309+ first()
310+
311+
312+@require_admin_context
313 def service_get_all_by_host(context, host):
314 session = get_session()
315 return session.query(models.Service).\
316@@ -1972,6 +1983,51 @@
317 all()
318
319
320+###################
321+
322+
323+@require_admin_context
324+def migration_create(context, values):
325+ migration = models.Migration()
326+ migration.update(values)
327+ migration.save()
328+ return migration
329+
330+
331+@require_admin_context
332+def migration_update(context, id, values):
333+ session = get_session()
334+ with session.begin():
335+ migration = migration_get(context, id, session=session)
336+ migration.update(values)
337+ migration.save(session=session)
338+ return migration
339+
340+
341+@require_admin_context
342+def migration_get(context, id, session=None):
343+ if not session:
344+ session = get_session()
345+ result = session.query(models.Migration).\
346+ filter_by(id=id).first()
347+ if not result:
348+ raise exception.NotFound(_("No migration found with id %s")
349+ % migration_id)
350+ return result
351+
352+
353+@require_admin_context
354+def migration_get_by_instance_and_status(context, instance_id, status):
355+ session = get_session()
356+ result = session.query(models.Migration).\
357+ filter_by(instance_id=instance_id).\
358+ filter_by(status=status).first()
359+ if not result:
360+ raise exception.NotFound(_("No migration found with instance id %s")
361+ % migration_id)
362+ return result
363+
364+
365 ##################
366
367
368
369=== added file 'nova/db/sqlalchemy/migrate_repo/versions/009_add_instance_migrations.py'
370--- nova/db/sqlalchemy/migrate_repo/versions/009_add_instance_migrations.py 1970-01-01 00:00:00 +0000
371+++ nova/db/sqlalchemy/migrate_repo/versions/009_add_instance_migrations.py 2011-03-07 17:23:15 +0000
372@@ -0,0 +1,61 @@
373+# vim: tabstop=4 shiftwidth=4 softtabstop=4
374+
375+# Copyright 2010 OpenStack LLC.
376+# All Rights Reserved.
377+#
378+# Licensed under the Apache License, Version 2.0 (the "License"); you may
379+# not use this file except in compliance with the License. You may obtain
380+# a copy of the License at
381+#
382+# http://www.apache.org/licenses/LICENSE-2.0
383+#
384+# Unless required by applicable law or agreed to in writing, software
385+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
386+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
387+# License for the specific language governing permissions and limitations
388+# under the License.from sqlalchemy import *
389+
390+from sqlalchemy import *
391+from migrate import *
392+
393+from nova import log as logging
394+
395+
396+meta = MetaData()
397+
398+# Just for the ForeignKey and column creation to succeed, these are not the
399+# actual definitions of instances or services.
400+instances = Table('instances', meta,
401+ Column('id', Integer(), primary_key=True, nullable=False),
402+ )
403+
404+#
405+# New Tables
406+#
407+
408+migrations = Table('migrations', meta,
409+ Column('created_at', DateTime(timezone=False)),
410+ Column('updated_at', DateTime(timezone=False)),
411+ Column('deleted_at', DateTime(timezone=False)),
412+ Column('deleted', Boolean(create_constraint=True, name=None)),
413+ Column('id', Integer(), primary_key=True, nullable=False),
414+ Column('source_compute', String(255)),
415+ Column('dest_compute', String(255)),
416+ Column('dest_host', String(255)),
417+ Column('instance_id', Integer, ForeignKey('instances.id'),
418+ nullable=True),
419+ Column('status', String(255)),
420+ )
421+
422+
423+def upgrade(migrate_engine):
424+ # Upgrade operations go here. Don't create your own engine;
425+ # bind migrate_engine to your metadata
426+ meta.bind = migrate_engine
427+ for table in (migrations, ):
428+ try:
429+ table.create()
430+ except Exception:
431+ logging.info(repr(table))
432+ logging.exception('Exception while creating table')
433+ raise
434
435=== modified file 'nova/db/sqlalchemy/migration.py'
436--- nova/db/sqlalchemy/migration.py 2011-02-17 21:39:03 +0000
437+++ nova/db/sqlalchemy/migration.py 2011-03-07 17:23:15 +0000
438@@ -60,7 +60,7 @@
439 'key_pairs', 'networks', 'projects', 'quotas',
440 'security_group_instance_association',
441 'security_group_rules', 'security_groups',
442- 'services',
443+ 'services', 'migrations',
444 'users', 'user_project_association',
445 'user_project_role_association',
446 'user_role_association',
447
448=== modified file 'nova/db/sqlalchemy/models.py'
449--- nova/db/sqlalchemy/models.py 2011-03-03 01:50:48 +0000
450+++ nova/db/sqlalchemy/models.py 2011-03-07 17:23:15 +0000
451@@ -389,6 +389,18 @@
452 public_key = Column(Text)
453
454
455+class Migration(BASE, NovaBase):
456+ """Represents a running host-to-host migration."""
457+ __tablename__ = 'migrations'
458+ id = Column(Integer, primary_key=True, nullable=False)
459+ source_compute = Column(String(255))
460+ dest_compute = Column(String(255))
461+ dest_host = Column(String(255))
462+ instance_id = Column(Integer, ForeignKey('instances.id'), nullable=True)
463+ #TODO(_cerberus_): enum
464+ status = Column(String(255))
465+
466+
467 class Network(BASE, NovaBase):
468 """Represents a network."""
469 __tablename__ = 'networks'
470@@ -598,7 +610,7 @@
471 Network, SecurityGroup, SecurityGroupIngressRule,
472 SecurityGroupInstanceAssociation, AuthToken, User,
473 Project, Certificate, ConsolePool, Console, Zone,
474- InstanceMetadata)
475+ InstanceMetadata, Migration)
476 engine = create_engine(FLAGS.sql_connection, echo=False)
477 for model in models:
478 model.metadata.create_all(engine)
479
480=== modified file 'nova/rpc.py'
481--- nova/rpc.py 2011-03-25 05:30:36 +0000
482+++ nova/rpc.py 2011-03-07 17:23:15 +0000
483@@ -123,7 +123,7 @@
484 LOG.error(_("Reconnected to queue"))
485 self.failed_connection = False
486 # NOTE(vish): This is catching all errors because we really don't
487- # exceptions to be logged 10 times a second if some
488+ # want exceptions to be logged 10 times a second if some
489 # persistent failure occurs.
490 except Exception: # pylint: disable-msg=W0703
491 if not self.failed_connection:
492
493=== added file 'nova/tests/api/openstack/common.py'
494--- nova/tests/api/openstack/common.py 1970-01-01 00:00:00 +0000
495+++ nova/tests/api/openstack/common.py 2011-03-07 17:23:15 +0000
496@@ -0,0 +1,35 @@
497+# vim: tabstop=4 shiftwidth=4 softtabstop=4
498+
499+# Copyright 2011 OpenStack LLC.
500+# All Rights Reserved.
501+#
502+# Licensed under the Apache License, Version 2.0 (the "License"); you may
503+# not use this file except in compliance with the License. You may obtain
504+# a copy of the License at
505+#
506+# http://www.apache.org/licenses/LICENSE-2.0
507+#
508+# Unless required by applicable law or agreed to in writing, software
509+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
510+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
511+# License for the specific language governing permissions and limitations
512+# under the License.
513+
514+import json
515+
516+import webob
517+
518+
519+def webob_factory(url):
520+ """Factory for removing duplicate webob code from tests"""
521+
522+ base_url = url
523+
524+ def web_request(url, method=None, body=None):
525+ req = webob.Request.blank("%s%s" % (base_url, url))
526+ if method:
527+ req.method = method
528+ if body:
529+ req.body = json.dumps(body)
530+ return req
531+ return web_request
532
533=== modified file 'nova/tests/api/openstack/test_servers.py'
534--- nova/tests/api/openstack/test_servers.py 2011-02-28 17:03:07 +0000
535+++ nova/tests/api/openstack/test_servers.py 2011-03-07 17:23:15 +0000
536@@ -1,6 +1,6 @@
537 # vim: tabstop=4 shiftwidth=4 softtabstop=4
538
539-# Copyright 2010 OpenStack LLC.
540+# Copyright 2010-2011 OpenStack LLC.
541 # All Rights Reserved.
542 #
543 # Licensed under the Apache License, Version 2.0 (the "License"); you may
544@@ -26,10 +26,12 @@
545 from nova import test
546 import nova.api.openstack
547 from nova.api.openstack import servers
548+import nova.compute.api
549 import nova.db.api
550 from nova.db.sqlalchemy.models import Instance
551 from nova.db.sqlalchemy.models import InstanceMetadata
552 import nova.rpc
553+from nova.tests.api.openstack import common
554 from nova.tests.api.openstack import fakes
555
556
557@@ -144,6 +146,8 @@
558 self.stubs.Set(nova.compute.API, "get_actions", fake_compute_api)
559 self.allow_admin = FLAGS.allow_admin_api
560
561+ self.webreq = common.webob_factory('/v1.0/servers')
562+
563 def tearDown(self):
564 self.stubs.UnsetAll()
565 FLAGS.allow_admin_api = self.allow_admin
566@@ -465,3 +469,99 @@
567 res = req.get_response(fakes.wsgi_app())
568 self.assertEqual(res.status, '202 Accepted')
569 self.assertEqual(self.server_delete_called, True)
570+
571+ def test_resize_server(self):
572+ req = self.webreq('/1/action', 'POST', dict(resize=dict(flavorId=3)))
573+
574+ self.resize_called = False
575+
576+ def resize_mock(*args):
577+ self.resize_called = True
578+
579+ self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
580+
581+ res = req.get_response(fakes.wsgi_app())
582+ self.assertEqual(res.status_int, 202)
583+ self.assertEqual(self.resize_called, True)
584+
585+ def test_resize_bad_flavor_fails(self):
586+ req = self.webreq('/1/action', 'POST', dict(resize=dict(derp=3)))
587+
588+ self.resize_called = False
589+
590+ def resize_mock(*args):
591+ self.resize_called = True
592+
593+ self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
594+
595+ res = req.get_response(fakes.wsgi_app())
596+ self.assertEqual(res.status_int, 422)
597+ self.assertEqual(self.resize_called, False)
598+
599+ def test_resize_raises_fails(self):
600+ req = self.webreq('/1/action', 'POST', dict(resize=dict(flavorId=3)))
601+
602+ def resize_mock(*args):
603+ raise Exception('hurr durr')
604+
605+ self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
606+
607+ res = req.get_response(fakes.wsgi_app())
608+ self.assertEqual(res.status_int, 400)
609+
610+ def test_confirm_resize_server(self):
611+ req = self.webreq('/1/action', 'POST', dict(confirmResize=None))
612+
613+ self.resize_called = False
614+
615+ def confirm_resize_mock(*args):
616+ self.resize_called = True
617+
618+ self.stubs.Set(nova.compute.api.API, 'confirm_resize',
619+ confirm_resize_mock)
620+
621+ res = req.get_response(fakes.wsgi_app())
622+ self.assertEqual(res.status_int, 204)
623+ self.assertEqual(self.resize_called, True)
624+
625+ def test_confirm_resize_server_fails(self):
626+ req = self.webreq('/1/action', 'POST', dict(confirmResize=None))
627+
628+ def confirm_resize_mock(*args):
629+ raise Exception('hurr durr')
630+
631+ self.stubs.Set(nova.compute.api.API, 'confirm_resize',
632+ confirm_resize_mock)
633+
634+ res = req.get_response(fakes.wsgi_app())
635+ self.assertEqual(res.status_int, 400)
636+
637+ def test_revert_resize_server(self):
638+ req = self.webreq('/1/action', 'POST', dict(revertResize=None))
639+
640+ self.resize_called = False
641+
642+ def revert_resize_mock(*args):
643+ self.resize_called = True
644+
645+ self.stubs.Set(nova.compute.api.API, 'revert_resize',
646+ revert_resize_mock)
647+
648+ res = req.get_response(fakes.wsgi_app())
649+ self.assertEqual(res.status_int, 202)
650+ self.assertEqual(self.resize_called, True)
651+
652+ def test_revert_resize_server_fails(self):
653+ req = self.webreq('/1/action', 'POST', dict(revertResize=None))
654+
655+ def revert_resize_mock(*args):
656+ raise Exception('hurr durr')
657+
658+ self.stubs.Set(nova.compute.api.API, 'revert_resize',
659+ revert_resize_mock)
660+
661+ res = req.get_response(fakes.wsgi_app())
662+ self.assertEqual(res.status_int, 400)
663+
664+if __name__ == "__main__":
665+ unittest.main()
666
667=== modified file 'nova/tests/test_compute.py'
668--- nova/tests/test_compute.py 2011-02-28 17:25:14 +0000
669+++ nova/tests/test_compute.py 2011-03-07 17:23:15 +0000
670@@ -57,7 +57,7 @@
671 self.manager.delete_project(self.project)
672 super(ComputeTestCase, self).tearDown()
673
674- def _create_instance(self):
675+ def _create_instance(self, params={}):
676 """Create a test instance"""
677 inst = {}
678 inst['image_id'] = 'ami-test'
679@@ -68,6 +68,7 @@
680 inst['instance_type'] = 'm1.tiny'
681 inst['mac_address'] = utils.generate_mac()
682 inst['ami_launch_index'] = 0
683+ inst.update(params)
684 return db.instance_create(self.context, inst)['id']
685
686 def _create_group(self):
687@@ -268,9 +269,30 @@
688
689 self.compute.terminate_instance(self.context, instance_id)
690
691+ def test_resize_instance(self):
692+ """Ensure instance can be migrated/resized"""
693+ instance_id = self._create_instance()
694+ context = self.context.elevated()
695+ self.compute.run_instance(self.context, instance_id)
696+ db.instance_update(self.context, instance_id, {'host': 'foo'})
697+ self.compute.prep_resize(context, instance_id)
698+ migration_ref = db.migration_get_by_instance_and_status(context,
699+ instance_id, 'pre-migrating')
700+ self.compute.resize_instance(context, instance_id,
701+ migration_ref['id'])
702+ self.compute.terminate_instance(context, instance_id)
703+
704 def test_get_by_flavor_id(self):
705 type = instance_types.get_by_flavor_id(1)
706 self.assertEqual(type, 'm1.tiny')
707
708+ def test_resize_same_source_fails(self):
709+ """Ensure instance fails to migrate when source and destination are
710+ the same host"""
711+ instance_id = self._create_instance()
712+ self.compute.run_instance(self.context, instance_id)
713+ self.assertRaises(exception.Error, self.compute.prep_resize,
714+ self.context, instance_id)
715+ self.compute.terminate_instance(self.context, instance_id)
716 type = instance_types.get_by_flavor_id("1")
717 self.assertEqual(type, 'm1.tiny')
718
719=== modified file 'nova/tests/test_xenapi.py'
720--- nova/tests/test_xenapi.py 2011-03-01 00:28:46 +0000
721+++ nova/tests/test_xenapi.py 2011-03-07 17:23:15 +0000
722@@ -346,6 +346,44 @@
723 super(XenAPIDiffieHellmanTestCase, self).tearDown()
724
725
726+class XenAPIMigrateInstance(test.TestCase):
727+ """
728+ Unit test for verifying migration-related actions
729+ """
730+
731+ def setUp(self):
732+ super(XenAPIMigrateInstance, self).setUp()
733+ self.stubs = stubout.StubOutForTesting()
734+ FLAGS.target_host = '127.0.0.1'
735+ FLAGS.xenapi_connection_url = 'test_url'
736+ FLAGS.xenapi_connection_password = 'test_pass'
737+ db_fakes.stub_out_db_instance_api(self.stubs)
738+ stubs.stub_out_get_target(self.stubs)
739+ xenapi_fake.reset()
740+ self.values = {'name': 1, 'id': 1,
741+ 'project_id': 'fake',
742+ 'user_id': 'fake',
743+ 'image_id': 1,
744+ 'kernel_id': 2,
745+ 'ramdisk_id': 3,
746+ 'instance_type': 'm1.large',
747+ 'mac_address': 'aa:bb:cc:dd:ee:ff',
748+ }
749+ stubs.stub_out_migration_methods(self.stubs)
750+
751+ def test_migrate_disk_and_power_off(self):
752+ instance = db.instance_create(self.values)
753+ stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
754+ conn = xenapi_conn.get_connection(False)
755+ conn.migrate_disk_and_power_off(instance, '127.0.0.1')
756+
757+ def test_attach_disk(self):
758+ instance = db.instance_create(self.values)
759+ stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
760+ conn = xenapi_conn.get_connection(False)
761+ conn.attach_disk(instance, {'base_copy': 'hurr', 'cow': 'durr'})
762+
763+
764 class XenAPIDetermineDiskImageTestCase(test.TestCase):
765 """
766 Unit tests for code that detects the ImageType
767
768=== modified file 'nova/tests/xenapi/stubs.py'
769--- nova/tests/xenapi/stubs.py 2011-03-01 16:53:19 +0000
770+++ nova/tests/xenapi/stubs.py 2011-03-07 17:23:15 +0000
771@@ -20,6 +20,7 @@
772 from nova.virt.xenapi import fake
773 from nova.virt.xenapi import volume_utils
774 from nova.virt.xenapi import vm_utils
775+from nova.virt.xenapi import vmops
776
777
778 def stubout_instance_snapshot(stubs):
779@@ -217,3 +218,44 @@
780
781 def SR_forget(self, _1, ref):
782 pass
783+
784+
785+class FakeSessionForMigrationTests(fake.SessionBase):
786+ """Stubs out a XenAPISession for Migration tests"""
787+ def __init__(self, uri):
788+ super(FakeSessionForMigrationTests, self).__init__(uri)
789+
790+
791+def stub_out_migration_methods(stubs):
792+ def fake_get_snapshot(self, instance):
793+ return 'foo', 'bar'
794+
795+ @classmethod
796+ def fake_get_vdi(cls, session, vm_ref):
797+ vdi_ref = fake.create_vdi(name_label='derp', read_only=False,
798+ sr_ref='herp', sharable=False)
799+ vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
800+ return vdi_ref, {'uuid': vdi_rec['uuid'], }
801+
802+ def fake_shutdown(self, inst, vm, method='clean'):
803+ pass
804+
805+ @classmethod
806+ def fake_sr(cls, session, *args):
807+ pass
808+
809+ @classmethod
810+ def fake_get_sr_path(cls, *args):
811+ return "fake"
812+
813+ def fake_destroy(*args, **kwargs):
814+ pass
815+
816+ stubs.Set(vmops.VMOps, '_destroy', fake_destroy)
817+ stubs.Set(vm_utils.VMHelper, 'scan_default_sr', fake_sr)
818+ stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr)
819+ stubs.Set(vmops.VMOps, '_get_snapshot', fake_get_snapshot)
820+ stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi)
821+ stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None)
822+ stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path)
823+ stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown)
824
825=== modified file 'nova/virt/fake.py'
826--- nova/virt/fake.py 2011-02-23 07:21:01 +0000
827+++ nova/virt/fake.py 2011-03-07 17:23:15 +0000
828@@ -139,6 +139,24 @@
829 """
830 pass
831
832+ def get_host_ip_addr(self):
833+ """
834+ Retrieves the IP address of the dom0
835+ """
836+ pass
837+
838+ def resize(self, instance, flavor):
839+ """
840+ Resizes/Migrates the specified instance.
841+
842+ The flavor parameter determines whether or not the instance RAM and
843+ disk space are modified, and if so, to what size.
844+
845+ The work will be done asynchronously. This function returns a task
846+ that allows the caller to detect when it is complete.
847+ """
848+ pass
849+
850 def set_admin_password(self, instance, new_pass):
851 """
852 Set the root password on the specified instance.
853@@ -179,6 +197,19 @@
854 """
855 pass
856
857+ def migrate_disk_and_power_off(self, instance, dest):
858+ """
859+ Transfers the disk of a running instance in multiple phases, turning
860+ off the instance before the end.
861+ """
862+ pass
863+
864+ def attach_disk(self, instance, disk_info):
865+ """
866+ Attaches the disk to an instance given the metadata disk_info
867+ """
868+ pass
869+
870 def pause(self, instance, callback):
871 """
872 Pause the specified instance.
873
874=== modified file 'nova/virt/xenapi/fake.py'
875--- nova/virt/xenapi/fake.py 2011-02-21 17:24:26 +0000
876+++ nova/virt/xenapi/fake.py 2011-03-07 17:23:15 +0000
877@@ -290,6 +290,9 @@
878 #Always return 12GB available
879 return 12 * 1024 * 1024 * 1024
880
881+ def host_call_plugin(*args):
882+ return 'herp'
883+
884 def xenapi_request(self, methodname, params):
885 if methodname.startswith('login'):
886 self._login(methodname, params)
887
888=== modified file 'nova/virt/xenapi/vm_utils.py'
889--- nova/virt/xenapi/vm_utils.py 2011-03-03 01:50:48 +0000
890+++ nova/virt/xenapi/vm_utils.py 2011-03-07 17:23:15 +0000
891@@ -253,16 +253,31 @@
892 return vdi_ref
893
894 @classmethod
895+ def get_vdi_for_vm_safely(cls, session, vm_ref):
896+ vdi_refs = VMHelper.lookup_vm_vdis(session, vm_ref)
897+ if vdi_refs is None:
898+ raise Exception(_("No VDIs found for VM %s") % vm_ref)
899+ else:
900+ num_vdis = len(vdi_refs)
901+ if num_vdis != 1:
902+ raise Exception(
903+ _("Unexpected number of VDIs (%(num_vdis)s) found"
904+ " for VM %(vm_ref)s") % locals())
905+
906+ vdi_ref = vdi_refs[0]
907+ vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
908+ return vdi_ref, vdi_rec
909+
910+ @classmethod
911 def create_snapshot(cls, session, instance_id, vm_ref, label):
912- """ Creates Snapshot (Template) VM, Snapshot VBD, Snapshot VDI,
913- Snapshot VHD
914- """
915+ """Creates Snapshot (Template) VM, Snapshot VBD, Snapshot VDI,
916+ Snapshot VHD"""
917 #TODO(sirp): Add quiesce and VSS locking support when Windows support
918 # is added
919 LOG.debug(_("Snapshotting VM %(vm_ref)s with label '%(label)s'...")
920 % locals())
921
922- vm_vdi_ref, vm_vdi_rec = get_vdi_for_vm_safely(session, vm_ref)
923+ vm_vdi_ref, vm_vdi_rec = cls.get_vdi_for_vm_safely(session, vm_ref)
924 vm_vdi_uuid = vm_vdi_rec["uuid"]
925 sr_ref = vm_vdi_rec["SR"]
926
927@@ -270,7 +285,8 @@
928
929 task = session.call_xenapi('Async.VM.snapshot', vm_ref, label)
930 template_vm_ref = session.wait_for_task(task, instance_id)
931- template_vdi_rec = get_vdi_for_vm_safely(session, template_vm_ref)[1]
932+ template_vdi_rec = cls.get_vdi_for_vm_safely(session,
933+ template_vm_ref)[1]
934 template_vdi_uuid = template_vdi_rec["uuid"]
935
936 LOG.debug(_('Created snapshot %(template_vm_ref)s from'
937@@ -285,6 +301,24 @@
938 return template_vm_ref, template_vdi_uuids
939
940 @classmethod
941+ def get_sr(cls, session, sr_label='slices'):
942+ """Finds the SR named by the given name label and returns
943+ the UUID"""
944+ return session.call_xenapi('SR.get_by_name_label', sr_label)[0]
945+
946+ @classmethod
947+ def get_sr_path(cls, session):
948+ """Return the path to our storage repository
949+
950+ This is used when we're dealing with VHDs directly, either by taking
951+ snapshots or by restoring an image in the DISK_VHD format.
952+ """
953+ sr_ref = safe_find_sr(session)
954+ sr_rec = session.get_xenapi().SR.get_record(sr_ref)
955+ sr_uuid = sr_rec["uuid"]
956+ return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid)
957+
958+ @classmethod
959 def upload_image(cls, session, instance_id, vdi_uuids, image_id):
960 """ Requests that the Glance plugin bundle the specified VDIs and
961 push them into Glance using the specified human-friendly name.
962@@ -298,7 +332,7 @@
963 'image_id': image_id,
964 'glance_host': FLAGS.glance_host,
965 'glance_port': FLAGS.glance_port,
966- 'sr_path': get_sr_path(session)}
967+ 'sr_path': cls.get_sr_path(session)}
968
969 kwargs = {'params': pickle.dumps(params)}
970 task = session.async_call_plugin('glance', 'upload_vhd', kwargs)
971@@ -341,13 +375,13 @@
972 'glance_host': FLAGS.glance_host,
973 'glance_port': FLAGS.glance_port,
974 'uuid_stack': uuid_stack,
975- 'sr_path': get_sr_path(session)}
976+ 'sr_path': cls.get_sr_path(session)}
977
978 kwargs = {'params': pickle.dumps(params)}
979 task = session.async_call_plugin('glance', 'download_vhd', kwargs)
980 vdi_uuid = session.wait_for_task(task, instance_id)
981
982- scan_sr(session, instance_id, sr_ref)
983+ cls.scan_sr(session, instance_id, sr_ref)
984
985 # Set the name-label to ease debugging
986 vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid)
987@@ -609,6 +643,21 @@
988 except cls.XenAPI.Failure as e:
989 return {"Unable to retrieve diagnostics": e}
990
991+ @classmethod
992+ def scan_sr(cls, session, instance_id=None, sr_ref=None):
993+ """Scans the SR specified by sr_ref"""
994+ if sr_ref:
995+ LOG.debug(_("Re-scanning SR %s"), sr_ref)
996+ task = session.call_xenapi('Async.SR.scan', sr_ref)
997+ session.wait_for_task(task, instance_id)
998+
999+ @classmethod
1000+ def scan_default_sr(cls, session):
1001+ """Looks for the system default SR and triggers a re-scan"""
1002+ #FIXME(sirp/mdietz): refactor scan_default_sr in there
1003+ sr_ref = cls.get_sr(session)
1004+ session.call_xenapi('SR.scan', sr_ref)
1005+
1006
1007 def get_rrd(host, uuid):
1008 """Return the VM RRD XML as a string"""
1009@@ -651,12 +700,6 @@
1010 return None
1011
1012
1013-def scan_sr(session, instance_id, sr_ref):
1014- LOG.debug(_("Re-scanning SR %s"), sr_ref)
1015- task = session.call_xenapi('Async.SR.scan', sr_ref)
1016- session.wait_for_task(task, instance_id)
1017-
1018-
1019 def wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref,
1020 original_parent_uuid):
1021 """ Spin until the parent VHD is coalesced into its parent VHD
1022@@ -681,7 +724,7 @@
1023 " %(max_attempts)d), giving up...") % locals())
1024 raise exception.Error(msg)
1025
1026- scan_sr(session, instance_id, sr_ref)
1027+ VMHelper.scan_sr(session, instance_id, sr_ref)
1028 parent_uuid = get_vhd_parent_uuid(session, vdi_ref)
1029 if original_parent_uuid and (parent_uuid != original_parent_uuid):
1030 LOG.debug(_("Parent %(parent_uuid)s doesn't match original parent"
1031@@ -738,18 +781,6 @@
1032 return None
1033
1034
1035-def get_sr_path(session):
1036- """Return the path to our storage repository
1037-
1038- This is used when we're dealing with VHDs directly, either by taking
1039- snapshots or by restoring an image in the DISK_VHD format.
1040- """
1041- sr_ref = safe_find_sr(session)
1042- sr_rec = session.get_xenapi().SR.get_record(sr_ref)
1043- sr_uuid = sr_rec["uuid"]
1044- return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid)
1045-
1046-
1047 def remap_vbd_dev(dev):
1048 """Return the appropriate location for a plugged-in VBD device
1049
1050
1051=== modified file 'nova/virt/xenapi/vmops.py'
1052--- nova/virt/xenapi/vmops.py 2011-03-02 23:37:31 +0000
1053+++ nova/virt/xenapi/vmops.py 2011-03-07 17:23:15 +0000
1054@@ -22,6 +22,7 @@
1055 import json
1056 import M2Crypto
1057 import os
1058+import pickle
1059 import subprocess
1060 import tempfile
1061 import uuid
1062@@ -61,7 +62,17 @@
1063 vms.append(rec["name_label"])
1064 return vms
1065
1066- def spawn(self, instance):
1067+ def _start(self, instance, vm_ref=None):
1068+ """Power on a VM instance"""
1069+ if not vm_ref:
1070+ vm_ref = VMHelper.lookup(self._session, instance.name)
1071+ if vm_ref is None:
1072+ raise exception(_('Attempted to power on non-existent instance'
1073+ ' bad instance id %s') % instance.id)
1074+ LOG.debug(_("Starting instance %s"), instance.name)
1075+ self._session.call_xenapi('VM.start', vm_ref, False, False)
1076+
1077+ def spawn(self, instance, disk):
1078 """Create VM instance"""
1079 instance_name = instance.name
1080 vm = VMHelper.lookup(self._session, instance_name)
1081@@ -81,16 +92,22 @@
1082 user = AuthManager().get_user(instance.user_id)
1083 project = AuthManager().get_project(instance.project_id)
1084
1085- disk_image_type = VMHelper.determine_disk_image_type(instance)
1086-
1087- vdi_uuid = VMHelper.fetch_image(self._session, instance.id,
1088- instance.image_id, user, project, disk_image_type)
1089-
1090- vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
1091-
1092- pv_kernel = False
1093+ vdi_ref = kernel = ramdisk = pv_kernel = None
1094+
1095+ # Are we building from a pre-existing disk?
1096+ if not disk:
1097+ #if kernel is not present we must download a raw disk
1098+
1099+ disk_image_type = VMHelper.determine_disk_image_type(instance)
1100+ vdi_uuid = VMHelper.fetch_image(self._session, instance.id,
1101+ instance.image_id, user, project, disk_image_type)
1102+ vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
1103+
1104+ else:
1105+ vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', disk)
1106+
1107 if disk_image_type == ImageType.DISK_RAW:
1108- #Have a look at the VDI and see if it has a PV kernel
1109+ # Have a look at the VDI and see if it has a PV kernel
1110 pv_kernel = VMHelper.lookup_image(self._session, instance.id,
1111 vdi_ref)
1112 elif disk_image_type == ImageType.DISK_VHD:
1113@@ -98,19 +115,18 @@
1114 # configurable as Windows will use HVM.
1115 pv_kernel = True
1116
1117- kernel = None
1118 if instance.kernel_id:
1119 kernel = VMHelper.fetch_image(self._session, instance.id,
1120 instance.kernel_id, user, project, ImageType.KERNEL_RAMDISK)
1121
1122- ramdisk = None
1123 if instance.ramdisk_id:
1124 ramdisk = VMHelper.fetch_image(self._session, instance.id,
1125 instance.ramdisk_id, user, project, ImageType.KERNEL_RAMDISK)
1126
1127 vm_ref = VMHelper.create_vm(self._session,
1128 instance, kernel, ramdisk, pv_kernel)
1129- VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
1130+ VMHelper.create_vbd(session=self._session, vm_ref=vm_ref,
1131+ vdi_ref=vdi_ref, userdevice=0, bootable=True)
1132
1133 # inject_network_info and create vifs
1134 networks = self.inject_network_info(instance)
1135@@ -217,7 +233,7 @@
1136 "start")
1137
1138 def snapshot(self, instance, image_id):
1139- """ Create snapshot from a running VM instance
1140+ """Create snapshot from a running VM instance
1141
1142 :param instance: instance to be snapshotted
1143 :param image_id: id of image to upload to
1144@@ -238,7 +254,20 @@
1145 that will bundle the VHDs together and then push the bundle into
1146 Glance.
1147 """
1148-
1149+ template_vm_ref = None
1150+ try:
1151+ template_vm_ref, template_vdi_uuids = self._get_snapshot(instance)
1152+ # call plugin to ship snapshot off to glance
1153+ VMHelper.upload_image(
1154+ self._session, instance.id, template_vdi_uuids, image_id)
1155+ finally:
1156+ if template_vm_ref:
1157+ self._destroy(instance, template_vm_ref,
1158+ shutdown=False, destroy_kernel_ramdisk=False)
1159+
1160+ logging.debug(_("Finished snapshot and upload for VM %s"), instance)
1161+
1162+ def _get_snapshot(self, instance):
1163 #TODO(sirp): Add quiesce and VSS locking support when Windows support
1164 # is added
1165
1166@@ -249,20 +278,89 @@
1167 try:
1168 template_vm_ref, template_vdi_uuids = VMHelper.create_snapshot(
1169 self._session, instance.id, vm_ref, label)
1170+ return template_vm_ref, template_vdi_uuids
1171 except self.XenAPI.Failure, exc:
1172 logging.error(_("Unable to Snapshot %(vm_ref)s: %(exc)s")
1173 % locals())
1174 return
1175
1176+ def migrate_disk_and_power_off(self, instance, dest):
1177+ """Copies a VHD from one host machine to another
1178+
1179+ :param instance: the instance that owns the VHD in question
1180+ :param dest: the destination host machine
1181+ :param disk_type: values are 'primary' or 'cow'
1182+ """
1183+ vm_ref = VMHelper.lookup(self._session, instance.name)
1184+
1185+ # The primary VDI becomes the COW after the snapshot, and we can
1186+ # identify it via the VBD. The base copy is the parent_uuid returned
1187+ # from the snapshot creation
1188+
1189+ base_copy_uuid = cow_uuid = None
1190+ template_vdi_uuids = template_vm_ref = None
1191 try:
1192- # call plugin to ship snapshot off to glance
1193- VMHelper.upload_image(
1194- self._session, instance.id, template_vdi_uuids, image_id)
1195+ # transfer the base copy
1196+ template_vm_ref, template_vdi_uuids = self._get_snapshot(instance)
1197+ base_copy_uuid = template_vdi_uuids[1]
1198+ vdi_ref, vm_vdi_rec = \
1199+ VMHelper.get_vdi_for_vm_safely(self._session, vm_ref)
1200+ cow_uuid = vm_vdi_rec['uuid']
1201+
1202+ params = {'host': dest,
1203+ 'vdi_uuid': base_copy_uuid,
1204+ 'instance_id': instance.id,
1205+ 'sr_path': VMHelper.get_sr_path(self._session)}
1206+
1207+ task = self._session.async_call_plugin('migration', 'transfer_vhd',
1208+ {'params': pickle.dumps(params)})
1209+ self._session.wait_for_task(task, instance.id)
1210+
1211+ # Now power down the instance and transfer the COW VHD
1212+ self._shutdown(instance, vm_ref, method='clean')
1213+
1214+ params = {'host': dest,
1215+ 'vdi_uuid': cow_uuid,
1216+ 'instance_id': instance.id,
1217+ 'sr_path': VMHelper.get_sr_path(self._session), }
1218+
1219+ task = self._session.async_call_plugin('migration', 'transfer_vhd',
1220+ {'params': pickle.dumps(params)})
1221+ self._session.wait_for_task(task, instance.id)
1222+
1223 finally:
1224- self._destroy(instance, template_vm_ref, shutdown=False,
1225- destroy_kernel_ramdisk=False)
1226-
1227- logging.debug(_("Finished snapshot and upload for VM %s"), instance)
1228+ if template_vm_ref:
1229+ self._destroy(instance, template_vm_ref,
1230+ shutdown=False, destroy_kernel_ramdisk=False)
1231+
1232+ # TODO(mdietz): we could also consider renaming these to something
1233+ # sensible so we don't need to blindly pass around dictionaries
1234+ return {'base_copy': base_copy_uuid, 'cow': cow_uuid}
1235+
1236+ def attach_disk(self, instance, disk_info):
1237+ """Links the base copy VHD to the COW via the XAPI plugin"""
1238+ vm_ref = VMHelper.lookup(self._session, instance.name)
1239+ new_base_copy_uuid = str(uuid.uuid4())
1240+ new_cow_uuid = str(uuid.uuid4())
1241+ params = {'instance_id': instance.id,
1242+ 'old_base_copy_uuid': disk_info['base_copy'],
1243+ 'old_cow_uuid': disk_info['cow'],
1244+ 'new_base_copy_uuid': new_base_copy_uuid,
1245+ 'new_cow_uuid': new_cow_uuid,
1246+ 'sr_path': VMHelper.get_sr_path(self._session), }
1247+
1248+ task = self._session.async_call_plugin('migration',
1249+ 'move_vhds_into_sr', {'params': pickle.dumps(params)})
1250+ self._session.wait_for_task(task, instance.id)
1251+
1252+ # Now we rescan the SR so we find the VHDs
1253+ VMHelper.scan_default_sr(self._session)
1254+
1255+ return new_cow_uuid
1256+
1257+ def resize(self, instance, flavor):
1258+ """Resize a running instance by changing it's RAM and disk size """
1259+ raise NotImplementedError()
1260
1261 def reboot(self, instance):
1262 """Reboot VM instance"""
1263@@ -308,11 +406,6 @@
1264 raise RuntimeError(resp_dict['message'])
1265 return resp_dict['message']
1266
1267- def _start(self, instance, vm):
1268- """Start an instance"""
1269- task = self._session.call_xenapi("Async.VM.start", vm, False, False)
1270- self._session.wait_for_task(task, instance.id)
1271-
1272 def inject_file(self, instance, b64_path, b64_contents):
1273 """Write a file to the VM instance. The path to which it is to be
1274 written and the contents of the file need to be supplied; both should
1275@@ -355,8 +448,7 @@
1276 if hard:
1277 task = self._session.call_xenapi("Async.VM.hard_shutdown", vm)
1278 else:
1279- task = self._session.call_xenapi("Async.VM.clean_shutdown", vm)
1280-
1281+ task = self._session.call_xenapi('Async.VM.clean_shutdown', vm)
1282 self._session.wait_for_task(task, instance.id)
1283 except self.XenAPI.Failure, exc:
1284 LOG.exception(exc)
1285
1286=== modified file 'nova/virt/xenapi_conn.py'
1287--- nova/virt/xenapi_conn.py 2011-03-01 16:53:19 +0000
1288+++ nova/virt/xenapi_conn.py 2011-03-07 17:23:15 +0000
1289@@ -154,14 +154,18 @@
1290 """List VM instances"""
1291 return self._vmops.list_instances()
1292
1293- def spawn(self, instance):
1294+ def spawn(self, instance, disk=None):
1295 """Create VM instance"""
1296- self._vmops.spawn(instance)
1297+ self._vmops.spawn(instance, disk)
1298
1299 def snapshot(self, instance, image_id):
1300 """ Create snapshot from a running VM instance """
1301 self._vmops.snapshot(instance, image_id)
1302
1303+ def resize(self, instance, flavor):
1304+ """Resize a VM instance"""
1305+ raise NotImplementedError()
1306+
1307 def reboot(self, instance):
1308 """Reboot VM instance"""
1309 self._vmops.reboot(instance)
1310@@ -188,6 +192,15 @@
1311 """Unpause paused VM instance"""
1312 self._vmops.unpause(instance, callback)
1313
1314+ def migrate_disk_and_power_off(self, instance, dest):
1315+ """Transfers the VHD of a running instance to another host, then shuts
1316+ off the instance copies over the COW disk"""
1317+ return self._vmops.migrate_disk_and_power_off(instance, dest)
1318+
1319+ def attach_disk(self, instance, disk_info):
1320+ """Moves the copied VDIs into the SR"""
1321+ return self._vmops.attach_disk(instance, disk_info)
1322+
1323 def suspend(self, instance, callback):
1324 """suspend the specified instance"""
1325 self._vmops.suspend(instance, callback)
1326@@ -228,6 +241,10 @@
1327 """Return link to instance's ajax console"""
1328 return self._vmops.get_ajax_console(instance)
1329
1330+ def get_host_ip_addr(self):
1331+ xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url)
1332+ return xs_url.netloc
1333+
1334 def attach_volume(self, instance_name, device_path, mountpoint):
1335 """Attach volume storage to VM instance"""
1336 return self._volumeops.attach_volume(instance_name,
1337
1338=== added file 'plugins/xenserver/xenapi/etc/xapi.d/plugins/migration'
1339--- plugins/xenserver/xenapi/etc/xapi.d/plugins/migration 1970-01-01 00:00:00 +0000
1340+++ plugins/xenserver/xenapi/etc/xapi.d/plugins/migration 2011-03-07 17:23:15 +0000
1341@@ -0,0 +1,117 @@
1342+#!/usr/bin/env python
1343+
1344+# Copyright 2010 OpenStack LLC.
1345+# All Rights Reserved.
1346+#
1347+# Licensed under the Apache License, Version 2.0 (the "License"); you may
1348+# not use this file except in compliance with the License. You may obtain
1349+# a copy of the License at
1350+#
1351+# http://www.apache.org/licenses/LICENSE-2.0
1352+#
1353+# Unless required by applicable law or agreed to in writing, software
1354+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
1355+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1356+# License for the specific language governing permissions and limitations
1357+# under the License.
1358+
1359+"""
1360+XenAPI Plugin for transfering data between host nodes
1361+"""
1362+
1363+import os
1364+import os.path
1365+import pickle
1366+import shutil
1367+import subprocess
1368+
1369+import XenAPIPlugin
1370+
1371+from pluginlib_nova import *
1372+configure_logging('migration')
1373+
1374+
1375+def move_vhds_into_sr(session, args):
1376+ """Moves the VHDs from their copied location to the SR"""
1377+ params = pickle.loads(exists(args, 'params'))
1378+ instance_id = params['instance_id']
1379+
1380+ old_base_copy_uuid = params['old_base_copy_uuid']
1381+ old_cow_uuid = params['old_cow_uuid']
1382+
1383+ new_base_copy_uuid = params['new_base_copy_uuid']
1384+ new_cow_uuid = params['new_cow_uuid']
1385+
1386+ sr_path = params['sr_path']
1387+ sr_temp_path = "%s/images/" % sr_path
1388+
1389+ # Discover the copied VHDs locally, and then set up paths to copy
1390+ # them to under the SR
1391+ source_image_path = "%s/instance%d" % ('/images/', instance_id)
1392+ source_base_copy_path = "%s/%s.vhd" % (source_image_path,
1393+ old_base_copy_uuid)
1394+ source_cow_path = "%s/%s.vhd" % (source_image_path, old_cow_uuid)
1395+
1396+ temp_vhd_path = "%s/instance%d/" % (sr_temp_path, instance_id)
1397+ new_base_copy_path = "%s/%s.vhd" % (temp_vhd_path, new_base_copy_uuid)
1398+ new_cow_path = "%s/%s.vhd" % (temp_vhd_path, new_cow_uuid)
1399+
1400+ logging.debug('Creating temporary SR path %s' % temp_vhd_path)
1401+ os.makedirs(temp_vhd_path)
1402+
1403+ logging.debug('Moving %s into %s' % (source_base_copy_path, temp_vhd_path))
1404+ shutil.move(source_base_copy_path, new_base_copy_path)
1405+
1406+ logging.debug('Moving %s into %s' % (source_cow_path, temp_vhd_path))
1407+ shutil.move(source_cow_path, new_cow_path)
1408+
1409+ logging.debug('Cleaning up %s' % source_image_path)
1410+ os.rmdir(source_image_path)
1411+
1412+ # Link the COW to the base copy
1413+ logging.debug('Attaching COW to the base copy %s -> %s' %
1414+ (new_cow_path, new_base_copy_path))
1415+ subprocess.call(shlex.split('/usr/sbin/vhd-util modify -n %s -p %s' %
1416+ (new_cow_path, new_base_copy_path)))
1417+ logging.debug('Moving VHDs into SR %s' % sr_path)
1418+ shutil.move("%s/%s.vhd" % (temp_vhd_path, new_base_copy_uuid), sr_path)
1419+ shutil.move("%s/%s.vhd" % (temp_vhd_path, new_cow_uuid), sr_path)
1420+
1421+ logging.debug('Cleaning up temporary SR path %s' % temp_vhd_path)
1422+ os.rmdir(temp_vhd_path)
1423+ return ""
1424+
1425+
1426+def transfer_vhd(session, args):
1427+ """Rsyncs a VHD to an adjacent host"""
1428+ params = pickle.loads(exists(args, 'params'))
1429+ instance_id = params['instance_id']
1430+ host = params['host']
1431+ vdi_uuid = params['vdi_uuid']
1432+ sr_path = params['sr_path']
1433+ vhd_path = "%s.vhd" % vdi_uuid
1434+
1435+ source_path = "%s/%s" % (sr_path, vhd_path)
1436+ dest_path = '%s:%sinstance%d/' % (host, '/images/', instance_id)
1437+
1438+ logging.debug("Preparing to transmit %s to %s" % (source_path,
1439+ dest_path))
1440+
1441+ ssh_cmd = 'ssh -o StrictHostKeyChecking=no'
1442+
1443+ rsync_args = shlex.split('nohup /usr/bin/rsync -av --progress -e %s %s %s'
1444+ % (ssh_cmd, source_path, dest_path))
1445+
1446+ logging.debug('rsync %s' % (' '.join(rsync_args, )))
1447+
1448+ rsync_proc = subprocess.Popen(rsync_args, stdout=subprocess.PIPE)
1449+ logging.debug('Rsync output: \n %s' % rsync_proc.communicate()[0])
1450+ logging.debug('Rsync return: %d' % rsync_proc.returncode)
1451+ if rsync_proc.returncode != 0:
1452+ raise Exception("Unexpected VHD transfer failure")
1453+ return ""
1454+
1455+
1456+if __name__ == '__main__':
1457+ XenAPIPlugin.dispatch({'transfer_vhd': transfer_vhd,
1458+ 'move_vhds_into_sr': move_vhds_into_sr, })