Merge lp:~pbeaman/akiban-persistit/fix-several-small-bugs into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Peter Beaman
Approved revision: 387
Merged at revision: 377
Proposed branch: lp:~pbeaman/akiban-persistit/fix-several-small-bugs
Merge into: lp:akiban-persistit
Diff against target: 770 lines (+287/-80)
18 files modified
src/main/java/com/persistit/BufferPool.java (+6/-5)
src/main/java/com/persistit/CheckpointManager.java (+2/-2)
src/main/java/com/persistit/JournalManager.java (+3/-20)
src/main/java/com/persistit/JournalManagerBench.java (+2/-1)
src/main/java/com/persistit/Persistit.java (+53/-27)
src/main/java/com/persistit/SessionId.java (+10/-0)
src/main/java/com/persistit/Transaction.java (+0/-1)
src/main/java/com/persistit/TransactionIndexBucket.java (+5/-9)
src/main/java/com/persistit/TransactionStatus.java (+20/-0)
src/main/java/com/persistit/VolumeHeader.java (+9/-1)
src/main/java/com/persistit/logging/LogBase.java (+3/-0)
src/main/java/com/persistit/util/Util.java (+4/-0)
src/test/java/com/persistit/IOFailureTest.java (+2/-0)
src/test/java/com/persistit/TransactionTest2.java (+43/-9)
src/test/java/com/persistit/stress/AbstractStressTest.java (+3/-1)
src/test/java/com/persistit/stress/AbstractSuite.java (+4/-4)
src/test/java/com/persistit/stress/StartStop.java (+2/-0)
src/test/java/com/persistit/unit/ExchangeTest.java (+116/-0)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-several-small-bugs
Reviewer Review Type Date Requested Status
Nathan Williams Pending
Review via email: mp+128568@code.launchpad.net

This proposal supersedes a proposal from 2012-10-05.

Description of the change

Fixes several small unrelated bugs:

https://bugs.launchpad.net/akiban-persistit/+bug/1023549 - as written, is invalid, and this proposal includes a new unit test to prove it.

https://bugs.launchpad.net/akiban-persistit/+bug/1013259 - cleans up Persistit shut-down handling. Changes ensure that concurrent transactions are interrupted and closed properly. The IllegalMonitorStateException occurred because two threads were racing to close a transaction on shutdown; this condition is now eliminated. A new unit test was added to TransactionTest2 to verify correct shutdown behavior.

https://bugs.launchpad.net/akiban-persistit/+bug/1029942 - incorrect constant was used in the log statement. Constants NS_PER_S, NS_PER_MS and MS_PER_S were consolidated into the com.persistit.util.Util class.

Bug #1062315: Assertion failure in TransactionIndexBucket#allocateTransactionStatus - a small change to avoid asserting in the case of a benign race condition was added.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote : Posted in a previous version of this proposal

VolumeStorageV2#updateMetaData() - including timestamp changed in result will make meta data always "dirty" and then written. Is that what we want?

ExchangeTest#traverseEQfalse0 - The bug has has 'false' in the title and 'true' in the description. Could we add cases for 'true' as well? Additionally, the server use case probably did it inside of a transaction. It should work as is if another test method just calls the new one, but inside of a transaction block. I have no reason to suspect a failure, but simple to add.

Persistit#close() - _journalManager.stopCopier() is no longer called. Should it be? The rest of the related changes look plausible and the new test is good.

review: Needs Information
Revision history for this message
Peter Beaman (pbeaman) wrote : Posted in a previous version of this proposal

VolumeStorageV2#updateMetaData - good catch - forgot to change it back after an experiment.

ExchangeTest#traverseEQfalse0 - good catch on not testing the transaction case. Changed the names, added a transactional version, and also tests for both true and false values of the 'deep' parameter.

Persistit#close() no longer calls stopCopier - yes, there is a change. The JOURNAL_COPIER thread used to be responsible for various cleanup activities and therefore had to be stopped early. That is no longer the case, so JournalManager#close() is where the copier now gets stopped.

Revision history for this message
Nathan Williams (nwilliams) wrote : Posted in a previous version of this proposal

Thanks for the tweaks and clarifications.

review: Approve
Revision history for this message
Peter Beaman (pbeaman) wrote : Posted in a previous version of this proposal

Has a conflict. Marked Rejected. Will fix and re-approve.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Merge from trunk (now with branch fix-dynamic-volumes) and fix the conflict in JournalManager.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Approving this since the only changes are mechanical.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/BufferPool.java'
2--- src/main/java/com/persistit/BufferPool.java 2012-09-27 18:22:25 +0000
3+++ src/main/java/com/persistit/BufferPool.java 2012-10-08 19:12:23 +0000
4@@ -15,6 +15,8 @@
5
6 package com.persistit;
7
8+import static com.persistit.util.Util.NS_PER_S;
9+
10 import java.io.DataOutputStream;
11 import java.io.IOException;
12 import java.nio.ByteBuffer;
13@@ -97,11 +99,10 @@
14 */
15 private final static int INVENTORY_VERSIONS = 3;
16
17- private final static long NS_PER_SEC = 1000000000;
18 /**
19 * Preload log message interval, in seconds
20 */
21- private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_SEC;
22+ private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_S;
23
24 /**
25 * The Persistit instance that references this BufferPool.
26@@ -1514,8 +1515,8 @@
27 count++;
28 final long now = System.nanoTime();
29 if (now - reportTime >= INVENTORY_PRELOAD_LOG_MESSAGE_NS) {
30- _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime)
31- / NS_PER_SEC);
32+ _persistit.getLogBase().bufferInventoryProgress
33+ .log(count, total, (now - reportTime) / NS_PER_S);
34 reportTime = now;
35 }
36 if (count >= _bufferCount) {
37@@ -1533,7 +1534,7 @@
38 _persistit.getLogBase().bufferInventoryException.log(e);
39 } finally {
40 final long now = System.nanoTime();
41- _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_SEC);
42+ _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_S);
43 }
44 }
45
46
47=== modified file 'src/main/java/com/persistit/CheckpointManager.java'
48--- src/main/java/com/persistit/CheckpointManager.java 2012-08-24 14:00:17 +0000
49+++ src/main/java/com/persistit/CheckpointManager.java 2012-10-08 19:12:23 +0000
50@@ -15,6 +15,8 @@
51
52 package com.persistit;
53
54+import static com.persistit.util.Util.NS_PER_S;
55+
56 import java.text.SimpleDateFormat;
57 import java.util.ArrayList;
58 import java.util.Date;
59@@ -86,8 +88,6 @@
60 }
61 }
62
63- private final static long NS_PER_S = 1000000000L;
64-
65 /**
66 * Default interval in nanoseconds between checkpoints - two minutes.
67 */
68
69=== modified file 'src/main/java/com/persistit/JournalManager.java'
70--- src/main/java/com/persistit/JournalManager.java 2012-10-03 18:19:49 +0000
71+++ src/main/java/com/persistit/JournalManager.java 2012-10-08 19:12:23 +0000
72@@ -19,6 +19,7 @@
73 import static com.persistit.util.SequencerConstants.PAGE_MAP_READ_INVALIDATE_A;
74 import static com.persistit.util.SequencerConstants.RECOVERY_PRUNING_B;
75 import static com.persistit.util.ThreadSequencer.sequence;
76+import static com.persistit.util.Util.NS_PER_MS;
77
78 import java.io.File;
79 import java.io.IOException;
80@@ -79,7 +80,6 @@
81 final static int HALF_URGENT = 5;
82 final static int URGENT_COMMIT_DELAY_MILLIS = 50;
83 final static int GENTLE_COMMIT_DELAY_MILLIS = 12;
84- private final static long NS_PER_MS = 1000000L;
85 private final static int IO_MEASUREMENT_CYCLES = 8;
86 private final static int TOO_MANY_WARN_THRESHOLD = 5;
87 private final static int TOO_MANY_ERROR_THRESHOLD = 10;
88@@ -1271,11 +1271,6 @@
89 return address % _blockSize;
90 }
91
92- void stopCopier() {
93- _copier.setShouldStop(true);
94- _persistit.waitForIOTaskStop(_copier);
95- }
96-
97 void setWriteBufferSize(final int size) {
98 if (size < MINIMUM_BUFFER_SIZE || size > MAXIMUM_BUFFER_SIZE) {
99 throw new IllegalArgumentException("Invalid write buffer size: " + size);
100@@ -1284,11 +1279,7 @@
101 }
102
103 public void close() throws PersistitException {
104-
105- synchronized (this) {
106- _closed.set(true);
107- }
108-
109+ _closed.set(true);
110 rollover();
111
112 final JournalCopier copier = _copier;
113@@ -2456,10 +2447,7 @@
114 int handle = -1;
115
116 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
117- if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {
118- list.clear();
119- break;
120- }
121+
122 final PageNode pageNode = iterator.next();
123 if (pageNode.isInvalid()) {
124 iterator.remove();
125@@ -2520,11 +2508,6 @@
126 final Set<Volume> volumes = new HashSet<Volume>();
127
128 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
129- if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {
130- list.clear();
131- break;
132- }
133-
134 final PageNode pageNode = iterator.next();
135
136 if (pageNode.getVolumeHandle() != handle) {
137
138=== modified file 'src/main/java/com/persistit/JournalManagerBench.java'
139--- src/main/java/com/persistit/JournalManagerBench.java 2012-09-12 20:36:27 +0000
140+++ src/main/java/com/persistit/JournalManagerBench.java 2012-10-08 19:12:23 +0000
141@@ -15,6 +15,8 @@
142
143 package com.persistit;
144
145+import static com.persistit.util.Util.NS_PER_S;
146+
147 import java.io.File;
148 import java.io.RandomAccessFile;
149 import java.nio.ByteBuffer;
150@@ -44,7 +46,6 @@
151 */
152 public class JournalManagerBench {
153
154- private final long NS_PER_S = 1000000000L;
155 private final byte[] NULLS = new byte[65536];
156
157 private final String[] ARG_TEMPLATE = new String[] { "duration|int:10:10:86400|Duration of test in seconds",
158
159=== modified file 'src/main/java/com/persistit/Persistit.java'
160--- src/main/java/com/persistit/Persistit.java 2012-10-03 16:04:16 +0000
161+++ src/main/java/com/persistit/Persistit.java 2012-10-08 19:12:23 +0000
162@@ -15,6 +15,8 @@
163
164 package com.persistit;
165
166+import static com.persistit.util.Util.NS_PER_S;
167+
168 import java.io.BufferedReader;
169 import java.io.File;
170 import java.io.FileReader;
171@@ -1514,7 +1516,7 @@
172 return _bufferPoolTable;
173 }
174
175- public void cleanup() {
176+ void cleanup() {
177 final Set<SessionId> sessionIds;
178 synchronized (_transactionSessionMap) {
179 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
180@@ -1659,12 +1661,8 @@
181 }
182 recordBufferPoolInventory();
183
184- /*
185- * The copier is responsible for background pruning of aborted
186- * transactions. Halt it so Transaction#close() can be called
187- * without being concerned about its state changing.
188- */
189- _journalManager.stopCopier();
190+ _cleanupManager.close(flush);
191+ waitForIOTaskStop(_cleanupManager);
192
193 getTransaction().close();
194 cleanup();
195@@ -1681,9 +1679,6 @@
196 }
197 }
198
199- _cleanupManager.close(flush);
200- waitForIOTaskStop(_cleanupManager);
201-
202 _checkpointManager.close(flush);
203 waitForIOTaskStop(_checkpointManager);
204
205@@ -1693,26 +1688,13 @@
206 pool.close();
207 }
208
209- /*
210- * Close (and abort) all remaining transactions.
211- */
212- Set<Transaction> transactions;
213- synchronized (_transactionSessionMap) {
214- transactions = new HashSet<Transaction>(_transactionSessionMap.values());
215- _transactionSessionMap.clear();
216- }
217- for (final Transaction txn : transactions) {
218- try {
219- txn.close();
220- } catch (final PersistitException e) {
221- _logBase.exception.log(e);
222- }
223- }
224-
225 _journalManager.close();
226 final IOTaskRunnable task = _transactionIndex.close();
227 waitForIOTaskStop(task);
228
229+ interruptActiveThreads(SHORT_DELAY);
230+ closeZombieTransactions();
231+
232 for (final Volume volume : volumes) {
233 volume.close();
234 }
235@@ -1730,6 +1712,50 @@
236 releaseAllResources();
237 }
238
239+ private void closeZombieTransactions() {
240+ final Set<SessionId> sessionIds;
241+ synchronized (_transactionSessionMap) {
242+ sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
243+ }
244+ for (final SessionId sessionId : sessionIds) {
245+ Transaction transaction = null;
246+ synchronized (_transactionSessionMap) {
247+ transaction = _transactionSessionMap.remove(sessionId);
248+ }
249+ if (!sessionId.isAlive()) {
250+ if (transaction != null) {
251+ try {
252+ transaction.close();
253+ } catch (final Exception e) {
254+ _logBase.exception.log(e);
255+ }
256+ }
257+ }
258+ }
259+ }
260+
261+ private void interruptActiveThreads(final long timeout) throws PersistitInterruptedException {
262+ final long expires = System.currentTimeMillis() + timeout;
263+ boolean remaining = false;
264+ do {
265+ final Set<SessionId> sessionIds;
266+ synchronized (_transactionSessionMap) {
267+ sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
268+ }
269+ for (final SessionId sessionId : sessionIds) {
270+ if (sessionId.isAlive()) {
271+ if (sessionId.interrupt()) {
272+ _logBase.interruptedAtClose.log(sessionId.ownerName());
273+ }
274+ remaining = true;
275+ }
276+ }
277+ if (remaining) {
278+ Util.spinSleep();
279+ }
280+ } while (remaining && System.currentTimeMillis() < expires);
281+ }
282+
283 /**
284 * Abruptly stop (using {@link Thread#stop()}) the writer and collector
285 * processes. This method should be used only by tests.
286@@ -1885,7 +1911,7 @@
287 }
288 final long now = System.currentTimeMillis();
289 if (now > _nextCloseTime) {
290- _logBase.waitForClose.log((_nextCloseTime - _beginCloseTime) / 1000);
291+ _logBase.waitForClose.log((_nextCloseTime - _beginCloseTime) / NS_PER_S);
292 _nextCloseTime += CLOSE_LOG_INTERVAL;
293 }
294 }
295
296=== modified file 'src/main/java/com/persistit/SessionId.java'
297--- src/main/java/com/persistit/SessionId.java 2012-09-26 13:31:13 +0000
298+++ src/main/java/com/persistit/SessionId.java 2012-10-08 19:12:23 +0000
299@@ -77,6 +77,16 @@
300 _owner.set(Thread.currentThread());
301 }
302
303+ boolean interrupt() {
304+ final Thread t = _owner.get();
305+ if (t != null && t != Thread.currentThread()) {
306+ t.interrupt();
307+ return true;
308+ } else {
309+ return false;
310+ }
311+ }
312+
313 public String ownerName() {
314 final Thread t = _owner.get();
315 if (t == null) {
316
317=== modified file 'src/main/java/com/persistit/Transaction.java'
318--- src/main/java/com/persistit/Transaction.java 2012-08-24 13:57:19 +0000
319+++ src/main/java/com/persistit/Transaction.java 2012-10-08 19:12:23 +0000
320@@ -725,7 +725,6 @@
321 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,
322 _persistit.getTimestampAllocator().getCurrentTimestamp());
323 _rollbackCompleted = true;
324-
325 }
326 }
327 }
328
329=== modified file 'src/main/java/com/persistit/TransactionIndexBucket.java'
330--- src/main/java/com/persistit/TransactionIndexBucket.java 2012-08-24 13:57:19 +0000
331+++ src/main/java/com/persistit/TransactionIndexBucket.java 2012-10-08 19:12:23 +0000
332@@ -170,10 +170,13 @@
333 _lock.unlock();
334 }
335
336- TransactionStatus allocateTransactionStatus() {
337+ TransactionStatus allocateTransactionStatus() throws InterruptedException {
338 assert _lock.isHeldByCurrentThread();
339 final TransactionStatus status = _free;
340 if (status != null) {
341+ if (status.isLocked()) {
342+ status.briefLock();
343+ }
344 assert !status.isLocked();
345 _free = status.getNext();
346 _freeCount--;
347@@ -569,14 +572,7 @@
348 }
349 }
350 } else if (tc < 0 && tc != ABORTED && -tc < timestamp) {
351- boolean locked = false;
352- try {
353- locked = status.wwLock(TransactionIndex.SHORT_TIMEOUT);
354- } finally {
355- if (locked) {
356- status.wwUnlock();
357- }
358- }
359+ status.briefLock();
360 _transactionIndex.incrementAccumulatorSnapshotRetryCounter();
361 throw RetryException.SINGLE;
362 }
363
364=== modified file 'src/main/java/com/persistit/TransactionStatus.java'
365--- src/main/java/com/persistit/TransactionStatus.java 2012-08-24 13:57:19 +0000
366+++ src/main/java/com/persistit/TransactionStatus.java 2012-10-08 19:12:23 +0000
367@@ -327,6 +327,26 @@
368 }
369
370 /**
371+ * Block briefly until another thread transiently holding the wwLock
372+ * vacates. Times out and returns after
373+ * {@value TransactionIndex#SHORT_TIMEOUT} milliseconds.
374+ *
375+ * @throws InterruptedException
376+ */
377+ void briefLock() throws InterruptedException {
378+ boolean locked = false;
379+ try {
380+ locked = wwLock(TransactionIndex.SHORT_TIMEOUT);
381+ } catch (final InterruptedException ie) {
382+ Thread.currentThread().interrupt();
383+ } finally {
384+ if (locked) {
385+ wwUnlock();
386+ }
387+ }
388+ }
389+
390+ /**
391 * <p>
392 * Acquire a lock on this TransactionStatus. This supports the
393 * {@link TransactionIndex#wwDependency(long, long, long)} method. While a
394
395=== modified file 'src/main/java/com/persistit/VolumeHeader.java'
396--- src/main/java/com/persistit/VolumeHeader.java 2012-08-24 13:57:19 +0000
397+++ src/main/java/com/persistit/VolumeHeader.java 2012-10-08 19:12:23 +0000
398@@ -287,11 +287,12 @@
399 */
400 public static boolean verifyVolumeHeader(final VolumeSpecification specification, final long systemTimestamp)
401 throws CorruptVolumeException, InvalidVolumeSpecificationException, PersistitIOException {
402+ FileInputStream stream = null;
403 try {
404 final File file = new File(specification.getPath());
405 if (file.exists()) {
406 if (file.isFile()) {
407- final FileInputStream stream = new FileInputStream(file);
408+ stream = new FileInputStream(file);
409 final byte[] bytes = new byte[SIZE];
410 final int readSize = stream.read(bytes);
411 if (readSize < SIZE) {
412@@ -336,6 +337,13 @@
413 }
414 } catch (final IOException ioe) {
415 throw new PersistitIOException(ioe);
416+ } finally {
417+ if (stream != null) {
418+ try {
419+ stream.close();
420+ } catch (final IOException e) {
421+ }
422+ }
423 }
424 }
425
426
427=== modified file 'src/main/java/com/persistit/logging/LogBase.java'
428--- src/main/java/com/persistit/logging/LogBase.java 2012-09-06 20:48:31 +0000
429+++ src/main/java/com/persistit/logging/LogBase.java 2012-10-08 19:12:23 +0000
430@@ -253,6 +253,9 @@
431 @Message("WARNING|Exception while writing buffer pool inventory %s")
432 public final LogItem bufferInventoryException = PersistitLogMessage.empty();
433
434+ @Message("WARNING|Thread %s interrupted due to shutdown")
435+ public final LogItem interruptedAtClose = PersistitLogMessage.empty();
436+
437 public static String recurring(final String message, final int count, final long duration) {
438 return String.format(RECURRING, message, count, duration);
439 }
440
441=== modified file 'src/main/java/com/persistit/util/Util.java'
442--- src/main/java/com/persistit/util/Util.java 2012-08-24 13:57:19 +0000
443+++ src/main/java/com/persistit/util/Util.java 2012-10-08 19:12:23 +0000
444@@ -38,6 +38,10 @@
445 public final static String NEW_LINE = System.getProperty("line.separator");
446 private final static String REGEX_QUOTE = "^$*+?()[].";
447
448+ public final static long NS_PER_S = 1000000000L;
449+ public final static long MS_PER_S = 1000L;
450+ public final static long NS_PER_MS = 1000000L;
451+
452 public final static char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D',
453 'E', 'F' };
454
455
456=== modified file 'src/test/java/com/persistit/IOFailureTest.java'
457--- src/test/java/com/persistit/IOFailureTest.java 2012-08-24 14:00:17 +0000
458+++ src/test/java/com/persistit/IOFailureTest.java 2012-10-08 19:12:23 +0000
459@@ -308,6 +308,8 @@
460 final long size0 = channel0.size();
461 channel0.truncate(100);
462
463+ channel0.close();
464+
465 final File file1 = jman.addressToFile(currentAddress);
466 final FileChannel channel1 = new RandomAccessFile(file1, "rw").getChannel();
467 final long size1 = channel1.size();
468
469=== modified file 'src/test/java/com/persistit/TransactionTest2.java'
470--- src/test/java/com/persistit/TransactionTest2.java 2012-08-24 13:57:19 +0000
471+++ src/test/java/com/persistit/TransactionTest2.java 2012-10-08 19:12:23 +0000
472@@ -29,6 +29,7 @@
473 import org.junit.Test;
474
475 import com.persistit.Transaction.CommitPolicy;
476+import com.persistit.exception.PersistitClosedException;
477 import com.persistit.exception.PersistitException;
478 import com.persistit.exception.PersistitIOException;
479 import com.persistit.exception.PersistitInterruptedException;
480@@ -47,15 +48,17 @@
481
482 final static CommitPolicy policy = CommitPolicy.SOFT;
483
484- final static long TIMEOUT = 20000; // 20 seconds
485+ final static long TIMEOUT = 10000; // 10 seconds
486+ final static int ITERATIONS_PER_THREAD = 25000;
487
488 static int _threadCount = 8;
489- static int _iterationsPerThread = 25000;
490+ static int _iterationsPerThread = ITERATIONS_PER_THREAD;
491 static int _accounts = 5000;
492
493 static AtomicInteger _retriedTransactionCount = new AtomicInteger();
494 static AtomicInteger _completedTransactionCount = new AtomicInteger();
495 static AtomicInteger _failedTransactionCount = new AtomicInteger();
496+ static AtomicInteger _strandedThreads = new AtomicInteger();
497
498 static int _threadCounter = 0;
499
500@@ -118,7 +121,7 @@
501 threadArray[index] = new Thread(new Runnable() {
502 @Override
503 public void run() {
504- runIt();
505+ runIt(_iterationsPerThread);
506 }
507 }, "TransactionThread_" + index);
508
509@@ -182,7 +185,7 @@
510 final Thread thread = new Thread(new Runnable() {
511 @Override
512 public void run() {
513- runIt();
514+ runIt(Integer.MAX_VALUE);
515 }
516 }, "TransactionThread_" + ++index);
517 threads.add(thread);
518@@ -238,19 +241,44 @@
519 txn.begin();
520 }
521
522- public void runIt() {
523+ @Test
524+ public void transactionsConcurrentWithPersistitClose() throws Exception {
525+ new Thread(new Runnable() {
526+ @Override
527+ public void run() {
528+ final Thread[] threadArray = new Thread[_threadCount];
529+ for (int index = 0; index < _threadCount; index++) {
530+ threadArray[index] = new Thread(new Runnable() {
531+ @Override
532+ public void run() {
533+ runIt(Integer.MAX_VALUE);
534+ }
535+ }, "TransactionThread_" + index);
536+ }
537+ for (int index = 0; index < _threadCount; index++) {
538+ threadArray[index].start();
539+ }
540+ }
541+ }).start();
542+ /*
543+ * Let the threads crank up
544+ */
545+ Thread.sleep(1000);
546+ _persistit.close();
547+ assertEquals("All threads should have exited correctly", 0, _strandedThreads.get());
548+ }
549+
550+ public void runIt(final int limit) {
551+ _strandedThreads.incrementAndGet();
552 try {
553 final Exchange accountEx = _persistit.getExchange("persistit", "account", true);
554 //
555 final Random random = new Random();
556- for (int iterations = 1; iterations <= _iterationsPerThread; iterations++) {
557+ for (int iterations = 1; iterations <= limit; iterations++) {
558 final int accountNo1 = random.nextInt(_accounts);
559- // int accountNo2 = random.nextInt(_accounts - 1);
560- // if (accountNo2 == accountNo1) accountNo2++;
561 final int accountNo2 = random.nextInt(_accounts);
562
563 final int delta = random.nextInt(10000);
564- // final int delta = 1;
565
566 transfer(accountEx, accountNo1, accountNo2, delta);
567 _completedTransactionCount.incrementAndGet();
568@@ -260,10 +288,16 @@
569 System.out.flush();
570 }
571 }
572+ _strandedThreads.decrementAndGet();
573 } catch (final PersistitInterruptedException exception) {
574+ _strandedThreads.decrementAndGet();
575+ // expected
576+ } catch (final PersistitClosedException exception) {
577+ _strandedThreads.decrementAndGet();
578 // expected
579 } catch (final PersistitIOException exception) {
580 if (InterruptedIOException.class.equals(exception.getCause().getClass())) {
581+ _strandedThreads.decrementAndGet();
582 // expected
583 } else {
584 exception.printStackTrace();
585
586=== modified file 'src/test/java/com/persistit/stress/AbstractStressTest.java'
587--- src/test/java/com/persistit/stress/AbstractStressTest.java 2012-08-24 13:57:19 +0000
588+++ src/test/java/com/persistit/stress/AbstractStressTest.java 2012-10-08 19:12:23 +0000
589@@ -15,6 +15,8 @@
590
591 package com.persistit.stress;
592
593+import static com.persistit.util.Util.NS_PER_S;
594+
595 import com.persistit.Persistit;
596
597 /**
598@@ -128,7 +130,7 @@
599 _result = new TestResult(false, throwable);
600 }
601 forceStop();
602- final long elapsed = (System.nanoTime() - _startTime) / AbstractSuite.NS_PER_S;
603+ final long elapsed = (System.nanoTime() - _startTime) / NS_PER_S;
604 System.err.printf("\n%s at %,d seconds: %s\n", this, elapsed, _result);
605 }
606
607
608=== modified file 'src/test/java/com/persistit/stress/AbstractSuite.java'
609--- src/test/java/com/persistit/stress/AbstractSuite.java 2012-08-24 14:00:17 +0000
610+++ src/test/java/com/persistit/stress/AbstractSuite.java 2012-10-08 19:12:23 +0000
611@@ -15,6 +15,10 @@
612
613 package com.persistit.stress;
614
615+import static com.persistit.util.Util.MS_PER_S;
616+import static com.persistit.util.Util.NS_PER_MS;
617+import static com.persistit.util.Util.NS_PER_S;
618+
619 import java.io.File;
620 import java.io.FileWriter;
621 import java.io.IOException;
622@@ -42,10 +46,6 @@
623
624 protected final static long PROGRESS_LOG_INTERVAL = 600000;
625
626- protected final static long NS_PER_MS = 1000000;
627- protected final static long MS_PER_S = 1000;
628- protected final static long NS_PER_S = NS_PER_MS * MS_PER_S;
629-
630 private long _nextReport;
631 private long _accumulatedWork;
632
633
634=== modified file 'src/test/java/com/persistit/stress/StartStop.java'
635--- src/test/java/com/persistit/stress/StartStop.java 2012-08-24 13:57:19 +0000
636+++ src/test/java/com/persistit/stress/StartStop.java 2012-10-08 19:12:23 +0000
637@@ -15,6 +15,8 @@
638
639 package com.persistit.stress;
640
641+import static com.persistit.util.Util.NS_PER_S;
642+
643 import com.persistit.IntegrityCheck;
644 import com.persistit.Persistit;
645 import com.persistit.Transaction.CommitPolicy;
646
647=== modified file 'src/test/java/com/persistit/unit/ExchangeTest.java'
648--- src/test/java/com/persistit/unit/ExchangeTest.java 2012-08-24 13:57:19 +0000
649+++ src/test/java/com/persistit/unit/ExchangeTest.java 2012-10-08 19:12:23 +0000
650@@ -368,4 +368,120 @@
651 assertTrue("Not enough methods were tested: " + tested, tested > 10);
652 }
653
654+ /**
655+ * Test for https://bugs.launchpad.net/akiban-persistit/+bug/1023549:
656+ *
657+ * traverse(EQ, false, 0) returns incorrect result
658+ *
659+ * This method returns true even when the tree is empty. traverse(EQ, true,
660+ * 0) returns the correct value.
661+ *
662+ * @throws Exception
663+ */
664+ @Test
665+ public void traverse_EQ_false_0__IsCorrect() throws Exception {
666+ traverseCases(false);
667+ traverseCases(true);
668+ }
669+
670+ /**
671+ * Test for https://bugs.launchpad.net/akiban-persistit/+bug/1023549:
672+ *
673+ * traverse(EQ, false, 0) returns incorrect result
674+ *
675+ * This method returns true even when the tree is empty. traverse(EQ, true,
676+ * 0) returns the correct value.
677+ *
678+ * @throws Exception
679+ */
680+ @Test
681+ public void traverse_EQ_false_0__IsCorrect_Txn() throws Exception {
682+ final Transaction txn = _persistit.getTransaction();
683+ txn.begin();
684+ traverseCases(false);
685+ txn.commit();
686+ txn.end();
687+
688+ txn.begin();
689+ traverseCases(true);
690+ txn.commit();
691+ txn.end();
692+ }
693+
694+ private void traverseCases(final boolean deep) throws Exception {
695+ final Exchange ex = _persistit.getExchange("persistit", "gogo", true);
696+
697+ ex.removeAll();
698+ ex.clear();
699+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1));
700+ ex.clear();
701+ assertEquals("Should be false", false, ex.traverse(Key.GTEQ, deep, -1));
702+ ex.clear();
703+ assertEquals("Should be false", false, ex.traverse(Key.GT, deep, -1));
704+ ex.clear();
705+ assertEquals("Should be false", false, ex.traverse(Key.LTEQ, deep, -1));
706+ ex.clear();
707+ assertEquals("Should be false", false, ex.traverse(Key.LT, deep, -1));
708+ ex.clear();
709+
710+ ex.append(1).append(2).store();
711+ ex.clear().append(Key.BEFORE);
712+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1));
713+ assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, -1));
714+ assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, -1));
715+ ex.clear().append(1);
716+ assertEquals("Should be " + !deep, !deep, ex.traverse(Key.EQ, deep, -1));
717+
718+ ex.clear().append(Key.AFTER);
719+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1));
720+ ex.clear().append(Key.AFTER);
721+ assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, -1));
722+ assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, -1));
723+
724+ ex.removeAll();
725+ ex.clear();
726+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0));
727+ keyCheck(ex, "{{before}}");
728+ ex.clear();
729+ assertEquals("Should be false", false, ex.traverse(Key.GTEQ, deep, 0));
730+ keyCheck(ex, "{{before}}");
731+ ex.clear();
732+ assertEquals("Should be false", false, ex.traverse(Key.GT, deep, 0));
733+ keyCheck(ex, "{{before}}");
734+ ex.clear();
735+ assertEquals("Should be false", false, ex.traverse(Key.LTEQ, deep, 0));
736+ keyCheck(ex, "{{after}}");
737+ ex.clear();
738+ assertEquals("Should be false", false, ex.traverse(Key.LT, deep, 0));
739+ keyCheck(ex, "{{after}}");
740+ ex.clear();
741+
742+ ex.append(1).append(2).store();
743+ ex.clear().append(Key.BEFORE);
744+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0));
745+ keyCheck(ex, "{{before}}");
746+ assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, 0));
747+ keyCheck(ex, deep ? "{1,2}" : "{1}");
748+ assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, 0));
749+ keyCheck(ex, deep ? "{1,2}" : "{1}");
750+ assertEquals("Should be true", true, ex.traverse(Key.EQ, deep, 0));
751+ keyCheck(ex, deep ? "{1,2}" : "{1}");
752+
753+ ex.clear().append(Key.AFTER);
754+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0));
755+ keyCheck(ex, "{{before}}");
756+ ex.clear().append(Key.AFTER);
757+ assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, 0));
758+ keyCheck(ex, deep ? "{1,2}" : "{1}");
759+ assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, 0));
760+ keyCheck(ex, deep ? "{1,2}" : "{1}");
761+ assertEquals("Should be true", true, ex.traverse(Key.EQ, deep, 0));
762+ keyCheck(ex, deep ? "{1,2}" : "{1}");
763+
764+ }
765+
766+ private void keyCheck(final Exchange ex, final String expected) {
767+ assertEquals("Key should be " + expected, expected, ex.getKey().toString());
768+ }
769+
770 }

Subscribers

People subscribed via source and target branches