Merge lp:~wgrant/launchpad/bulk-insert into lp:launchpad

Proposed by William Grant
Status: Merged
Approved by: Curtis Hovey
Approved revision: no longer in the source branch.
Merged at revision: 14851
Proposed branch: lp:~wgrant/launchpad/bulk-insert
Merge into: lp:launchpad
Diff against target: 212 lines (+139/-3)
2 files modified
lib/lp/services/database/bulk.py (+74/-2)
lib/lp/services/database/tests/test_bulk.py (+65/-1)
To merge this branch: bzr merge lp:~wgrant/launchpad/bulk-insert
Reviewer Review Type Date Requested Status
Curtis Hovey (community) code Needs Fixing
Review via email: mp+93930@code.launchpad.net

Commit message

[r=sinzui][no-qa] Introduce lp.services.database.bulk.create().

Description of the change

This branch adds lp.services.database.bulk.create(), a helper to insert multiple DB rows in a single statement.

It touches some of Storm's privates in inappropriate ways in order to handle References in a pretty (and typesafe) manner. Reference columns and values have to be flattened into (potentially compound) primary keys, and the methods to do that are private.

To post a comment you must log in.
Revision history for this message
Curtis Hovey (sinzui) wrote :

Change the assert on line 82. It should be explicitly raised if the method has an integrity issue. Is this an AssertionError? I read "The Storm columns to insert values into. Must be from a single class", which implies this is a ValueError. Maybe?
    if len(clses) == 1:
        raise ValueError('The Storm columns to insert values into must be from a single class.')

