Merge lp:~pbeaman/akiban-persistit/fix-1032701-interrupts-leave-latches into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 349
Merged at revision: 349
Proposed branch: lp:~pbeaman/akiban-persistit/fix-1032701-interrupts-leave-latches
Merge into: lp:akiban-persistit
Diff against target: 180 lines (+63/-23)
4 files modified
src/main/java/com/persistit/BufferPool.java (+13/-2)
src/main/java/com/persistit/Transaction.java (+10/-1)
src/main/java/com/persistit/TransactionIndex.java (+25/-19)
src/test/java/com/persistit/TransactionTest2.java (+15/-1)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-1032701-interrupts-leave-latches
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+118253@code.launchpad.net

Description of the change

Fixes bug 1032701 - Interrupt causes Thread to exit without releasing claims. This bug was caused by an interrupt occurring at a dangerous place in the code. Specifically, in the BufferPool#get(...) method, the try/finally around the actual reading of a page from disk contained a call to BufferPool#invalidate(Buffer). In the event the read failed (in the tested case, due to an interrupt) and then a second interrupt occurred in the invalidate method, the subsequent call to release() never happened.

This branch fixes this by ensuring that the invalidate(Buffer) method does not throw an interrupt. Because this is asserted by a change in the signature of the invalidate(Buffer) and because injecting timing conditions to test this is a bit awkward, there is no unit test for this. Also, there is no proof that no other code paths can cause a similar condition. However, I carefully reviewed all the finally{} blocks that could have the same effect, and also have verified that the test which originally detected the failure no longer fails.

Note that the invalidate() method polls the detach(Buffer) method, which is necessary to avoid deadlock. That it does so is not a change; the only change is that this loop is no longer interruptible.

Also changed are a couple of other items noticed while debugging this:

If a commit() fails due to an I/O failure or an interrupt, the intended action is to abort the transaction. That was being done correctly, but the _rollbackPending flag was not set, causing a spurious warning from the end() method such as:

[TransactionThread_344] WARNING Transaction neither committed nor rolled back Transaction_100000347 depth=1 status=<ts=3,480,654 tc=ABORTED mvv=1> owner=TransactionThread_344

(That the TransactionStatus is ABORTED is inconsistent with the warning.) The _rollbackPending and _rollbackCompleted flags are now managemed more correctly.

Also, an attempt to begin a nested transaction within an aborted transaction context now correctly issues an IllegalStateException. New unit tests verify this.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

