Merge lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status into lp:akiban-persistit

Proposed by Nathan Williams
Status: Merged
Approved by: Peter Beaman
Approved revision: 428
Merged at revision: 423
Proposed branch: lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status
Merge into: lp:akiban-persistit
Diff against target: 308 lines (+174/-31)
5 files modified
src/main/java/com/persistit/Persistit.java (+12/-26)
src/main/java/com/persistit/Transaction.java (+1/-0)
src/main/java/com/persistit/TransactionIndexBucket.java (+3/-5)
src/main/java/com/persistit/TransactionStatus.java (+28/-0)
src/test/java/com/persistit/TransactionAbandonedTest.java (+130/-0)
To merge this branch: bzr merge lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status
Reviewer Review Type Date Requested Status
Nathan Williams Needs Resubmitting
Akiban Build User Needs Fixing
Review via email: mp+149643@code.launchpad.net

Description of the change

Fix closing of transactions from threads that have been abandoned.

If a thread is abandoned with an open transaction the wwLock member of TransactionStatus will be in a locked state. See associated bug for more details.

This isn't easily recoverable as the thread holding the lock is now gone. The only reason it needs to be unlocked is for re-use, mark it as abandoned and throw it away when it is no longer needed.

Minor cleanup and helper usage to eliminate duplicate code.

Includes simple test case and one requiring MVV cleanup.

To post a comment you must log in.
Revision history for this message
Peter Beaman (pbeaman) wrote :
Download full text (12.3 KiB)

From cell phone, so it's hard to Approve. But it looks fine to me so please
go ahead. Alternatively I was thinking about replacing the lock object and
allowing the ts to be recycled. Either way is fine with me. Good detective
work!
On Feb 20, 2013 12:38 PM, "Nathan Williams" <email address hidden> wrote:

