Merge lp:~verterok/ubuntuone-client/file_shelf-is-fast into lp:ubuntuone-client
- file_shelf-is-fast
- Merge into trunk
Status: | Merged | ||||||||
---|---|---|---|---|---|---|---|---|---|
Approved by: | dobey | ||||||||
Approved revision: | 91 | ||||||||
Merged at revision: | not available | ||||||||
Proposed branch: | lp:~verterok/ubuntuone-client/file_shelf-is-fast | ||||||||
Merge into: | lp:ubuntuone-client | ||||||||
Diff against target: | None lines | ||||||||
To merge this branch: | bzr merge lp:~verterok/ubuntuone-client/file_shelf-is-fast | ||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
dobey (community) | Approve | ||
Lucio Torre (community) | Approve | ||
Review via email: mp+8963@code.launchpad.net |
Commit message
[r=lucio.torre, r=dobey] minimize the times fileshelf touch the disk to do a key lookup. Also adds a new LRUCache for the Fileshelf
Description of the change
Guillermo Gonzalez (verterok) wrote : | # |
Lucio Torre (lucio.torre) wrote : | # |
try:
except KeyError:
This part of test_hit_
anyhow, big +1. i like that we now show more stats on log marks.
- 91. By Guillermo Gonzalez
-
fix lint warning
dobey (dobey) : | # |
Guillermo Gonzalez (verterok) wrote : | # |
> try:
> self.shelf[
> except KeyError:
> self.assertEqua
>
> This part of test_hit_
> somewhere else? dont you need to add an 'else' block that raises an error?
good point!
the cache is clean, as the shelf was initialized before starting the test, but I'll add an else clause just to make the test more complete.
>
> anyhow, big +1. i like that we now show more stats on log marks.
Thanks!
- 92. By Guillermo Gonzalez
-
add else clause to cache.misses test
Preview Diff
1 | === modified file 'tests/syncdaemon/test_fileshelf.py' |
2 | --- tests/syncdaemon/test_fileshelf.py 2009-07-02 20:22:51 +0000 |
3 | +++ tests/syncdaemon/test_fileshelf.py 2009-07-17 20:31:26 +0000 |
4 | @@ -15,57 +15,59 @@ |
5 | # You should have received a copy of the GNU General Public License along |
6 | # with this program. If not, see <http://www.gnu.org/licenses/>. |
7 | |
8 | -""" Test disk cache persistent shelf """ |
9 | +""" Test file based persistent shelf """ |
10 | |
11 | import os |
12 | import hashlib |
13 | import unittest |
14 | import shutil |
15 | |
16 | -from ubuntuone.syncdaemon.file_shelf import FileShelf |
17 | - |
18 | -TESTS_DIR = os.path.join(os.getcwd(), "tmp") |
19 | - |
20 | -class TestFileShelf(unittest.TestCase): |
21 | +from ubuntuone.syncdaemon.file_shelf import ( |
22 | + FileShelf, |
23 | + CachedFileShelf, |
24 | + LRUCache, |
25 | + CacheInconsistencyError, |
26 | +) |
27 | +from contrib.testing import testcase |
28 | + |
29 | + |
30 | +class TestFileShelf(testcase.BaseTwistedTestCase): |
31 | """ Test the FileShelf """ |
32 | + fileshelf_class = FileShelf |
33 | |
34 | def setUp(self): |
35 | """ Sets up a test. """ |
36 | - try: |
37 | - os.mkdir(TESTS_DIR) |
38 | - except OSError: |
39 | - # already there, remove it to clean and create again |
40 | - shutil.rmtree(TESTS_DIR) |
41 | - os.mkdir(TESTS_DIR) |
42 | - self.path = os.path.join(TESTS_DIR, 'shelf') |
43 | - self.shelf = FileShelf(self.path) |
44 | + testcase.BaseTwistedTestCase.setUp(self) |
45 | + self.path = self.mktemp('shelf') |
46 | + self.shelf = self.fileshelf_class(self.path) |
47 | |
48 | def tearDown(self): |
49 | '''Clean up the tests.''' |
50 | - shutil.rmtree(TESTS_DIR) |
51 | + shutil.rmtree(self.path) |
52 | + testcase.BaseTwistedTestCase.tearDown(self) |
53 | |
54 | def test_bad_depth(self): |
55 | """ test that the shelf reject invalid depth at creation time """ |
56 | - self.assertRaises(ValueError, FileShelf, TESTS_DIR, depth=-1) |
57 | + self.assertRaises(ValueError, self.fileshelf_class, self.path, depth=-1) |
58 | |
59 | def test_bad_path(self): |
60 | """ test that the shelf removes the previous shelve file and create a |
61 | directory for the new file based shelf at creation time. |
62 | """ |
63 | - path = os.path.join(TESTS_DIR, 'shelf_file') |
64 | + path = os.path.join(self.path, 'shelf_file') |
65 | open(path, 'w').close() |
66 | - FileShelf(path) |
67 | + self.fileshelf_class(path) |
68 | self.assertTrue(os.path.isdir(path)) |
69 | |
70 | def test_different_depth_sizes(self): |
71 | """ test the basic operations (delitem, getitem, setitem) with |
72 | depths between 0 and len(hashlib.sha1().hexdigest()) |
73 | """ |
74 | - base_path = os.path.join(TESTS_DIR, 'shelf_depth-') |
75 | + base_path = os.path.join(self.path, 'shelf_depth-') |
76 | sha1 = hashlib.sha1() |
77 | for idx in xrange(0, len(sha1.hexdigest())): |
78 | path = base_path + str(idx) |
79 | - shelf = FileShelf(path, depth=idx) |
80 | + shelf = self.fileshelf_class(path, depth=idx) |
81 | key = sha1.hexdigest() |
82 | # test __setitem__ |
83 | shelf[key] = 'foo' |
84 | @@ -85,8 +87,8 @@ |
85 | |
86 | def test_contains(self): |
87 | """ test that it behaves with the 'in' """ |
88 | - path = os.path.join(TESTS_DIR, 'shelf_depth') |
89 | - shelf = FileShelf(path) |
90 | + path = os.path.join(self.path, 'shelf_depth') |
91 | + shelf = self.fileshelf_class(path) |
92 | shelf["foo"] = "bar" |
93 | self.assertTrue("foo" in shelf) |
94 | self.assertFalse("baz" in shelf) |
95 | @@ -95,8 +97,8 @@ |
96 | |
97 | def test_pop(self): |
98 | """ test that it behaves with the .pop() """ |
99 | - path = os.path.join(TESTS_DIR, 'shelf_depth') |
100 | - shelf = FileShelf(path) |
101 | + path = os.path.join(self.path, 'shelf_depth') |
102 | + shelf = self.fileshelf_class(path) |
103 | shelf["foo"] = "bar" |
104 | self.assertEqual("bar", shelf.pop("foo")) |
105 | self.assertFalse("foo" in shelf) |
106 | @@ -106,8 +108,8 @@ |
107 | |
108 | def test_get(self): |
109 | """ test that it behaves with the .get(key, default) """ |
110 | - path = os.path.join(TESTS_DIR, 'shelf_get') |
111 | - shelf = FileShelf(path) |
112 | + path = os.path.join(self.path, 'shelf_get') |
113 | + shelf = self.fileshelf_class(path) |
114 | shelf["foo"] = "bar" |
115 | self.assertEquals('bar', shelf.get('foo')) |
116 | self.assertEquals('bar', shelf.get('foo', None)) |
117 | @@ -116,8 +118,8 @@ |
118 | |
119 | def test_items(self): |
120 | """ test that it behaves with the .items() """ |
121 | - path = os.path.join(TESTS_DIR, 'shelf_get') |
122 | - shelf = FileShelf(path) |
123 | + path = os.path.join(self.path, 'shelf_get') |
124 | + shelf = self.fileshelf_class(path) |
125 | shelf["foo"] = "bar" |
126 | # k, v are temp variables, pylint: disable-msg=W0631 |
127 | self.assertEquals([('foo', 'bar')], |
128 | @@ -127,6 +129,131 @@ |
129 | [(k, v) for k, v in shelf.items()]) |
130 | |
131 | |
132 | +class CachedFileShelfTests(TestFileShelf): |
133 | + """TestFileShelf tests but using CachedFileShelf""" |
134 | + fileshelf_class = CachedFileShelf |
135 | + |
136 | + def test_hit_miss_properties(self): |
137 | + """test the cache hits/misses properties""" |
138 | + # yes, the statement has some effect, pylint: disable-msg=W0104 |
139 | + try: |
140 | + self.shelf['missingkey'] |
141 | + except KeyError: |
142 | + self.assertEquals(self.shelf.cache_misses, 1) |
143 | + self.shelf['realkey'] = 'realvalue' |
144 | + self.shelf['realkey'] |
145 | + self.shelf['realkey'] |
146 | + self.assertEquals(self.shelf.cache_hits, 1) |
147 | + |
148 | + |
149 | +class LRUCacheTests(unittest.TestCase): |
150 | + '''Test the LRUCache class''' |
151 | + # I'm a unittest of a class, I need to access protected members |
152 | + # pylint: disable-msg=W0212 |
153 | + |
154 | + def test_setitem(self): |
155 | + """test __delitem__ method""" |
156 | + cache = LRUCache(100, 4) |
157 | + # set some data in the cache |
158 | + values = [('key'+str(i), i) for i in range(100)] |
159 | + for i, j in values: |
160 | + cache[i] = j |
161 | + self.assertEquals(len(cache._queue), len(values)) |
162 | + self.assertEquals(len(cache._cache), len(values)) |
163 | + |
164 | + def test_getitem(self): |
165 | + """test __delitem__ method""" |
166 | + cache = LRUCache(100, 4) |
167 | + # set some data in the cache |
168 | + values = [('key'+str(i), i) for i in range(100)] |
169 | + for i, j in values: |
170 | + cache[i] = j |
171 | + self.assertEquals(len(cache._queue), len(values)) |
172 | + self.assertEquals(len(cache._cache), len(values)) |
173 | + # compare all the items with the values |
174 | + for i, j in values: |
175 | + self.assertEquals(cache[i], j) |
176 | + |
177 | + def test_delitem(self): |
178 | + """test __delitem__ method""" |
179 | + cache = LRUCache(100, 4) |
180 | + values = [('key'+str(i), i) for i in range(100)] |
181 | + for i, j in values: |
182 | + cache[i] = j |
183 | + self.assertEquals(len(cache._queue), len(values)) |
184 | + self.assertEquals(len(cache._cache), len(values)) |
185 | + for item in cache._cache.copy(): |
186 | + del cache[item] |
187 | + self.assertEquals(len(cache._queue), 0) |
188 | + self.assertEquals(len(cache._cache), 0) |
189 | + |
190 | + def test_update(self): |
191 | + """test the update method, chacking the refcount and queue.""" |
192 | + cache = LRUCache(100, 4) |
193 | + cache.update('key1') |
194 | + self.assertEquals(len(cache._queue), 1) |
195 | + self.assertEquals(len(cache._refcount), 1) |
196 | + self.assertEquals(cache._refcount['key1'], 1) |
197 | + cache.update('key1') |
198 | + self.assertEquals(len(cache._queue), 2) |
199 | + self.assertEquals(len(cache._refcount), 1) |
200 | + self.assertEquals(cache._refcount['key1'], 2) |
201 | + |
202 | + def test_purge(self): |
203 | + """Test the queue compact and cache purge""" |
204 | + cache = LRUCache(100, 4) |
205 | + values = [('key'+str(i), j) for i in range(50) for j in range(8)] |
206 | + for i, j in values: |
207 | + cache[i] = j |
208 | + # we hit the limit |
209 | + self.assertEquals(len(cache._queue), 400) |
210 | + self.assertEquals(len(cache._cache), 50) |
211 | + # insert key1 item to compact the queue |
212 | + cache[values[0][0]] = values[0][1] |
213 | + self.assertEquals(len(cache._queue), 50) |
214 | + self.assertEquals(len(cache._cache), 50) |
215 | + |
216 | + # now with a key not present in the cache |
217 | + cache = LRUCache(100, 4) |
218 | + for i, j in values: |
219 | + cache[i] = j |
220 | + # we hit the limit |
221 | + self.assertEquals(len(cache._queue), 400) |
222 | + self.assertEquals(len(cache._cache), 50) |
223 | + # insert key1 item to compact the queue |
224 | + cache['non-present-key'] = 'some value' |
225 | + self.assertEquals(len(cache._queue), 51) |
226 | + self.assertEquals(len(cache._cache), 51) |
227 | + |
228 | + def test_statistics(self): |
229 | + """Tests if the cache correclty keeps track of misses and hits.""" |
230 | + cache = LRUCache(100, 4) |
231 | + # set some data in the cache |
232 | + values = [('key'+str(i), i) for i in range(10)] |
233 | + for i, j in values: |
234 | + cache[i] = j |
235 | + self.assertEquals(len(cache._queue), len(values)) |
236 | + self.assertEquals(len(cache._cache), len(values)) |
237 | + # compare 4 items with the values |
238 | + for i, j in values[:4]: |
239 | + self.assertEquals(cache[i], j) |
240 | + # check the hits value |
241 | + self.assertEquals(cache.hits, 4) |
242 | + # try to get items not present in the cache |
243 | + for i, j in values[5:10]: |
244 | + self.assertRaises(KeyError, cache.__getitem__, i*10) |
245 | + self.assertEquals(cache.misses, 5) |
246 | + |
247 | + def test_inconsistency(self): |
248 | + """Test that the consistency checking works as expected""" |
249 | + cache = LRUCache(2, 1) |
250 | + cache['foo'] = 'bar' |
251 | + # add it again to the _queue to force a inconsistency |
252 | + cache._queue.append('foo') |
253 | + self.assertRaises(CacheInconsistencyError, |
254 | + cache.__setitem__, 'bar', 'foo') |
255 | + |
256 | + |
257 | def test_suite(): |
258 | # pylint: disable-msg=C0111 |
259 | return unittest.TestLoader().loadTestsFromName(__name__) |
260 | |
261 | === modified file 'ubuntuone/syncdaemon/file_shelf.py' |
262 | --- ubuntuone/syncdaemon/file_shelf.py 2009-07-15 07:55:08 +0000 |
263 | +++ ubuntuone/syncdaemon/file_shelf.py 2009-07-17 20:37:43 +0000 |
264 | @@ -22,6 +22,10 @@ |
265 | from UserDict import DictMixin |
266 | import cPickle |
267 | import os |
268 | +import stat |
269 | +import errno |
270 | +from collections import deque |
271 | + |
272 | |
273 | class FileShelf(object, DictMixin): |
274 | """ File based shelf. |
275 | @@ -46,10 +50,19 @@ |
276 | """ check if the path isn't a file and in case it don't exists, |
277 | creates it |
278 | """ |
279 | - if os.path.exists(path) and os.path.isfile(path): |
280 | - os.unlink(path) |
281 | - if not os.path.exists(path): |
282 | - os.makedirs(path) |
283 | + try: |
284 | + stat_result = os.stat(path) |
285 | + # is a regular file? |
286 | + if stat.S_ISREG(stat_result.st_mode): |
287 | + os.unlink(path) |
288 | + os.makedirs(path) |
289 | + # else, the dir is already there |
290 | + except OSError, e: |
291 | + if e.errno == errno.ENOENT: |
292 | + # the file or dir don't exist |
293 | + os.makedirs(path) |
294 | + else: |
295 | + raise |
296 | |
297 | def key_file(self, key): |
298 | """ get the real key used by the storage from a key """ |
299 | @@ -62,9 +75,7 @@ |
300 | raise ValueError("The key (%r) needs to be longer!" % key) |
301 | |
302 | letters = [key[i] for i in xrange(0, self._depth)] |
303 | - path = os.path.join(self._path, *letters) |
304 | - self._check_and_create_dirs(path) |
305 | - return os.path.join(path, key) |
306 | + return os.path.join(os.path.join(self._path, *letters), key) |
307 | |
308 | def has_key(self, key): |
309 | """ True if the the shelf has the key """ |
310 | @@ -105,7 +116,9 @@ |
311 | |
312 | def __setitem__(self, key, value): |
313 | """ setitem backed by the file storage """ |
314 | - with open(self.key_file(key), "wb") as fh: |
315 | + path = self.key_file(key) |
316 | + self._check_and_create_dirs(os.path.dirname(path)) |
317 | + with open(path, "wb") as fh: |
318 | cPickle.dump(value, fh, protocol=2) |
319 | fh.flush() |
320 | os.fsync(fh.fileno()) |
321 | @@ -124,3 +137,143 @@ |
322 | counter += 1 |
323 | return counter |
324 | |
325 | + |
326 | +class CachedFileShelf(FileShelf): |
327 | + """A extension of FileShelf that uses a cache of 1500 items""" |
328 | + |
329 | + def __init__(self, *args, **kwargs): |
330 | + """Create the instance""" |
331 | + super(CachedFileShelf, self).__init__(*args, **kwargs) |
332 | + # XXX: the size of the cache and the compact threshold needs to be |
333 | + # tweaked once we get more statistics from real usage |
334 | + self._cache = LRUCache(1500, 4) |
335 | + |
336 | + @property |
337 | + def cache_misses(self): |
338 | + """proterty to access the internal cache misses""" |
339 | + return self._cache.misses |
340 | + |
341 | + @property |
342 | + def cache_hits(self): |
343 | + """proterty to access the internal cache hits""" |
344 | + return self._cache.hits |
345 | + |
346 | + def __getitem__(self, key): |
347 | + """ getitem backed by the file storage """ |
348 | + try: |
349 | + return self._cache[key] |
350 | + except KeyError: |
351 | + value = super(CachedFileShelf, self).__getitem__(key) |
352 | + # add it to the cache |
353 | + self._cache[key] = value |
354 | + return value |
355 | + |
356 | + def __setitem__(self, key, value): |
357 | + """ setitem backed by the file storage """ |
358 | + super(CachedFileShelf, self).__setitem__(key, value) |
359 | + if key in self._cache: |
360 | + self._cache[key] = value |
361 | + |
362 | + def __delitem__(self, key): |
363 | + """ delitem backed by the file storage """ |
364 | + super(CachedFileShelf, self).__delitem__(key) |
365 | + if key in self._cache: |
366 | + del self._cache[key] |
367 | + |
368 | + |
369 | +class LRUCache(object): |
370 | + """A least-recently-used|updated cache with maximum size. |
371 | + |
372 | + The object(s) added to the cache must be hashable. |
373 | + Cache performance statistics stored in self.hits and self.misses. |
374 | + |
375 | + Based on recipe #252524 by Raymond Hettinger |
376 | + """ |
377 | + def __init__(self, maxsize, compact_threshold=4): |
378 | + """Create the instance. |
379 | + @param maxsize: |
380 | + @param compact_threshold: |
381 | + """ |
382 | + self._maxsize = maxsize |
383 | + self._compact_threshold = compact_threshold |
384 | + self._cache = {} # mapping of args to results |
385 | + self._queue = deque() # order that keys have been accessed |
386 | + self._refcount = {} # number of times each key is in the access queue |
387 | + self.hits = 0 |
388 | + self.misses = 0 |
389 | + |
390 | + def __getitem__(self, key): |
391 | + """return the item from the cache or raise KeyError.""" |
392 | + try: |
393 | + result = self._cache[key] |
394 | + self.hits += 1 |
395 | + except KeyError: |
396 | + # get the value and increase misses |
397 | + self.misses += 1 |
398 | + raise |
399 | + else: |
400 | + self.update(key) |
401 | + self.purge() |
402 | + return result |
403 | + |
404 | + def __setitem__(self, key, value): |
405 | + """set the key, value in the cache""" |
406 | + self._cache[key] = value |
407 | + self.update(key) |
408 | + self.purge() |
409 | + |
410 | + def __delitem__(self, key): |
411 | + """removes the key, value from the cache""" |
412 | + del self._cache[key] |
413 | + self._refcount[key] -= 1 |
414 | + self._queue.remove(key) |
415 | + self.purge() |
416 | + |
417 | + def __contains__(self, key): |
418 | + """returns True if key is in the cache""" |
419 | + return key in self._cache |
420 | + |
421 | + def update(self, key): |
422 | + """Update the least recently used|updated and refcount""" |
423 | + self._queue.append(key) |
424 | + self._refcount[key] = self._refcount.get(key, 0) + 1 |
425 | + |
426 | + def purge(self): |
427 | + """Purge least recently accessed cache contents and periodically |
428 | + compact the queue by duplicate keys. |
429 | + """ |
430 | + while len(self._cache) > self._maxsize: |
431 | + k = self._queue.popleft() |
432 | + self._refcount[k] -= 1 |
433 | + if not self._refcount[k]: |
434 | + if k in self._cache: |
435 | + del self._cache[k] |
436 | + if k in self._refcount: |
437 | + del self._refcount[k] |
438 | + |
439 | + # Periodically compact the queue by duplicate keys |
440 | + queue_len = len(self._queue) |
441 | + if queue_len > self._maxsize * self._compact_threshold: |
442 | + for _ in xrange(queue_len): |
443 | + k = self._queue.popleft() |
444 | + if self._refcount[k] == 1: |
445 | + self._queue.append(k) |
446 | + else: |
447 | + self._refcount[k] -= 1 |
448 | + if not (len(self._queue) == len(self._cache) \ |
449 | + == len(self._refcount) \ |
450 | + == sum(self._refcount.itervalues())): |
451 | + # create a custom exception for this error |
452 | + raise CacheInconsistencyError(len(self._queue), |
453 | + len(self._cache), |
454 | + len(self._refcount), |
455 | + sum(self._refcount.itervalues())) |
456 | + |
457 | + |
458 | +class CacheInconsistencyError(Exception): |
459 | + """Exception representing a inconsistency in the cache""" |
460 | + |
461 | + def __str__(self): |
462 | + return "Inconsistency in the cache: queue: %d cache: %d refcount: %d" \ |
463 | + " sum(refcount.values): %d" % self.args |
464 | + |
465 | |
466 | === modified file 'ubuntuone/syncdaemon/filesystem_manager.py' |
467 | --- ubuntuone/syncdaemon/filesystem_manager.py 2009-07-14 13:37:22 +0000 |
468 | +++ ubuntuone/syncdaemon/filesystem_manager.py 2009-07-16 22:52:12 +0000 |
469 | @@ -187,7 +187,7 @@ |
470 | raise TypeError("data_dir should be a string instead of %s" % \ |
471 | type(data_dir)) |
472 | fsmdir = os.path.join(data_dir, 'fsm') |
473 | - self.fs = file_shelf.FileShelf(fsmdir) |
474 | + self.fs = file_shelf.CachedFileShelf(fsmdir) |
475 | self.shares = {} |
476 | self.vm = vm |
477 | |
478 | |
479 | === modified file 'ubuntuone/syncdaemon/main.py' |
480 | --- ubuntuone/syncdaemon/main.py 2009-07-14 05:49:49 +0000 |
481 | +++ ubuntuone/syncdaemon/main.py 2009-07-17 20:31:26 +0000 |
482 | @@ -94,10 +94,12 @@ |
483 | def log_mark(self): |
484 | """ log a "mark" that includes the current AQ state and queue size""" |
485 | self.logger.info("%s %s (state: %s; queues: metadata: %d; content: %d;" |
486 | - " hash: %d) %s" % ('-'*4, 'MARK', self.state.name, |
487 | + " hash: %d, fsm-cache: hit=%d miss=%d) %s" % ('-'*4, 'MARK', self.state.name, |
488 | len(self.action_q.meta_queue), |
489 | len(self.action_q.content_queue), |
490 | - len(self.hash_q), '-'*4)) |
491 | + len(self.hash_q), |
492 | + self.fs.fs.cache_hits, |
493 | + self.fs.fs.cache_misses, '-'*4)) |
494 | |
495 | def wait_for_nirvana(self, last_event_interval=0.5): |
496 | """Get a deferred that will fire when there are no more |
This branch minimize the times fileshelf touch the disk to do a key lookup. Also includes a branch new LRUCache for the Filshelf this also should reduce the time we are hitting disk, as e.g: sync is issuing a lot of consecutive request to the file shelf.