Merge lp:~mwhudson/loggerhead/concurrency-thoughts into lp:loggerhead

Proposed by Michael Hudson-Doyle
Status: Merged
Merged at revision: not available
Proposed branch: lp:~mwhudson/loggerhead/concurrency-thoughts
Merge into: lp:loggerhead
Diff against target: None lines
To merge this branch: bzr merge lp:~mwhudson/loggerhead/concurrency-thoughts
Reviewer Review Type Date Requested Status
Martin Albisetti Approve
Review via email: mp+4687@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

Hi.

Currently all accesses to the filechanges.sql cache db are serialized. I'd like to experiment on launchpad with having one database for all branches, so this serialization won't really do.

This branch switches to a more optimistic approach to concurrency -- if, by the time you're trying to add the data to the database, someone else has already added it, oh well. The extremely paranoid locking was left over from the days when we used bsddb, sqlite should be much safer!

I also delete some of the locking infrastructure that is no longer used.

Revision history for this message
Martin Albisetti (beuno) wrote :

Looks good

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'loggerhead/changecache.py'
2--- loggerhead/changecache.py 2009-03-18 20:57:19 +0000
3+++ loggerhead/changecache.py 2009-03-19 21:01:11 +0000
4@@ -17,9 +17,10 @@
5 #
6
7 """
8-a cache for chewed-up "change" data structures, which are basically just a
9-different way of storing a revision. the cache improves lookup times 10x
10-over bazaar's xml revision structure, though, so currently still worth doing.
11+a cache for chewed-up 'file change' data structures, which are basically just
12+a different way of storing a revision delta. the cache improves lookup times
13+10x over bazaar's xml revision structure, though, so currently still worth
14+doing.
15
16 once a revision is committed in bazaar, it never changes, so once we have
17 cached a change, it's good forever.
18@@ -27,36 +28,40 @@
19
20 import cPickle
21 import os
22-
23-from loggerhead import util
24-from loggerhead.lockfile import LockFile
25-
26-with_lock = util.with_lock('_lock', 'ChangeCache')
27+import tempfile
28
29 try:
30 from sqlite3 import dbapi2
31 except ImportError:
32 from pysqlite2 import dbapi2
33
34+# We take an optimistic approach to concurrency here: we might do work twice
35+# in the case of races, but not crash or corrupt data.
36
37 class FakeShelf(object):
38
39 def __init__(self, filename):
40 create_table = not os.path.exists(filename)
41+ if create_table:
42+ # To avoid races around creating the database, we create the db in
43+ # a temporary file and rename it into the ultimate location.
44+ fd, path = tempfile.mkstemp(dir=os.path.dirname(filename))
45+ self._create_table(path)
46+ os.rename(path, filename)
47 self.connection = dbapi2.connect(filename)
48 self.cursor = self.connection.cursor()
49- if create_table:
50- self._create_table()
51
52- def _create_table(self):
53- self.cursor.execute(
54+ def _create_table(self, filename):
55+ con = dbapi2.connect(filename)
56+ cur = con.cursor()
57+ cur.execute(
58 "create table RevisionData "
59 "(revid binary primary key, data binary)")
60- self.connection.commit()
61+ con.commit()
62+ con.close()
63
64 def _serialize(self, obj):
65- r = dbapi2.Binary(cPickle.dumps(obj, protocol=2))
66- return r
67+ return dbapi2.Binary(cPickle.dumps(obj, protocol=2))
68
69 def _unserialize(self, data):
70 return cPickle.loads(str(data))
71@@ -71,10 +76,15 @@
72 return self._unserialize(filechange[0])
73
74 def add(self, revid, object):
75- self.cursor.execute(
76- "insert into revisiondata (revid, data) values (?, ?)",
77- (revid, self._serialize(object)))
78- self.connection.commit()
79+ try:
80+ self.cursor.execute(
81+ "insert into revisiondata (revid, data) values (?, ?)",
82+ (revid, self._serialize(object)))
83+ self.connection.commit()
84+ except dbapi2.IntegrityError:
85+ # If another thread or process attempted to set the same key, we
86+ # assume it set it to the same value and carry on with our day.
87+ pass
88
89
90 class FileChangeCache(object):
91@@ -87,11 +97,6 @@
92
93 self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
94
95- # use a lockfile since the cache folder could be shared across
96- # different processes.
97- self._lock = LockFile(os.path.join(cache_path, 'filechange-lock'))
98-
99- @with_lock
100 def get_file_changes(self, entry):
101 cache = FakeShelf(self._changes_filename)
102 changes = cache.get(entry.revid)
103
104=== removed file 'loggerhead/lockfile.py'
105--- loggerhead/lockfile.py 2008-10-24 02:26:05 +0000
106+++ loggerhead/lockfile.py 1970-01-01 00:00:00 +0000
107@@ -1,84 +0,0 @@
108-#
109-# Copyright (C) 2006 Robey Pointer <robey@lag.net>
110-#
111-# This program is free software; you can redistribute it and/or modify
112-# it under the terms of the GNU General Public License as published by
113-# the Free Software Foundation; either version 2 of the License, or
114-# (at your option) any later version.
115-#
116-# This program is distributed in the hope that it will be useful,
117-# but WITHOUT ANY WARRANTY; without even the implied warranty of
118-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
119-# GNU General Public License for more details.
120-#
121-# You should have received a copy of the GNU General Public License
122-# along with this program; if not, write to the Free Software
123-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
124-#
125-
126-import os
127-import threading
128-import time
129-
130-from loggerhead import util
131-
132-
133-with_lock = util.with_lock('_tlock', 'LockFile')
134-
135-MAX_STALE_TIME = 5 * 60
136-
137-
138-class LockFile (object):
139- """
140- simple lockfile implementation that mimics the API of threading.Lock, so
141- it can be used interchangably. it's actually a reentrant lock, so the
142- lock may be acquired multiple times by the same thread, as long as it's
143- released an equal number of times. unlike threading.Lock, this lock can
144- be used across processes.
145-
146- this uses os.open(O_CREAT|O_EXCL), which apparently works even on windows,
147- but will not work over NFS, if anyone still uses that. so don't put the
148- cache folder on an NFS server...
149- """
150-
151- def __init__(self, filename):
152- self._filename = filename
153- # thread lock to maintain internal consistency
154- self._tlock = threading.Lock()
155- self._count = 0
156- if os.path.exists(filename):
157- # remove stale locks left over from a previous run
158- if time.time() - os.stat(filename).st_mtime > MAX_STALE_TIME:
159- os.remove(filename)
160-
161- @with_lock
162- def _try_acquire(self):
163- if self._count > 0:
164- self._count += 1
165- return True
166- try:
167- fd = os.open(self._filename,
168- os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0600)
169- os.close(fd)
170- self._count += 1
171- return True
172- except OSError:
173- return False
174-
175- def acquire(self):
176- # try over and over, sleeping on exponential backoff with
177- # an upper limit of about 5 seconds
178- pause = 0.1
179- #max_pause = 5.0
180- max_pause = 0.1
181- while True:
182- if self._try_acquire():
183- return
184- time.sleep(pause)
185- pause = min(pause * 2.0, max_pause)
186-
187- @with_lock
188- def release(self):
189- self._count -= 1
190- if self._count == 0:
191- os.remove(self._filename)
192
193=== modified file 'loggerhead/util.py'
194--- loggerhead/util.py 2009-03-06 09:57:54 +0000
195+++ loggerhead/util.py 2009-03-19 20:31:49 +0000
196@@ -449,25 +449,6 @@
197 return new_decorator
198
199
200-# common threading-lock decorator
201-
202-
203-def with_lock(lockname, debug_name=None):
204- if debug_name is None:
205- debug_name = lockname
206-
207- @decorator
208- def _decorator(unbound):
209-
210- def locked(self, *args, **kw):
211- getattr(self, lockname).acquire()
212- try:
213- return unbound(self, *args, **kw)
214- finally:
215- getattr(self, lockname).release()
216- return locked
217- return _decorator
218-
219
220 @decorator
221 def lsprof(f):

Subscribers

People subscribed via source and target branches