Merge lp:~allenap/launchpad/storm-bulk-reload-bug-572211 into lp:launchpad

Proposed by Gavin Panella on 2010-04-30
Status: Merged
Approved by: Gavin Panella on 2010-05-06
Approved revision: no longer in the source branch.
Merged at revision: 10892
Proposed branch: lp:~allenap/launchpad/storm-bulk-reload-bug-572211
Merge into: lp:launchpad
Diff against target: 227 lines (+218/-0)
2 files modified
lib/lp/services/database/bulk.py (+66/-0)
lib/lp/services/database/tests/test_bulk.py (+152/-0)
To merge this branch: bzr merge lp:~allenap/launchpad/storm-bulk-reload-bug-572211
Reviewer Review Type Date Requested Status
Abel Deuring (community) code 2010-04-30 Approve on 2010-04-30
Review via email: mp+24492@code.launchpad.net

Commit message

New function, lp.services.database.bulk.reload(), to efficiently reload database objects.

Description of the change

Adds a new module, lp.services.database.bulk, that contains a single public function, reload(). Given an arbitrary list of Storm model objects, this attempts to reload them from the database in the most efficient way it can. The model objects can be a mix of types, and loaded from any number of different stores, but they must not have compound primary keys. See the linked bug for the rationale behind this new function.

To post a comment you must log in.
Abel Deuring (adeuring) wrote :

