Merge lp:~pbeaman/akiban-persistit/fix-1174352-commit-thread into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 435
Merged at revision: 431
Proposed branch: lp:~pbeaman/akiban-persistit/fix-1174352-commit-thread
Merge into: lp:akiban-persistit
Prerequisite: lp:~pbeaman/akiban-persistit/apache-license
Diff against target: 313 lines (+151/-53)
5 files modified
src/main/java/com/persistit/Transaction.java (+0/-1)
src/main/java/com/persistit/TransactionIndexBucket.java (+2/-5)
src/main/java/com/persistit/TransactionStatus.java (+26/-46)
src/test/java/com/persistit/TransactionSessionSwitchTest.java (+122/-0)
src/test/java/com/persistit/stress/unit/AccumulatorRestart.java (+1/-1)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-1174352-commit-thread
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+161824@code.launchpad.net

Description of the change

Use Semaphore instead of ReentrantLock in TransactionStatus#wwLock. This allows Thread #1 to register the transaction and Thread #2 to commit it. (ReentrantLock throws an IllegalMonitorStateException if a different thread attempts to unlock it.)

This is important to support the documented behavior allowing SessionId management, and that is important to allow a fixed number of threads in a thread pool to handled a potentially much larger number of sessions.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

Semaphore has availablePermits() which we could use for the removed assert and short sleep, if desired.

This is fine by me as-is, too.

435. By Peter Beaman

Reinstate TransactionStatus.isLocked and an assert that uses it.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Reinstated TransactionStatus#isLocked and an assert that uses it.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Looks good.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/Transaction.java'
2--- src/main/java/com/persistit/Transaction.java 2013-05-01 15:05:31 +0000
3+++ src/main/java/com/persistit/Transaction.java 2013-05-01 15:05:32 +0000
4@@ -517,7 +517,6 @@
5 if (_nestedDepth > 0 && !_commitCompleted && !_rollbackCompleted) {
6 final TransactionStatus ts = _transactionStatus;
7 if (ts != null && ts.getTs() == _startTimestamp && !_commitCompleted && !_rollbackCompleted) {
8- _transactionStatus.markAbandoned();
9 rollback();
10 _persistit.getLogBase().txnAbandoned.log(this);
11 }
12
13=== modified file 'src/main/java/com/persistit/TransactionIndexBucket.java'
14--- src/main/java/com/persistit/TransactionIndexBucket.java 2013-05-01 15:05:31 +0000
15+++ src/main/java/com/persistit/TransactionIndexBucket.java 2013-05-01 15:05:32 +0000
16@@ -605,11 +605,8 @@
17
18 private void free(final TransactionStatus status) {
19 assert _lock.isHeldByCurrentThread();
20- /*
21- * May be held by another thread briefly while status being checked
22- */
23- assert !status.isHeldByCurrentThread();
24- if (_freeCount < _transactionIndex.getMaxFreeListSize() && !status.isAbandoned()) {
25+
26+ if (_freeCount < _transactionIndex.getMaxFreeListSize()) {
27 status.setNext(_free);
28 _free = status;
29 _freeCount++;
30
31=== modified file 'src/main/java/com/persistit/TransactionStatus.java'
32--- src/main/java/com/persistit/TransactionStatus.java 2013-05-01 15:05:31 +0000
33+++ src/main/java/com/persistit/TransactionStatus.java 2013-05-01 15:05:32 +0000
34@@ -16,9 +16,9 @@
35
36 package com.persistit;
37
38+import java.util.concurrent.Semaphore;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41-import java.util.concurrent.locks.ReentrantLock;
42
43 import com.persistit.Accumulator.Delta;
44
45@@ -96,18 +96,20 @@
46 private AtomicInteger _mvvCount = new AtomicInteger();
47
48 /**
49- * Lock used to manage ww dependencies. An attempt to update an MVV that
50- * already contains a value version from a concurrently executing
51- * transaction must wait for that other transaction to commit or abort.
52+ * Semaphore used to manage ww dependencies. An attempt to update an MVV
53+ * that already contains a value version from a concurrently executing
54+ * transaction must wait for that other transaction to commit or abort. The
55+ * protocol uses a Semaphore rather than a ReentrantLock because it may be
56+ * acquired in one thread but released in another.
57 */
58- private final ReentrantLock _wwLock = new ReentrantLock(true);
59+ private final Semaphore _wwLock = new Semaphore(1);
60 /**
61 * Pointer to next member of singly-linked list.
62 */
63 private TransactionStatus _next;
64
65 /**
66- * Pointer to TransactionStatus on which we intend to claim a lock. (For
67+ * Pointer to TransactionStatus on which we intend to claim a permit. (For
68 * deadlock detection.)
69 */
70 private volatile TransactionStatus _depends;
71@@ -125,12 +127,6 @@
72 */
73 private volatile boolean _notified;
74
75- /**
76- * Indicates whether the owning transaction had been abandoned. This status
77- * may be in a locked state and is not available for re-use.
78- */
79- private volatile boolean _abandoned;
80-
81 TransactionStatus(final TransactionIndexBucket bucket) {
82 _bucket = bucket;
83 }
84@@ -253,17 +249,7 @@
85
86 void completeAndUnlock(final long timestamp) {
87 complete(timestamp);
88- if (!isAbandoned()) {
89- wwUnlock();
90- }
91- }
92-
93- boolean isLocked() {
94- return _wwLock.isLocked();
95- }
96-
97- boolean isHeldByCurrentThread() {
98- return _wwLock.isHeldByCurrentThread();
99+ wwUnlock();
100 }
101
102 Delta getDelta() {
103@@ -363,16 +349,15 @@
104
105 /**
106 * <p>
107- * Acquire a lock on this TransactionStatus. This supports the
108- * {@link TransactionIndex#wwDependency(long, long, long)} method. While a
109- * transaction is running this lock is in a locked state. The lock is
110- * acquired when the transaction is registered (see
111+ * Acquire a permit on this TransactionStatus. This supports the
112+ * {@link TransactionIndex#wwDependency(long, long, long)} method. The
113+ * permit is acquired when the transaction is registered (see
114 * {@link TransactionIndex#registerTransaction(Transaction)} and released
115 * once the transaction is either committed or aborted.
116 * </p>
117 * <p>
118 * The <code>wwDependency</code> method also attempts to acquire, and then
119- * immediately release this lock. This stalls the thread calling
120+ * immediately release this permit. This stalls the thread calling
121 * wwDependency until the commit/abort status of the current transaction is
122 * known.
123 *
124@@ -381,15 +366,24 @@
125 * @throws InterruptedException
126 */
127 boolean wwLock(final long timeout) throws InterruptedException {
128- return _wwLock.tryLock(timeout, TimeUnit.MILLISECONDS);
129+ return _wwLock.tryAcquire(timeout, TimeUnit.MILLISECONDS);
130 }
131
132 /**
133- * Release the lock acquired by {@link #wwLock(long)}.
134+ * Release the permit acquired by {@link #wwLock(long)}.
135 */
136 void wwUnlock() {
137- assert !isAbandoned() : "Attempt to unlock abandoned: " + this;
138- _wwLock.unlock();
139+ _wwLock.release();
140+ }
141+
142+ /**
143+ * Indicate whether this TransactionStatus has been locked. Tested by assert
144+ * statements in various places.
145+ *
146+ * @return true if a thread has acquired a claim on this TransactionStatus.
147+ */
148+ boolean isLocked() {
149+ return _wwLock.availablePermits() == 0;
150 }
151
152 /**
153@@ -424,20 +418,6 @@
154 _notified = true;
155 }
156
157- /**
158- * Make this status as abandoned. This will prevent its unlock and re-use.
159- */
160- void markAbandoned() {
161- _abandoned = true;
162- }
163-
164- /**
165- * @return <code>true</code> if this status has been abandoned.
166- */
167- boolean isAbandoned() {
168- return _abandoned;
169- }
170-
171 @Override
172 public String toString() {
173 return String.format("<ts=%,d tc=%s mvv=%,d>", _ts, tcString(_tc), _mvvCount.get());
174
175=== added file 'src/test/java/com/persistit/TransactionSessionSwitchTest.java'
176--- src/test/java/com/persistit/TransactionSessionSwitchTest.java 1970-01-01 00:00:00 +0000
177+++ src/test/java/com/persistit/TransactionSessionSwitchTest.java 2013-05-01 15:05:32 +0000
178@@ -0,0 +1,122 @@
179+/**
180+ * Copyright 2005-2013 Akiban Technologies, Inc.
181+ *
182+ * Licensed under the Apache License, Version 2.0 (the "License");
183+ * you may not use this file except in compliance with the License.
184+ * You may obtain a copy of the License at
185+ *
186+ * http://www.apache.org/licenses/LICENSE-2.0
187+ *
188+ * Unless required by applicable law or agreed to in writing, software
189+ * distributed under the License is distributed on an "AS IS" BASIS,
190+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
191+ * See the License for the specific language governing permissions and
192+ * limitations under the License.
193+ */
194+
195+package com.persistit;
196+
197+import static org.junit.Assert.*;
198+
199+import java.util.HashMap;
200+import java.util.Map;
201+import java.util.Queue;
202+import java.util.concurrent.ArrayBlockingQueue;
203+import java.util.concurrent.atomic.AtomicInteger;
204+
205+import org.junit.Test;
206+
207+import com.persistit.exception.PersistitException;
208+import com.persistit.unit.ConcurrentUtil;
209+
210+/**
211+ * <p>
212+ * Bug https://bugs.launchpad.net/akiban-persistit/+bug/1174352
213+ * </p>
214+ * <p>
215+ * Documentation for com.persistit.Transaction says that with careful management
216+ * of ownership of a SessionId, it is possible to complete a transaction in a
217+ * different thread than the one that began it. However, this is not the case:
218+ * the second thread receives an IllegalMonitorStateException when attempting to
219+ * commit or abort the transaction.
220+ * </p>
221+ * <p>
222+ * The issue is that the TransactionStatus object used to represent transaction
223+ * state within the TransactionIndex uses a
224+ * java.util.concurrent.locks.ReentrantLock to represent its in-use state, and
225+ * only the thread that locked it may unlock it. *
226+ * </p>
227+ * <p>
228+ */
229+public class TransactionSessionSwitchTest extends PersistitUnitTestCase {
230+
231+ private final static int SESSIONS = 100;
232+ private final static int STEPS = 197;
233+ private final static int THREADS = 17;
234+ private final static long TIMEOUT = 10000;
235+
236+ private Queue<SessionId> sessionQueue = new ArrayBlockingQueue<SessionId>(SESSIONS);
237+ private Map<SessionId, AtomicInteger> sessionState = new HashMap<SessionId, AtomicInteger>();
238+
239+ @Test
240+ public void sessionManagement() throws Exception {
241+ for (int i = 0; i < SESSIONS; i++) {
242+ final SessionId sessionId = new SessionId();
243+ sessionQueue.add(sessionId);
244+ sessionState.put(sessionId, new AtomicInteger(0));
245+ }
246+
247+ Thread[] threads = new Thread[THREADS];
248+
249+ final Tree tree = _persistit.getVolume("persistit").getTree("tt", true);
250+ for (int i = 0; i < THREADS; i++) {
251+ threads[i] = new Thread(new Runnable() {
252+ public void run() {
253+ SessionId session;
254+ while ((session = sessionQueue.poll()) != null) {
255+ int state = sessionState.get(session).get();
256+ try {
257+ _persistit.setSessionId(session);
258+ final Transaction txn = _persistit.getTransaction();
259+ if (state == 0) {
260+ txn.begin();
261+ } else if (state > STEPS) {
262+ if ((session.hashCode() % 3) == 0) {
263+ txn.rollback();
264+ } else {
265+ txn.commit();
266+ }
267+ txn.end();
268+ } else {
269+ Exchange ex = _persistit.getExchange(tree.getVolume(), tree.getName(), false);
270+ ex.getValue().put(Thread.currentThread().getName());
271+ ex.clear().append(session.hashCode()).append(state).store();
272+ _persistit.releaseExchange(ex);
273+ }
274+ } catch (PersistitException e) {
275+ throw new RuntimeException(e);
276+ } finally {
277+ if (state <= STEPS) {
278+ sessionState.get(session).incrementAndGet();
279+ sessionQueue.add(session);
280+ }
281+ }
282+ }
283+ }
284+ });
285+
286+ }
287+ ConcurrentUtil.startAndJoinAssertSuccess(TIMEOUT, threads);
288+
289+ final Exchange ex = _persistit.getExchange(tree.getVolume(), tree.getName(), false);
290+ for (SessionId session : sessionState.keySet()) {
291+ int count = 0;
292+ ex.clear().append(session.hashCode()).append(Key.BEFORE);
293+ while (ex.next()) {
294+ count++;
295+ }
296+ final int expected = (session.hashCode() % 3) == 0 ? 0 : STEPS;
297+ assertEquals("Mismatched count", expected, count);
298+ }
299+ }
300+}
301
302=== modified file 'src/test/java/com/persistit/stress/unit/AccumulatorRestart.java'
303--- src/test/java/com/persistit/stress/unit/AccumulatorRestart.java 2013-05-01 15:05:31 +0000
304+++ src/test/java/com/persistit/stress/unit/AccumulatorRestart.java 2013-05-01 15:05:32 +0000
305@@ -108,7 +108,7 @@
306 min.minimum(bsum(minValue, r));
307 max.maximum(bsum(maxValue, r));
308 seq.allocate();
309- sum.add(1);
310+ sum.add(r);
311 seqValue++;
312 final long minWas = getLong(_ex.to("min"), Long.MAX_VALUE);
313 _ex.getValue().put(Math.min(bsum(minValue, r), minWas));

Subscribers

People subscribed via source and target branches