Merge lp:~soren/nova/consolidate-locking into lp:~hudson-openstack/nova/trunk

Proposed by Soren Hansen
Status: Merged
Approved by: Soren Hansen
Approved revision: 856
Merged at revision: 856
Proposed branch: lp:~soren/nova/consolidate-locking
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 305 lines (+134/-51)
5 files modified
nova/network/linux_net.py (+16/-25)
nova/tests/test_misc.py (+39/-4)
nova/tests/test_virt.py (+0/-2)
nova/utils.py (+67/-7)
nova/virt/libvirt_conn.py (+12/-13)
To merge this branch: bzr merge lp:~soren/nova/consolidate-locking
Reviewer Review Type Date Requested Status
Rick Harris (community) Approve
Vish Ishaya (community) Approve
Review via email: mp+54364@code.launchpad.net

Description of the change

Move all types of locking into utils.synchronize decorator.

Convert all uses of semaphores to use this decorator.

To post a comment you must log in.
Revision history for this message
Vish Ishaya (vishvananda) wrote :

Tests run just fine and seem complete to me. Nice to have a clean decorator implementation for locking. Nice job! lgtm

review: Approve
Revision history for this message
Rick Harris (rconradharris) wrote :

This looks really good-- great work Soren.

Brownie points for:

* Useful inline comments
* Added test
* YAGNI on the added complexity of a race-free sync (think it was probably wise to punt on that for now)

review: Approve
Revision history for this message
Soren Hansen (soren) wrote :

2011/3/23 Rick Harris <email address hidden>
> * YAGNI on the added complexity of a race-free sync (think it was probably wise to punt on that for now)

I actually started working on it, but when I realised the hilarity of
implementing an rwlock that worked with Eventlet in an effort to solve
a problem that specifically does not exist under Eventlet, I stopped
:)

Revision history for this message
OpenStack Infra (hudson-openstack) wrote :
Download full text (37.1 KiB)

The attempt to merge lp:~soren/nova/consolidate-locking into lp:nova failed. Below is the output from the failed tests.

AccountsTest
    test_account_create OK
    test_account_delete OK
    test_account_update OK
    test_get_account OK
AdminAPITest
    test_admin_disabled OK
    test_admin_enabled OK
APITest
    test_exceptions_are_converted_to_faults OK
Test
    test_authorize_token OK
    test_authorize_user OK
    test_bad_token OK
    test_bad_user_bad_key OK
    test_bad_user_good_key OK
    test_no_user OK
    test_token_expiry OK
TestFunctional
    test_token_doesnotexist OK
    test_token_expiry OK
TestLimiter
    test_authorize_token OK
LimiterTest
    test_limiter_custom_max_limit OK
    test_limiter_limit_and_offset OK
    test_limiter_limit_medium OK
    test_limiter_limit_over_max OK
    test_limiter_limit_zero OK
    test_limiter_negative_limit OK
    test_limiter_negative_offset OK
    test_limiter_nothing OK
    test_limiter_offset_bad OK
    test_limiter_offset_blank OK
    test_limiter_offset_medium OK
    test_limiter_offset_over_max OK
    test_limiter_offset_zero OK
TestFaults
    test_fault_parts OK
    test_raise OK
    test_retry_header OK
FlavorsTest
    test_get_flavor_by_id OK
    test_get_flavor_list OK
    test_get_flavor_list_detail OK
GlanceImageServiceTest
    test_create OK
    test_create_and_show_non_existing_image OK
    test_delete OK
    test_update OK
ImageControllerWithGlanceServiceTest
    test_get_image_details OK
    test_get_image_index OK
    test_show_image OK
LocalImageServiceTest
    test_create...

Revision history for this message
Soren Hansen (soren) wrote :