> Nathan Williams has proposed merging
> lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status into
> lp:akiban-persistit.
>
> Requested reviews:
> Akiban Technologies (akiban-technologies)
> Related bugs:
> Bug #1126297 in Akiban Persistit: "Assertion failure in
> TransactionIndexBucket#allocateTransactionStatus"
> https://bugs.launchpad.net/akiban-persistit/+bug/1126297
>
> For more details, see:
>
> https://code.launchpad.net/~nwilliams/akiban-persistit/handle-abandoned-txn-status/+merge/149643
>
> Fix closing of transactions from threads that have been abandoned.
>
> If a thread is abandoned with an open transaction the wwLock member of
> TransactionStatus will be in a locked state. See associated bug for more
> details.
>
> This isn't easily recoverable as the thread holding the lock is now gone.
> The only reason it needs to be unlocked is for re-use, mark it as abandoned
> and throw it away when it is no longer needed.
>
> Minor cleanup and helper usage to eliminate duplicate code.
>
> Includes simple test case and one requiring MVV cleanup.
> --
>
> https://code.launchpad.net/~nwilliams/akiban-persistit/handle-abandoned-txn-status/+merge/149643
> Your team Akiban Technologies is requested to review the proposed merge of
> lp:~nwilliams/akiban-persistit/handle-abandoned-txn-status into
> lp:akiban-persistit.
>
> === modified file 'src/main/java/com/persistit/Persistit.java'
> --- src/main/java/com/persistit/Persistit.java 2013-02-15 15:39:42 +0000
> +++ src/main/java/com/persistit/Persistit.java 2013-02-20 17:37:24 +0000
> @@ -1496,26 +1496,12 @@
> return _bufferPoolTable;
> }
>
> + /**
> + * Remove any sessions that have expired and close transactions
> associated
> + * with them. Also flush statistics for all known volumes.
> + */
> void cleanup() {
> - final Set<SessionId> sessionIds;
> - synchronized (_transactionSessionMap) {
> - sessionIds = new
> HashSet<SessionId>(_transactionSessionMap.keySet());
> - }
> - for (final SessionId sessionId : sessionIds) {
> - if (!sessionId.isAlive()) {
> - Transaction transaction = null;
> - synchronized (_transactionSessionMap) {
> - transaction =
> _transactionSessionMap.remove(sessionId);
> - }
> - if (transaction != null) {
> - try {
> - transaction.close();
> - } catch (final PersistitException e) {
> - _logBase.exception.log(e);
> - }
> - }
> - }
> - }
> + closeZombieTransactions();
> final List<Volume> volumes;
> synchronized (this) {
> volumes = new ArrayList<Volume>(_volumes);
>
> === modified file 'src/main/java/com/persistit/Transaction.jav...

Revision history for this message
Nathan Williams (nwilliams) wrote :

I considered that, but the implications regarding other transactions were daunting (e.g. what if someone was waiting for ww-dependency check). That, coupled with the abandonment being rare (i.e. loss of status not a big deal) lead me to this solution.

I'm happy to discuss it further. I don't think there is a huge rush, either, as the customer has a workaround.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Looks good. Approving.

Revision history for this message
Akiban Build User (build-akiban) wrote :

There were 2 failures during build/test:

* job persistit-build failed at build number 544: http://172.16.20.104:8080/job/persistit-build/544/

* view must-pass failed: persistit-build is aborted

review: Needs Fixing
Revision history for this message
Nathan Williams (nwilliams) wrote :

Unrelated.

Revision history for this message
Akiban Build User (build-akiban) wrote :

There were 2 failures during build/test:

* job persistit-build failed at build number 546: http://172.16.20.104:8080/job/persistit-build/546/

* view must-pass failed: persistit-build is aborted

review: Needs Fixing
428. By Nathan Williams

Add parameter to refactored helper

Revision history for this message
Nathan Williams (nwilliams) wrote :

The closeZombieTransactions() helper actually removed all sessions from the map, which of course we don't want to do from cleanup. Add a boolean parameter so both usages can be served.

review: Needs Resubmitting
Revision history for this message
Peter Beaman (pbeaman) wrote :

Oops. I didn't notice before that this proposal was still pending. Looks good - Approving.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/main/java/com/persistit/Persistit.java'
--- src/main/java/com/persistit/Persistit.java 2013-02-15 15:39:42 +0000
+++ src/main/java/com/persistit/Persistit.java 2013-02-26 17:25:24 +0000
@@ -1496,26 +1496,12 @@
1496 return _bufferPoolTable;1496 return _bufferPoolTable;
1497 }1497 }
14981498
1499 /**
1500 * Remove any sessions that have expired and close transactions associated
1501 * with them. Also flush statistics for all known volumes.
1502 */
1499 void cleanup() {1503 void cleanup() {
1500 final Set<SessionId> sessionIds;1504 closeZombieTransactions(false);
1501 synchronized (_transactionSessionMap) {
1502 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
1503 }
1504 for (final SessionId sessionId : sessionIds) {
1505 if (!sessionId.isAlive()) {
1506 Transaction transaction = null;
1507 synchronized (_transactionSessionMap) {
1508 transaction = _transactionSessionMap.remove(sessionId);
1509 }
1510 if (transaction != null) {
1511 try {
1512 transaction.close();
1513 } catch (final PersistitException e) {
1514 _logBase.exception.log(e);
1515 }
1516 }
1517 }
1518 }
1519 final List<Volume> volumes;1505 final List<Volume> volumes;
1520 synchronized (this) {1506 synchronized (this) {
1521 volumes = new ArrayList<Volume>(_volumes);1507 volumes = new ArrayList<Volume>(_volumes);
@@ -1676,7 +1662,7 @@
1676 waitForIOTaskStop(task);1662 waitForIOTaskStop(task);
16771663
1678 interruptActiveThreads(SHORT_DELAY);1664 interruptActiveThreads(SHORT_DELAY);
1679 closeZombieTransactions();1665 closeZombieTransactions(true);
16801666
1681 for (final Volume volume : volumes) {1667 for (final Volume volume : volumes) {
1682 volume.close();1668 volume.close();
@@ -1695,17 +1681,17 @@
1695 releaseAllResources();1681 releaseAllResources();
1696 }1682 }
16971683
1698 private void closeZombieTransactions() {1684 private void closeZombieTransactions(boolean removeAllSessions) {
1699 final Set<SessionId> sessionIds;1685 final Set<SessionId> sessionIds;
1700 synchronized (_transactionSessionMap) {1686 synchronized (_transactionSessionMap) {
1701 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());1687 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
1702 }1688 }
1703 for (final SessionId sessionId : sessionIds) {1689 for (final SessionId sessionId : sessionIds) {
1704 Transaction transaction = null;1690 if (!sessionId.isAlive() || removeAllSessions) {
1705 synchronized (_transactionSessionMap) {1691 Transaction transaction = null;
1706 transaction = _transactionSessionMap.remove(sessionId);1692 synchronized (_transactionSessionMap) {
1707 }1693 transaction = _transactionSessionMap.remove(sessionId);
1708 if (!sessionId.isAlive()) {1694 }
1709 if (transaction != null) {1695 if (transaction != null) {
1710 try {1696 try {
1711 transaction.close();1697 transaction.close();
17121698
=== modified file 'src/main/java/com/persistit/Transaction.java'
--- src/main/java/com/persistit/Transaction.java 2013-02-01 11:46:13 +0000
+++ src/main/java/com/persistit/Transaction.java 2013-02-26 17:25:24 +0000
@@ -508,6 +508,7 @@
508 if (_nestedDepth > 0 && !_commitCompleted && !_rollbackCompleted) {508 if (_nestedDepth > 0 && !_commitCompleted && !_rollbackCompleted) {
509 final TransactionStatus ts = _transactionStatus;509 final TransactionStatus ts = _transactionStatus;
510 if (ts != null && ts.getTs() == _startTimestamp && !_commitCompleted && !_rollbackCompleted) {510 if (ts != null && ts.getTs() == _startTimestamp && !_commitCompleted && !_rollbackCompleted) {
511 _transactionStatus.markAbandoned();
511 rollback();512 rollback();
512 _persistit.getLogBase().txnAbandoned.log(this);513 _persistit.getLogBase().txnAbandoned.log(this);
513 }514 }
514515
=== modified file 'src/main/java/com/persistit/TransactionIndexBucket.java'
--- src/main/java/com/persistit/TransactionIndexBucket.java 2013-02-18 13:57:21 +0000
+++ src/main/java/com/persistit/TransactionIndexBucket.java 2013-02-26 17:25:24 +0000
@@ -258,8 +258,7 @@
258 if (ts >= getFloor()) {258 if (ts >= getFloor()) {
259 for (TransactionStatus s = getCurrent(); s != null; s = s.getNext()) {259 for (TransactionStatus s = getCurrent(); s != null; s = s.getNext()) {
260 if (s == status) {260 if (s == status) {
261 s.complete(timestamp);261 status.completeAndUnlock(timestamp);
262 status.wwUnlock();
263 if (s.getTs() == getFloor() || hasFloorMoved()) {262 if (s.getTs() == getFloor() || hasFloorMoved()) {
264 reduce();263 reduce();
265 }264 }
@@ -272,8 +271,7 @@
272 if (s == status) {271 if (s == status) {
273 final TransactionStatus next = s.getNext();272 final TransactionStatus next = s.getNext();
274 assert s.getTc() != UNCOMMITTED;273 assert s.getTc() != UNCOMMITTED;
275 s.complete(timestamp);274 status.completeAndUnlock(timestamp);
276 status.wwUnlock();
277 boolean moved = false;275 boolean moved = false;
278 if (s.getTc() == ABORTED) {276 if (s.getTc() == ABORTED) {
279 aggregate(s, false);277 aggregate(s, false);
@@ -610,7 +608,7 @@
610 * May be held by another thread briefly while status being checked608 * May be held by another thread briefly while status being checked
611 */609 */
612 assert !status.isHeldByCurrentThread();610 assert !status.isHeldByCurrentThread();
613 if (_freeCount < _transactionIndex.getMaxFreeListSize()) {611 if (_freeCount < _transactionIndex.getMaxFreeListSize() && !status.isAbandoned()) {
614 status.setNext(_free);612 status.setNext(_free);
615 _free = status;613 _free = status;
616 _freeCount++;614 _freeCount++;
617615
=== modified file 'src/main/java/com/persistit/TransactionStatus.java'
--- src/main/java/com/persistit/TransactionStatus.java 2013-02-18 13:57:21 +0000
+++ src/main/java/com/persistit/TransactionStatus.java 2013-02-26 17:25:24 +0000
@@ -124,6 +124,12 @@
124 */124 */
125 private volatile boolean _notified;125 private volatile boolean _notified;
126126
127 /**
128 * Indicates whether the owning transaction had been abandoned. This status
129 * may be in a locked state and is not available for re-use.
130 */
131 private volatile boolean _abandoned;
132
127 TransactionStatus(final TransactionIndexBucket bucket) {133 TransactionStatus(final TransactionIndexBucket bucket) {
128 _bucket = bucket;134 _bucket = bucket;
129 }135 }
@@ -244,6 +250,13 @@
244 _notified = true;250 _notified = true;
245 }251 }
246252
253 void completeAndUnlock(final long timestamp) {
254 complete(timestamp);
255 if (!isAbandoned()) {
256 wwUnlock();
257 }
258 }
259
247 boolean isLocked() {260 boolean isLocked() {
248 return _wwLock.isLocked();261 return _wwLock.isLocked();
249 }262 }
@@ -374,6 +387,7 @@
374 * Release the lock acquired by {@link #wwLock(long)}.387 * Release the lock acquired by {@link #wwLock(long)}.
375 */388 */
376 void wwUnlock() {389 void wwUnlock() {
390 assert !isAbandoned() : "Attempt to unlock abandoned: " + this;
377 _wwLock.unlock();391 _wwLock.unlock();
378 }392 }
379393
@@ -409,6 +423,20 @@
409 _notified = true;423 _notified = true;
410 }424 }
411425
426 /**
427 * Make this status as abandoned. This will prevent its unlock and re-use.
428 */
429 void markAbandoned() {
430 _abandoned = true;
431 }
432
433 /**
434 * @return <code>true</code> if this status has been abandoned.
435 */
436 boolean isAbandoned() {
437 return _abandoned;
438 }
439
412 @Override440 @Override
413 public String toString() {441 public String toString() {
414 return String.format("<ts=%,d tc=%s mvv=%,d>", _ts, tcString(_tc), _mvvCount.get());442 return String.format("<ts=%,d tc=%s mvv=%,d>", _ts, tcString(_tc), _mvvCount.get());
415443
=== added file 'src/test/java/com/persistit/TransactionAbandonedTest.java'
--- src/test/java/com/persistit/TransactionAbandonedTest.java 1970-01-01 00:00:00 +0000
+++ src/test/java/com/persistit/TransactionAbandonedTest.java 2013-02-26 17:25:24 +0000
@@ -0,0 +1,130 @@
1/**
2 * Copyright © 2005-2013 Akiban Technologies, Inc. All rights reserved.
3 *
4 * This program and the accompanying materials are made available
5 * under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * This program may also be available under different license terms.
10 * For more information, see www.akiban.com or contact licensing@akiban.com.
11 *
12 * Contributors:
13 * Akiban Technologies, Inc.
14 */
15
16package com.persistit;
17
18import com.persistit.exception.PersistitException;
19import com.persistit.unit.ConcurrentUtil;
20import com.persistit.unit.UnitTestProperties;
21import org.junit.Before;
22import org.junit.Test;
23
24import static org.junit.Assert.assertEquals;
25
26/**
27 * <p>
28 * Inspired by bug1126297: Assertion failure in
29 * TransactionIndexBucket#allocateTransactionStatus
30 * </p>
31 * <p>
32 * The symptom was the bug (a locked TransactionStatus on the free list) but the
33 * cause was mishandling of abandoned transactions from the
34 * {@link Persistit#cleanup()} method.
35 * </p>
36 * <p>
37 * When attempting to rollback the abandoned transaction, the status was
38 * notified and then unlocked. Since the lock was held by a now dead thread, an
39 * IllegalMonitorStateException occurred. It was then put on the free list
40 * during the next cleanup of that bucket since it had been notified.
41 * </p>
42 */
43public class TransactionAbandonedTest extends PersistitUnitTestCase {
44 private static final String TREE = TransactionAbandonedTest.class.getSimpleName();
45 private static final int KEY_START = 1;
46 private static final int KEY_RANGE = 10;
47 private static final long MAX_TIMEOUT_MS = 10 * 1000;
48
49 private static class TxnAbandoner extends ConcurrentUtil.ThrowingRunnable {
50 private final Persistit persistit;
51 private final boolean doRead;
52 private final boolean doWrite;
53
54 public TxnAbandoner(Persistit persistit, boolean doRead, boolean doWrite) {
55 this.persistit = persistit;
56 this.doRead = doRead;
57 this.doWrite = doWrite;
58 }
59
60 @Override
61 public void run() throws PersistitException {
62 Transaction txn = persistit.getTransaction();
63 txn.begin();
64 if (doRead) {
65 assertEquals("Traverse count", KEY_RANGE, scanAndCount(getExchange(persistit)));
66 }
67 if (doWrite) {
68 loadData(persistit, KEY_START + KEY_RANGE, KEY_RANGE);
69 }
70 }
71 }
72
73 private static Exchange getExchange(Persistit persistit) throws PersistitException {
74 return persistit.getExchange(UnitTestProperties.VOLUME_NAME, TREE, true);
75 }
76
77 private static void loadData(Persistit persistit, int keyOffset, int count) throws PersistitException {
78 Exchange ex = getExchange(persistit);
79 for (int i = 0; i < count; ++i) {
80 ex.clear().append(keyOffset + i).store();
81 }
82 }
83
84 private static int scanAndCount(Exchange ex) throws PersistitException {
85 ex.clear().append(Key.BEFORE);
86 int saw = 0;
87 while (ex.next()) {
88 ++saw;
89 }
90 return saw;
91 }
92
93 @Before
94 public void disableAndLoad() throws PersistitException {
95 disableBackgroundCleanup();
96 loadData(_persistit, KEY_START, KEY_RANGE);
97 }
98
99 private void runAndCleanup(String name, boolean doRead, boolean doWrite) {
100 Thread t = ConcurrentUtil.createThread(name, new TxnAbandoner(_persistit, false, false));
101 ConcurrentUtil.startAndJoinAssertSuccess(MAX_TIMEOUT_MS, t);
102 // Threw exception before fix
103 _persistit.cleanup();
104 }
105
106 @Test
107 public void noReadsOrWrites() {
108 runAndCleanup("NoReadNoWrite", false, false);
109 }
110
111 @Test
112 public void readOnly() throws PersistitException {
113 runAndCleanup("ReadOnly", true, false);
114 assertEquals("Traversed after abandoned", KEY_RANGE, scanAndCount(getExchange(_persistit)));
115 }
116
117 @Test
118 public void readAndWrite() throws Exception {
119 runAndCleanup("ReadAndWrite", true, true);
120 assertEquals("Traversed after abandoned", KEY_RANGE, scanAndCount(getExchange(_persistit)));
121 // Check that the abandoned was pruned
122 CleanupManager cm = _persistit.getCleanupManager();
123 for (int i = 0; i < 5 && cm.getEnqueuedCount() > 0; ++i) {
124 cm.runTask();
125 }
126 Exchange rawEx = getExchange(_persistit);
127 rawEx.ignoreMVCCFetch(true);
128 assertEquals("Raw traversed after abandoned", KEY_RANGE, scanAndCount(rawEx));
129 }
130}

Subscribers

People subscribed via source and target branches