I really like this branch. Thanks!

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'lib/lp/services/database/bulk.py'
2--- lib/lp/services/database/bulk.py 1970-01-01 00:00:00 +0000
3+++ lib/lp/services/database/bulk.py 2010-04-30 15:26:06 +0000
4@@ -0,0 +1,66 @@
5+# Copyright 2010 Canonical Ltd. This software is licensed under the
6+# GNU Affero General Public License version 3 (see the file LICENSE).
7+
8+"""Optimized bulk operations against the database."""
9+
10+__metaclass__ = type
11+__all__ = [
12+ 'reload',
13+ ]
14+
15+
16+from collections import defaultdict
17+
18+from zope.security.proxy import removeSecurityProxy
19+
20+from storm.base import Storm
21+from storm.expr import In
22+from storm.info import get_cls_info
23+from storm.store import Store
24+
25+
26+def collate(things, key):
27+ """Collate the given objects according to a key function.
28+
29+ Generates (common-key-value, list-of-things) tuples, like groupby,
30+ except that the given objects do not need to be sorted.
31+ """
32+ collection = defaultdict(list)
33+ for thing in things:
34+ collection[key(thing)].append(thing)
35+ return collection.iteritems()
36+
37+
38+def get_type(thing):
39+ """Return the type of the given object.
40+
41+ If the given object is wrapped by a security proxy, the type
42+ returned is that of the wrapped object.
43+ """
44+ return type(removeSecurityProxy(thing))
45+
46+
47+def gen_reload_queries(objects):
48+ """Prepare queries to reload the given objects."""
49+ for object_type, objects in collate(objects, get_type):
50+ if not issubclass(object_type, Storm):
51+ raise AssertionError(
52+ "Cannot load objects of type %s: %r" % (
53+ object_type.__name__, objects))
54+ primary_key = get_cls_info(object_type).primary_key
55+ if len(primary_key) != 1:
56+ raise AssertionError(
57+ "Compound primary keys are not supported: %s." %
58+ object_type.__name__)
59+ primary_key_column = primary_key[0]
60+ primary_key_column_getter = primary_key_column.__get__
61+ for store, objects in collate(objects, Store.of):
62+ primary_keys = map(primary_key_column_getter, objects)
63+ condition = In(primary_key_column, primary_keys)
64+ yield store.find(object_type, condition)
65+
66+
67+def reload(objects):
68+ """Reload a large number of objects efficiently."""
69+ for query in gen_reload_queries(objects):
70+ list(query)
71
72=== added file 'lib/lp/services/database/tests/test_bulk.py'
73--- lib/lp/services/database/tests/test_bulk.py 1970-01-01 00:00:00 +0000
74+++ lib/lp/services/database/tests/test_bulk.py 2010-04-30 15:26:06 +0000
75@@ -0,0 +1,152 @@
76+# Copyright 2010 Canonical Ltd. This software is licensed under the
77+# GNU Affero General Public License version 3 (see the file LICENSE).
78+
79+"""Test the bulk database functions."""
80+
81+__metaclass__ = type
82+
83+import unittest
84+
85+import transaction
86+
87+import zope.security.checker
88+import zope.security.proxy
89+
90+from storm.info import get_obj_info
91+
92+from canonical.launchpad.interfaces.lpstorm import (
93+ IMasterStore, ISlaveStore, IStore)
94+from canonical.testing import LaunchpadZopelessLayer
95+
96+from lp.bugs.model.bug import BugAffectsPerson
97+from lp.services.database import bulk
98+from lp.testing import TestCase, TestCaseWithFactory
99+
100+
101+object_is_key = lambda thing: thing
102+
103+
104+class TestBasicFunctions(TestCase):
105+
106+ def test_collate_empty_list(self):
107+ self.failUnlessEqual([], list(bulk.collate([], object_is_key)))
108+
109+ def test_collate_when_object_is_key(self):
110+ self.failUnlessEqual(
111+ [(1, [1])],
112+ list(bulk.collate([1], object_is_key)))
113+ self.failUnlessEqual(
114+ [(1, [1]), (2, [2, 2])],
115+ sorted(bulk.collate([1, 2, 2], object_is_key)))
116+
117+ def test_collate_with_key_function(self):
118+ self.failUnlessEqual(
119+ [(4, ['fred', 'joss']), (6, ['barney'])],
120+ sorted(bulk.collate(['fred', 'barney', 'joss'], len)))
121+
122+ def test_get_type(self):
123+ self.failUnlessEqual(object, bulk.get_type(object()))
124+
125+ def test_get_type_with_proxied_object(self):
126+ proxied_object = zope.security.proxy.Proxy(
127+ 'fred', zope.security.checker.Checker({}))
128+ self.failUnlessEqual(str, bulk.get_type(proxied_object))
129+
130+
131+class TestLoaders(TestCaseWithFactory):
132+
133+ layer = LaunchpadZopelessLayer
134+
135+ def test_gen_reload_queries_with_empty_list(self):
136+ self.failUnlessEqual([], list(bulk.gen_reload_queries([])))
137+
138+ def test_gen_reload_queries_with_single_object(self):
139+ # gen_reload_queries() should generate a single query for a
140+ # single object.
141+ db_objects = [self.factory.makeSourcePackageName()]
142+ db_queries = list(bulk.gen_reload_queries(db_objects))
143+ self.failUnlessEqual(1, len(db_queries))
144+ db_query = db_queries[0]
145+ self.failUnlessEqual(db_objects, list(db_query))
146+
147+ def test_gen_reload_queries_with_multiple_similar_objects(self):
148+ # gen_reload_queries() should generate a single query to load
149+ # multiple objects of the same type.
150+ db_objects = set(
151+ self.factory.makeSourcePackageName() for i in range(5))
152+ db_queries = list(bulk.gen_reload_queries(db_objects))
153+ self.failUnlessEqual(1, len(db_queries))
154+ db_query = db_queries[0]
155+ self.failUnlessEqual(db_objects, set(db_query))
156+
157+ def test_gen_reload_queries_with_mixed_objects(self):
158+ # gen_reload_queries() should return one query for each
159+ # distinct object type in the given objects.
160+ db_objects = set(
161+ self.factory.makeSourcePackageName() for i in range(5))
162+ db_objects.update(
163+ self.factory.makeComponent() for i in range(5))
164+ db_queries = list(bulk.gen_reload_queries(db_objects))
165+ self.failUnlessEqual(2, len(db_queries))
166+ db_objects_loaded = set()
167+ for db_query in db_queries:
168+ objects = set(db_query)
169+ # None of these objects should have been loaded before.
170+ self.failUnlessEqual(
171+ set(), objects.intersection(db_objects_loaded))
172+ db_objects_loaded.update(objects)
173+ self.failUnlessEqual(db_objects, db_objects_loaded)
174+
175+ def test_gen_reload_queries_with_mixed_stores(self):
176+ # gen_reload_queries() returns one query for each distinct
177+ # store even for the same object type.
178+ db_object = self.factory.makeComponent()
179+ db_object_type = bulk.get_type(db_object)
180+ # Commit so the database object is available in both master
181+ # and slave stores.
182+ transaction.commit()
183+ db_objects = set(
184+ (IMasterStore(db_object).get(db_object_type, db_object.id),
185+ ISlaveStore(db_object).get(db_object_type, db_object.id)))
186+ db_queries = list(bulk.gen_reload_queries(db_objects))
187+ self.failUnlessEqual(2, len(db_queries))
188+ db_objects_loaded = set()
189+ for db_query in db_queries:
190+ objects = set(db_query)
191+ # None of these objects should have been loaded before.
192+ self.failUnlessEqual(
193+ set(), objects.intersection(db_objects_loaded))
194+ db_objects_loaded.update(objects)
195+ self.failUnlessEqual(db_objects, db_objects_loaded)
196+
197+ def test_gen_reload_queries_with_non_Storm_objects(self):
198+ # gen_reload_queries() does not like non-Storm objects.
199+ self.assertRaisesWithContent(
200+ AssertionError,
201+ "Cannot load objects of type str: ['fred']",
202+ list, bulk.gen_reload_queries(['fred']))
203+
204+ def test_gen_reload_queries_with_compound_primary_keys(self):
205+ # gen_reload_queries() does not like compound primary keys.
206+ db_queries = bulk.gen_reload_queries([BugAffectsPerson()])
207+ self.assertRaisesWithContent(
208+ AssertionError,
209+ 'Compound primary keys are not supported: BugAffectsPerson.',
210+ list, db_queries)
211+
212+ def test_load(self):
213+ # load() loads the given objects using queries generated by
214+ # gen_reload_queries().
215+ db_object = self.factory.makeComponent()
216+ db_object_naked = zope.security.proxy.removeSecurityProxy(db_object)
217+ db_object_info = get_obj_info(db_object_naked)
218+ IStore(db_object).flush()
219+ self.failUnlessEqual(None, db_object_info.get('invalidated'))
220+ IStore(db_object).invalidate(db_object)
221+ self.failUnlessEqual(True, db_object_info.get('invalidated'))
222+ bulk.reload([db_object])
223+ self.failUnlessEqual(None, db_object_info.get('invalidated'))
224+
225+
226+def test_suite():
227+ return unittest.TestLoader().loadTestsFromName(__name__)