/me retries

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'nova/network/linux_net.py'
--- nova/network/linux_net.py 2011-03-23 11:21:20 +0000
+++ nova/network/linux_net.py 2011-03-23 23:52:30 +0000
@@ -21,8 +21,6 @@
21import os21import os
22import calendar22import calendar
2323
24from eventlet import semaphore
25
26from nova import db24from nova import db
27from nova import exception25from nova import exception
28from nova import flags26from nova import flags
@@ -272,37 +270,30 @@
272 self.ipv4['nat'].add_chain('floating-snat')270 self.ipv4['nat'].add_chain('floating-snat')
273 self.ipv4['nat'].add_rule('snat', '-j $floating-snat')271 self.ipv4['nat'].add_rule('snat', '-j $floating-snat')
274272
275 self.semaphore = semaphore.Semaphore()273 @utils.synchronized('iptables', external=True)
276
277 @utils.synchronized('iptables')
278 def apply(self):274 def apply(self):
279 """Apply the current in-memory set of iptables rules275 """Apply the current in-memory set of iptables rules
280276
281 This will blow away any rules left over from previous runs of the277 This will blow away any rules left over from previous runs of the
282 same component of Nova, and replace them with our current set of278 same component of Nova, and replace them with our current set of
283 rules. This happens atomically, thanks to iptables-restore.279 rules. This happens atomically, thanks to iptables-restore.
284
285 We wrap the call in a semaphore lock, so that we don't race with
286 ourselves. In the event of a race with another component running
287 an iptables-* command at the same time, we retry up to 5 times.
288 """280 """
289 with self.semaphore:281 s = [('iptables', self.ipv4)]
290 s = [('iptables', self.ipv4)]282 if FLAGS.use_ipv6:
291 if FLAGS.use_ipv6:283 s += [('ip6tables', self.ipv6)]
292 s += [('ip6tables', self.ipv6)]
293284
294 for cmd, tables in s:285 for cmd, tables in s:
295 for table in tables:286 for table in tables:
296 current_table, _ = self.execute('sudo',287 current_table, _ = self.execute('sudo',
297 '%s-save' % (cmd,),288 '%s-save' % (cmd,),
298 '-t', '%s' % (table,),289 '-t', '%s' % (table,),
299 attempts=5)290 attempts=5)
300 current_lines = current_table.split('\n')291 current_lines = current_table.split('\n')
301 new_filter = self._modify_rules(current_lines,292 new_filter = self._modify_rules(current_lines,
302 tables[table])293 tables[table])
303 self.execute('sudo', '%s-restore' % (cmd,),294 self.execute('sudo', '%s-restore' % (cmd,),
304 process_input='\n'.join(new_filter),295 process_input='\n'.join(new_filter),
305 attempts=5)296 attempts=5)
306297
307 def _modify_rules(self, current_lines, table, binary=None):298 def _modify_rules(self, current_lines, table, binary=None):
308 unwrapped_chains = table.unwrapped_chains299 unwrapped_chains = table.unwrapped_chains
309300
=== modified file 'nova/tests/test_misc.py'
--- nova/tests/test_misc.py 2011-03-11 08:54:08 +0000
+++ nova/tests/test_misc.py 2011-03-23 23:52:30 +0000
@@ -18,8 +18,12 @@
18import os18import os
19import select19import select
2020
21from eventlet import greenpool
22from eventlet import greenthread
23
21from nova import test24from nova import test
22from nova.utils import parse_mailmap, str_dict_replace, synchronized25from nova import utils
26from nova.utils import parse_mailmap, str_dict_replace
2327
2428
25class ProjectTestCase(test.TestCase):29class ProjectTestCase(test.TestCase):
@@ -63,7 +67,7 @@
6367
64class LockTestCase(test.TestCase):68class LockTestCase(test.TestCase):
65 def test_synchronized_wrapped_function_metadata(self):69 def test_synchronized_wrapped_function_metadata(self):
66 @synchronized('whatever')70 @utils.synchronized('whatever')
67 def foo():71 def foo():
68 """Bar"""72 """Bar"""
69 pass73 pass
@@ -72,11 +76,42 @@
72 self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "76 self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
73 "got mangled")77 "got mangled")
7478
75 def test_synchronized(self):79 def test_synchronized_internally(self):
80 """We can lock across multiple green threads"""
81 saved_sem_num = len(utils._semaphores)
82 seen_threads = list()
83
84 @utils.synchronized('testlock2', external=False)
85 def f(id):
86 for x in range(10):
87 seen_threads.append(id)
88 greenthread.sleep(0)
89
90 threads = []
91 pool = greenpool.GreenPool(10)
92 for i in range(10):
93 threads.append(pool.spawn(f, i))
94
95 for thread in threads:
96 thread.wait()
97
98 self.assertEquals(len(seen_threads), 100)
99 # Looking at the seen threads, split it into chunks of 10, and verify
100 # that the last 9 match the first in each chunk.
101 for i in range(10):
102 for j in range(9):
103 self.assertEquals(seen_threads[i * 10],
104 seen_threads[i * 10 + 1 + j])
105
106 self.assertEqual(saved_sem_num, len(utils._semaphores),
107 "Semaphore leak detected")
108
109 def test_synchronized_externally(self):
110 """We can lock across multiple processes"""
76 rpipe1, wpipe1 = os.pipe()111 rpipe1, wpipe1 = os.pipe()
77 rpipe2, wpipe2 = os.pipe()112 rpipe2, wpipe2 = os.pipe()
78113
79 @synchronized('testlock')114 @utils.synchronized('testlock1', external=True)
80 def f(rpipe, wpipe):115 def f(rpipe, wpipe):
81 try:116 try:
82 os.write(wpipe, "foo")117 os.write(wpipe, "foo")
83118
=== modified file 'nova/tests/test_virt.py'
--- nova/tests/test_virt.py 2011-03-14 17:59:41 +0000
+++ nova/tests/test_virt.py 2011-03-23 23:52:30 +0000
@@ -77,13 +77,11 @@
77 eventlet.sleep(0)77 eventlet.sleep(0)
78 try:78 try:
79 self.assertFalse(done2.ready())79 self.assertFalse(done2.ready())
80 self.assertTrue('fname' in conn._image_sems)
81 finally:80 finally:
82 wait1.send()81 wait1.send()
83 done1.wait()82 done1.wait()
84 eventlet.sleep(0)83 eventlet.sleep(0)
85 self.assertTrue(done2.ready())84 self.assertTrue(done2.ready())
86 self.assertFalse('fname' in conn._image_sems)
8785
88 def test_different_fname_concurrency(self):86 def test_different_fname_concurrency(self):
89 """Ensures that two different fname caches are concurrent"""87 """Ensures that two different fname caches are concurrent"""
9088
=== modified file 'nova/utils.py'
--- nova/utils.py 2011-03-17 21:11:58 +0000
+++ nova/utils.py 2011-03-23 23:52:30 +0000
@@ -41,6 +41,7 @@
4141
42from eventlet import event42from eventlet import event
43from eventlet import greenthread43from eventlet import greenthread
44from eventlet import semaphore
44from eventlet.green import subprocess45from eventlet.green import subprocess
45None46None
46from nova import exception47from nova import exception
@@ -531,17 +532,76 @@
531 return json.loads(s)532 return json.loads(s)
532533
533534
534def synchronized(name):535_semaphores = {}
536
537
538class _NoopContextManager(object):
539 def __enter__(self):
540 pass
541
542 def __exit__(self, exc_type, exc_val, exc_tb):
543 pass
544
545
546def synchronized(name, external=False):
547 """Synchronization decorator
548
549 Decorating a method like so:
550 @synchronized('mylock')
551 def foo(self, *args):
552 ...
553
554 ensures that only one thread will execute the bar method at a time.
555
556 Different methods can share the same lock:
557 @synchronized('mylock')
558 def foo(self, *args):
559 ...
560
561 @synchronized('mylock')
562 def bar(self, *args):
563 ...
564
565 This way only one of either foo or bar can be executing at a time.
566
567 The external keyword argument denotes whether this lock should work across
568 multiple processes. This means that if two different workers both run a
569 a method decorated with @synchronized('mylock', external=True), only one
570 of them will execute at a time.
571 """
572
535 def wrap(f):573 def wrap(f):
536 @functools.wraps(f)574 @functools.wraps(f)
537 def inner(*args, **kwargs):575 def inner(*args, **kwargs):
538 LOG.debug(_("Attempting to grab %(lock)s for method "576 # NOTE(soren): If we ever go natively threaded, this will be racy.
539 "%(method)s..." % {"lock": name,577 # See http://stackoverflow.com/questions/5390569/dyn\
578 # amically-allocating-and-destroying-mutexes
579 if name not in _semaphores:
580 _semaphores[name] = semaphore.Semaphore()
581 sem = _semaphores[name]
582 LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method '
583 '"%(method)s"...' % {"lock": name,
540 "method": f.__name__}))584 "method": f.__name__}))
541 lock = lockfile.FileLock(os.path.join(FLAGS.lock_path,585 with sem:
542 'nova-%s.lock' % name))586 if external:
543 with lock:587 LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
544 return f(*args, **kwargs)588 'method "%(method)s"...' %
589 {"lock": name, "method": f.__name__}))
590 lock_file_path = os.path.join(FLAGS.lock_path,
591 'nova-%s.lock' % name)
592 lock = lockfile.FileLock(lock_file_path)
593 else:
594 lock = _NoopContextManager()
595
596 with lock:
597 retval = f(*args, **kwargs)
598
599 # If no-one else is waiting for it, delete it.
600 # See note about possible raciness above.
601 if not sem.balance < 1:
602 del _semaphores[name]
603
604 return retval
545 return inner605 return inner
546 return wrap606 return wrap
547607
548608
=== modified file 'nova/virt/libvirt_conn.py'
--- nova/virt/libvirt_conn.py 2011-03-23 05:29:32 +0000
+++ nova/virt/libvirt_conn.py 2011-03-23 23:52:30 +0000
@@ -48,7 +48,7 @@
4848
49from eventlet import greenthread49from eventlet import greenthread
50from eventlet import tpool50from eventlet import tpool
51from eventlet import semaphore51
52import IPy52import IPy
5353
54from nova import context54from nova import context
@@ -556,13 +556,12 @@
556 os.mkdir(base_dir)556 os.mkdir(base_dir)
557 base = os.path.join(base_dir, fname)557 base = os.path.join(base_dir, fname)
558558
559 if fname not in LibvirtConnection._image_sems:559 @utils.synchronized(fname)
560 LibvirtConnection._image_sems[fname] = semaphore.Semaphore()560 def call_if_not_exists(base, fn, *args, **kwargs):
561 with LibvirtConnection._image_sems[fname]:
562 if not os.path.exists(base):561 if not os.path.exists(base):
563 fn(target=base, *args, **kwargs)562 fn(target=base, *args, **kwargs)
564 if not LibvirtConnection._image_sems[fname].locked():563
565 del LibvirtConnection._image_sems[fname]564 call_if_not_exists(base, fn, *args, **kwargs)
566565
567 if cow:566 if cow:
568 utils.execute('qemu-img', 'create', '-f', 'qcow2', '-o',567 utils.execute('qemu-img', 'create', '-f', 'qcow2', '-o',
@@ -1780,15 +1779,15 @@
1780 pass1779 pass
17811780
1782 def refresh_security_group_rules(self, security_group):1781 def refresh_security_group_rules(self, security_group):
1783 # We use the semaphore to make sure noone applies the rule set1782 self.do_refresh_security_group_rules(security_group)
1784 # after we've yanked the existing rules but before we've put in
1785 # the new ones.
1786 with self.iptables.semaphore:
1787 for instance in self.instances.values():
1788 self.remove_filters_for_instance(instance)
1789 self.add_filters_for_instance(instance)
1790 self.iptables.apply()1783 self.iptables.apply()
17911784
1785 @utils.synchronized('iptables', external=True)
1786 def do_refresh_security_group_rules(self, security_group):
1787 for instance in self.instances.values():
1788 self.remove_filters_for_instance(instance)
1789 self.add_filters_for_instance(instance)
1790
1792 def _security_group_chain_name(self, security_group_id):1791 def _security_group_chain_name(self, security_group_id):
1793 return 'nova-sg-%s' % (security_group_id,)1792 return 'nova-sg-%s' % (security_group_id,)
17941793