Merge lp:~allenap/postgresfixture/more-locking into lp:~lazr-developers/postgresfixture/trunk

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: 9
Merged at revision: 5
Proposed branch: lp:~allenap/postgresfixture/more-locking
Merge into: lp:~lazr-developers/postgresfixture/trunk
Diff against target: 346 lines (+118/-33)
6 files modified
postgresfixture/cluster.py (+43/-2)
postgresfixture/clusterfixture.py (+6/-6)
postgresfixture/main.py (+4/-4)
postgresfixture/tests/test_cluster.py (+49/-5)
postgresfixture/tests/test_clusterfixture.py (+11/-11)
postgresfixture/tests/test_main.py (+5/-5)
To merge this branch: bzr merge lp:~allenap/postgresfixture/more-locking
Reviewer Review Type Date Requested Status
Raphaël Badin (community) Approve
Review via email: mp+106907@code.launchpad.net

Commit message

Lock, using lockf, around critical sections during cluster set-up and tear-down.

To post a comment you must log in.
Revision history for this message
Raphaël Badin (rvb) wrote :

Looks good.

rvba -> allenap: could you tell me why you did s/locks/shares/ in the postgresfixtur branch?
allenap -> rvba: That change, from lock to shares, is because of the addition of the "critical section" lock (the lockf based one). The shares lock is better named now because it reflects that each acquired lock on it represents a share in the resource.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'postgresfixture/cluster.py'
2--- postgresfixture/cluster.py 2012-05-14 13:37:38 +0000
3+++ postgresfixture/cluster.py 2012-05-22 22:41:20 +0000
4@@ -14,7 +14,16 @@
5 "Cluster",
6 ]
7
8-from contextlib import closing
9+from contextlib import (
10+ closing,
11+ contextmanager,
12+ )
13+from fcntl import (
14+ LOCK_EX,
15+ LOCK_UN,
16+ lockf,
17+ )
18+from functools import wraps
19 from os import (
20 devnull,
21 environ,
22@@ -46,12 +55,40 @@
23 return path.pathsep.join(exe_path)
24
25
26+def locked(method):
27+ """Execute the decorated method with its instance's lock held."""
28+ @wraps(method)
29+ def wrapper(self, *args, **kwargs):
30+ with self.lock:
31+ return method(self, *args, **kwargs)
32+ return wrapper
33+
34+
35 class Cluster:
36 """Represents a PostgreSQL cluster, running or not."""
37
38 def __init__(self, datadir):
39 self.datadir = path.abspath(datadir)
40-
41+ self.lockfile = path.join(
42+ path.dirname(self.datadir),
43+ ".%s.lock" % path.basename(self.datadir))
44+
45+ @property
46+ @contextmanager
47+ def lock(self):
48+ """Context that takes out a lock for critical sections.
49+
50+ The lock is meant to be short-lived, to avoid race conditions. As
51+ such, acquiring this lock will block.
52+ """
53+ with open(self.lockfile, "ab") as fd:
54+ lockf(fd, LOCK_EX)
55+ try:
56+ yield
57+ finally:
58+ lockf(fd, LOCK_UN)
59+
60+ @locked
61 def execute(self, *command, **options):
62 """Execute a command with an environment suitable for this cluster."""
63 env = options.pop("env", environ).copy()
64@@ -92,6 +129,7 @@
65 else:
66 return True
67
68+ @locked
69 def create(self):
70 """Create this cluster, if it does not exist."""
71 if not self.exists:
72@@ -99,6 +137,7 @@
73 makedirs(self.datadir)
74 self.execute("pg_ctl", "init", "-s", "-o", "-E utf8 -A trust")
75
76+ @locked
77 def start(self):
78 """Start this cluster, if it's not already started."""
79 if not self.running:
80@@ -149,6 +188,7 @@
81 with closing(conn.cursor()) as cur:
82 cur.execute("DROP DATABASE %s" % name)
83
84+ @locked
85 def stop(self):
86 """Stop this cluster, if started."""
87 if self.running:
88@@ -157,6 +197,7 @@
89 # -m <mode> -- shutdown mode.
90 self.execute("pg_ctl", "stop", "-s", "-w", "-m", "fast")
91
92+ @locked
93 def destroy(self):
94 """Destroy this cluster, if it exists.
95
96
97=== modified file 'postgresfixture/clusterfixture.py'
98--- postgresfixture/clusterfixture.py 2012-05-14 13:37:38 +0000
99+++ postgresfixture/clusterfixture.py 2012-05-22 22:41:20 +0000
100@@ -99,8 +99,8 @@
101 """
102 super(ClusterFixture, self).__init__(datadir)
103 self.preserve = preserve
104- self.lock = ProcessSemaphore(
105- path.join(self.datadir, "locks"))
106+ self.shares = ProcessSemaphore(
107+ path.join(self.datadir, "shares"))
108
109 def setUp(self):
110 super(ClusterFixture, self).setUp()
111@@ -112,8 +112,8 @@
112 self.create()
113 self.addCleanup(self.stop)
114 self.start()
115- self.addCleanup(self.lock.release)
116- self.lock.acquire()
117+ self.addCleanup(self.shares.release)
118+ self.shares.acquire()
119
120 def createdb(self, name):
121 """Create the named database if it does not exist already.
122@@ -133,10 +133,10 @@
123
124 def stop(self):
125 """Stop the cluster, but only if there are no other users."""
126- if not self.lock.locked:
127+ if not self.shares.locked:
128 super(ClusterFixture, self).stop()
129
130 def destroy(self):
131 """Destroy the cluster, but only if there are no other users."""
132- if not self.lock.locked:
133+ if not self.shares.locked:
134 super(ClusterFixture, self).destroy()
135
136=== modified file 'postgresfixture/main.py'
137--- postgresfixture/main.py 2012-05-22 14:53:09 +0000
138+++ postgresfixture/main.py 2012-05-22 22:41:20 +0000
139@@ -67,9 +67,9 @@
140 action_stop(cluster, arguments)
141 cluster.destroy()
142 if cluster.exists:
143- if cluster.lock.locked:
144+ if cluster.shares.locked:
145 message = "%s: cluster is %s" % (
146- cluster.datadir, locked_by_description(cluster.lock))
147+ cluster.datadir, locked_by_description(cluster.shares))
148 else:
149 message = "%s: cluster could not be removed." % cluster.datadir
150 error(message)
151@@ -121,9 +121,9 @@
152 """Stop the cluster."""
153 cluster.stop()
154 if cluster.running:
155- if cluster.lock.locked:
156+ if cluster.shares.locked:
157 message = "%s: cluster is %s" % (
158- cluster.datadir, locked_by_description(cluster.lock))
159+ cluster.datadir, locked_by_description(cluster.shares))
160 else:
161 message = "%s: cluster is still running." % cluster.datadir
162 error(message)
163
164=== modified file 'postgresfixture/tests/test_cluster.py'
165--- postgresfixture/tests/test_cluster.py 2012-05-14 15:01:01 +0000
166+++ postgresfixture/tests/test_cluster.py 2012-05-22 22:41:20 +0000
167@@ -17,7 +17,13 @@
168 getpid,
169 path,
170 )
171-from subprocess import CalledProcessError
172+from subprocess import (
173+ CalledProcessError,
174+ PIPE,
175+ Popen,
176+ )
177+import sys
178+from textwrap import dedent
179
180 import postgresfixture.cluster
181 from postgresfixture.cluster import (
182@@ -27,6 +33,7 @@
183 )
184 from postgresfixture.main import repr_pid
185 from postgresfixture.testing import TestCase
186+from testtools.content import text_content
187 from testtools.matchers import (
188 DirExists,
189 FileExists,
190@@ -62,9 +69,46 @@
191 def test_init(self):
192 # The datadir passed into the Cluster constructor is resolved to an
193 # absolute path.
194- datadir = path.join(self.make_dir(), "locks")
195+ tmpdir = self.make_dir()
196+ datadir = path.join(tmpdir, "somewhere")
197 cluster = self.make(path.relpath(datadir))
198 self.assertEqual(datadir, cluster.datadir)
199+ # The lock file is in the parent directory of the data directory.
200+ self.assertEqual(
201+ path.join(tmpdir, ".somewhere.lock"),
202+ cluster.lockfile)
203+
204+ def test_lock(self):
205+ # To test the lock - based on lockf - we take the lock locally then
206+ # check if it appears locked from a separate process.
207+ cluster = self.make(self.make_dir())
208+ script = dedent("""\
209+ from errno import EAGAIN
210+ from fcntl import LOCK_EX, LOCK_NB, lockf
211+ with open(%r, "ab") as fd:
212+ try:
213+ lockf(fd, LOCK_EX | LOCK_NB)
214+ except IOError, error:
215+ if error.errno != EAGAIN:
216+ raise
217+ else:
218+ raise AssertionError("Not locked")
219+ """) % cluster.lockfile
220+ with cluster.lock:
221+ process = Popen(
222+ sys.executable, stdin=PIPE,
223+ stdout=PIPE, stderr=PIPE)
224+ stdout, stderr = process.communicate(script)
225+ self.addDetail("stdout", text_content(stdout))
226+ self.addDetail("stderr", text_content(stderr))
227+ self.assertEqual(0, process.returncode)
228+
229+ def test_lock_is_reentrant(self):
230+ # The lock can be acquired several times.
231+ cluster = self.make(self.make_dir())
232+ with cluster.lock:
233+ with cluster.lock:
234+ pass # We get here okay.
235
236 def patch_check_call(self, returncode=0):
237 calls = []
238@@ -112,19 +156,19 @@
239
240 def test_running(self):
241 calls = self.patch_check_call(returncode=0)
242- cluster = self.make("/some/where")
243+ cluster = self.make(self.make_dir())
244 self.assertTrue(cluster.running)
245 [(command, options)] = calls
246 self.assertEqual(("pg_ctl", "status"), command)
247
248 def test_running_not(self):
249 self.patch_check_call(returncode=1)
250- cluster = self.make("/some/where")
251+ cluster = self.make(self.make_dir())
252 self.assertFalse(cluster.running)
253
254 def test_running_error(self):
255 self.patch_check_call(returncode=2)
256- cluster = self.make("/some/where")
257+ cluster = self.make(self.make_dir())
258 self.assertRaises(
259 CalledProcessError, getattr, cluster, "running")
260
261
262=== modified file 'postgresfixture/tests/test_clusterfixture.py'
263--- postgresfixture/tests/test_clusterfixture.py 2012-05-21 10:34:58 +0000
264+++ postgresfixture/tests/test_clusterfixture.py 2012-05-22 22:41:20 +0000
265@@ -68,10 +68,10 @@
266 def test_init_fixture(self):
267 fixture = self.make("/some/where")
268 self.assertEqual(False, fixture.preserve)
269- self.assertIsInstance(fixture.lock, ProcessSemaphore)
270+ self.assertIsInstance(fixture.shares, ProcessSemaphore)
271 self.assertEqual(
272- path.join(fixture.datadir, "locks"),
273- fixture.lock.lockdir)
274+ path.join(fixture.datadir, "shares"),
275+ fixture.shares.lockdir)
276
277 def test_createdb_no_preserve(self):
278 fixture = self.make(self.make_dir(), preserve=False)
279@@ -108,26 +108,26 @@
280 fixture.dropdb("diekrupps")
281 # The test is that we arrive here without error.
282
283- def test_stop_locked(self):
284- # The cluster is not stopped if a lock is held.
285+ def test_stop_share_locked(self):
286+ # The cluster is not stopped if a shared lock is held.
287 fixture = self.make(self.make_dir())
288 self.addCleanup(fixture.stop)
289 fixture.start()
290- fixture.lock.acquire()
291+ fixture.shares.acquire()
292 fixture.stop()
293 self.assertTrue(fixture.running)
294- fixture.lock.release()
295+ fixture.shares.release()
296 fixture.stop()
297 self.assertFalse(fixture.running)
298
299- def test_destroy_locked(self):
300- # The cluster is not destroyed if a lock is held.
301+ def test_destroy_share_locked(self):
302+ # The cluster is not destroyed if a shared lock is held.
303 fixture = self.make(self.make_dir())
304 fixture.create()
305- fixture.lock.acquire()
306+ fixture.shares.acquire()
307 fixture.destroy()
308 self.assertTrue(fixture.exists)
309- fixture.lock.release()
310+ fixture.shares.release()
311 fixture.destroy()
312 self.assertFalse(fixture.exists)
313
314
315=== modified file 'postgresfixture/tests/test_main.py'
316--- postgresfixture/tests/test_main.py 2012-05-22 14:53:09 +0000
317+++ postgresfixture/tests/test_main.py 2012-05-22 22:41:20 +0000
318@@ -137,12 +137,12 @@
319 self.assertFalse(cluster.running)
320 self.assertTrue(cluster.exists)
321
322- def test_stop_when_locked(self):
323+ def test_stop_when_share_locked(self):
324 cluster = ClusterFixture(self.make_dir())
325 self.addCleanup(cluster.stop)
326 cluster.start()
327- self.addCleanup(cluster.lock.release)
328- cluster.lock.acquire()
329+ self.addCleanup(cluster.shares.release)
330+ cluster.shares.acquire()
331 self.patch(sys, "stderr", StringIO())
332 error = self.assertRaises(
333 SystemExit, main.action_stop, cluster,
334@@ -161,10 +161,10 @@
335 self.assertFalse(cluster.running)
336 self.assertFalse(cluster.exists)
337
338- def test_destroy_when_locked(self):
339+ def test_destroy_when_share_locked(self):
340 cluster = ClusterFixture(self.make_dir())
341 cluster.create()
342- cluster.lock.acquire()
343+ cluster.shares.acquire()
344 self.patch(sys, "stderr", StringIO())
345 error = self.assertRaises(
346 SystemExit, main.action_destroy, cluster,

Subscribers

People subscribed via source and target branches

to all changes: