Merge lp:~pbeaman/akiban-persistit/fix-1032701-interrupts-leave-latches into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 349
Merged at revision: 349
Proposed branch: lp:~pbeaman/akiban-persistit/fix-1032701-interrupts-leave-latches
Merge into: lp:akiban-persistit
Diff against target: 180 lines (+63/-23)
4 files modified
src/main/java/com/persistit/BufferPool.java (+13/-2)
src/main/java/com/persistit/Transaction.java (+10/-1)
src/main/java/com/persistit/TransactionIndex.java (+25/-19)
src/test/java/com/persistit/TransactionTest2.java (+15/-1)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-1032701-interrupts-leave-latches
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+118253@code.launchpad.net

Description of the change

Fixes bug 1032701 - Interrupt causes Thread to exit without releasing claims. This bug was caused by an interrupt occurring at a dangerous place in the code. Specifically, in the BufferPool#get(...) method, the try/finally around the actual reading of a page from disk contained a call to BufferPool#invalidate(Buffer). In the event the read failed (in the tested case, due to an interrupt) and then a second interrupt occurred in the invalidate method, the subsequent call to release() never happened.

This branch fixes this by ensuring that the invalidate(Buffer) method does not throw an interrupt. Because this is asserted by a change in the signature of the invalidate(Buffer) and because injecting timing conditions to test this is a bit awkward, there is no unit test for this. Also, there is no proof that no other code paths can cause a similar condition. However, I carefully reviewed all the finally{} blocks that could have the same effect, and also have verified that the test which originally detected the failure no longer fails.

Note that the invalidate() method polls the detach(Buffer) method, which is necessary to avoid deadlock. That it does so is not a change; the only change is that this loop is no longer interruptible.

Also changed are a couple of other items noticed while debugging this:

If a commit() fails due to an I/O failure or an interrupt, the intended action is to abort the transaction. That was being done correctly, but the _rollbackPending flag was not set, causing a spurious warning from the end() method such as:

[TransactionThread_344] WARNING Transaction neither committed nor rolled back Transaction_100000347 depth=1 status=<ts=3,480,654 tc=ABORTED mvv=1> owner=TransactionThread_344

(That the TransactionStatus is ABORTED is inconsistent with the warning.) The _rollbackPending and _rollbackCompleted flags are now managemed more correctly.

Also, an attempt to begin a nested transaction within an aborted transaction context now correctly issues an IllegalStateException. New unit tests verify this.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

