Merge lp:~soren/nova/consolidate-locking into lp:~hudson-openstack/nova/trunk
- consolidate-locking
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Soren Hansen | ||||
Approved revision: | 856 | ||||
Merged at revision: | 856 | ||||
Proposed branch: | lp:~soren/nova/consolidate-locking | ||||
Merge into: | lp:~hudson-openstack/nova/trunk | ||||
Diff against target: |
305 lines (+134/-51) 5 files modified
nova/network/linux_net.py (+16/-25) nova/tests/test_misc.py (+39/-4) nova/tests/test_virt.py (+0/-2) nova/utils.py (+67/-7) nova/virt/libvirt_conn.py (+12/-13) |
||||
To merge this branch: | bzr merge lp:~soren/nova/consolidate-locking | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Rick Harris (community) | Approve | ||
Vish Ishaya (community) | Approve | ||
Review via email: mp+54364@code.launchpad.net |
Commit message
Description of the change
Move all types of locking into utils.synchronize decorator.
Convert all uses of semaphores to use this decorator.
Rick Harris (rconradharris) wrote : | # |
This looks really good-- great work Soren.
Brownie points for:
* Useful inline comments
* Added test
* YAGNI on the added complexity of a race-free sync (think it was probably wise to punt on that for now)
Soren Hansen (soren) wrote : | # |
2011/3/23 Rick Harris <email address hidden>
> * YAGNI on the added complexity of a race-free sync (think it was probably wise to punt on that for now)
I actually started working on it, but when I realised the hilarity of
implementing an rwlock that worked with Eventlet in an effort to solve
a problem that specifically does not exist under Eventlet, I stopped
:)
OpenStack Infra (hudson-openstack) wrote : | # |
The attempt to merge lp:~soren/nova/consolidate-locking into lp:nova failed. Below is the output from the failed tests.
AccountsTest
test_
test_
test_
test_
AdminAPITest
test_
test_
APITest
test_
Test
test_
test_
test_bad_token OK
test_
test_
test_no_user OK
test_
TestFunctional
test_
test_
TestLimiter
test_
LimiterTest
test_
test_
test_
test_
test_
test_
test_
test_
test_
test_
test_
test_
test_
TestFaults
test_
test_raise OK
test_
FlavorsTest
test_
test_
test_
GlanceImageServ
test_create OK
test_
test_delete OK
test_update OK
ImageController
test_
test_
test_show_image OK
LocalImageServi
test_create...
Soren Hansen (soren) wrote : | # |
/me retries
Preview Diff
1 | === modified file 'nova/network/linux_net.py' | |||
2 | --- nova/network/linux_net.py 2011-03-23 11:21:20 +0000 | |||
3 | +++ nova/network/linux_net.py 2011-03-23 23:52:30 +0000 | |||
4 | @@ -21,8 +21,6 @@ | |||
5 | 21 | import os | 21 | import os |
6 | 22 | import calendar | 22 | import calendar |
7 | 23 | 23 | ||
8 | 24 | from eventlet import semaphore | ||
9 | 25 | |||
10 | 26 | from nova import db | 24 | from nova import db |
11 | 27 | from nova import exception | 25 | from nova import exception |
12 | 28 | from nova import flags | 26 | from nova import flags |
13 | @@ -272,37 +270,30 @@ | |||
14 | 272 | self.ipv4['nat'].add_chain('floating-snat') | 270 | self.ipv4['nat'].add_chain('floating-snat') |
15 | 273 | self.ipv4['nat'].add_rule('snat', '-j $floating-snat') | 271 | self.ipv4['nat'].add_rule('snat', '-j $floating-snat') |
16 | 274 | 272 | ||
20 | 275 | self.semaphore = semaphore.Semaphore() | 273 | @utils.synchronized('iptables', external=True) |
18 | 276 | |||
19 | 277 | @utils.synchronized('iptables') | ||
21 | 278 | def apply(self): | 274 | def apply(self): |
22 | 279 | """Apply the current in-memory set of iptables rules | 275 | """Apply the current in-memory set of iptables rules |
23 | 280 | 276 | ||
24 | 281 | This will blow away any rules left over from previous runs of the | 277 | This will blow away any rules left over from previous runs of the |
25 | 282 | same component of Nova, and replace them with our current set of | 278 | same component of Nova, and replace them with our current set of |
26 | 283 | rules. This happens atomically, thanks to iptables-restore. | 279 | rules. This happens atomically, thanks to iptables-restore. |
27 | 284 | |||
28 | 285 | We wrap the call in a semaphore lock, so that we don't race with | ||
29 | 286 | ourselves. In the event of a race with another component running | ||
30 | 287 | an iptables-* command at the same time, we retry up to 5 times. | ||
31 | 288 | """ | 280 | """ |
36 | 289 | with self.semaphore: | 281 | s = [('iptables', self.ipv4)] |
37 | 290 | s = [('iptables', self.ipv4)] | 282 | if FLAGS.use_ipv6: |
38 | 291 | if FLAGS.use_ipv6: | 283 | s += [('ip6tables', self.ipv6)] |
35 | 292 | s += [('ip6tables', self.ipv6)] | ||
39 | 293 | 284 | ||
52 | 294 | for cmd, tables in s: | 285 | for cmd, tables in s: |
53 | 295 | for table in tables: | 286 | for table in tables: |
54 | 296 | current_table, _ = self.execute('sudo', | 287 | current_table, _ = self.execute('sudo', |
55 | 297 | '%s-save' % (cmd,), | 288 | '%s-save' % (cmd,), |
56 | 298 | '-t', '%s' % (table,), | 289 | '-t', '%s' % (table,), |
57 | 299 | attempts=5) | 290 | attempts=5) |
58 | 300 | current_lines = current_table.split('\n') | 291 | current_lines = current_table.split('\n') |
59 | 301 | new_filter = self._modify_rules(current_lines, | 292 | new_filter = self._modify_rules(current_lines, |
60 | 302 | tables[table]) | 293 | tables[table]) |
61 | 303 | self.execute('sudo', '%s-restore' % (cmd,), | 294 | self.execute('sudo', '%s-restore' % (cmd,), |
62 | 304 | process_input='\n'.join(new_filter), | 295 | process_input='\n'.join(new_filter), |
63 | 305 | attempts=5) | 296 | attempts=5) |
64 | 306 | 297 | ||
65 | 307 | def _modify_rules(self, current_lines, table, binary=None): | 298 | def _modify_rules(self, current_lines, table, binary=None): |
66 | 308 | unwrapped_chains = table.unwrapped_chains | 299 | unwrapped_chains = table.unwrapped_chains |
67 | 309 | 300 | ||
68 | === modified file 'nova/tests/test_misc.py' | |||
69 | --- nova/tests/test_misc.py 2011-03-11 08:54:08 +0000 | |||
70 | +++ nova/tests/test_misc.py 2011-03-23 23:52:30 +0000 | |||
71 | @@ -18,8 +18,12 @@ | |||
72 | 18 | import os | 18 | import os |
73 | 19 | import select | 19 | import select |
74 | 20 | 20 | ||
75 | 21 | from eventlet import greenpool | ||
76 | 22 | from eventlet import greenthread | ||
77 | 23 | |||
78 | 21 | from nova import test | 24 | from nova import test |
80 | 22 | from nova.utils import parse_mailmap, str_dict_replace, synchronized | 25 | from nova import utils |
81 | 26 | from nova.utils import parse_mailmap, str_dict_replace | ||
82 | 23 | 27 | ||
83 | 24 | 28 | ||
84 | 25 | class ProjectTestCase(test.TestCase): | 29 | class ProjectTestCase(test.TestCase): |
85 | @@ -63,7 +67,7 @@ | |||
86 | 63 | 67 | ||
87 | 64 | class LockTestCase(test.TestCase): | 68 | class LockTestCase(test.TestCase): |
88 | 65 | def test_synchronized_wrapped_function_metadata(self): | 69 | def test_synchronized_wrapped_function_metadata(self): |
90 | 66 | @synchronized('whatever') | 70 | @utils.synchronized('whatever') |
91 | 67 | def foo(): | 71 | def foo(): |
92 | 68 | """Bar""" | 72 | """Bar""" |
93 | 69 | pass | 73 | pass |
94 | @@ -72,11 +76,42 @@ | |||
95 | 72 | self.assertEquals(foo.__name__, 'foo', "Wrapped function's name " | 76 | self.assertEquals(foo.__name__, 'foo', "Wrapped function's name " |
96 | 73 | "got mangled") | 77 | "got mangled") |
97 | 74 | 78 | ||
99 | 75 | def test_synchronized(self): | 79 | def test_synchronized_internally(self): |
100 | 80 | """We can lock across multiple green threads""" | ||
101 | 81 | saved_sem_num = len(utils._semaphores) | ||
102 | 82 | seen_threads = list() | ||
103 | 83 | |||
104 | 84 | @utils.synchronized('testlock2', external=False) | ||
105 | 85 | def f(id): | ||
106 | 86 | for x in range(10): | ||
107 | 87 | seen_threads.append(id) | ||
108 | 88 | greenthread.sleep(0) | ||
109 | 89 | |||
110 | 90 | threads = [] | ||
111 | 91 | pool = greenpool.GreenPool(10) | ||
112 | 92 | for i in range(10): | ||
113 | 93 | threads.append(pool.spawn(f, i)) | ||
114 | 94 | |||
115 | 95 | for thread in threads: | ||
116 | 96 | thread.wait() | ||
117 | 97 | |||
118 | 98 | self.assertEquals(len(seen_threads), 100) | ||
119 | 99 | # Looking at the seen threads, split it into chunks of 10, and verify | ||
120 | 100 | # that the last 9 match the first in each chunk. | ||
121 | 101 | for i in range(10): | ||
122 | 102 | for j in range(9): | ||
123 | 103 | self.assertEquals(seen_threads[i * 10], | ||
124 | 104 | seen_threads[i * 10 + 1 + j]) | ||
125 | 105 | |||
126 | 106 | self.assertEqual(saved_sem_num, len(utils._semaphores), | ||
127 | 107 | "Semaphore leak detected") | ||
128 | 108 | |||
129 | 109 | def test_synchronized_externally(self): | ||
130 | 110 | """We can lock across multiple processes""" | ||
131 | 76 | rpipe1, wpipe1 = os.pipe() | 111 | rpipe1, wpipe1 = os.pipe() |
132 | 77 | rpipe2, wpipe2 = os.pipe() | 112 | rpipe2, wpipe2 = os.pipe() |
133 | 78 | 113 | ||
135 | 79 | @synchronized('testlock') | 114 | @utils.synchronized('testlock1', external=True) |
136 | 80 | def f(rpipe, wpipe): | 115 | def f(rpipe, wpipe): |
137 | 81 | try: | 116 | try: |
138 | 82 | os.write(wpipe, "foo") | 117 | os.write(wpipe, "foo") |
139 | 83 | 118 | ||
140 | === modified file 'nova/tests/test_virt.py' | |||
141 | --- nova/tests/test_virt.py 2011-03-14 17:59:41 +0000 | |||
142 | +++ nova/tests/test_virt.py 2011-03-23 23:52:30 +0000 | |||
143 | @@ -77,13 +77,11 @@ | |||
144 | 77 | eventlet.sleep(0) | 77 | eventlet.sleep(0) |
145 | 78 | try: | 78 | try: |
146 | 79 | self.assertFalse(done2.ready()) | 79 | self.assertFalse(done2.ready()) |
147 | 80 | self.assertTrue('fname' in conn._image_sems) | ||
148 | 81 | finally: | 80 | finally: |
149 | 82 | wait1.send() | 81 | wait1.send() |
150 | 83 | done1.wait() | 82 | done1.wait() |
151 | 84 | eventlet.sleep(0) | 83 | eventlet.sleep(0) |
152 | 85 | self.assertTrue(done2.ready()) | 84 | self.assertTrue(done2.ready()) |
153 | 86 | self.assertFalse('fname' in conn._image_sems) | ||
154 | 87 | 85 | ||
155 | 88 | def test_different_fname_concurrency(self): | 86 | def test_different_fname_concurrency(self): |
156 | 89 | """Ensures that two different fname caches are concurrent""" | 87 | """Ensures that two different fname caches are concurrent""" |
157 | 90 | 88 | ||
158 | === modified file 'nova/utils.py' | |||
159 | --- nova/utils.py 2011-03-17 21:11:58 +0000 | |||
160 | +++ nova/utils.py 2011-03-23 23:52:30 +0000 | |||
161 | @@ -41,6 +41,7 @@ | |||
162 | 41 | 41 | ||
163 | 42 | from eventlet import event | 42 | from eventlet import event |
164 | 43 | from eventlet import greenthread | 43 | from eventlet import greenthread |
165 | 44 | from eventlet import semaphore | ||
166 | 44 | from eventlet.green import subprocess | 45 | from eventlet.green import subprocess |
167 | 45 | None | 46 | None |
168 | 46 | from nova import exception | 47 | from nova import exception |
169 | @@ -531,17 +532,76 @@ | |||
170 | 531 | return json.loads(s) | 532 | return json.loads(s) |
171 | 532 | 533 | ||
172 | 533 | 534 | ||
174 | 534 | def synchronized(name): | 535 | _semaphores = {} |
175 | 536 | |||
176 | 537 | |||
177 | 538 | class _NoopContextManager(object): | ||
178 | 539 | def __enter__(self): | ||
179 | 540 | pass | ||
180 | 541 | |||
181 | 542 | def __exit__(self, exc_type, exc_val, exc_tb): | ||
182 | 543 | pass | ||
183 | 544 | |||
184 | 545 | |||
185 | 546 | def synchronized(name, external=False): | ||
186 | 547 | """Synchronization decorator | ||
187 | 548 | |||
188 | 549 | Decorating a method like so: | ||
189 | 550 | @synchronized('mylock') | ||
190 | 551 | def foo(self, *args): | ||
191 | 552 | ... | ||
192 | 553 | |||
193 | 554 | ensures that only one thread will execute the bar method at a time. | ||
194 | 555 | |||
195 | 556 | Different methods can share the same lock: | ||
196 | 557 | @synchronized('mylock') | ||
197 | 558 | def foo(self, *args): | ||
198 | 559 | ... | ||
199 | 560 | |||
200 | 561 | @synchronized('mylock') | ||
201 | 562 | def bar(self, *args): | ||
202 | 563 | ... | ||
203 | 564 | |||
204 | 565 | This way only one of either foo or bar can be executing at a time. | ||
205 | 566 | |||
206 | 567 | The external keyword argument denotes whether this lock should work across | ||
207 | 568 | multiple processes. This means that if two different workers both run a | ||
208 | 569 | a method decorated with @synchronized('mylock', external=True), only one | ||
209 | 570 | of them will execute at a time. | ||
210 | 571 | """ | ||
211 | 572 | |||
212 | 535 | def wrap(f): | 573 | def wrap(f): |
213 | 536 | @functools.wraps(f) | 574 | @functools.wraps(f) |
214 | 537 | def inner(*args, **kwargs): | 575 | def inner(*args, **kwargs): |
217 | 538 | LOG.debug(_("Attempting to grab %(lock)s for method " | 576 | # NOTE(soren): If we ever go natively threaded, this will be racy. |
218 | 539 | "%(method)s..." % {"lock": name, | 577 | # See http://stackoverflow.com/questions/5390569/dyn\ |
219 | 578 | # amically-allocating-and-destroying-mutexes | ||
220 | 579 | if name not in _semaphores: | ||
221 | 580 | _semaphores[name] = semaphore.Semaphore() | ||
222 | 581 | sem = _semaphores[name] | ||
223 | 582 | LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method ' | ||
224 | 583 | '"%(method)s"...' % {"lock": name, | ||
225 | 540 | "method": f.__name__})) | 584 | "method": f.__name__})) |
230 | 541 | lock = lockfile.FileLock(os.path.join(FLAGS.lock_path, | 585 | with sem: |
231 | 542 | 'nova-%s.lock' % name)) | 586 | if external: |
232 | 543 | with lock: | 587 | LOG.debug(_('Attempting to grab file lock "%(lock)s" for ' |
233 | 544 | return f(*args, **kwargs) | 588 | 'method "%(method)s"...' % |
234 | 589 | {"lock": name, "method": f.__name__})) | ||
235 | 590 | lock_file_path = os.path.join(FLAGS.lock_path, | ||
236 | 591 | 'nova-%s.lock' % name) | ||
237 | 592 | lock = lockfile.FileLock(lock_file_path) | ||
238 | 593 | else: | ||
239 | 594 | lock = _NoopContextManager() | ||
240 | 595 | |||
241 | 596 | with lock: | ||
242 | 597 | retval = f(*args, **kwargs) | ||
243 | 598 | |||
244 | 599 | # If no-one else is waiting for it, delete it. | ||
245 | 600 | # See note about possible raciness above. | ||
246 | 601 | if not sem.balance < 1: | ||
247 | 602 | del _semaphores[name] | ||
248 | 603 | |||
249 | 604 | return retval | ||
250 | 545 | return inner | 605 | return inner |
251 | 546 | return wrap | 606 | return wrap |
252 | 547 | 607 | ||
253 | 548 | 608 | ||
254 | === modified file 'nova/virt/libvirt_conn.py' | |||
255 | --- nova/virt/libvirt_conn.py 2011-03-23 05:29:32 +0000 | |||
256 | +++ nova/virt/libvirt_conn.py 2011-03-23 23:52:30 +0000 | |||
257 | @@ -48,7 +48,7 @@ | |||
258 | 48 | 48 | ||
259 | 49 | from eventlet import greenthread | 49 | from eventlet import greenthread |
260 | 50 | from eventlet import tpool | 50 | from eventlet import tpool |
262 | 51 | from eventlet import semaphore | 51 | |
263 | 52 | import IPy | 52 | import IPy |
264 | 53 | 53 | ||
265 | 54 | from nova import context | 54 | from nova import context |
266 | @@ -556,13 +556,12 @@ | |||
267 | 556 | os.mkdir(base_dir) | 556 | os.mkdir(base_dir) |
268 | 557 | base = os.path.join(base_dir, fname) | 557 | base = os.path.join(base_dir, fname) |
269 | 558 | 558 | ||
273 | 559 | if fname not in LibvirtConnection._image_sems: | 559 | @utils.synchronized(fname) |
274 | 560 | LibvirtConnection._image_sems[fname] = semaphore.Semaphore() | 560 | def call_if_not_exists(base, fn, *args, **kwargs): |
272 | 561 | with LibvirtConnection._image_sems[fname]: | ||
275 | 562 | if not os.path.exists(base): | 561 | if not os.path.exists(base): |
276 | 563 | fn(target=base, *args, **kwargs) | 562 | fn(target=base, *args, **kwargs) |
279 | 564 | if not LibvirtConnection._image_sems[fname].locked(): | 563 | |
280 | 565 | del LibvirtConnection._image_sems[fname] | 564 | call_if_not_exists(base, fn, *args, **kwargs) |
281 | 566 | 565 | ||
282 | 567 | if cow: | 566 | if cow: |
283 | 568 | utils.execute('qemu-img', 'create', '-f', 'qcow2', '-o', | 567 | utils.execute('qemu-img', 'create', '-f', 'qcow2', '-o', |
284 | @@ -1780,15 +1779,15 @@ | |||
285 | 1780 | pass | 1779 | pass |
286 | 1781 | 1780 | ||
287 | 1782 | def refresh_security_group_rules(self, security_group): | 1781 | def refresh_security_group_rules(self, security_group): |
295 | 1783 | # We use the semaphore to make sure noone applies the rule set | 1782 | self.do_refresh_security_group_rules(security_group) |
289 | 1784 | # after we've yanked the existing rules but before we've put in | ||
290 | 1785 | # the new ones. | ||
291 | 1786 | with self.iptables.semaphore: | ||
292 | 1787 | for instance in self.instances.values(): | ||
293 | 1788 | self.remove_filters_for_instance(instance) | ||
294 | 1789 | self.add_filters_for_instance(instance) | ||
296 | 1790 | self.iptables.apply() | 1783 | self.iptables.apply() |
297 | 1791 | 1784 | ||
298 | 1785 | @utils.synchronized('iptables', external=True) | ||
299 | 1786 | def do_refresh_security_group_rules(self, security_group): | ||
300 | 1787 | for instance in self.instances.values(): | ||
301 | 1788 | self.remove_filters_for_instance(instance) | ||
302 | 1789 | self.add_filters_for_instance(instance) | ||
303 | 1790 | |||
304 | 1792 | def _security_group_chain_name(self, security_group_id): | 1791 | def _security_group_chain_name(self, security_group_id): |
305 | 1793 | return 'nova-sg-%s' % (security_group_id,) | 1792 | return 'nova-sg-%s' % (security_group_id,) |
306 | 1794 | 1793 |
Tests run just fine and seem complete to me. Nice to have a clean decorator implementation for locking. Nice job! lgtm