Merge lp:~edoardo-serra/storm/select-for-update-support into lp:storm

Proposed by Edoardo Serra
Status: Needs review
Proposed branch: lp:~edoardo-serra/storm/select-for-update-support
Merge into: lp:storm
Diff against target: 510 lines (+188/-18)
6 files modified
storm/databases/sqlite.py (+4/-0)
storm/expr.py (+7/-3)
storm/store.py (+51/-14)
storm/zope/interfaces.py (+3/-0)
tests/expr.py (+13/-1)
tests/store/base.py (+110/-0)
To merge this branch: bzr merge lp:~edoardo-serra/storm/select-for-update-support
Reviewer Review Type Date Requested Status
Storm Developers Pending
Storm Developers Pending
Review via email: mp+35112@code.launchpad.net

Description of the change

This branches implements the SELECT ... FOR UPDATE

To post a comment you must log in.
Revision history for this message
Jamu Kakar (jkakar) wrote :

Edoardo, thanks for this branch! Before we spend time reviewing the
changes, have you signed the Canonical copyright assignment forms?
This process will need to be completed before we can accept changes.

Revision history for this message
Edoardo Serra (edoardo-serra) wrote :

Hi Jamu, sorry I didn't find any mention of this on the Wiki.

I just accepdet the form via email.

Regards

On Sep 10, 2010, at 4:52 PM, Jamu Kakar wrote:

> Edoardo, thanks for this branch! Before we spend time reviewing the
> changes, have you signed the Canonical copyright assignment forms?
> This process will need to be completed before we can accept changes.
>
> --
> https://code.launchpad.net/~edoardo-serra/storm/select-for-update-support/+merge/35112
> You are the owner of lp:~edoardo-serra/storm/select-for-update-support.

Revision history for this message
James Henstridge (jamesh) wrote :

This isn't a full review, but here are a few initial comments:

1. If "SELECT FOR UPDATE" is incompatible with "GROUP BY", perhaps it would make sense to check for that in Select's compile function and raise ExprError there rather than trying to protect all the ResultSet entry points that could cause this problem.

2. You've added a comment asking whether it makes sense to remove the "FOR UPDATE" clause for ResultSet.is_empty(). If is_empty() returns True, then no rows would be locked if "FOR UPDATE" was left in. If it returns False, the program is likely to request those rows if it actually wants them locked. So I don't think the comment is necessary.

3. This doesn't need to go in your branch, but I wonder if it would be worth storing the face that a row has been locked in its obj_info, so that Store.get(..., for_update=True) can avoid a query if the row has already been locked? This could be set in Store._load_object() and unset by Store.invalidate().

Revision history for this message
James Henstridge (jamesh) wrote :

One other point: it is probably worth using an API that can support "FOR SHARE" read locks in addition to "FOR UPDATE" write locks.

Unmerged revisions

380. By Edoardo Serra <eserra@barbera>

SELECT ... FOR UPDATE implemented in Store.get

379. By Edoardo Serra <eserra@barbera>

Unit tests

378. By Edoardo Serra <eserra@barbera>

SELECT ... FOR UPDATE is not compatible with GROUP BY

377. By Edoardo Serra <eserra@barbera>

SQLite does not support SELECT ... FOR UPDATE

376. By Edoardo Serra <eserra@barbera>

