Merge lp:~allenap/launchpad/storm-bulk-reload-bug-572211 into lp:launchpad

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: no longer in the source branch.
Merged at revision: 10892
Proposed branch: lp:~allenap/launchpad/storm-bulk-reload-bug-572211
Merge into: lp:launchpad
Diff against target: 227 lines (+218/-0)
2 files modified
lib/lp/services/database/bulk.py (+66/-0)
lib/lp/services/database/tests/test_bulk.py (+152/-0)
To merge this branch: bzr merge lp:~allenap/launchpad/storm-bulk-reload-bug-572211
Reviewer Review Type Date Requested Status
Abel Deuring (community) code Approve
Review via email: mp+24492@code.launchpad.net

Commit message

New function, lp.services.database.bulk.reload(), to efficiently reload database objects.

Description of the change

Adds a new module, lp.services.database.bulk, that contains a single public function, reload(). Given an arbitrary list of Storm model objects, this attempts to reload them from the database in the most efficient way it can. The model objects can be a mix of types, and loaded from any number of different stores, but they must not have compound primary keys. See the linked bug for the rationale behind this new function.

To post a comment you must log in.
Revision history for this message
Abel Deuring (adeuring) wrote :

I really like this branch. Thanks!

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'lib/lp/services/database/bulk.py'
--- lib/lp/services/database/bulk.py 1970-01-01 00:00:00 +0000
+++ lib/lp/services/database/bulk.py 2010-04-30 15:26:06 +0000
@@ -0,0 +1,66 @@
1# Copyright 2010 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).
3
4"""Optimized bulk operations against the database."""
5
6__metaclass__ = type
7__all__ = [
8 'reload',
9 ]
10
11
12from collections import defaultdict
13
14from zope.security.proxy import removeSecurityProxy
15
16from storm.base import Storm
17from storm.expr import In
18from storm.info import get_cls_info
19from storm.store import Store
20
21
22def collate(things, key):
23 """Collate the given objects according to a key function.
24
25 Generates (common-key-value, list-of-things) tuples, like groupby,
26 except that the given objects do not need to be sorted.
27 """
28 collection = defaultdict(list)
29 for thing in things:
30 collection[key(thing)].append(thing)
31 return collection.iteritems()
32
33
34def get_type(thing):
35 """Return the type of the given object.
36
37 If the given object is wrapped by a security proxy, the type
38 returned is that of the wrapped object.
39 """
40 return type(removeSecurityProxy(thing))
41
42
43def gen_reload_queries(objects):
44 """Prepare queries to reload the given objects."""
45 for object_type, objects in collate(objects, get_type):
46 if not issubclass(object_type, Storm):
47 raise AssertionError(
48 "Cannot load objects of type %s: %r" % (
49 object_type.__name__, objects))
50 primary_key = get_cls_info(object_type).primary_key
51 if len(primary_key) != 1:
52 raise AssertionError(
53 "Compound primary keys are not supported: %s." %
54 object_type.__name__)
55 primary_key_column = primary_key[0]
56 primary_key_column_getter = primary_key_column.__get__
57 for store, objects in collate(objects, Store.of):
58 primary_keys = map(primary_key_column_getter, objects)
59 condition = In(primary_key_column, primary_keys)
60 yield store.find(object_type, condition)
61
62
63def reload(objects):
64 """Reload a large number of objects efficiently."""
65 for query in gen_reload_queries(objects):
66 list(query)
067
=== added file 'lib/lp/services/database/tests/test_bulk.py'
--- lib/lp/services/database/tests/test_bulk.py 1970-01-01 00:00:00 +0000
+++ lib/lp/services/database/tests/test_bulk.py 2010-04-30 15:26:06 +0000
@@ -0,0 +1,152 @@
1# Copyright 2010 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).
3
4"""Test the bulk database functions."""
5
6__metaclass__ = type
7
8import unittest
9
10import transaction
11
12import zope.security.checker
13import zope.security.proxy
14
15from storm.info import get_obj_info
16
17from canonical.launchpad.interfaces.lpstorm import (
18 IMasterStore, ISlaveStore, IStore)
19from canonical.testing import LaunchpadZopelessLayer
20
21from lp.bugs.model.bug import BugAffectsPerson
22from lp.services.database import bulk
23from lp.testing import TestCase, TestCaseWithFactory
24
25
26object_is_key = lambda thing: thing
27
28
29class TestBasicFunctions(TestCase):
30
31 def test_collate_empty_list(self):
32 self.failUnlessEqual([], list(bulk.collate([], object_is_key)))
33
34 def test_collate_when_object_is_key(self):
35 self.failUnlessEqual(
36 [(1, [1])],
37 list(bulk.collate([1], object_is_key)))
38 self.failUnlessEqual(
39 [(1, [1]), (2, [2, 2])],
40 sorted(bulk.collate([1, 2, 2], object_is_key)))
41
42 def test_collate_with_key_function(self):
43 self.failUnlessEqual(
44 [(4, ['fred', 'joss']), (6, ['barney'])],
45 sorted(bulk.collate(['fred', 'barney', 'joss'], len)))
46
47 def test_get_type(self):
48 self.failUnlessEqual(object, bulk.get_type(object()))
49
50 def test_get_type_with_proxied_object(self):
51 proxied_object = zope.security.proxy.Proxy(
52 'fred', zope.security.checker.Checker({}))
53 self.failUnlessEqual(str, bulk.get_type(proxied_object))
54
55
56class TestLoaders(TestCaseWithFactory):
57
58 layer = LaunchpadZopelessLayer
59
60 def test_gen_reload_queries_with_empty_list(self):
61 self.failUnlessEqual([], list(bulk.gen_reload_queries([])))
62
63 def test_gen_reload_queries_with_single_object(self):
64 # gen_reload_queries() should generate a single query for a
65 # single object.
66 db_objects = [self.factory.makeSourcePackageName()]
67 db_queries = list(bulk.gen_reload_queries(db_objects))
68 self.failUnlessEqual(1, len(db_queries))
69 db_query = db_queries[0]
70 self.failUnlessEqual(db_objects, list(db_query))
71
72 def test_gen_reload_queries_with_multiple_similar_objects(self):
73 # gen_reload_queries() should generate a single query to load
74 # multiple objects of the same type.
75 db_objects = set(
76 self.factory.makeSourcePackageName() for i in range(5))
77 db_queries = list(bulk.gen_reload_queries(db_objects))
78 self.failUnlessEqual(1, len(db_queries))
79 db_query = db_queries[0]
80 self.failUnlessEqual(db_objects, set(db_query))
81
82 def test_gen_reload_queries_with_mixed_objects(self):
83 # gen_reload_queries() should return one query for each
84 # distinct object type in the given objects.
85 db_objects = set(
86 self.factory.makeSourcePackageName() for i in range(5))
87 db_objects.update(
88 self.factory.makeComponent() for i in range(5))
89 db_queries = list(bulk.gen_reload_queries(db_objects))
90 self.failUnlessEqual(2, len(db_queries))
91 db_objects_loaded = set()
92 for db_query in db_queries:
93 objects = set(db_query)
94 # None of these objects should have been loaded before.
95 self.failUnlessEqual(
96 set(), objects.intersection(db_objects_loaded))
97 db_objects_loaded.update(objects)
98 self.failUnlessEqual(db_objects, db_objects_loaded)
99
100 def test_gen_reload_queries_with_mixed_stores(self):
101 # gen_reload_queries() returns one query for each distinct
102 # store even for the same object type.
103 db_object = self.factory.makeComponent()
104 db_object_type = bulk.get_type(db_object)
105 # Commit so the database object is available in both master
106 # and slave stores.
107 transaction.commit()
108 db_objects = set(
109 (IMasterStore(db_object).get(db_object_type, db_object.id),
110 ISlaveStore(db_object).get(db_object_type, db_object.id)))
111 db_queries = list(bulk.gen_reload_queries(db_objects))
112 self.failUnlessEqual(2, len(db_queries))
113 db_objects_loaded = set()
114 for db_query in db_queries:
115 objects = set(db_query)
116 # None of these objects should have been loaded before.
117 self.failUnlessEqual(
118 set(), objects.intersection(db_objects_loaded))
119 db_objects_loaded.update(objects)
120 self.failUnlessEqual(db_objects, db_objects_loaded)
121
122 def test_gen_reload_queries_with_non_Storm_objects(self):
123 # gen_reload_queries() does not like non-Storm objects.
124 self.assertRaisesWithContent(
125 AssertionError,
126 "Cannot load objects of type str: ['fred']",
127 list, bulk.gen_reload_queries(['fred']))
128
129 def test_gen_reload_queries_with_compound_primary_keys(self):
130 # gen_reload_queries() does not like compound primary keys.
131 db_queries = bulk.gen_reload_queries([BugAffectsPerson()])
132 self.assertRaisesWithContent(
133 AssertionError,
134 'Compound primary keys are not supported: BugAffectsPerson.',
135 list, db_queries)
136
137 def test_load(self):
138 # load() loads the given objects using queries generated by
139 # gen_reload_queries().
140 db_object = self.factory.makeComponent()
141 db_object_naked = zope.security.proxy.removeSecurityProxy(db_object)
142 db_object_info = get_obj_info(db_object_naked)
143 IStore(db_object).flush()
144 self.failUnlessEqual(None, db_object_info.get('invalidated'))
145 IStore(db_object).invalidate(db_object)
146 self.failUnlessEqual(True, db_object_info.get('invalidated'))
147 bulk.reload([db_object])
148 self.failUnlessEqual(None, db_object_info.get('invalidated'))
149
150
151def test_suite():
152 return unittest.TestLoader().loadTestsFromName(__name__)