review: Needs Fixing (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/database/bulk.py'
2--- lib/lp/services/database/bulk.py 2012-02-20 07:11:22 +0000
3+++ lib/lp/services/database/bulk.py 2012-02-21 22:17:21 +0000
4@@ -5,6 +5,7 @@
5
6 __metaclass__ = type
7 __all__ = [
8+ 'create',
9 'load',
10 'load_referencing',
11 'load_related',
12@@ -13,14 +14,24 @@
13
14
15 from collections import defaultdict
16+from itertools import chain
17 from functools import partial
18-from operator import attrgetter
19+from operator import (
20+ attrgetter,
21+ itemgetter,
22+ )
23
24+from storm.databases.postgres import Returning
25 from storm.expr import (
26 And,
27+ Insert,
28 Or,
29 )
30-from storm.info import get_cls_info
31+from storm.info import (
32+ get_cls_info,
33+ get_obj_info,
34+ )
35+from storm.references import Reference
36 from storm.store import Store
37 from zope.security.proxy import removeSecurityProxy
38
39@@ -153,3 +164,64 @@
40 for owning_object in owning_objects:
41 keys.update(map(partial(getattr, owning_object), foreign_keys))
42 return load(object_type, keys)
43+
44+
45+def _dbify_value(col, val):
46+ """Convert a value into a form that Storm can compile directly."""
47+ if isinstance(col, Reference):
48+ # References are mainly meant to be used as descriptors, so we
49+ # have to perform a bit of evil here to turn the (potentially
50+ # None) value into a sequence of primary key values.
51+ if val is None:
52+ return (None,) * len(col._relation._get_local_columns(col._cls))
53+ else:
54+ return col._relation.get_remote_variables(
55+ get_obj_info(val).get_obj())
56+ else:
57+ return (col.variable_factory(value=val),)
58+
59+
60+def _dbify_column(col):
61+ """Convert a column into a form that Storm can compile directly."""
62+ if isinstance(col, Reference):
63+ # References are mainly meant to be used as descriptors, so we
64+ # haver to perform a bit of evil here to turn the column into
65+ # a sequence of primary key columns.
66+ return col._relation._get_local_columns(col._cls)
67+ else:
68+ return (col,)
69+
70+
71+def create(columns, values):
72+ """Create a large number of objects efficiently.
73+
74+ :param cols: The Storm columns to insert values into. Must be from a
75+ single class.
76+ :param values: A list of lists of values for the columns.
77+ :return: A list of the created objects.
78+ """
79+ # Flatten Reference faux-columns into their primary keys.
80+ db_cols = list(chain.from_iterable(map(_dbify_column, columns)))
81+ clses = set(col.cls for col in db_cols)
82+ if len(clses) != 1:
83+ raise ValueError(
84+ "The Storm columns to insert values into must be from a single "
85+ "class.")
86+ [cls] = clses
87+ primary_key = get_cls_info(cls).primary_key
88+
89+ # Mangle our value list into compilable values. Normal columns just
90+ # get passed through the variable factory, while References get
91+ # squashed into primary key variables.
92+ db_values = [
93+ list(chain.from_iterable(
94+ _dbify_value(col, val) for col, val in zip(columns, value)))
95+ for value in values]
96+
97+ result = IStore(cls).execute(
98+ Returning(Insert(
99+ db_cols, expr=db_values, primary_columns=primary_key)))
100+ if len(primary_key) == 1:
101+ return load(cls, map(itemgetter(0), result))
102+ else:
103+ return load(cls, result)
104
105=== modified file 'lib/lp/services/database/tests/test_bulk.py'
106--- lib/lp/services/database/tests/test_bulk.py 2012-02-20 07:11:22 +0000
107+++ lib/lp/services/database/tests/test_bulk.py 2012-02-21 22:17:21 +0000
108@@ -5,16 +5,26 @@
109
110 __metaclass__ = type
111
112+import datetime
113+
114+from pytz import UTC
115 from storm.exceptions import ClassInfoError
116 from storm.info import get_obj_info
117 from storm.store import Store
118+from testtools.matchers import Equals
119 import transaction
120 from zope.security import (
121 checker,
122 proxy,
123 )
124
125+from lp.bugs.enum import BugNotificationLevel
126 from lp.bugs.model.bug import BugAffectsPerson
127+from lp.bugs.model.bugsubscription import BugSubscription
128+from lp.code.model.branchjob import (
129+ BranchJob,
130+ BranchJobType,
131+ )
132 from lp.code.model.branchsubscription import BranchSubscription
133 from lp.registry.model.person import Person
134 from lp.services.database import bulk
135@@ -27,12 +37,15 @@
136 FeatureFlag,
137 getFeatureStore,
138 )
139+from lp.services.job.model.job import Job
140 from lp.soyuz.model.component import Component
141 from lp.testing import (
142+ StormStatementRecorder,
143 TestCase,
144 TestCaseWithFactory,
145 )
146 from lp.testing.layers import DatabaseFunctionalLayer
147+from lp.testing.matchers import HasQueryCount
148
149
150 object_is_key = lambda thing: thing
151@@ -219,9 +232,60 @@
152 self.factory.makeBranch(),
153 self.factory.makeBranch(),
154 ]
155- expected = set(list(owned_objects[0].subscriptions) +
156+ expected = set(list(owned_objects[0].subscriptions) +
157 list(owned_objects[1].subscriptions))
158 self.assertNotEqual(0, len(expected))
159 self.assertEqual(expected,
160 set(bulk.load_referencing(BranchSubscription, owned_objects,
161 ['branchID'])))
162+
163+
164+class TestCreate(TestCaseWithFactory):
165+
166+ layer = DatabaseFunctionalLayer
167+
168+ def test_references_and_enums(self):
169+ # create() correctly compiles plain types, enums and references.
170+ bug = self.factory.makeBug()
171+ people = [self.factory.makePerson() for i in range(5)]
172+
173+ wanted = [
174+ (bug, person, person, datetime.datetime.now(UTC),
175+ BugNotificationLevel.LIFECYCLE)
176+ for person in people]
177+
178+ with StormStatementRecorder() as recorder:
179+ subs = bulk.create(
180+ (BugSubscription.bug, BugSubscription.person,
181+ BugSubscription.subscribed_by, BugSubscription.date_created,
182+ BugSubscription.bug_notification_level),
183+ wanted)
184+
185+ self.assertThat(recorder, HasQueryCount(Equals(2)))
186+ self.assertContentEqual(
187+ wanted,
188+ ((sub.bug, sub.person, sub.subscribed_by, sub.date_created,
189+ sub.bug_notification_level) for sub in subs))
190+
191+ def test_null_reference(self):
192+ # create() handles None as a Reference value.
193+ job = Job()
194+ IStore(Job).add(job)
195+ wanted = [(None, job, BranchJobType.RECLAIM_BRANCH_SPACE)]
196+ [branchjob] = bulk.create(
197+ (BranchJob.branch, BranchJob.job, BranchJob.job_type),
198+ wanted)
199+ self.assertEqual(
200+ wanted, [(branchjob.branch, branchjob.job, branchjob.job_type)])
201+
202+ def test_fails_on_multiple_classes(self):
203+ # create() only inserts into columns on a single class.
204+ self.assertRaises(
205+ ValueError,
206+ bulk.create, (BugSubscription.bug, BranchSubscription.branch), [])
207+
208+ def test_fails_on_reference_mismatch(self):
209+ # create() handles Reference columns in a typesafe manner.
210+ self.assertRaisesWithContent(
211+ RuntimeError, "Property used in an unknown class",
212+ bulk.create, (BugSubscription.bug,), [[self.factory.makeBranch()]])