Explanation and change make sense. Other fixes are good, too.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/main/java/com/persistit/BufferPool.java'
--- src/main/java/com/persistit/BufferPool.java 2012-08-02 04:45:28 +0000
+++ src/main/java/com/persistit/BufferPool.java 2012-08-04 19:33:20 +0000
@@ -613,11 +613,22 @@
613 return result;613 return result;
614 }614 }
615615
616 private void invalidate(Buffer buffer) throws PersistitInterruptedException {616 private void invalidate(Buffer buffer) {
617 Debug.$assert0.t(buffer.isValid() && buffer.isMine());617 Debug.$assert0.t(buffer.isValid() && buffer.isMine());
618618
619 while (!detach(buffer)) {619 while (!detach(buffer)) {
620 Util.spinSleep();620 //
621 // Spin until detach succeeds. Note: this method must not throw an Exception
622 // because it is called in at at critical time when cleanup must be done.
623 // It is not possible to lock the hash bucket here due to possible deadlock.
624 // However, the likelihood of a lengthy live-lock is infinitesimal so polling
625 // is acceptable.
626 //
627 try {
628 Thread.sleep(1);
629 } catch (InterruptedException ie) {
630 // ignore
631 }
621 }632 }
622 buffer.clearValid();633 buffer.clearValid();
623 buffer.clearDirty();634 buffer.clearDirty();
624635
=== modified file 'src/main/java/com/persistit/Transaction.java'
--- src/main/java/com/persistit/Transaction.java 2012-08-02 04:45:28 +0000
+++ src/main/java/com/persistit/Transaction.java 2012-08-04 19:33:20 +0000
@@ -586,11 +586,15 @@
586 if (_commitCompleted) {586 if (_commitCompleted) {
587 throw new IllegalStateException("Attempt to begin a committed transaction " + this);587 throw new IllegalStateException("Attempt to begin a committed transaction " + this);
588 }588 }
589 if (_rollbackPending) {
590 throw new IllegalStateException("Attempt to begin a transaction with pending rollback" + this);
591 }
589 if (_nestedDepth == 0) {592 if (_nestedDepth == 0) {
590 flushTransactionBuffer(false);593 flushTransactionBuffer(false);
591 try {594 try {
592 _transactionStatus = _persistit.getTransactionIndex().registerTransaction();595 _transactionStatus = _persistit.getTransactionIndex().registerTransaction();
593 } catch (InterruptedException e) {596 } catch (InterruptedException e) {
597 _rollbackCompleted = true;
594 throw new PersistitInterruptedException(e);598 throw new PersistitInterruptedException(e);
595 }599 }
596 _rollbackPending = false;600 _rollbackPending = false;
@@ -609,11 +613,15 @@
609 if (_commitCompleted) {613 if (_commitCompleted) {
610 throw new IllegalStateException("Attempt to begin a committed transaction " + this);614 throw new IllegalStateException("Attempt to begin a committed transaction " + this);
611 }615 }
616 if (_rollbackPending) {
617 throw new IllegalStateException("Attmpet to begin a transaction with pending rollback" + this);
618 }
612 if (_nestedDepth == 0) {619 if (_nestedDepth == 0) {
613 flushTransactionBuffer(false);620 flushTransactionBuffer(false);
614 try {621 try {
615 _transactionStatus = _persistit.getTransactionIndex().registerCheckpointTransaction();622 _transactionStatus = _persistit.getTransactionIndex().registerCheckpointTransaction();
616 } catch (InterruptedException e) {623 } catch (InterruptedException e) {
624 _rollbackCompleted = true;
617 throw new PersistitInterruptedException(e);625 throw new PersistitInterruptedException(e);
618 }626 }
619 _rollbackPending = false;627 _rollbackPending = false;
@@ -854,10 +862,11 @@
854 policy == CommitPolicy.GROUP ? _persistit.getTransactionCommitStallTime() : 0);862 policy == CommitPolicy.GROUP ? _persistit.getTransactionCommitStallTime() : 0);
855 committed = true;863 committed = true;
856 } finally {864 } finally {
865
857 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,866 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,
858 committed ? _commitTimestamp : TransactionStatus.ABORTED);867 committed ? _commitTimestamp : TransactionStatus.ABORTED);
859 _commitCompleted = committed;868 _commitCompleted = committed;
860 _rollbackCompleted = !committed;869 _rollbackPending = _rollbackCompleted = !committed;
861 }870 }
862 }871 }
863 }872 }
864873
=== modified file 'src/main/java/com/persistit/TransactionIndex.java'
--- src/main/java/com/persistit/TransactionIndex.java 2012-08-02 04:45:28 +0000
+++ src/main/java/com/persistit/TransactionIndex.java 2012-08-04 19:33:20 +0000
@@ -1073,8 +1073,9 @@
1073 }1073 }
1074 bucket.checkpointAccumulatorSnapshots(timestamp);1074 bucket.checkpointAccumulatorSnapshots(timestamp);
1075 for (final Accumulator accumulator : accumulators) {1075 for (final Accumulator accumulator : accumulators) {
1076 accumulator.setCheckpointValueAndTimestamp(accumulator.applyValue(accumulator1076 accumulator.setCheckpointValueAndTimestamp(
1077 .getCheckpointValue(), accumulator.getCheckpointTemp()), timestamp);1077 accumulator.applyValue(accumulator.getCheckpointValue(),
1078 accumulator.getCheckpointTemp()), timestamp);
1078 }1079 }
1079 } catch (RetryException e) {1080 } catch (RetryException e) {
1080 again = true;1081 again = true;
@@ -1086,11 +1087,12 @@
1086 }1087 }
10871088
1088 /**1089 /**
1089 * Create and return a brand new delta associated with the given1090 * Create and return a brand new delta associated with the given status.
1090 * status. Note that it is completely uninitialized and always1091 * Note that it is completely uninitialized and always allocated from the
1091 * allocated from the bucket.1092 * bucket.
1092 *1093 *
1093 * @param status Status to add the delta to.1094 * @param status
1095 * Status to add the delta to.
1094 * @return The new Delta.1096 * @return The new Delta.
1095 */1097 */
1096 Delta addDelta(final TransactionStatus status) {1098 Delta addDelta(final TransactionStatus status) {
@@ -1107,18 +1109,22 @@
1107 }1109 }
11081110
1109 /**1111 /**
1110 * Create, or combine, new delta information for the given status.1112 * Create, or combine, new delta information for the given status. This
1111 * This method attempts to find a compatible delta1113 * method attempts to find a compatible delta (see
1112 * (see {@link Delta#canMerge(Accumulator, int)}) to combine with1114 * {@link Delta#canMerge(Accumulator, int)}) to combine with before
1113 * before allocating a new one. If one is not found,1115 * allocating a new one. If one is not found,
1114 * {@link #addDelta(TransactionStatus)} is called and initialized1116 * {@link #addDelta(TransactionStatus)} is called and initialized before
1115 * before returning.1117 * returning.
1116 *1118 *
1117 * @param status Status to add, or combine, delta to.1119 * @param status
1118 * @param accumulator Accumulator being modified.1120 * Status to add, or combine, delta to.
1119 * @param step Step value of modification.1121 * @param accumulator
1120 * @param value The value to add or combine.1122 * Accumulator being modified.
1121 *1123 * @param step
1124 * Step value of modification.
1125 * @param value
1126 * The value to add or combine.
1127 *
1122 * @return Delta that was created or modified.1128 * @return Delta that was created or modified.
1123 */1129 */
1124 Delta addOrCombineDelta(TransactionStatus status, Accumulator accumulator, int step, long value) {1130 Delta addOrCombineDelta(TransactionStatus status, Accumulator accumulator, int step, long value) {
11251131
=== modified file 'src/test/java/com/persistit/TransactionTest2.java'
--- src/test/java/com/persistit/TransactionTest2.java 2012-08-02 04:45:28 +0000
+++ src/test/java/com/persistit/TransactionTest2.java 2012-08-04 19:33:20 +0000
@@ -48,7 +48,7 @@
4848
49 final static CommitPolicy policy = CommitPolicy.SOFT;49 final static CommitPolicy policy = CommitPolicy.SOFT;
5050
51 final static long TIMEOUT = 20000; // seconds51 final static long TIMEOUT = 20000; // 20 seconds
5252
53 static int _threadCount = 8;53 static int _threadCount = 8;
54 static int _iterationsPerThread = 25000;54 static int _iterationsPerThread = 25000;
@@ -222,6 +222,20 @@
222 assertTrue("ATC has very old transaction",222 assertTrue("ATC has very old transaction",
223 ti.getActiveTransactionCeiling() - ti.getActiveTransactionFloor() < 10000);223 ti.getActiveTransactionCeiling() - ti.getActiveTransactionFloor() < 10000);
224 }224 }
225
226 @Test(expected = IllegalStateException.class)
227 public void illegalStateExceptionOnRollback() throws Exception {
228 Transaction txn = _persistit.getTransaction();
229 txn.rollback();
230 txn.begin();
231 }
232
233 @Test(expected = IllegalStateException.class)
234 public void illegalStateExceptionOnCommit() throws Exception {
235 Transaction txn = _persistit.getTransaction();
236 txn.commit();
237 txn.begin();
238 }
225239
226 public void runIt() {240 public void runIt() {
227 try {241 try {

Subscribers

People subscribed via source and target branches