Merge lp:~soren/nova/consolidate-locking into lp:~hudson-openstack/nova/trunk
- consolidate-locking
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Soren Hansen | ||||
Approved revision: | 856 | ||||
Merged at revision: | 856 | ||||
Proposed branch: | lp:~soren/nova/consolidate-locking | ||||
Merge into: | lp:~hudson-openstack/nova/trunk | ||||
Diff against target: |
305 lines (+134/-51) 5 files modified
nova/network/linux_net.py (+16/-25) nova/tests/test_misc.py (+39/-4) nova/tests/test_virt.py (+0/-2) nova/utils.py (+67/-7) nova/virt/libvirt_conn.py (+12/-13) |
||||
To merge this branch: | bzr merge lp:~soren/nova/consolidate-locking | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Rick Harris (community) | Approve | ||
Vish Ishaya (community) | Approve | ||
Review via email: mp+54364@code.launchpad.net |
Commit message
Description of the change
Move all types of locking into utils.synchronize decorator.
Convert all uses of semaphores to use this decorator.
Rick Harris (rconradharris) wrote : | # |
This looks really good-- great work Soren.
Brownie points for:
* Useful inline comments
* Added test
* YAGNI on the added complexity of a race-free sync (think it was probably wise to punt on that for now)
Soren Hansen (soren) wrote : | # |
2011/3/23 Rick Harris <email address hidden>
> * YAGNI on the added complexity of a race-free sync (think it was probably wise to punt on that for now)
I actually started working on it, but when I realised the hilarity of
implementing an rwlock that worked with Eventlet in an effort to solve
a problem that specifically does not exist under Eventlet, I stopped
:)
OpenStack Infra (hudson-openstack) wrote : | # |
The attempt to merge lp:~soren/nova/consolidate-locking into lp:nova failed. Below is the output from the failed tests.
AccountsTest
test_
test_
test_
test_
AdminAPITest
test_
test_
APITest
test_
Test
test_
test_
test_bad_token OK
test_
test_
test_no_user OK
test_
TestFunctional
test_
test_
TestLimiter
test_
LimiterTest
test_
test_
test_
test_
test_
test_
test_
test_
test_
test_
test_
test_
test_
TestFaults
test_
test_raise OK
test_
FlavorsTest
test_
test_
test_
GlanceImageServ
test_create OK
test_
test_delete OK
test_update OK
ImageController
test_
test_
test_show_image OK
LocalImageServi
test_create...
Soren Hansen (soren) wrote : | # |
/me retries
Preview Diff
1 | === modified file 'nova/network/linux_net.py' |
2 | --- nova/network/linux_net.py 2011-03-23 11:21:20 +0000 |
3 | +++ nova/network/linux_net.py 2011-03-23 23:52:30 +0000 |
4 | @@ -21,8 +21,6 @@ |
5 | import os |
6 | import calendar |
7 | |
8 | -from eventlet import semaphore |
9 | - |
10 | from nova import db |
11 | from nova import exception |
12 | from nova import flags |
13 | @@ -272,37 +270,30 @@ |
14 | self.ipv4['nat'].add_chain('floating-snat') |
15 | self.ipv4['nat'].add_rule('snat', '-j $floating-snat') |
16 | |
17 | - self.semaphore = semaphore.Semaphore() |
18 | - |
19 | - @utils.synchronized('iptables') |
20 | + @utils.synchronized('iptables', external=True) |
21 | def apply(self): |
22 | """Apply the current in-memory set of iptables rules |
23 | |
24 | This will blow away any rules left over from previous runs of the |
25 | same component of Nova, and replace them with our current set of |
26 | rules. This happens atomically, thanks to iptables-restore. |
27 | - |
28 | - We wrap the call in a semaphore lock, so that we don't race with |
29 | - ourselves. In the event of a race with another component running |
30 | - an iptables-* command at the same time, we retry up to 5 times. |
31 | """ |
32 | - with self.semaphore: |
33 | - s = [('iptables', self.ipv4)] |
34 | - if FLAGS.use_ipv6: |
35 | - s += [('ip6tables', self.ipv6)] |
36 | + s = [('iptables', self.ipv4)] |
37 | + if FLAGS.use_ipv6: |
38 | + s += [('ip6tables', self.ipv6)] |
39 | |
40 | - for cmd, tables in s: |
41 | - for table in tables: |
42 | - current_table, _ = self.execute('sudo', |
43 | - '%s-save' % (cmd,), |
44 | - '-t', '%s' % (table,), |
45 | - attempts=5) |
46 | - current_lines = current_table.split('\n') |
47 | - new_filter = self._modify_rules(current_lines, |
48 | - tables[table]) |
49 | - self.execute('sudo', '%s-restore' % (cmd,), |
50 | - process_input='\n'.join(new_filter), |
51 | - attempts=5) |
52 | + for cmd, tables in s: |
53 | + for table in tables: |
54 | + current_table, _ = self.execute('sudo', |
55 | + '%s-save' % (cmd,), |
56 | + '-t', '%s' % (table,), |
57 | + attempts=5) |
58 | + current_lines = current_table.split('\n') |
59 | + new_filter = self._modify_rules(current_lines, |
60 | + tables[table]) |
61 | + self.execute('sudo', '%s-restore' % (cmd,), |
62 | + process_input='\n'.join(new_filter), |
63 | + attempts=5) |
64 | |
65 | def _modify_rules(self, current_lines, table, binary=None): |
66 | unwrapped_chains = table.unwrapped_chains |
67 | |
68 | === modified file 'nova/tests/test_misc.py' |
69 | --- nova/tests/test_misc.py 2011-03-11 08:54:08 +0000 |
70 | +++ nova/tests/test_misc.py 2011-03-23 23:52:30 +0000 |
71 | @@ -18,8 +18,12 @@ |
72 | import os |
73 | import select |
74 | |
75 | +from eventlet import greenpool |
76 | +from eventlet import greenthread |
77 | + |
78 | from nova import test |
79 | -from nova.utils import parse_mailmap, str_dict_replace, synchronized |
80 | +from nova import utils |
81 | +from nova.utils import parse_mailmap, str_dict_replace |
82 | |
83 | |
84 | class ProjectTestCase(test.TestCase): |
85 | @@ -63,7 +67,7 @@ |
86 | |
87 | class LockTestCase(test.TestCase): |
88 | def test_synchronized_wrapped_function_metadata(self): |
89 | - @synchronized('whatever') |
90 | + @utils.synchronized('whatever') |
91 | def foo(): |
92 | """Bar""" |
93 | pass |
94 | @@ -72,11 +76,42 @@ |
95 | self.assertEquals(foo.__name__, 'foo', "Wrapped function's name " |
96 | "got mangled") |
97 | |
98 | - def test_synchronized(self): |
99 | + def test_synchronized_internally(self): |
100 | + """We can lock across multiple green threads""" |
101 | + saved_sem_num = len(utils._semaphores) |
102 | + seen_threads = list() |
103 | + |
104 | + @utils.synchronized('testlock2', external=False) |
105 | + def f(id): |
106 | + for x in range(10): |
107 | + seen_threads.append(id) |
108 | + greenthread.sleep(0) |
109 | + |
110 | + threads = [] |
111 | + pool = greenpool.GreenPool(10) |
112 | + for i in range(10): |
113 | + threads.append(pool.spawn(f, i)) |
114 | + |
115 | + for thread in threads: |
116 | + thread.wait() |
117 | + |
118 | + self.assertEquals(len(seen_threads), 100) |
119 | + # Looking at the seen threads, split it into chunks of 10, and verify |
120 | + # that the last 9 match the first in each chunk. |
121 | + for i in range(10): |
122 | + for j in range(9): |
123 | + self.assertEquals(seen_threads[i * 10], |
124 | + seen_threads[i * 10 + 1 + j]) |
125 | + |
126 | + self.assertEqual(saved_sem_num, len(utils._semaphores), |
127 | + "Semaphore leak detected") |
128 | + |
129 | + def test_synchronized_externally(self): |
130 | + """We can lock across multiple processes""" |
131 | rpipe1, wpipe1 = os.pipe() |
132 | rpipe2, wpipe2 = os.pipe() |
133 | |
134 | - @synchronized('testlock') |
135 | + @utils.synchronized('testlock1', external=True) |
136 | def f(rpipe, wpipe): |
137 | try: |
138 | os.write(wpipe, "foo") |
139 | |
140 | === modified file 'nova/tests/test_virt.py' |
141 | --- nova/tests/test_virt.py 2011-03-14 17:59:41 +0000 |
142 | +++ nova/tests/test_virt.py 2011-03-23 23:52:30 +0000 |
143 | @@ -77,13 +77,11 @@ |
144 | eventlet.sleep(0) |
145 | try: |
146 | self.assertFalse(done2.ready()) |
147 | - self.assertTrue('fname' in conn._image_sems) |
148 | finally: |
149 | wait1.send() |
150 | done1.wait() |
151 | eventlet.sleep(0) |
152 | self.assertTrue(done2.ready()) |
153 | - self.assertFalse('fname' in conn._image_sems) |
154 | |
155 | def test_different_fname_concurrency(self): |
156 | """Ensures that two different fname caches are concurrent""" |
157 | |
158 | === modified file 'nova/utils.py' |
159 | --- nova/utils.py 2011-03-17 21:11:58 +0000 |
160 | +++ nova/utils.py 2011-03-23 23:52:30 +0000 |
161 | @@ -41,6 +41,7 @@ |
162 | |
163 | from eventlet import event |
164 | from eventlet import greenthread |
165 | +from eventlet import semaphore |
166 | from eventlet.green import subprocess |
167 | None |
168 | from nova import exception |
169 | @@ -531,17 +532,76 @@ |
170 | return json.loads(s) |
171 | |
172 | |
173 | -def synchronized(name): |
174 | +_semaphores = {} |
175 | + |
176 | + |
177 | +class _NoopContextManager(object): |
178 | + def __enter__(self): |
179 | + pass |
180 | + |
181 | + def __exit__(self, exc_type, exc_val, exc_tb): |
182 | + pass |
183 | + |
184 | + |
185 | +def synchronized(name, external=False): |
186 | + """Synchronization decorator |
187 | + |
188 | + Decorating a method like so: |
189 | + @synchronized('mylock') |
190 | + def foo(self, *args): |
191 | + ... |
192 | + |
193 | + ensures that only one thread will execute the bar method at a time. |
194 | + |
195 | + Different methods can share the same lock: |
196 | + @synchronized('mylock') |
197 | + def foo(self, *args): |
198 | + ... |
199 | + |
200 | + @synchronized('mylock') |
201 | + def bar(self, *args): |
202 | + ... |
203 | + |
204 | + This way only one of either foo or bar can be executing at a time. |
205 | + |
206 | + The external keyword argument denotes whether this lock should work across |
207 | + multiple processes. This means that if two different workers both run a |
208 | + a method decorated with @synchronized('mylock', external=True), only one |
209 | + of them will execute at a time. |
210 | + """ |
211 | + |
212 | def wrap(f): |
213 | @functools.wraps(f) |
214 | def inner(*args, **kwargs): |
215 | - LOG.debug(_("Attempting to grab %(lock)s for method " |
216 | - "%(method)s..." % {"lock": name, |
217 | + # NOTE(soren): If we ever go natively threaded, this will be racy. |
218 | + # See http://stackoverflow.com/questions/5390569/dyn\ |
219 | + # amically-allocating-and-destroying-mutexes |
220 | + if name not in _semaphores: |
221 | + _semaphores[name] = semaphore.Semaphore() |
222 | + sem = _semaphores[name] |
223 | + LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method ' |
224 | + '"%(method)s"...' % {"lock": name, |
225 | "method": f.__name__})) |
226 | - lock = lockfile.FileLock(os.path.join(FLAGS.lock_path, |
227 | - 'nova-%s.lock' % name)) |
228 | - with lock: |
229 | - return f(*args, **kwargs) |
230 | + with sem: |
231 | + if external: |
232 | + LOG.debug(_('Attempting to grab file lock "%(lock)s" for ' |
233 | + 'method "%(method)s"...' % |
234 | + {"lock": name, "method": f.__name__})) |
235 | + lock_file_path = os.path.join(FLAGS.lock_path, |
236 | + 'nova-%s.lock' % name) |
237 | + lock = lockfile.FileLock(lock_file_path) |
238 | + else: |
239 | + lock = _NoopContextManager() |
240 | + |
241 | + with lock: |
242 | + retval = f(*args, **kwargs) |
243 | + |
244 | + # If no-one else is waiting for it, delete it. |
245 | + # See note about possible raciness above. |
246 | + if not sem.balance < 1: |
247 | + del _semaphores[name] |
248 | + |
249 | + return retval |
250 | return inner |
251 | return wrap |
252 | |
253 | |
254 | === modified file 'nova/virt/libvirt_conn.py' |
255 | --- nova/virt/libvirt_conn.py 2011-03-23 05:29:32 +0000 |
256 | +++ nova/virt/libvirt_conn.py 2011-03-23 23:52:30 +0000 |
257 | @@ -48,7 +48,7 @@ |
258 | |
259 | from eventlet import greenthread |
260 | from eventlet import tpool |
261 | -from eventlet import semaphore |
262 | + |
263 | import IPy |
264 | |
265 | from nova import context |
266 | @@ -556,13 +556,12 @@ |
267 | os.mkdir(base_dir) |
268 | base = os.path.join(base_dir, fname) |
269 | |
270 | - if fname not in LibvirtConnection._image_sems: |
271 | - LibvirtConnection._image_sems[fname] = semaphore.Semaphore() |
272 | - with LibvirtConnection._image_sems[fname]: |
273 | + @utils.synchronized(fname) |
274 | + def call_if_not_exists(base, fn, *args, **kwargs): |
275 | if not os.path.exists(base): |
276 | fn(target=base, *args, **kwargs) |
277 | - if not LibvirtConnection._image_sems[fname].locked(): |
278 | - del LibvirtConnection._image_sems[fname] |
279 | + |
280 | + call_if_not_exists(base, fn, *args, **kwargs) |
281 | |
282 | if cow: |
283 | utils.execute('qemu-img', 'create', '-f', 'qcow2', '-o', |
284 | @@ -1780,15 +1779,15 @@ |
285 | pass |
286 | |
287 | def refresh_security_group_rules(self, security_group): |
288 | - # We use the semaphore to make sure noone applies the rule set |
289 | - # after we've yanked the existing rules but before we've put in |
290 | - # the new ones. |
291 | - with self.iptables.semaphore: |
292 | - for instance in self.instances.values(): |
293 | - self.remove_filters_for_instance(instance) |
294 | - self.add_filters_for_instance(instance) |
295 | + self.do_refresh_security_group_rules(security_group) |
296 | self.iptables.apply() |
297 | |
298 | + @utils.synchronized('iptables', external=True) |
299 | + def do_refresh_security_group_rules(self, security_group): |
300 | + for instance in self.instances.values(): |
301 | + self.remove_filters_for_instance(instance) |
302 | + self.add_filters_for_instance(instance) |
303 | + |
304 | def _security_group_chain_name(self, security_group_id): |
305 | return 'nova-sg-%s' % (security_group_id,) |
306 |
Tests run just fine and seem complete to me. Nice to have a clean decorator implementation for locking. Nice job! lgtm