Explanation and change make sense. Other fixes are good, too.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/BufferPool.java'
2--- src/main/java/com/persistit/BufferPool.java 2012-08-02 04:45:28 +0000
3+++ src/main/java/com/persistit/BufferPool.java 2012-08-04 19:33:20 +0000
4@@ -613,11 +613,22 @@
5 return result;
6 }
7
8- private void invalidate(Buffer buffer) throws PersistitInterruptedException {
9+ private void invalidate(Buffer buffer) {
10 Debug.$assert0.t(buffer.isValid() && buffer.isMine());
11
12 while (!detach(buffer)) {
13- Util.spinSleep();
14+ //
15+ // Spin until detach succeeds. Note: this method must not throw an Exception
16+ // because it is called in at at critical time when cleanup must be done.
17+ // It is not possible to lock the hash bucket here due to possible deadlock.
18+ // However, the likelihood of a lengthy live-lock is infinitesimal so polling
19+ // is acceptable.
20+ //
21+ try {
22+ Thread.sleep(1);
23+ } catch (InterruptedException ie) {
24+ // ignore
25+ }
26 }
27 buffer.clearValid();
28 buffer.clearDirty();
29
30=== modified file 'src/main/java/com/persistit/Transaction.java'
31--- src/main/java/com/persistit/Transaction.java 2012-08-02 04:45:28 +0000
32+++ src/main/java/com/persistit/Transaction.java 2012-08-04 19:33:20 +0000
33@@ -586,11 +586,15 @@
34 if (_commitCompleted) {
35 throw new IllegalStateException("Attempt to begin a committed transaction " + this);
36 }
37+ if (_rollbackPending) {
38+ throw new IllegalStateException("Attempt to begin a transaction with pending rollback" + this);
39+ }
40 if (_nestedDepth == 0) {
41 flushTransactionBuffer(false);
42 try {
43 _transactionStatus = _persistit.getTransactionIndex().registerTransaction();
44 } catch (InterruptedException e) {
45+ _rollbackCompleted = true;
46 throw new PersistitInterruptedException(e);
47 }
48 _rollbackPending = false;
49@@ -609,11 +613,15 @@
50 if (_commitCompleted) {
51 throw new IllegalStateException("Attempt to begin a committed transaction " + this);
52 }
53+ if (_rollbackPending) {
54+ throw new IllegalStateException("Attmpet to begin a transaction with pending rollback" + this);
55+ }
56 if (_nestedDepth == 0) {
57 flushTransactionBuffer(false);
58 try {
59 _transactionStatus = _persistit.getTransactionIndex().registerCheckpointTransaction();
60 } catch (InterruptedException e) {
61+ _rollbackCompleted = true;
62 throw new PersistitInterruptedException(e);
63 }
64 _rollbackPending = false;
65@@ -854,10 +862,11 @@
66 policy == CommitPolicy.GROUP ? _persistit.getTransactionCommitStallTime() : 0);
67 committed = true;
68 } finally {
69+
70 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,
71 committed ? _commitTimestamp : TransactionStatus.ABORTED);
72 _commitCompleted = committed;
73- _rollbackCompleted = !committed;
74+ _rollbackPending = _rollbackCompleted = !committed;
75 }
76 }
77 }
78
79=== modified file 'src/main/java/com/persistit/TransactionIndex.java'
80--- src/main/java/com/persistit/TransactionIndex.java 2012-08-02 04:45:28 +0000
81+++ src/main/java/com/persistit/TransactionIndex.java 2012-08-04 19:33:20 +0000
82@@ -1073,8 +1073,9 @@
83 }
84 bucket.checkpointAccumulatorSnapshots(timestamp);
85 for (final Accumulator accumulator : accumulators) {
86- accumulator.setCheckpointValueAndTimestamp(accumulator.applyValue(accumulator
87- .getCheckpointValue(), accumulator.getCheckpointTemp()), timestamp);
88+ accumulator.setCheckpointValueAndTimestamp(
89+ accumulator.applyValue(accumulator.getCheckpointValue(),
90+ accumulator.getCheckpointTemp()), timestamp);
91 }
92 } catch (RetryException e) {
93 again = true;
94@@ -1086,11 +1087,12 @@
95 }
96
97 /**
98- * Create and return a brand new delta associated with the given
99- * status. Note that it is completely uninitialized and always
100- * allocated from the bucket.
101- *
102- * @param status Status to add the delta to.
103+ * Create and return a brand new delta associated with the given status.
104+ * Note that it is completely uninitialized and always allocated from the
105+ * bucket.
106+ *
107+ * @param status
108+ * Status to add the delta to.
109 * @return The new Delta.
110 */
111 Delta addDelta(final TransactionStatus status) {
112@@ -1107,18 +1109,22 @@
113 }
114
115 /**
116- * Create, or combine, new delta information for the given status.
117- * This method attempts to find a compatible delta
118- * (see {@link Delta#canMerge(Accumulator, int)}) to combine with
119- * before allocating a new one. If one is not found,
120- * {@link #addDelta(TransactionStatus)} is called and initialized
121- * before returning.
122- *
123- * @param status Status to add, or combine, delta to.
124- * @param accumulator Accumulator being modified.
125- * @param step Step value of modification.
126- * @param value The value to add or combine.
127- *
128+ * Create, or combine, new delta information for the given status. This
129+ * method attempts to find a compatible delta (see
130+ * {@link Delta#canMerge(Accumulator, int)}) to combine with before
131+ * allocating a new one. If one is not found,
132+ * {@link #addDelta(TransactionStatus)} is called and initialized before
133+ * returning.
134+ *
135+ * @param status
136+ * Status to add, or combine, delta to.
137+ * @param accumulator
138+ * Accumulator being modified.
139+ * @param step
140+ * Step value of modification.
141+ * @param value
142+ * The value to add or combine.
143+ *
144 * @return Delta that was created or modified.
145 */
146 Delta addOrCombineDelta(TransactionStatus status, Accumulator accumulator, int step, long value) {
147
148=== modified file 'src/test/java/com/persistit/TransactionTest2.java'
149--- src/test/java/com/persistit/TransactionTest2.java 2012-08-02 04:45:28 +0000
150+++ src/test/java/com/persistit/TransactionTest2.java 2012-08-04 19:33:20 +0000
151@@ -48,7 +48,7 @@
152
153 final static CommitPolicy policy = CommitPolicy.SOFT;
154
155- final static long TIMEOUT = 20000; // seconds
156+ final static long TIMEOUT = 20000; // 20 seconds
157
158 static int _threadCount = 8;
159 static int _iterationsPerThread = 25000;
160@@ -222,6 +222,20 @@
161 assertTrue("ATC has very old transaction",
162 ti.getActiveTransactionCeiling() - ti.getActiveTransactionFloor() < 10000);
163 }
164+
165+ @Test(expected = IllegalStateException.class)
166+ public void illegalStateExceptionOnRollback() throws Exception {
167+ Transaction txn = _persistit.getTransaction();
168+ txn.rollback();
169+ txn.begin();
170+ }
171+
172+ @Test(expected = IllegalStateException.class)
173+ public void illegalStateExceptionOnCommit() throws Exception {
174+ Transaction txn = _persistit.getTransaction();
175+ txn.commit();
176+ txn.begin();
177+ }
178
179 public void runIt() {
180 try {

Subscribers

People subscribed via source and target branches