Merge lp:~allenap/maas/retry-integrity-errors into lp:~maas-committers/maas/trunk
- retry-integrity-errors
- Merge into trunk
Proposed by
Gavin Panella
Status: | Merged |
---|---|
Approved by: | Gavin Panella |
Approved revision: | no longer in the source branch. |
Merged at revision: | 5136 |
Proposed branch: | lp:~allenap/maas/retry-integrity-errors |
Merge into: | lp:~maas-committers/maas/trunk |
Diff against target: |
667 lines (+400/-27) 3 files modified
src/maasserver/testing/testcase.py (+162/-10) src/maasserver/utils/orm.py (+93/-7) src/maasserver/utils/tests/test_orm.py (+145/-10) |
To merge this branch: | bzr merge lp:~allenap/maas/retry-integrity-errors |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Blake Rouse (community) | Approve | ||
Review via email: mp+297673@code.launchpad.net |
Commit message
Retry transactions when they fail with unique violation errors.
This also makes explicit retrying via request_
Description of the change
To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote : | # |
Thanks!
Revision history for this message
Gavin Panella (allenap) wrote : | # |
Because of the potential for fallout at this late time in the release cycle I'm running this through CI before landing.
Revision history for this message
Gavin Panella (allenap) wrote : | # |
All good in CI.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'src/maasserver/testing/testcase.py' | |||
2 | --- src/maasserver/testing/testcase.py 2016-05-24 12:51:54 +0000 | |||
3 | +++ src/maasserver/testing/testcase.py 2016-06-17 14:52:22 +0000 | |||
4 | @@ -9,9 +9,10 @@ | |||
5 | 9 | 'SeleniumTestCase', | 9 | 'SeleniumTestCase', |
6 | 10 | 'SerializationFailureTestCase', | 10 | 'SerializationFailureTestCase', |
7 | 11 | 'TestWithoutCrochetMixin', | 11 | 'TestWithoutCrochetMixin', |
8 | 12 | 'UniqueViolationTestCase', | ||
9 | 12 | ] | 13 | ] |
10 | 13 | 14 | ||
12 | 14 | from contextlib import closing | 15 | from itertools import count |
13 | 15 | import socketserver | 16 | import socketserver |
14 | 16 | import sys | 17 | import sys |
15 | 17 | import threading | 18 | import threading |
16 | @@ -28,13 +29,19 @@ | |||
17 | 28 | connection, | 29 | connection, |
18 | 29 | transaction, | 30 | transaction, |
19 | 30 | ) | 31 | ) |
21 | 31 | from django.db.utils import OperationalError | 32 | from django.db.utils import ( |
22 | 33 | IntegrityError, | ||
23 | 34 | OperationalError, | ||
24 | 35 | ) | ||
25 | 32 | from fixtures import Fixture | 36 | from fixtures import Fixture |
26 | 33 | from maasserver.fields import register_mac_type | 37 | from maasserver.fields import register_mac_type |
27 | 34 | from maasserver.testing.factory import factory | 38 | from maasserver.testing.factory import factory |
28 | 35 | from maasserver.testing.orm import PostCommitHooksTestMixin | 39 | from maasserver.testing.orm import PostCommitHooksTestMixin |
29 | 36 | from maasserver.testing.testclient import MAASSensibleClient | 40 | from maasserver.testing.testclient import MAASSensibleClient |
31 | 37 | from maasserver.utils.orm import is_serialization_failure | 41 | from maasserver.utils.orm import ( |
32 | 42 | is_serialization_failure, | ||
33 | 43 | is_unique_violation, | ||
34 | 44 | ) | ||
35 | 38 | from maastesting.djangotestcase import ( | 45 | from maastesting.djangotestcase import ( |
36 | 39 | DjangoTestCase, | 46 | DjangoTestCase, |
37 | 40 | DjangoTransactionTestCase, | 47 | DjangoTransactionTestCase, |
38 | @@ -234,11 +241,11 @@ | |||
39 | 234 | DjangoTransactionTestCase, PostCommitHooksTestMixin): | 241 | DjangoTransactionTestCase, PostCommitHooksTestMixin): |
40 | 235 | 242 | ||
41 | 236 | def create_stest_table(self): | 243 | def create_stest_table(self): |
43 | 237 | with closing(connection.cursor()) as cursor: | 244 | with connection.cursor() as cursor: |
44 | 238 | cursor.execute("CREATE TABLE IF NOT EXISTS stest (a INTEGER)") | 245 | cursor.execute("CREATE TABLE IF NOT EXISTS stest (a INTEGER)") |
45 | 239 | 246 | ||
46 | 240 | def drop_stest_table(self): | 247 | def drop_stest_table(self): |
48 | 241 | with closing(connection.cursor()) as cursor: | 248 | with connection.cursor() as cursor: |
49 | 242 | cursor.execute("DROP TABLE IF EXISTS stest") | 249 | cursor.execute("DROP TABLE IF EXISTS stest") |
50 | 243 | 250 | ||
51 | 244 | def setUp(self): | 251 | def setUp(self): |
52 | @@ -247,7 +254,7 @@ | |||
53 | 247 | # Put something into the stest table upon which to trigger a | 254 | # Put something into the stest table upon which to trigger a |
54 | 248 | # serialization failure. | 255 | # serialization failure. |
55 | 249 | with transaction.atomic(): | 256 | with transaction.atomic(): |
57 | 250 | with closing(connection.cursor()) as cursor: | 257 | with connection.cursor() as cursor: |
58 | 251 | cursor.execute("INSERT INTO stest VALUES (1)") | 258 | cursor.execute("INSERT INTO stest VALUES (1)") |
59 | 252 | 259 | ||
60 | 253 | def tearDown(self): | 260 | def tearDown(self): |
61 | @@ -258,7 +265,7 @@ | |||
62 | 258 | """Trigger an honest, from the database, serialization failure.""" | 265 | """Trigger an honest, from the database, serialization failure.""" |
63 | 259 | # Helper to switch the transaction to SERIALIZABLE. | 266 | # Helper to switch the transaction to SERIALIZABLE. |
64 | 260 | def set_serializable(): | 267 | def set_serializable(): |
66 | 261 | with closing(connection.cursor()) as cursor: | 268 | with connection.cursor() as cursor: |
67 | 262 | cursor.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") | 269 | cursor.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") |
68 | 263 | 270 | ||
69 | 264 | # Perform a conflicting update. This must run in a separate thread. It | 271 | # Perform a conflicting update. This must run in a separate thread. It |
70 | @@ -269,7 +276,7 @@ | |||
71 | 269 | def do_conflicting_update(): | 276 | def do_conflicting_update(): |
72 | 270 | try: | 277 | try: |
73 | 271 | with transaction.atomic(): | 278 | with transaction.atomic(): |
75 | 272 | with closing(connection.cursor()) as cursor: | 279 | with connection.cursor() as cursor: |
76 | 273 | cursor.execute("UPDATE stest SET a = 2") | 280 | cursor.execute("UPDATE stest SET a = 2") |
77 | 274 | finally: | 281 | finally: |
78 | 275 | close_old_connections() | 282 | close_old_connections() |
79 | @@ -278,7 +285,7 @@ | |||
80 | 278 | # Fetch something first. This ensures that we're inside the | 285 | # Fetch something first. This ensures that we're inside the |
81 | 279 | # transaction, and that the database has a reference point for | 286 | # transaction, and that the database has a reference point for |
82 | 280 | # calculating serialization failures. | 287 | # calculating serialization failures. |
84 | 281 | with closing(connection.cursor()) as cursor: | 288 | with connection.cursor() as cursor: |
85 | 282 | cursor.execute("SELECT * FROM stest") | 289 | cursor.execute("SELECT * FROM stest") |
86 | 283 | cursor.fetchall() | 290 | cursor.fetchall() |
87 | 284 | 291 | ||
88 | @@ -290,7 +297,7 @@ | |||
89 | 290 | # Updating the same rows as do_conflicting_update() did will | 297 | # Updating the same rows as do_conflicting_update() did will |
90 | 291 | # trigger a serialization failure. We have to check the __cause__ | 298 | # trigger a serialization failure. We have to check the __cause__ |
91 | 292 | # to confirm the failure type as reported by PostgreSQL. | 299 | # to confirm the failure type as reported by PostgreSQL. |
93 | 293 | with closing(connection.cursor()) as cursor: | 300 | with connection.cursor() as cursor: |
94 | 294 | cursor.execute("UPDATE stest SET a = 4") | 301 | cursor.execute("UPDATE stest SET a = 4") |
95 | 295 | 302 | ||
96 | 296 | if connection.in_atomic_block: | 303 | if connection.in_atomic_block: |
97 | @@ -312,3 +319,148 @@ | |||
98 | 312 | return sys.exc_info() | 319 | return sys.exc_info() |
99 | 313 | else: | 320 | else: |
100 | 314 | raise | 321 | raise |
101 | 322 | |||
102 | 323 | |||
103 | 324 | class UniqueViolationTestCase( | ||
104 | 325 | DjangoTransactionTestCase, PostCommitHooksTestMixin): | ||
105 | 326 | |||
106 | 327 | def create_uvtest_table(self): | ||
107 | 328 | with connection.cursor() as cursor: | ||
108 | 329 | cursor.execute("DROP TABLE IF EXISTS uvtest") | ||
109 | 330 | cursor.execute("CREATE TABLE uvtest (a INTEGER PRIMARY KEY)") | ||
110 | 331 | |||
111 | 332 | def drop_uvtest_table(self): | ||
112 | 333 | with connection.cursor() as cursor: | ||
113 | 334 | cursor.execute("DROP TABLE IF EXISTS uvtest") | ||
114 | 335 | |||
115 | 336 | def setUp(self): | ||
116 | 337 | super(UniqueViolationTestCase, self).setUp() | ||
117 | 338 | self.conflicting_values = count(1) | ||
118 | 339 | self.create_uvtest_table() | ||
119 | 340 | |||
120 | 341 | def tearDown(self): | ||
121 | 342 | super(UniqueViolationTestCase, self).tearDown() | ||
122 | 343 | self.drop_uvtest_table() | ||
123 | 344 | |||
124 | 345 | def cause_unique_violation(self): | ||
125 | 346 | """Trigger an honest, from the database, unique violation. | ||
126 | 347 | |||
127 | 348 | This may appear needlessly elaborate, but it's for a good reason. | ||
128 | 349 | Indexes in PostgreSQL are a bit weird; they don't fully support MVCC | ||
129 | 350 | so it's possible for situations like the following: | ||
130 | 351 | |||
131 | 352 | CREATE TABLE foo (id SERIAL PRIMARY KEY); | ||
132 | 353 | -- Session A: | ||
133 | 354 | BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; | ||
134 | 355 | INSERT INTO foo (id) VALUES (1); | ||
135 | 356 | -- Session B: | ||
136 | 357 | BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; | ||
137 | 358 | SELECT id FROM foo; -- Nothing. | ||
138 | 359 | INSERT INTO foo (id) VALUES (1); -- Hangs. | ||
139 | 360 | -- Session A: | ||
140 | 361 | COMMIT; | ||
141 | 362 | -- Session B: | ||
142 | 363 | ERROR: duplicate key value violates unique constraint "..." | ||
143 | 364 | DETAIL: Key (id)=(1) already exists. | ||
144 | 365 | |||
145 | 366 | Two things to note: | ||
146 | 367 | |||
147 | 368 | 1. Session B hangs when there's a potential conflict on id's index. | ||
148 | 369 | |||
149 | 370 | 2. Session B fails with a duplicate key error. | ||
150 | 371 | |||
151 | 372 | Both differ from expectations: | ||
152 | 373 | |||
153 | 374 | 1. I would expect the transaction to continue optimistically and | ||
154 | 375 | only fail if session A commits. | ||
155 | 376 | |||
156 | 377 | 2. I would expect a serialisation failure instead. | ||
157 | 378 | |||
158 | 379 | This method jumps through hoops to reproduce the situation above so | ||
159 | 380 | that we're testing against PostgreSQL's exact behaviour as of today, | ||
160 | 381 | not the behaviour that we observed at a single moment in time. | ||
161 | 382 | PostgreSQL may change its behaviour in later versions and this test | ||
162 | 383 | ought to tell us about it. | ||
163 | 384 | |||
164 | 385 | """ | ||
165 | 386 | # Helper to switch the transaction to REPEATABLE READ. | ||
166 | 387 | def set_repeatable_read(): | ||
167 | 388 | with connection.cursor() as cursor: | ||
168 | 389 | cursor.execute( | ||
169 | 390 | "SET TRANSACTION ISOLATION LEVEL " | ||
170 | 391 | "REPEATABLE READ") | ||
171 | 392 | |||
172 | 393 | # Both threads / database sessions will attempt to insert this. | ||
173 | 394 | conflicting_value = next(self.conflicting_values) | ||
174 | 395 | |||
175 | 396 | # Perform a conflicting insert. This must run in a separate thread. It | ||
176 | 397 | # also must begin after the beginning of the transaction in which we | ||
177 | 398 | # will trigger a unique violation AND commit before that other | ||
178 | 399 | # transaction commits. This doesn't need to run with any special | ||
179 | 400 | # isolation; it just needs to be in a transaction. | ||
180 | 401 | def do_conflicting_insert(): | ||
181 | 402 | try: | ||
182 | 403 | with transaction.atomic(): | ||
183 | 404 | with connection.cursor() as cursor: | ||
184 | 405 | cursor.execute( | ||
185 | 406 | "INSERT INTO uvtest VALUES (%s)", | ||
186 | 407 | [conflicting_value]) | ||
187 | 408 | finally: | ||
188 | 409 | close_old_connections() | ||
189 | 410 | |||
190 | 411 | def trigger_unique_violation(): | ||
191 | 412 | # Fetch something first. This ensures that we're inside the | ||
192 | 413 | # transaction, and so the database has a reference point for | ||
193 | 414 | # repeatable reads. | ||
194 | 415 | with connection.cursor() as cursor: | ||
195 | 416 | cursor.execute( | ||
196 | 417 | "SELECT 1 FROM uvtest WHERE a = %s", | ||
197 | 418 | [conflicting_value]) | ||
198 | 419 | self.assertIsNone(cursor.fetchone(), ( | ||
199 | 420 | "We've seen through PostgreSQL impenetrable transaction " | ||
200 | 421 | "isolation — or so we once thought — to witness a " | ||
201 | 422 | "conflicting value from another database session. " | ||
202 | 423 | "Needless to say, this requires investigation.")) | ||
203 | 424 | |||
204 | 425 | # Run do_conflicting_insert() in a separate thread and wait for it | ||
205 | 426 | # to commit and return. | ||
206 | 427 | thread = threading.Thread(target=do_conflicting_insert) | ||
207 | 428 | thread.start() | ||
208 | 429 | thread.join() | ||
209 | 430 | |||
210 | 431 | # Still no sign of that conflicting value from here. | ||
211 | 432 | with connection.cursor() as cursor: | ||
212 | 433 | cursor.execute( | ||
213 | 434 | "SELECT 1 FROM uvtest WHERE a = %s", | ||
214 | 435 | [conflicting_value]) | ||
215 | 436 | self.assertIsNone(cursor.fetchone(), ( | ||
216 | 437 | "PostgreSQL, once thought of highly in transactional " | ||
217 | 438 | "circles, has dropped its kimono and disgraced itself " | ||
218 | 439 | "with its wanton exhibition of conflicting values from " | ||
219 | 440 | "another's session.")) | ||
220 | 441 | |||
221 | 442 | # Inserting the same row will trigger a unique violation. | ||
222 | 443 | with connection.cursor() as cursor: | ||
223 | 444 | cursor.execute( | ||
224 | 445 | "INSERT INTO uvtest VALUES (%s)", | ||
225 | 446 | [conflicting_value]) | ||
226 | 447 | |||
227 | 448 | if connection.in_atomic_block: | ||
228 | 449 | # We're already in a transaction. | ||
229 | 450 | set_repeatable_read() | ||
230 | 451 | trigger_unique_violation() | ||
231 | 452 | else: | ||
232 | 453 | # Start a transaction in this thread. | ||
233 | 454 | with transaction.atomic(): | ||
234 | 455 | set_repeatable_read() | ||
235 | 456 | trigger_unique_violation() | ||
236 | 457 | |||
237 | 458 | def capture_unique_violation(self): | ||
238 | 459 | """Trigger a unique violation, return its ``exc_info`` tuple.""" | ||
239 | 460 | try: | ||
240 | 461 | self.cause_unique_violation() | ||
241 | 462 | except IntegrityError as e: | ||
242 | 463 | if is_unique_violation(e): | ||
243 | 464 | return sys.exc_info() | ||
244 | 465 | else: | ||
245 | 466 | raise | ||
246 | 315 | 467 | ||
247 | === modified file 'src/maasserver/utils/orm.py' | |||
248 | --- src/maasserver/utils/orm.py 2016-03-28 13:54:47 +0000 | |||
249 | +++ src/maasserver/utils/orm.py 2016-06-17 14:52:22 +0000 | |||
250 | @@ -16,8 +16,10 @@ | |||
251 | 16 | 'is_deadlock_failure', | 16 | 'is_deadlock_failure', |
252 | 17 | 'is_retryable_failure', | 17 | 'is_retryable_failure', |
253 | 18 | 'is_serialization_failure', | 18 | 'is_serialization_failure', |
254 | 19 | 'is_unique_violation', | ||
255 | 19 | 'make_deadlock_failure', | 20 | 'make_deadlock_failure', |
256 | 20 | 'make_serialization_failure', | 21 | 'make_serialization_failure', |
257 | 22 | 'make_unique_violation', | ||
258 | 21 | 'post_commit', | 23 | 'post_commit', |
259 | 22 | 'post_commit_do', | 24 | 'post_commit_do', |
260 | 23 | 'psql_array', | 25 | 'psql_array', |
261 | @@ -54,7 +56,11 @@ | |||
262 | 54 | ) | 56 | ) |
263 | 55 | from django.db.models import Q | 57 | from django.db.models import Q |
264 | 56 | from django.db.transaction import TransactionManagementError | 58 | from django.db.transaction import TransactionManagementError |
266 | 57 | from django.db.utils import OperationalError | 59 | from django.db.utils import ( |
267 | 60 | DatabaseError, | ||
268 | 61 | IntegrityError, | ||
269 | 62 | OperationalError, | ||
270 | 63 | ) | ||
271 | 58 | from django.http import Http404 | 64 | from django.http import Http404 |
272 | 59 | from maasserver.exceptions import ( | 65 | from maasserver.exceptions import ( |
273 | 60 | MAASAPIBadRequest, | 66 | MAASAPIBadRequest, |
274 | @@ -72,6 +78,7 @@ | |||
275 | 72 | from psycopg2.errorcodes import ( | 78 | from psycopg2.errorcodes import ( |
276 | 73 | DEADLOCK_DETECTED, | 79 | DEADLOCK_DETECTED, |
277 | 74 | SERIALIZATION_FAILURE, | 80 | SERIALIZATION_FAILURE, |
278 | 81 | UNIQUE_VIOLATION, | ||
279 | 75 | ) | 82 | ) |
280 | 76 | from twisted.internet.defer import Deferred | 83 | from twisted.internet.defer import Deferred |
281 | 77 | 84 | ||
282 | @@ -273,6 +280,68 @@ | |||
283 | 273 | return get_psycopg2_deadlock_exception(exception) is not None | 280 | return get_psycopg2_deadlock_exception(exception) is not None |
284 | 274 | 281 | ||
285 | 275 | 282 | ||
286 | 283 | def get_psycopg2_unique_violation_exception(exception): | ||
287 | 284 | """Return the root-cause if `exception` is a unique violation. | ||
288 | 285 | |||
289 | 286 | PostgreSQL sets a specific error code, "23505", when a transaction breaks | ||
290 | 287 | because of a unique violation. | ||
291 | 288 | |||
292 | 289 | :return: The underlying `psycopg2.Error` if it's a unique violation, or | ||
293 | 290 | `None` if there isn't one. | ||
294 | 291 | """ | ||
295 | 292 | exception = get_psycopg2_exception(exception) | ||
296 | 293 | if exception is None: | ||
297 | 294 | return None | ||
298 | 295 | elif exception.pgcode == UNIQUE_VIOLATION: | ||
299 | 296 | return exception | ||
300 | 297 | else: | ||
301 | 298 | return None | ||
302 | 299 | |||
303 | 300 | |||
304 | 301 | def is_unique_violation(exception): | ||
305 | 302 | """Does `exception` represent a unique violation? | ||
306 | 303 | |||
307 | 304 | PostgreSQL sets a specific error code, "23505", when a transaction breaks | ||
308 | 305 | because of a unique violation. | ||
309 | 306 | """ | ||
310 | 307 | return get_psycopg2_unique_violation_exception(exception) is not None | ||
311 | 308 | |||
312 | 309 | |||
313 | 310 | class UniqueViolation(psycopg2.IntegrityError): | ||
314 | 311 | """Explicit serialization failure. | ||
315 | 312 | |||
316 | 313 | A real unique violation, arising out of psycopg2 (and thus signalled from | ||
317 | 314 | the database) would *NOT* be an instance of this class. However, it is not | ||
318 | 315 | obvious how to create a `psycopg2.IntegrityError` with ``pgcode`` set to | ||
319 | 316 | `UNIQUE_VIOLATION` without subclassing. I suspect only the C interface can | ||
320 | 317 | do that. | ||
321 | 318 | """ | ||
322 | 319 | pgcode = UNIQUE_VIOLATION | ||
323 | 320 | |||
324 | 321 | |||
325 | 322 | def make_unique_violation(): | ||
326 | 323 | """Make a serialization exception. | ||
327 | 324 | |||
328 | 325 | Artificially construct an exception that resembles what Django's ORM would | ||
329 | 326 | raise when PostgreSQL fails a transaction because of a unique violation. | ||
330 | 327 | |||
331 | 328 | :returns: an instance of :py:class:`IntegrityError` that will pass the | ||
332 | 329 | `is_unique_violation` predicate. | ||
333 | 330 | """ | ||
334 | 331 | exception = IntegrityError() | ||
335 | 332 | exception.__cause__ = UniqueViolation() | ||
336 | 333 | assert is_unique_violation(exception) | ||
337 | 334 | return exception | ||
338 | 335 | |||
339 | 336 | |||
340 | 337 | class RetryTransaction(BaseException): | ||
341 | 338 | """An explicit request that the transaction be retried.""" | ||
342 | 339 | |||
343 | 340 | |||
344 | 341 | class TooManyRetries(Exception): | ||
345 | 342 | """A transaction retry has been requested too many times.""" | ||
346 | 343 | |||
347 | 344 | |||
348 | 276 | def request_transaction_retry(): | 345 | def request_transaction_retry(): |
349 | 277 | """Raise a serialization exception. | 346 | """Raise a serialization exception. |
350 | 278 | 347 | ||
351 | @@ -280,15 +349,24 @@ | |||
352 | 280 | this, and then retrying the transaction, though it may choose to re-raise | 349 | this, and then retrying the transaction, though it may choose to re-raise |
353 | 281 | the error if too many retries have already been attempted. | 350 | the error if too many retries have already been attempted. |
354 | 282 | 351 | ||
356 | 283 | :raises OperationalError: | 352 | :raise RetryTransaction: |
357 | 284 | """ | 353 | """ |
359 | 285 | raise make_serialization_failure() | 354 | raise RetryTransaction() |
360 | 286 | 355 | ||
361 | 287 | 356 | ||
362 | 288 | def is_retryable_failure(exception): | 357 | def is_retryable_failure(exception): |
364 | 289 | """Does `exception` represent a serialization or deadlock failure?""" | 358 | """Does `exception` represent a retryable failure? |
365 | 359 | |||
366 | 360 | This does NOT include requested retries, i.e. `RetryTransaction`. | ||
367 | 361 | |||
368 | 362 | :param exception: An instance of :class:`DatabaseError` or one of its | ||
369 | 363 | subclasses. | ||
370 | 364 | """ | ||
371 | 290 | return ( | 365 | return ( |
373 | 291 | is_serialization_failure(exception) or is_deadlock_failure(exception)) | 366 | is_serialization_failure(exception) or |
374 | 367 | is_deadlock_failure(exception) or | ||
375 | 368 | is_unique_violation(exception) | ||
376 | 369 | ) | ||
377 | 292 | 370 | ||
378 | 293 | 371 | ||
379 | 294 | def gen_retry_intervals(base=0.01, rate=2.5, maximum=10.0): | 372 | def gen_retry_intervals(base=0.01, rate=2.5, maximum=10.0): |
380 | @@ -341,14 +419,22 @@ | |||
381 | 341 | for _ in range(9): | 419 | for _ in range(9): |
382 | 342 | try: | 420 | try: |
383 | 343 | return func(*args, **kwargs) | 421 | return func(*args, **kwargs) |
385 | 344 | except OperationalError as error: | 422 | except RetryTransaction: |
386 | 423 | reset() # Which may do nothing. | ||
387 | 424 | sleep(next(intervals)) | ||
388 | 425 | except DatabaseError as error: | ||
389 | 345 | if is_retryable_failure(error): | 426 | if is_retryable_failure(error): |
390 | 346 | reset() # Which may do nothing. | 427 | reset() # Which may do nothing. |
391 | 347 | sleep(next(intervals)) | 428 | sleep(next(intervals)) |
392 | 348 | else: | 429 | else: |
393 | 349 | raise | 430 | raise |
394 | 350 | else: | 431 | else: |
396 | 351 | return func(*args, **kwargs) | 432 | try: |
397 | 433 | return func(*args, **kwargs) | ||
398 | 434 | except RetryTransaction: | ||
399 | 435 | raise TooManyRetries( | ||
400 | 436 | "This transaction has already been attempted " | ||
401 | 437 | "multiple times; giving up.") | ||
402 | 352 | return retrier | 438 | return retrier |
403 | 353 | 439 | ||
404 | 354 | 440 | ||
405 | 355 | 441 | ||
406 | === modified file 'src/maasserver/utils/tests/test_orm.py' | |||
407 | --- src/maasserver/utils/tests/test_orm.py 2016-05-12 19:07:37 +0000 | |||
408 | +++ src/maasserver/utils/tests/test_orm.py 2016-06-17 14:52:22 +0000 | |||
409 | @@ -26,12 +26,16 @@ | |||
410 | 26 | ) | 26 | ) |
411 | 27 | from django.db.backends.base.base import BaseDatabaseWrapper | 27 | from django.db.backends.base.base import BaseDatabaseWrapper |
412 | 28 | from django.db.transaction import TransactionManagementError | 28 | from django.db.transaction import TransactionManagementError |
414 | 29 | from django.db.utils import OperationalError | 29 | from django.db.utils import ( |
415 | 30 | IntegrityError, | ||
416 | 31 | OperationalError, | ||
417 | 32 | ) | ||
418 | 30 | from maasserver.models import Node | 33 | from maasserver.models import Node |
419 | 31 | from maasserver.testing.testcase import ( | 34 | from maasserver.testing.testcase import ( |
420 | 32 | MAASServerTestCase, | 35 | MAASServerTestCase, |
421 | 33 | MAASTransactionServerTestCase, | 36 | MAASTransactionServerTestCase, |
422 | 34 | SerializationFailureTestCase, | 37 | SerializationFailureTestCase, |
423 | 38 | UniqueViolationTestCase, | ||
424 | 35 | ) | 39 | ) |
425 | 36 | from maasserver.utils import orm | 40 | from maasserver.utils import orm |
426 | 37 | from maasserver.utils.orm import ( | 41 | from maasserver.utils.orm import ( |
427 | @@ -46,11 +50,12 @@ | |||
428 | 46 | get_psycopg2_deadlock_exception, | 50 | get_psycopg2_deadlock_exception, |
429 | 47 | get_psycopg2_exception, | 51 | get_psycopg2_exception, |
430 | 48 | get_psycopg2_serialization_exception, | 52 | get_psycopg2_serialization_exception, |
431 | 53 | get_psycopg2_unique_violation_exception, | ||
432 | 49 | in_transaction, | 54 | in_transaction, |
433 | 50 | is_deadlock_failure, | 55 | is_deadlock_failure, |
434 | 51 | is_retryable_failure, | 56 | is_retryable_failure, |
435 | 52 | is_serialization_failure, | 57 | is_serialization_failure, |
437 | 53 | make_serialization_failure, | 58 | is_unique_violation, |
438 | 54 | post_commit, | 59 | post_commit, |
439 | 55 | post_commit_do, | 60 | post_commit_do, |
440 | 56 | post_commit_hooks, | 61 | post_commit_hooks, |
441 | @@ -81,6 +86,7 @@ | |||
442 | 81 | from psycopg2.errorcodes import ( | 86 | from psycopg2.errorcodes import ( |
443 | 82 | DEADLOCK_DETECTED, | 87 | DEADLOCK_DETECTED, |
444 | 83 | SERIALIZATION_FAILURE, | 88 | SERIALIZATION_FAILURE, |
445 | 89 | UNIQUE_VIOLATION, | ||
446 | 84 | ) | 90 | ) |
447 | 85 | from testtools import ExpectedException | 91 | from testtools import ExpectedException |
448 | 86 | from testtools.matchers import ( | 92 | from testtools.matchers import ( |
449 | @@ -221,6 +227,16 @@ | |||
450 | 221 | SERIALIZATION_FAILURE, error.__cause__.pgcode) | 227 | SERIALIZATION_FAILURE, error.__cause__.pgcode) |
451 | 222 | 228 | ||
452 | 223 | 229 | ||
453 | 230 | class TestUniqueViolation(UniqueViolationTestCase): | ||
454 | 231 | """Detecting UNIQUE_VIOLATION failures.""" | ||
455 | 232 | |||
456 | 233 | def test_unique_violation_detectable_via_error_cause(self): | ||
457 | 234 | error = self.assertRaises( | ||
458 | 235 | IntegrityError, self.cause_unique_violation) | ||
459 | 236 | self.assertEqual( | ||
460 | 237 | UNIQUE_VIOLATION, error.__cause__.pgcode) | ||
461 | 238 | |||
462 | 239 | |||
463 | 224 | class TestGetPsycopg2Exception(MAASTestCase): | 240 | class TestGetPsycopg2Exception(MAASTestCase): |
464 | 225 | """Tests for `get_psycopg2_exception`.""" | 241 | """Tests for `get_psycopg2_exception`.""" |
465 | 226 | 242 | ||
466 | @@ -281,6 +297,25 @@ | |||
467 | 281 | get_psycopg2_deadlock_exception(exception)) | 297 | get_psycopg2_deadlock_exception(exception)) |
468 | 282 | 298 | ||
469 | 283 | 299 | ||
470 | 300 | class TestGetPsycopg2UniqueViolationException(MAASTestCase): | ||
471 | 301 | """Tests for `get_psycopg2_unique_violation_exception`.""" | ||
472 | 302 | |||
473 | 303 | def test__returns_None_for_plain_psycopg2_error(self): | ||
474 | 304 | exception = psycopg2.Error() | ||
475 | 305 | self.assertIsNone(get_psycopg2_unique_violation_exception(exception)) | ||
476 | 306 | |||
477 | 307 | def test__returns_None_for_other_error(self): | ||
478 | 308 | exception = factory.make_exception() | ||
479 | 309 | self.assertIsNone(get_psycopg2_unique_violation_exception(exception)) | ||
480 | 310 | |||
481 | 311 | def test__returns_psycopg2_error_root_cause(self): | ||
482 | 312 | exception = Exception() | ||
483 | 313 | exception.__cause__ = orm.UniqueViolation() | ||
484 | 314 | self.assertIs( | ||
485 | 315 | exception.__cause__, | ||
486 | 316 | get_psycopg2_unique_violation_exception(exception)) | ||
487 | 317 | |||
488 | 318 | |||
489 | 284 | class TestIsSerializationFailure(SerializationFailureTestCase): | 319 | class TestIsSerializationFailure(SerializationFailureTestCase): |
490 | 285 | """Tests relating to MAAS's use of SERIALIZABLE isolation.""" | 320 | """Tests relating to MAAS's use of SERIALIZABLE isolation.""" |
491 | 286 | 321 | ||
492 | @@ -340,6 +375,36 @@ | |||
493 | 340 | self.assertFalse(is_deadlock_failure(error)) | 375 | self.assertFalse(is_deadlock_failure(error)) |
494 | 341 | 376 | ||
495 | 342 | 377 | ||
496 | 378 | class TestIsUniqueViolation(UniqueViolationTestCase): | ||
497 | 379 | """Tests relating to MAAS's identification of unique violations.""" | ||
498 | 380 | |||
499 | 381 | def test_detects_integrity_error_with_matching_cause(self): | ||
500 | 382 | error = self.assertRaises( | ||
501 | 383 | IntegrityError, self.cause_unique_violation) | ||
502 | 384 | self.assertTrue(is_unique_violation(error)) | ||
503 | 385 | |||
504 | 386 | def test_rejects_integrity_error_without_matching_cause(self): | ||
505 | 387 | error = IntegrityError() | ||
506 | 388 | cause = self.patch(error, "__cause__", Exception()) | ||
507 | 389 | cause.pgcode = factory.make_name("pgcode") | ||
508 | 390 | self.assertFalse(is_unique_violation(error)) | ||
509 | 391 | |||
510 | 392 | def test_rejects_integrity_error_with_unrelated_cause(self): | ||
511 | 393 | error = IntegrityError() | ||
512 | 394 | error.__cause__ = Exception() | ||
513 | 395 | self.assertFalse(is_unique_violation(error)) | ||
514 | 396 | |||
515 | 397 | def test_rejects_integrity_error_without_cause(self): | ||
516 | 398 | error = IntegrityError() | ||
517 | 399 | self.assertFalse(is_unique_violation(error)) | ||
518 | 400 | |||
519 | 401 | def test_rejects_non_integrity_error_with_matching_cause(self): | ||
520 | 402 | error = factory.make_exception() | ||
521 | 403 | cause = self.patch(error, "__cause__", Exception()) | ||
522 | 404 | cause.pgcode = UNIQUE_VIOLATION | ||
523 | 405 | self.assertFalse(is_unique_violation(error)) | ||
524 | 406 | |||
525 | 407 | |||
526 | 343 | class TestIsRetryableFailure(MAASTestCase): | 408 | class TestIsRetryableFailure(MAASTestCase): |
527 | 344 | """Tests relating to MAAS's use of catching retryable failures.""" | 409 | """Tests relating to MAAS's use of catching retryable failures.""" |
528 | 345 | 410 | ||
529 | @@ -351,33 +416,58 @@ | |||
530 | 351 | error = orm.make_deadlock_failure() | 416 | error = orm.make_deadlock_failure() |
531 | 352 | self.assertTrue(is_retryable_failure(error)) | 417 | self.assertTrue(is_retryable_failure(error)) |
532 | 353 | 418 | ||
533 | 419 | def test_detects_unique_violation(self): | ||
534 | 420 | error = orm.make_unique_violation() | ||
535 | 421 | self.assertTrue(is_retryable_failure(error)) | ||
536 | 422 | |||
537 | 354 | def test_rejects_operational_error_without_matching_cause(self): | 423 | def test_rejects_operational_error_without_matching_cause(self): |
538 | 355 | error = OperationalError() | 424 | error = OperationalError() |
539 | 356 | cause = self.patch(error, "__cause__", Exception()) | 425 | cause = self.patch(error, "__cause__", Exception()) |
540 | 357 | cause.pgcode = factory.make_name("pgcode") | 426 | cause.pgcode = factory.make_name("pgcode") |
541 | 358 | self.assertFalse(is_retryable_failure(error)) | 427 | self.assertFalse(is_retryable_failure(error)) |
542 | 359 | 428 | ||
543 | 429 | def test_rejects_integrity_error_without_matching_cause(self): | ||
544 | 430 | error = IntegrityError() | ||
545 | 431 | cause = self.patch(error, "__cause__", Exception()) | ||
546 | 432 | cause.pgcode = factory.make_name("pgcode") | ||
547 | 433 | self.assertFalse(is_retryable_failure(error)) | ||
548 | 434 | |||
549 | 360 | def test_rejects_operational_error_with_unrelated_cause(self): | 435 | def test_rejects_operational_error_with_unrelated_cause(self): |
550 | 361 | error = OperationalError() | 436 | error = OperationalError() |
551 | 362 | error.__cause__ = Exception() | 437 | error.__cause__ = Exception() |
552 | 363 | self.assertFalse(is_retryable_failure(error)) | 438 | self.assertFalse(is_retryable_failure(error)) |
553 | 364 | 439 | ||
554 | 440 | def test_rejects_integrity_error_with_unrelated_cause(self): | ||
555 | 441 | error = IntegrityError() | ||
556 | 442 | error.__cause__ = Exception() | ||
557 | 443 | self.assertFalse(is_retryable_failure(error)) | ||
558 | 444 | |||
559 | 365 | def test_rejects_operational_error_without_cause(self): | 445 | def test_rejects_operational_error_without_cause(self): |
560 | 366 | error = OperationalError() | 446 | error = OperationalError() |
561 | 367 | self.assertFalse(is_retryable_failure(error)) | 447 | self.assertFalse(is_retryable_failure(error)) |
562 | 368 | 448 | ||
564 | 369 | def test_rejects_non_operational_error_with_cause_serialization(self): | 449 | def test_rejects_integrity_error_without_cause(self): |
565 | 450 | error = IntegrityError() | ||
566 | 451 | self.assertFalse(is_retryable_failure(error)) | ||
567 | 452 | |||
568 | 453 | def test_rejects_non_database_error_with_cause_serialization(self): | ||
569 | 370 | error = factory.make_exception() | 454 | error = factory.make_exception() |
570 | 371 | cause = self.patch(error, "__cause__", Exception()) | 455 | cause = self.patch(error, "__cause__", Exception()) |
571 | 372 | cause.pgcode = SERIALIZATION_FAILURE | 456 | cause.pgcode = SERIALIZATION_FAILURE |
572 | 373 | self.assertFalse(is_retryable_failure(error)) | 457 | self.assertFalse(is_retryable_failure(error)) |
573 | 374 | 458 | ||
575 | 375 | def test_rejects_non_operational_error_with_cause_deadlock(self): | 459 | def test_rejects_non_database_error_with_cause_deadlock(self): |
576 | 376 | error = factory.make_exception() | 460 | error = factory.make_exception() |
577 | 377 | cause = self.patch(error, "__cause__", Exception()) | 461 | cause = self.patch(error, "__cause__", Exception()) |
578 | 378 | cause.pgcode = DEADLOCK_DETECTED | 462 | cause.pgcode = DEADLOCK_DETECTED |
579 | 379 | self.assertFalse(is_retryable_failure(error)) | 463 | self.assertFalse(is_retryable_failure(error)) |
580 | 380 | 464 | ||
581 | 465 | def test_rejects_non_database_error_with_cause_unique_violation(self): | ||
582 | 466 | error = factory.make_exception() | ||
583 | 467 | cause = self.patch(error, "__cause__", Exception()) | ||
584 | 468 | cause.pgcode = UNIQUE_VIOLATION | ||
585 | 469 | self.assertFalse(is_retryable_failure(error)) | ||
586 | 470 | |||
587 | 381 | 471 | ||
588 | 382 | class TestRetryOnRetryableFailure(SerializationFailureTestCase): | 472 | class TestRetryOnRetryableFailure(SerializationFailureTestCase): |
589 | 383 | 473 | ||
590 | @@ -418,6 +508,36 @@ | |||
591 | 418 | self.assertEqual(sentinel.result, function_wrapped()) | 508 | self.assertEqual(sentinel.result, function_wrapped()) |
592 | 419 | self.assertThat(function, MockCallsMatch(call(), call())) | 509 | self.assertThat(function, MockCallsMatch(call(), call())) |
593 | 420 | 510 | ||
594 | 511 | def test_retries_on_unique_violation(self): | ||
595 | 512 | function = self.make_mock_function() | ||
596 | 513 | function.side_effect = orm.make_unique_violation() | ||
597 | 514 | function_wrapped = retry_on_retryable_failure(function) | ||
598 | 515 | self.assertRaises(IntegrityError, function_wrapped) | ||
599 | 516 | expected_calls = [call()] * 10 | ||
600 | 517 | self.assertThat(function, MockCallsMatch(*expected_calls)) | ||
601 | 518 | |||
602 | 519 | def test_retries_on_unique_violation_until_successful(self): | ||
603 | 520 | function = self.make_mock_function() | ||
604 | 521 | function.side_effect = [orm.make_unique_violation(), sentinel.result] | ||
605 | 522 | function_wrapped = retry_on_retryable_failure(function) | ||
606 | 523 | self.assertEqual(sentinel.result, function_wrapped()) | ||
607 | 524 | self.assertThat(function, MockCallsMatch(call(), call())) | ||
608 | 525 | |||
609 | 526 | def test_retries_on_retry_transaction(self): | ||
610 | 527 | function = self.make_mock_function() | ||
611 | 528 | function.side_effect = orm.RetryTransaction() | ||
612 | 529 | function_wrapped = retry_on_retryable_failure(function) | ||
613 | 530 | self.assertRaises(orm.TooManyRetries, function_wrapped) | ||
614 | 531 | expected_calls = [call()] * 10 | ||
615 | 532 | self.assertThat(function, MockCallsMatch(*expected_calls)) | ||
616 | 533 | |||
617 | 534 | def test_retries_on_retry_transaction_until_successful(self): | ||
618 | 535 | function = self.make_mock_function() | ||
619 | 536 | function.side_effect = [orm.RetryTransaction(), sentinel.result] | ||
620 | 537 | function_wrapped = retry_on_retryable_failure(function) | ||
621 | 538 | self.assertEqual(sentinel.result, function_wrapped()) | ||
622 | 539 | self.assertThat(function, MockCallsMatch(call(), call())) | ||
623 | 540 | |||
624 | 421 | def test_passes_args_to_wrapped_function(self): | 541 | def test_passes_args_to_wrapped_function(self): |
625 | 422 | function = lambda a, b: (a, b) | 542 | function = lambda a, b: (a, b) |
626 | 423 | function_wrapped = retry_on_retryable_failure(function) | 543 | function_wrapped = retry_on_retryable_failure(function) |
627 | @@ -450,19 +570,34 @@ | |||
628 | 450 | """Tests for `make_serialization_failure`.""" | 570 | """Tests for `make_serialization_failure`.""" |
629 | 451 | 571 | ||
630 | 452 | def test__makes_a_serialization_failure(self): | 572 | def test__makes_a_serialization_failure(self): |
632 | 453 | exception = make_serialization_failure() | 573 | exception = orm.make_serialization_failure() |
633 | 454 | self.assertThat(exception, MatchesPredicate( | 574 | self.assertThat(exception, MatchesPredicate( |
634 | 455 | is_serialization_failure, "%r is not a serialization failure.")) | 575 | is_serialization_failure, "%r is not a serialization failure.")) |
635 | 456 | 576 | ||
636 | 457 | 577 | ||
637 | 578 | class TestMakeDeadlockFailure(MAASTestCase): | ||
638 | 579 | """Tests for `make_deadlock_failure`.""" | ||
639 | 580 | |||
640 | 581 | def test__makes_a_deadlock_failure(self): | ||
641 | 582 | exception = orm.make_deadlock_failure() | ||
642 | 583 | self.assertThat(exception, MatchesPredicate( | ||
643 | 584 | is_deadlock_failure, "%r is not a deadlock failure.")) | ||
644 | 585 | |||
645 | 586 | |||
646 | 587 | class TestMakeUniqueViolation(MAASTestCase): | ||
647 | 588 | """Tests for `make_unique_violation`.""" | ||
648 | 589 | |||
649 | 590 | def test__makes_a_unique_violation(self): | ||
650 | 591 | exception = orm.make_unique_violation() | ||
651 | 592 | self.assertThat(exception, MatchesPredicate( | ||
652 | 593 | is_unique_violation, "%r is not a unique violation.")) | ||
653 | 594 | |||
654 | 595 | |||
655 | 458 | class TestRequestTransactionRetry(MAASTestCase): | 596 | class TestRequestTransactionRetry(MAASTestCase): |
656 | 459 | """Tests for `request_transaction_retry`.""" | 597 | """Tests for `request_transaction_retry`.""" |
657 | 460 | 598 | ||
663 | 461 | def test__raises_a_serialization_failure(self): | 599 | def test__raises_a_retry_transaction_exception(self): |
664 | 462 | exception = self.assertRaises( | 600 | self.assertRaises(orm.RetryTransaction, request_transaction_retry) |
660 | 463 | OperationalError, request_transaction_retry) | ||
661 | 464 | self.assertThat(exception, MatchesPredicate( | ||
662 | 465 | is_serialization_failure, "%r is not a serialization failure.")) | ||
665 | 466 | 601 | ||
666 | 467 | 602 | ||
667 | 468 | class TestGenRetryIntervals(MAASTestCase): | 603 | class TestGenRetryIntervals(MAASTestCase): |
Looks really good. The test case for creating the exception is awesome. Just some comments and 1 question.