Starting implementation of SELECT ... FOR UPATE

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'storm/databases/sqlite.py'
2--- storm/databases/sqlite.py 2010-06-14 11:52:25 +0000
3+++ storm/databases/sqlite.py 2010-09-10 14:44:22 +0000
4@@ -49,6 +49,10 @@
5 def compile_select_sqlite(compile, select, state):
6 if select.offset is not Undef and select.limit is Undef:
7 select.limit = sys.maxint
8+ # SQLite does not support SELECT ... FOR UPDATE
9+ # Can we just ignore it?
10+ if select.for_update:
11+ select.for_update = False
12 statement = compile_select(compile, select, state)
13 if state.context is SELECT:
14 # SQLite breaks with (SELECT ...) UNION (SELECT ...), so we
15
16=== modified file 'storm/expr.py'
17--- storm/expr.py 2009-11-02 11:11:20 +0000
18+++ storm/expr.py 2010-09-10 14:44:22 +0000
19@@ -636,12 +636,13 @@
20
21 class Select(Expr):
22 __slots__ = ("columns", "where", "tables", "default_tables", "order_by",
23- "group_by", "limit", "offset", "distinct", "having")
24+ "group_by", "limit", "offset", "distinct", "having",
25+ 'for_update')
26
27 def __init__(self, columns, where=Undef,
28 tables=Undef, default_tables=Undef,
29- order_by=Undef, group_by=Undef,
30- limit=Undef, offset=Undef, distinct=False, having=Undef):
31+ order_by=Undef, group_by=Undef, limit=Undef,
32+ offset=Undef, distinct=False, having=Undef, for_update=False):
33 self.columns = columns
34 self.where = where
35 self.tables = tables
36@@ -652,6 +653,7 @@
37 self.offset = offset
38 self.distinct = distinct
39 self.having = having
40+ self.for_update = for_update
41
42 @compile.when(Select)
43 def compile_select(compile, select, state):
44@@ -680,6 +682,8 @@
45 tokens.append(" LIMIT %d" % select.limit)
46 if select.offset is not Undef:
47 tokens.append(" OFFSET %d" % select.offset)
48+ if select.for_update:
49+ tokens.append(" FOR UPDATE")
50 if has_tables(state, select):
51 state.context = TABLE
52 state.push("parameters", [])
53
54=== modified file 'storm/store.py'
55--- storm/store.py 2010-09-01 15:00:42 +0000
56+++ storm/store.py 2010-09-10 14:44:22 +0000
57@@ -137,13 +137,16 @@
58 self.invalidate()
59 self._connection.rollback()
60
61- def get(self, cls, key):
62+ def get(self, cls, key, for_update=False):
63 """Get object of type cls with the given primary key from the database.
64
65 If the object is alive the database won't be touched.
66
67+ If the FOR UPDATE clause is used, the query is always executed
68+
69 @param cls: Class of the object to be retrieved.
70 @param key: Primary key of object. May be a tuple for composed keys.
71+ @param for_update: Indicates wether using the clause FOR UPDATE
72
73 @return: The object found with the given primary key, or None
74 if no object is found.
75@@ -165,20 +168,22 @@
76 variable = column.variable_factory(value=variable)
77 primary_vars.append(variable)
78
79- primary_values = tuple(var.get(to_db=True) for var in primary_vars)
80- obj_info = self._alive.get((cls_info.cls, primary_values))
81- if obj_info is not None:
82- if obj_info.get("invalidated"):
83- try:
84- self._validate_alive(obj_info)
85- except LostObjectError:
86- return None
87- return self._get_object(obj_info)
88+ if not for_update:
89+ primary_values = tuple(var.get(to_db=True) for var in primary_vars)
90+ obj_info = self._alive.get((cls_info.cls, primary_values))
91+ if obj_info is not None:
92+ if obj_info.get("invalidated"):
93+ try:
94+ self._validate_alive(obj_info)
95+ except LostObjectError:
96+ return None
97+ return self._get_object(obj_info)
98
99 where = compare_columns(cls_info.primary_key, primary_vars)
100
101 select = Select(cls_info.columns, where,
102- default_tables=cls_info.table, limit=1)
103+ default_tables=cls_info.table, limit=1,
104+ for_update=for_update)
105
106 result = self._connection.execute(select)
107 values = result.get_one()
108@@ -921,6 +926,7 @@
109 self._distinct = False
110 self._group_by = Undef
111 self._having = Undef
112+ self._for_update = False
113
114 def copy(self):
115 """Return a copy of this ResultSet object, with the same configuration.
116@@ -933,7 +939,7 @@
117 result_set._select = copy(self._select)
118 return result_set
119
120- def config(self, distinct=None, offset=None, limit=None):
121+ def config(self, distinct=None, offset=None, limit=None, for_update=None):
122 """Configure this result object in-place. All parameters are optional.
123
124 @param distinct: Boolean enabling/disabling usage of the DISTINCT
125@@ -952,6 +958,8 @@
126 self._offset = offset
127 if limit is not None:
128 self._limit = limit
129+ if for_update is not None:
130+ self._for_update = for_update
131 return self
132
133 def _get_select(self):
134@@ -967,7 +975,7 @@
135 return Select(columns, self._where, self._tables, default_tables,
136 self._order_by, offset=self._offset, limit=self._limit,
137 distinct=self._distinct, group_by=self._group_by,
138- having=self._having)
139+ having=self._having, for_update=self._for_update)
140
141 def _load_objects(self, result, values):
142 return self._find_spec.load_objects(self._store, result, values)
143@@ -1054,6 +1062,9 @@
144 subselect = self._get_select()
145 subselect.limit = 1
146 subselect.order_by = Undef
147+ # Should we really strip FOR UPDATE clause or should we raise a
148+ # FeatureError?
149+ subselect.for_update = False
150 select = Select(1, tables=Alias(subselect, "_tmp"), limit=1)
151 result = self._store._connection.execute(select)
152 return (not result.get_one())
153@@ -1162,6 +1173,15 @@
154 self._order_by = args or Undef
155 return self
156
157+ def for_update(self):
158+ """Acquire an exclusive row-level lock on the rows in the ResultSet
159+ """
160+ if self._group_by is not Undef:
161+ raise FeatureError("SELECT ... FOR UPDATE isn't supported with a "
162+ " GROUP BY clause ")
163+ self._for_update = True
164+ return self
165+
166 def remove(self):
167 """Remove all rows represented by this ResultSet from the database.
168
169@@ -1193,6 +1213,9 @@
170 if self._select is not Undef:
171 raise FeatureError("Grouping isn't supported with "
172 "set expressions (unions, etc)")
173+ if self._for_update:
174+ raise FeatureError("Grouping isn't supported with "
175+ "FOR UPDATE clause")
176
177 find_spec = FindSpec(expr)
178 columns, dummy = find_spec.get_columns_and_tables()
179@@ -1216,6 +1239,9 @@
180 if self._group_by is not Undef:
181 raise FeatureError("Single aggregates aren't supported after a "
182 " GROUP BY clause ")
183+ if self._for_update:
184+ raise FeatureError("Single aggregates aren't supported with"
185+ " FOR UPDATE clause ")
186 columns, default_tables = self._find_spec.get_columns_and_tables()
187 if (self._select is Undef and not self._distinct and
188 self._offset is Undef and self._limit is Undef):
189@@ -1471,6 +1497,8 @@
190 """
191 if isinstance(other, EmptyResultSet):
192 return self
193+ if self._for_update or other._for_update:
194+ raise FeatureError("SELECT FOR UPDATE is not allowed with UNION")
195 return self._set_expr(Union, other, all)
196
197 def difference(self, other, all=False):
198@@ -1480,6 +1508,8 @@
199 """
200 if isinstance(other, EmptyResultSet):
201 return self
202+ if self._for_update or other._for_update:
203+ raise FeatureError("SELECT FOR UPDATE is not allowed with EXCEPT")
204 return self._set_expr(Except, other, all)
205
206 def intersection(self, other, all=False):
207@@ -1489,6 +1519,9 @@
208 """
209 if isinstance(other, EmptyResultSet):
210 return other
211+ if self._for_update or other._for_update:
212+ raise FeatureError("SELECT FOR UPDATE is not allowed with "
213+ "INTERSECT")
214 return self._set_expr(Intersect, other, all)
215
216
217@@ -1516,7 +1549,7 @@
218 result = EmptyResultSet(self._order_by)
219 return result
220
221- def config(self, distinct=None, offset=None, limit=None):
222+ def config(self, distinct=None, offset=None, limit=None, for_update=None):
223 pass
224
225 def __iter__(self):
226@@ -1552,6 +1585,10 @@
227 self._order_by = True
228 return self
229
230+ def for_update(self):
231+ self._for_update = True
232+ return self
233+
234 def remove(self):
235 return 0
236
237
238=== modified file 'storm/zope/interfaces.py'
239--- storm/zope/interfaces.py 2008-11-27 09:37:56 +0000
240+++ storm/zope/interfaces.py 2010-09-10 14:44:22 +0000
241@@ -93,6 +93,9 @@
242 def order_by(*args):
243 """Order the result set based on expressions in C{args}."""
244
245+ def for_update():
246+ """Acquire an exclusive row-level lock on the rows in the ResultSet"""
247+
248 def count(column=Undef, distinct=False):
249 """Returns the number of rows in the result set.
250
251
252=== modified file 'tests/expr.py'
253--- tests/expr.py 2009-11-02 11:11:20 +0000
254+++ tests/expr.py 2010-09-10 14:44:22 +0000
255@@ -65,9 +65,11 @@
256 self.assertEquals(expr.limit, Undef)
257 self.assertEquals(expr.offset, Undef)
258 self.assertEquals(expr.distinct, False)
259+ self.assertEquals(expr.having, Undef)
260+ self.assertEquals(expr.for_update, False)
261
262 def test_select_constructor(self):
263- objects = [object() for i in range(9)]
264+ objects = [object() for i in range(11)]
265 expr = Select(*objects)
266 self.assertEquals(expr.columns, objects[0])
267 self.assertEquals(expr.where, objects[1])
268@@ -78,6 +80,8 @@
269 self.assertEquals(expr.limit, objects[6])
270 self.assertEquals(expr.offset, objects[7])
271 self.assertEquals(expr.distinct, objects[8])
272+ self.assertEquals(expr.having, objects[9])
273+ self.assertEquals(expr.for_update, objects[10])
274
275 def test_insert_default(self):
276 expr = Insert(None)
277@@ -657,6 +661,14 @@
278 'SELECT DISTINCT column1, column2 FROM "table 1"')
279 self.assertEquals(state.parameters, [])
280
281+ def test_select_for_update(self):
282+ expr = Select([column1, column2], Undef, [table1], for_update=True)
283+ state = State()
284+ statement = compile(expr, state)
285+ self.assertEquals(statement,
286+ 'SELECT column1, column2 FROM "table 1" FOR UPDATE')
287+ self.assertEquals(state.parameters, [])
288+
289 def test_select_where(self):
290 expr = Select([column1, Func1()],
291 Func1(),
292
293=== modified file 'tests/store/base.py'
294--- tests/store/base.py 2010-09-01 09:02:41 +0000
295+++ tests/store/base.py 2010-09-10 14:44:22 +0000
296@@ -321,6 +321,18 @@
297 foo = self.store.get(Foo, 40)
298 self.assertEquals(foo, None)
299
300+ def test_get_for_update(self):
301+ stream = StringIO()
302+ self.addCleanup(debug, False)
303+ debug(True, stream)
304+
305+ foo = self.store.get(Foo, 10, for_update=True)
306+ self.assertEquals(foo.id, 10)
307+ self.assertEquals(foo.title, "Title 30")
308+ if self.__class__.__name__.startswith("SQLite"):
309+ return
310+ self.assertIn("FOR UPDATE", stream.getvalue())
311+
312 def test_get_cached(self):
313 foo = self.store.get(Foo, 10)
314 self.assertTrue(self.store.get(Foo, 10) is foo)
315@@ -332,6 +344,13 @@
316 self.store.get(Foo, 10)
317 self.store._connection = connection
318
319+ def test_wb_get_cached_for_update_needa_connection(self):
320+ foo = self.store.get(Foo, 10)
321+ connection = self.store._connection
322+ self.store._connection = None
323+ self.assertRaises(AttributeError, self.store.get, Foo, 10, for_update=True)
324+ self.store._connection = connection
325+
326 def test_cache_cleanup(self):
327 # Disable the cache, which holds strong references.
328 self.get_cache(self.store).set_size(0)
329@@ -569,6 +588,21 @@
330 self.assertEqual(True, result.is_empty())
331 self.assertNotIn("ORDER BY", stream.getvalue())
332
333+ def test_is_empty_strips_for_update(self):
334+ """
335+ L{ResultSet.is_empty} strips the C{FOR UPDATE} clause, if one is
336+ present, since it's not supported in subqueries and thre is no reason
337+ to acquire a row-level lock while testing if a ResultSet is empty
338+ """
339+ stream = StringIO()
340+ self.addCleanup(debug, False)
341+ debug(True, stream)
342+
343+ result = self.store.find(Foo, Foo.id == 300).for_update()
344+ result.order_by(Foo.id)
345+ self.assertEqual(True, result.is_empty())
346+ self.assertNotIn("FOR UPDATE", stream.getvalue())
347+
348 def test_is_empty_with_composed_key(self):
349 result = self.store.find(Link, foo_id=300, bar_id=3000)
350 self.assertEquals(result.is_empty(), True)
351@@ -592,6 +626,23 @@
352 (30, "Title 10"),
353 ])
354
355+ def test_find_for_update(self):
356+ stream = StringIO()
357+ self.addCleanup(debug, False)
358+ debug(True, stream)
359+
360+ result = self.store.find(Foo).for_update()
361+ lst = [(foo.id, foo.title) for foo in result]
362+ lst.sort()
363+ self.assertEquals(lst, [
364+ (10, "Title 30"),
365+ (20, "Title 20"),
366+ (30, "Title 10"),
367+ ])
368+ if self.__class__.__name__.startswith("SQLite"):
369+ return
370+ self.assertIn("FOR UPDATE", stream.getvalue())
371+
372 def test_find_from_cache(self):
373 foo = self.store.get(Foo, 10)
374 self.assertTrue(self.store.find(Foo, id=10).one() is foo)
375@@ -940,6 +991,11 @@
376 def test_find_count(self):
377 self.assertEquals(self.store.find(Foo).count(), 3)
378
379+ def test_find_count_for_update(self):
380+ result = self.store.find(Foo)
381+ result.for_update()
382+ self.assertRaises(FeatureError, result.count, 3)
383+
384 def test_find_count_after_slice(self):
385 """
386 When we slice a ResultSet obtained after a set operation (like union),
387@@ -1007,6 +1063,11 @@
388 def test_find_max(self):
389 self.assertEquals(self.store.find(Foo).max(Foo.id), 30)
390
391+ def test_find_max_for_update(self):
392+ result = self.store.find(Foo)
393+ result.for_update()
394+ self.assertRaises(FeatureError, result.max, Foo.id)
395+
396 def test_find_max_expr(self):
397 self.assertEquals(self.store.find(Foo).max(Foo.id + 1), 31)
398
399@@ -1028,6 +1089,11 @@
400 def test_find_min(self):
401 self.assertEquals(self.store.find(Foo).min(Foo.id), 10)
402
403+ def test_find_min_for_update(self):
404+ result = self.store.find(Foo)
405+ result.for_update()
406+ self.assertRaises(FeatureError, result.min, Foo.id)
407+
408 def test_find_min_expr(self):
409 self.assertEquals(self.store.find(Foo).min(Foo.id - 1), 9)
410
411@@ -1049,6 +1115,11 @@
412 def test_find_avg(self):
413 self.assertEquals(self.store.find(Foo).avg(Foo.id), 20)
414
415+ def test_find_avg_for_update(self):
416+ result = self.store.find(Foo)
417+ result.for_update()
418+ self.assertRaises(FeatureError, result.avg, Foo.id)
419+
420 def test_find_avg_expr(self):
421 self.assertEquals(self.store.find(Foo).avg(Foo.id + 10), 30)
422
423@@ -1062,6 +1133,11 @@
424 def test_find_sum(self):
425 self.assertEquals(self.store.find(Foo).sum(Foo.id), 60)
426
427+ def test_find_sum_for_update(self):
428+ result = self.store.find(Foo)
429+ result.for_update()
430+ self.assertRaises(FeatureError, result.sum, Foo.id)
431+
432 def test_find_sum_expr(self):
433 self.assertEquals(self.store.find(Foo).sum(Foo.id * 2), 120)
434
435@@ -1546,6 +1622,11 @@
436 result = list(result)
437 self.assertEquals(result, [(2L, 2L), (2L, 2L), (2L, 3L), (3L, 6L)])
438
439+ def test_find_group_by_for_update(self):
440+ result = self.store.find((Count(FooValue.id), Sum(FooValue.value1)))
441+ result.group_by(FooValue.value2)
442+ self.assertRaises(FeatureError, result.for_update)
443+
444 def test_find_group_by_table(self):
445 result = self.store.find(
446 (Sum(FooValue.value2), Foo), Foo.id == FooValue.foo_id)
447@@ -2271,6 +2352,13 @@
448 self.assertEquals(foo.id, 20)
449 self.assertEquals(foo.title, "Title 20")
450
451+ def test_sub_select_for_update(self):
452+ foo = self.store.find(Foo, Foo.id == Select(SQL("20"))
453+ ).for_update().one()
454+ self.assertTrue(foo)
455+ self.assertEquals(foo.id, 20)
456+ self.assertEquals(foo.title, "Title 20")
457+
458 def test_cache_has_improper_object(self):
459 foo = self.store.get(Foo, 20)
460 self.store.remove(foo)
461@@ -5472,6 +5560,12 @@
462 (30, "Title 10"),
463 ])
464
465+ def test_result_union_for_update(self):
466+ result1 = self.store.find(Foo, id=30).for_update()
467+ result2 = self.store.find(Foo, id=10)
468+ self.assertRaises(FeatureError, result1.union, result2)
469+ self.assertRaises(FeatureError, result2.union, result1)
470+
471 def test_result_union_duplicated(self):
472 result1 = self.store.find(Foo, id=30)
473 result2 = self.store.find(Foo, id=30)
474@@ -5544,6 +5638,12 @@
475 (30, "Title 10"),
476 ])
477
478+ def test_result_difference_for_update(self):
479+ result1 = self.store.find(Foo).for_update()
480+ result2 = self.store.find(Foo, id=10)
481+ self.assertRaises(FeatureError, result1.difference, result2)
482+ self.assertRaises(FeatureError, result2.difference, result1)
483+
484 def test_result_difference_with_empty(self):
485 if self.__class__.__name__.startswith("MySQL"):
486 return
487@@ -5605,6 +5705,12 @@
488 (30, "Title 10"),
489 ])
490
491+ def test_result_intersection_for_update(self):
492+ result1 = self.store.find(Foo).for_update()
493+ result2 = self.store.find(Foo, Foo.id.is_in((10, 30)))
494+ self.assertRaises(FeatureError, result1.intersection, result2)
495+ self.assertRaises(FeatureError, result2.intersection, result1)
496+
497 def test_result_intersection_with_empty(self):
498 if self.__class__.__name__.startswith("MySQL"):
499 return
500@@ -6048,6 +6154,10 @@
501 self.assertEquals(self.result.order_by(Foo.title), self.result)
502 self.assertEquals(self.empty.order_by(Foo.title), self.empty)
503
504+ def test_for_update(self):
505+ self.assertEquals(self.result.for_update(), self.result)
506+ self.assertEquals(self.empty.for_update(), self.empty)
507+
508 def test_remove(self):
509 self.assertEquals(self.result.remove(), 0)
510 self.assertEquals(self.empty.remove(), 0)

Subscribers

People subscribed via source and target branches

to status/vote changes: