Merge lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status into lp:akiban-persistit

Proposed by Nathan Williams
Status: Merged
Approved by: Peter Beaman
Approved revision: 428
Merged at revision: 423
Proposed branch: lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status
Merge into: lp:akiban-persistit
Diff against target: 308 lines (+174/-31)
5 files modified
src/main/java/com/persistit/Persistit.java (+12/-26)
src/main/java/com/persistit/Transaction.java (+1/-0)
src/main/java/com/persistit/TransactionIndexBucket.java (+3/-5)
src/main/java/com/persistit/TransactionStatus.java (+28/-0)
src/test/java/com/persistit/TransactionAbandonedTest.java (+130/-0)
To merge this branch: bzr merge lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status
Reviewer Review Type Date Requested Status
Nathan Williams Needs Resubmitting
Akiban Build User Needs Fixing
Review via email: mp+149643@code.launchpad.net

Description of the change

Fix closing of transactions from threads that have been abandoned.

If a thread is abandoned with an open transaction the wwLock member of TransactionStatus will be in a locked state. See associated bug for more details.

This isn't easily recoverable as the thread holding the lock is now gone. The only reason it needs to be unlocked is for re-use, mark it as abandoned and throw it away when it is no longer needed.

Minor cleanup and helper usage to eliminate duplicate code.

Includes simple test case and one requiring MVV cleanup.

To post a comment you must log in.
Revision history for this message
Peter Beaman (pbeaman) wrote :
Download full text (12.3 KiB)

From cell phone, so it's hard to Approve. But it looks fine to me so please
go ahead. Alternatively I was thinking about replacing the lock object and
allowing the ts to be recycled. Either way is fine with me. Good detective
work!
On Feb 20, 2013 12:38 PM, "Nathan Williams" <email address hidden> wrote:

> Nathan Williams has proposed merging
> lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status into
> lp:akiban-persistit.
>
> Requested reviews:
> Akiban Technologies (akiban-technologies)
> Related bugs:
> Bug #1126297 in Akiban Persistit: "Assertion failure in
> TransactionIndexBucket#allocateTransactionStatus"
> https://bugs.launchpad.net/akiban-persistit/+bug/1126297
>
> For more details, see:
>
> https://code.launchpad.net/~nwilliams/akiban-persistit/handle-abandoned-txn-status/+merge/149643
>
> Fix closing of transactions from threads that have been abandoned.
>
> If a thread is abandoned with an open transaction the wwLock member of
> TransactionStatus will be in a locked state. See associated bug for more
> details.
>
> This isn't easily recoverable as the thread holding the lock is now gone.
> The only reason it needs to be unlocked is for re-use, mark it as abandoned
> and throw it away when it is no longer needed.
>
> Minor cleanup and helper usage to eliminate duplicate code.
>
> Includes simple test case and one requiring MVV cleanup.
> --
>
> https://code.launchpad.net/~nwilliams/akiban-persistit/handle-abandoned-txn-status/+merge/149643
> Your team Akiban Technologies is requested to review the proposed merge of
> lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status into
> lp:akiban-persistit.
>
> === modified file 'src/main/java/com/persistit/Persistit.java'
> --- src/main/java/com/persistit/Persistit.java 2013-02-15 15:39:42 +0000
> +++ src/main/java/com/persistit/Persistit.java 2013-02-20 17:37:24 +0000
> @@ -1496,26 +1496,12 @@
> return _bufferPoolTable;
> }
>
> + /**
> + * Remove any sessions that have expired and close transactions
> associated
> + * with them. Also flush statistics for all known volumes.
> + */
> void cleanup() {
> - final Set<SessionId> sessionIds;
> - synchronized (_transactionSessionMap) {
> - sessionIds = new
> HashSet<SessionId>(_transactionSessionMap.keySet());
> - }
> - for (final SessionId sessionId : sessionIds) {
> - if (!sessionId.isAlive()) {
> - Transaction transaction = null;
> - synchronized (_transactionSessionMap) {
> - transaction =
> _transactionSessionMap.remove(sessionId);
> - }
> - if (transaction != null) {
> - try {
> - transaction.close();
> - } catch (final PersistitException e) {
> - _logBase.exception.log(e);
> - }
> - }
> - }
> - }
> + closeZombieTransactions();
> final List<Volume> volumes;
> synchronized (this) {
> volumes = new ArrayList<Volume>(_volumes);
>
> === modified file 'src/main/java/com/persistit/Transaction.jav...

Revision history for this message
Nathan Williams (nwilliams) wrote :

I considered that, but the implications regarding other transactions were daunting (e.g. what if someone was waiting for ww-dependency check). That, coupled with the abandonment being rare (i.e. loss of status not a big deal) lead me to this solution.

I'm happy to discuss it further. I don't think there is a huge rush, either, as the customer has a workaround.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Looks good. Approving.

Revision history for this message
Akiban Build User (build-akiban) wrote :

There were 2 failures during build/test:

* job persistit-build failed at build number 544: http://172.16.20.104:8080/job/persistit-build/544/

* view must-pass failed: persistit-build is aborted

review: Needs Fixing
Revision history for this message
Nathan Williams (nwilliams) wrote :

Unrelated.

Revision history for this message
Akiban Build User (build-akiban) wrote :

There were 2 failures during build/test:

* job persistit-build failed at build number 546: http://172.16.20.104:8080/job/persistit-build/546/

* view must-pass failed: persistit-build is aborted

review: Needs Fixing
428. By Nathan Williams

Add parameter to refactored helper

Revision history for this message
Nathan Williams (nwilliams) wrote :

The closeZombieTransactions() helper actually removed all sessions from the map, which of course we don't want to do from cleanup. Add a boolean parameter so both usages can be served.

review: Needs Resubmitting
Revision history for this message
Peter Beaman (pbeaman) wrote :

Oops. I didn't notice before that this proposal was still pending. Looks good - Approving.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/Persistit.java'
2--- src/main/java/com/persistit/Persistit.java 2013-02-15 15:39:42 +0000
3+++ src/main/java/com/persistit/Persistit.java 2013-02-26 17:25:24 +0000
4@@ -1496,26 +1496,12 @@
5 return _bufferPoolTable;
6 }
7
8+ /**
9+ * Remove any sessions that have expired and close transactions associated
10+ * with them. Also flush statistics for all known volumes.
11+ */
12 void cleanup() {
13- final Set<SessionId> sessionIds;
14- synchronized (_transactionSessionMap) {
15- sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
16- }
17- for (final SessionId sessionId : sessionIds) {
18- if (!sessionId.isAlive()) {
19- Transaction transaction = null;
20- synchronized (_transactionSessionMap) {
21- transaction = _transactionSessionMap.remove(sessionId);
22- }
23- if (transaction != null) {
24- try {
25- transaction.close();
26- } catch (final PersistitException e) {
27- _logBase.exception.log(e);
28- }
29- }
30- }
31- }
32+ closeZombieTransactions(false);
33 final List<Volume> volumes;
34 synchronized (this) {
35 volumes = new ArrayList<Volume>(_volumes);
36@@ -1676,7 +1662,7 @@
37 waitForIOTaskStop(task);
38
39 interruptActiveThreads(SHORT_DELAY);
40- closeZombieTransactions();
41+ closeZombieTransactions(true);
42
43 for (final Volume volume : volumes) {
44 volume.close();
45@@ -1695,17 +1681,17 @@
46 releaseAllResources();
47 }
48
49- private void closeZombieTransactions() {
50+ private void closeZombieTransactions(boolean removeAllSessions) {
51 final Set<SessionId> sessionIds;
52 synchronized (_transactionSessionMap) {
53 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
54 }
55 for (final SessionId sessionId : sessionIds) {
56- Transaction transaction = null;
57- synchronized (_transactionSessionMap) {
58- transaction = _transactionSessionMap.remove(sessionId);
59- }
60- if (!sessionId.isAlive()) {
61+ if (!sessionId.isAlive() || removeAllSessions) {
62+ Transaction transaction = null;
63+ synchronized (_transactionSessionMap) {
64+ transaction = _transactionSessionMap.remove(sessionId);
65+ }
66 if (transaction != null) {
67 try {
68 transaction.close();
69
70=== modified file 'src/main/java/com/persistit/Transaction.java'
71--- src/main/java/com/persistit/Transaction.java 2013-02-01 11:46:13 +0000
72+++ src/main/java/com/persistit/Transaction.java 2013-02-26 17:25:24 +0000
73@@ -508,6 +508,7 @@
74 if (_nestedDepth > 0 && !_commitCompleted && !_rollbackCompleted) {
75 final TransactionStatus ts = _transactionStatus;
76 if (ts != null && ts.getTs() == _startTimestamp && !_commitCompleted && !_rollbackCompleted) {
77+ _transactionStatus.markAbandoned();
78 rollback();
79 _persistit.getLogBase().txnAbandoned.log(this);
80 }
81
82=== modified file 'src/main/java/com/persistit/TransactionIndexBucket.java'
83--- src/main/java/com/persistit/TransactionIndexBucket.java 2013-02-18 13:57:21 +0000
84+++ src/main/java/com/persistit/TransactionIndexBucket.java 2013-02-26 17:25:24 +0000
85@@ -258,8 +258,7 @@
86 if (ts >= getFloor()) {
87 for (TransactionStatus s = getCurrent(); s != null; s = s.getNext()) {
88 if (s == status) {
89- s.complete(timestamp);
90- status.wwUnlock();
91+ status.completeAndUnlock(timestamp);
92 if (s.getTs() == getFloor() || hasFloorMoved()) {
93 reduce();
94 }
95@@ -272,8 +271,7 @@
96 if (s == status) {
97 final TransactionStatus next = s.getNext();
98 assert s.getTc() != UNCOMMITTED;
99- s.complete(timestamp);
100- status.wwUnlock();
101+ status.completeAndUnlock(timestamp);
102 boolean moved = false;
103 if (s.getTc() == ABORTED) {
104 aggregate(s, false);
105@@ -610,7 +608,7 @@
106 * May be held by another thread briefly while status being checked
107 */
108 assert !status.isHeldByCurrentThread();
109- if (_freeCount < _transactionIndex.getMaxFreeListSize()) {
110+ if (_freeCount < _transactionIndex.getMaxFreeListSize() && !status.isAbandoned()) {
111 status.setNext(_free);
112 _free = status;
113 _freeCount++;
114
115=== modified file 'src/main/java/com/persistit/TransactionStatus.java'
116--- src/main/java/com/persistit/TransactionStatus.java 2013-02-18 13:57:21 +0000
117+++ src/main/java/com/persistit/TransactionStatus.java 2013-02-26 17:25:24 +0000
118@@ -124,6 +124,12 @@
119 */
120 private volatile boolean _notified;
121
122+ /**
123+ * Indicates whether the owning transaction had been abandoned. This status
124+ * may be in a locked state and is not available for re-use.
125+ */
126+ private volatile boolean _abandoned;
127+
128 TransactionStatus(final TransactionIndexBucket bucket) {
129 _bucket = bucket;
130 }
131@@ -244,6 +250,13 @@
132 _notified = true;
133 }
134
135+ void completeAndUnlock(final long timestamp) {
136+ complete(timestamp);
137+ if (!isAbandoned()) {
138+ wwUnlock();
139+ }
140+ }
141+
142 boolean isLocked() {
143 return _wwLock.isLocked();
144 }
145@@ -374,6 +387,7 @@
146 * Release the lock acquired by {@link #wwLock(long)}.
147 */
148 void wwUnlock() {
149+ assert !isAbandoned() : "Attempt to unlock abandoned: " + this;
150 _wwLock.unlock();
151 }
152
153@@ -409,6 +423,20 @@
154 _notified = true;
155 }
156
157+ /**
158+ * Make this status as abandoned. This will prevent its unlock and re-use.
159+ */
160+ void markAbandoned() {
161+ _abandoned = true;
162+ }
163+
164+ /**
165+ * @return <code>true</code> if this status has been abandoned.
166+ */
167+ boolean isAbandoned() {
168+ return _abandoned;
169+ }
170+
171 @Override
172 public String toString() {
173 return String.format("<ts=%,d tc=%s mvv=%,d>", _ts, tcString(_tc), _mvvCount.get());
174
175=== added file 'src/test/java/com/persistit/TransactionAbandonedTest.java'
176--- src/test/java/com/persistit/TransactionAbandonedTest.java 1970-01-01 00:00:00 +0000
177+++ src/test/java/com/persistit/TransactionAbandonedTest.java 2013-02-26 17:25:24 +0000
178@@ -0,0 +1,130 @@
179+/**
180+ * Copyright © 2005-2013 Akiban Technologies, Inc. All rights reserved.
181+ *
182+ * This program and the accompanying materials are made available
183+ * under the terms of the Eclipse Public License v1.0 which
184+ * accompanies this distribution, and is available at
185+ * http://www.eclipse.org/legal/epl-v10.html
186+ *
187+ * This program may also be available under different license terms.
188+ * For more information, see www.akiban.com or contact licensing@akiban.com.
189+ *
190+ * Contributors:
191+ * Akiban Technologies, Inc.
192+ */
193+
194+package com.persistit;
195+
196+import com.persistit.exception.PersistitException;
197+import com.persistit.unit.ConcurrentUtil;
198+import com.persistit.unit.UnitTestProperties;
199+import org.junit.Before;
200+import org.junit.Test;
201+
202+import static org.junit.Assert.assertEquals;
203+
204+/**
205+ * <p>
206+ * Inspired by bug1126297: Assertion failure in
207+ * TransactionIndexBucket#allocateTransactionStatus
208+ * </p>
209+ * <p>
210+ * The symptom was the bug (a locked TransactionStatus on the free list) but the
211+ * cause was mishandling of abandoned transactions from the
212+ * {@link Persistit#cleanup()} method.
213+ * </p>
214+ * <p>
215+ * When attempting to rollback the abandoned transaction, the status was
216+ * notified and then unlocked. Since the lock was held by a now dead thread, an
217+ * IllegalMonitorStateException occurred. It was then put on the free list
218+ * during the next cleanup of that bucket since it had been notified.
219+ * </p>
220+ */
221+public class TransactionAbandonedTest extends PersistitUnitTestCase {
222+ private static final String TREE = TransactionAbandonedTest.class.getSimpleName();
223+ private static final int KEY_START = 1;
224+ private static final int KEY_RANGE = 10;
225+ private static final long MAX_TIMEOUT_MS = 10 * 1000;
226+
227+ private static class TxnAbandoner extends ConcurrentUtil.ThrowingRunnable {
228+ private final Persistit persistit;
229+ private final boolean doRead;
230+ private final boolean doWrite;
231+
232+ public TxnAbandoner(Persistit persistit, boolean doRead, boolean doWrite) {
233+ this.persistit = persistit;
234+ this.doRead = doRead;
235+ this.doWrite = doWrite;
236+ }
237+
238+ @Override
239+ public void run() throws PersistitException {
240+ Transaction txn = persistit.getTransaction();
241+ txn.begin();
242+ if (doRead) {
243+ assertEquals("Traverse count", KEY_RANGE, scanAndCount(getExchange(persistit)));
244+ }
245+ if (doWrite) {
246+ loadData(persistit, KEY_START + KEY_RANGE, KEY_RANGE);
247+ }
248+ }
249+ }
250+
251+ private static Exchange getExchange(Persistit persistit) throws PersistitException {
252+ return persistit.getExchange(UnitTestProperties.VOLUME_NAME, TREE, true);
253+ }
254+
255+ private static void loadData(Persistit persistit, int keyOffset, int count) throws PersistitException {
256+ Exchange ex = getExchange(persistit);
257+ for (int i = 0; i < count; ++i) {
258+ ex.clear().append(keyOffset + i).store();
259+ }
260+ }
261+
262+ private static int scanAndCount(Exchange ex) throws PersistitException {
263+ ex.clear().append(Key.BEFORE);
264+ int saw = 0;
265+ while (ex.next()) {
266+ ++saw;
267+ }
268+ return saw;
269+ }
270+
271+ @Before
272+ public void disableAndLoad() throws PersistitException {
273+ disableBackgroundCleanup();
274+ loadData(_persistit, KEY_START, KEY_RANGE);
275+ }
276+
277+ private void runAndCleanup(String name, boolean doRead, boolean doWrite) {
278+ Thread t = ConcurrentUtil.createThread(name, new TxnAbandoner(_persistit, false, false));
279+ ConcurrentUtil.startAndJoinAssertSuccess(MAX_TIMEOUT_MS, t);
280+ // Threw exception before fix
281+ _persistit.cleanup();
282+ }
283+
284+ @Test
285+ public void noReadsOrWrites() {
286+ runAndCleanup("NoReadNoWrite", false, false);
287+ }
288+
289+ @Test
290+ public void readOnly() throws PersistitException {
291+ runAndCleanup("ReadOnly", true, false);
292+ assertEquals("Traversed after abandoned", KEY_RANGE, scanAndCount(getExchange(_persistit)));
293+ }
294+
295+ @Test
296+ public void readAndWrite() throws Exception {
297+ runAndCleanup("ReadAndWrite", true, true);
298+ assertEquals("Traversed after abandoned", KEY_RANGE, scanAndCount(getExchange(_persistit)));
299+ // Check that the abandoned was pruned
300+ CleanupManager cm = _persistit.getCleanupManager();
301+ for (int i = 0; i < 5 && cm.getEnqueuedCount() > 0; ++i) {
302+ cm.runTask();
303+ }
304+ Exchange rawEx = getExchange(_persistit);
305+ rawEx.ignoreMVCCFetch(true);
306+ assertEquals("Raw traversed after abandoned", KEY_RANGE, scanAndCount(rawEx));
307+ }
308+}

Subscribers

People subscribed via source and target branches