Merge lp:~pbeaman/akiban-persistit/fix-several-small-bugs into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Peter Beaman
Approved revision: 387
Merged at revision: 377
Proposed branch: lp:~pbeaman/akiban-persistit/fix-several-small-bugs
Merge into: lp:akiban-persistit
Diff against target: 770 lines (+287/-80)
18 files modified
src/main/java/com/persistit/BufferPool.java (+6/-5)
src/main/java/com/persistit/CheckpointManager.java (+2/-2)
src/main/java/com/persistit/JournalManager.java (+3/-20)
src/main/java/com/persistit/JournalManagerBench.java (+2/-1)
src/main/java/com/persistit/Persistit.java (+53/-27)
src/main/java/com/persistit/SessionId.java (+10/-0)
src/main/java/com/persistit/Transaction.java (+0/-1)
src/main/java/com/persistit/TransactionIndexBucket.java (+5/-9)
src/main/java/com/persistit/TransactionStatus.java (+20/-0)
src/main/java/com/persistit/VolumeHeader.java (+9/-1)
src/main/java/com/persistit/logging/LogBase.java (+3/-0)
src/main/java/com/persistit/util/Util.java (+4/-0)
src/test/java/com/persistit/IOFailureTest.java (+2/-0)
src/test/java/com/persistit/TransactionTest2.java (+43/-9)
src/test/java/com/persistit/stress/AbstractStressTest.java (+3/-1)
src/test/java/com/persistit/stress/AbstractSuite.java (+4/-4)
src/test/java/com/persistit/stress/StartStop.java (+2/-0)
src/test/java/com/persistit/unit/ExchangeTest.java (+116/-0)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-several-small-bugs
Reviewer Review Type Date Requested Status
Nathan Williams Pending
Review via email: mp+128568@code.launchpad.net

This proposal supersedes a proposal from 2012-10-05.

Description of the change

Fixes several small unrelated bugs:

https://bugs.launchpad.net/akiban-persistit/+bug/1023549 - as written, is invalid, and this proposal includes a new unit test to prove it.

https://bugs.launchpad.net/akiban-persistit/+bug/1013259 - cleans up Persistit shut-down handling. Changes ensure that concurrent transactions are interrupted and closed properly. The IllegalMonitorStateException occurred because two threads were racing to close a transaction on shutdown; this condition is now eliminated. A new unit test was added to TransactionTest2 to verify correct shutdown behavior.

https://bugs.launchpad.net/akiban-persistit/+bug/1029942 - incorrect constant was used in the log statement. Constants NS_PER_S, NS_PER_MS and MS_PER_S were consolidated into the com.persistit.util.Util class.

Bug #1062315: Assertion failure in TransactionIndexBucket#allocateTransactionStatus - a small change to avoid asserting in the case of a benign race condition was added.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote : Posted in a previous version of this proposal

VolumeStorageV2#updateMetaData() - including timestamp changed in result will make meta data always "dirty" and then written. Is that what we want?

ExchangeTest#traverseEQfalse0 - The bug has has 'false' in the title and 'true' in the description. Could we add cases for 'true' as well? Additionally, the server use case probably did it inside of a transaction. It should work as is if another test method just calls the new one, but inside of a transaction block. I have no reason to suspect a failure, but simple to add.

Persistit#close() - _journalManager.stopCopier() is no longer called. Should it be? The rest of the related changes look plausible and the new test is good.

review: Needs Information
Revision history for this message
Peter Beaman (pbeaman) wrote : Posted in a previous version of this proposal

VolumeStorageV2#updateMetaData - good catch - forgot to change it back after an experiment.

ExchangeTest#traverseEQfalse0 - good catch on not testing the transaction case. Changed the names, added a transactional version, and also tests for both true and false values of the 'deep' parameter.

Persistit#close() no longer calls stopCopier - yes, there is a change. The JOURNAL_COPIER thread used to be responsible for various cleanup activities and therefore had to be stopped early. That is no longer the case, so JournalManager#close() is where the copier now gets stopped.

Revision history for this message
Nathan Williams (nwilliams) wrote : Posted in a previous version of this proposal

Thanks for the tweaks and clarifications.

review: Approve
Revision history for this message
Peter Beaman (pbeaman) wrote : Posted in a previous version of this proposal

Has a conflict. Marked Rejected. Will fix and re-approve.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Merge from trunk (now with branch fix-dynamic-volumes) and fix the conflict in JournalManager.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Approving this since the only changes are mechanical.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/main/java/com/persistit/BufferPool.java'
--- src/main/java/com/persistit/BufferPool.java 2012-09-27 18:22:25 +0000
+++ src/main/java/com/persistit/BufferPool.java 2012-10-08 19:12:23 +0000
@@ -15,6 +15,8 @@
1515
16package com.persistit;16package com.persistit;
1717
18import static com.persistit.util.Util.NS_PER_S;
19
18import java.io.DataOutputStream;20import java.io.DataOutputStream;
19import java.io.IOException;21import java.io.IOException;
20import java.nio.ByteBuffer;22import java.nio.ByteBuffer;
@@ -97,11 +99,10 @@
97 */99 */
98 private final static int INVENTORY_VERSIONS = 3;100 private final static int INVENTORY_VERSIONS = 3;
99101
100 private final static long NS_PER_SEC = 1000000000;
101 /**102 /**
102 * Preload log message interval, in seconds103 * Preload log message interval, in seconds
103 */104 */
104 private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_SEC;105 private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_S;
105106
106 /**107 /**
107 * The Persistit instance that references this BufferPool.108 * The Persistit instance that references this BufferPool.
@@ -1514,8 +1515,8 @@
1514 count++;1515 count++;
1515 final long now = System.nanoTime();1516 final long now = System.nanoTime();
1516 if (now - reportTime >= INVENTORY_PRELOAD_LOG_MESSAGE_NS) {1517 if (now - reportTime >= INVENTORY_PRELOAD_LOG_MESSAGE_NS) {
1517 _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime)1518 _persistit.getLogBase().bufferInventoryProgress
1518 / NS_PER_SEC);1519 .log(count, total, (now - reportTime) / NS_PER_S);
1519 reportTime = now;1520 reportTime = now;
1520 }1521 }
1521 if (count >= _bufferCount) {1522 if (count >= _bufferCount) {
@@ -1533,7 +1534,7 @@
1533 _persistit.getLogBase().bufferInventoryException.log(e);1534 _persistit.getLogBase().bufferInventoryException.log(e);
1534 } finally {1535 } finally {
1535 final long now = System.nanoTime();1536 final long now = System.nanoTime();
1536 _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_SEC);1537 _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_S);
1537 }1538 }
1538 }1539 }
15391540
15401541
=== modified file 'src/main/java/com/persistit/CheckpointManager.java'
--- src/main/java/com/persistit/CheckpointManager.java 2012-08-24 14:00:17 +0000
+++ src/main/java/com/persistit/CheckpointManager.java 2012-10-08 19:12:23 +0000
@@ -15,6 +15,8 @@
1515
16package com.persistit;16package com.persistit;
1717
18import static com.persistit.util.Util.NS_PER_S;
19
18import java.text.SimpleDateFormat;20import java.text.SimpleDateFormat;
19import java.util.ArrayList;21import java.util.ArrayList;
20import java.util.Date;22import java.util.Date;
@@ -86,8 +88,6 @@
86 }88 }
87 }89 }
8890
89 private final static long NS_PER_S = 1000000000L;
90
91 /**91 /**
92 * Default interval in nanoseconds between checkpoints - two minutes.92 * Default interval in nanoseconds between checkpoints - two minutes.
93 */93 */
9494
=== modified file 'src/main/java/com/persistit/JournalManager.java'
--- src/main/java/com/persistit/JournalManager.java 2012-10-03 18:19:49 +0000
+++ src/main/java/com/persistit/JournalManager.java 2012-10-08 19:12:23 +0000
@@ -19,6 +19,7 @@
19import static com.persistit.util.SequencerConstants.PAGE_MAP_READ_INVALIDATE_A;19import static com.persistit.util.SequencerConstants.PAGE_MAP_READ_INVALIDATE_A;
20import static com.persistit.util.SequencerConstants.RECOVERY_PRUNING_B;20import static com.persistit.util.SequencerConstants.RECOVERY_PRUNING_B;
21import static com.persistit.util.ThreadSequencer.sequence;21import static com.persistit.util.ThreadSequencer.sequence;
22import static com.persistit.util.Util.NS_PER_MS;
2223
23import java.io.File;24import java.io.File;
24import java.io.IOException;25import java.io.IOException;
@@ -79,7 +80,6 @@
79 final static int HALF_URGENT = 5;80 final static int HALF_URGENT = 5;
80 final static int URGENT_COMMIT_DELAY_MILLIS = 50;81 final static int URGENT_COMMIT_DELAY_MILLIS = 50;
81 final static int GENTLE_COMMIT_DELAY_MILLIS = 12;82 final static int GENTLE_COMMIT_DELAY_MILLIS = 12;
82 private final static long NS_PER_MS = 1000000L;
83 private final static int IO_MEASUREMENT_CYCLES = 8;83 private final static int IO_MEASUREMENT_CYCLES = 8;
84 private final static int TOO_MANY_WARN_THRESHOLD = 5;84 private final static int TOO_MANY_WARN_THRESHOLD = 5;
85 private final static int TOO_MANY_ERROR_THRESHOLD = 10;85 private final static int TOO_MANY_ERROR_THRESHOLD = 10;
@@ -1271,11 +1271,6 @@
1271 return address % _blockSize;1271 return address % _blockSize;
1272 }1272 }
12731273
1274 void stopCopier() {
1275 _copier.setShouldStop(true);
1276 _persistit.waitForIOTaskStop(_copier);
1277 }
1278
1279 void setWriteBufferSize(final int size) {1274 void setWriteBufferSize(final int size) {
1280 if (size < MINIMUM_BUFFER_SIZE || size > MAXIMUM_BUFFER_SIZE) {1275 if (size < MINIMUM_BUFFER_SIZE || size > MAXIMUM_BUFFER_SIZE) {
1281 throw new IllegalArgumentException("Invalid write buffer size: " + size);1276 throw new IllegalArgumentException("Invalid write buffer size: " + size);
@@ -1284,11 +1279,7 @@
1284 }1279 }
12851280
1286 public void close() throws PersistitException {1281 public void close() throws PersistitException {
12871282 _closed.set(true);
1288 synchronized (this) {
1289 _closed.set(true);
1290 }
1291
1292 rollover();1283 rollover();
12931284
1294 final JournalCopier copier = _copier;1285 final JournalCopier copier = _copier;
@@ -2456,10 +2447,7 @@
2456 int handle = -1;2447 int handle = -1;
24572448
2458 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {2449 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
2459 if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {2450
2460 list.clear();
2461 break;
2462 }
2463 final PageNode pageNode = iterator.next();2451 final PageNode pageNode = iterator.next();
2464 if (pageNode.isInvalid()) {2452 if (pageNode.isInvalid()) {
2465 iterator.remove();2453 iterator.remove();
@@ -2520,11 +2508,6 @@
2520 final Set<Volume> volumes = new HashSet<Volume>();2508 final Set<Volume> volumes = new HashSet<Volume>();
25212509
2522 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {2510 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
2523 if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {
2524 list.clear();
2525 break;
2526 }
2527
2528 final PageNode pageNode = iterator.next();2511 final PageNode pageNode = iterator.next();
25292512
2530 if (pageNode.getVolumeHandle() != handle) {2513 if (pageNode.getVolumeHandle() != handle) {
25312514
=== modified file 'src/main/java/com/persistit/JournalManagerBench.java'
--- src/main/java/com/persistit/JournalManagerBench.java 2012-09-12 20:36:27 +0000
+++ src/main/java/com/persistit/JournalManagerBench.java 2012-10-08 19:12:23 +0000
@@ -15,6 +15,8 @@
1515
16package com.persistit;16package com.persistit;
1717
18import static com.persistit.util.Util.NS_PER_S;
19
18import java.io.File;20import java.io.File;
19import java.io.RandomAccessFile;21import java.io.RandomAccessFile;
20import java.nio.ByteBuffer;22import java.nio.ByteBuffer;
@@ -44,7 +46,6 @@
44 */46 */
45public class JournalManagerBench {47public class JournalManagerBench {
4648
47 private final long NS_PER_S = 1000000000L;
48 private final byte[] NULLS = new byte[65536];49 private final byte[] NULLS = new byte[65536];
4950
50 private final String[] ARG_TEMPLATE = new String[] { "duration|int:10:10:86400|Duration of test in seconds",51 private final String[] ARG_TEMPLATE = new String[] { "duration|int:10:10:86400|Duration of test in seconds",
5152
=== modified file 'src/main/java/com/persistit/Persistit.java'
--- src/main/java/com/persistit/Persistit.java 2012-10-03 16:04:16 +0000
+++ src/main/java/com/persistit/Persistit.java 2012-10-08 19:12:23 +0000
@@ -15,6 +15,8 @@
1515
16package com.persistit;16package com.persistit;
1717
18import static com.persistit.util.Util.NS_PER_S;
19
18import java.io.BufferedReader;20import java.io.BufferedReader;
19import java.io.File;21import java.io.File;
20import java.io.FileReader;22import java.io.FileReader;
@@ -1514,7 +1516,7 @@
1514 return _bufferPoolTable;1516 return _bufferPoolTable;
1515 }1517 }
15161518
1517 public void cleanup() {1519 void cleanup() {
1518 final Set<SessionId> sessionIds;1520 final Set<SessionId> sessionIds;
1519 synchronized (_transactionSessionMap) {1521 synchronized (_transactionSessionMap) {
1520 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());1522 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
@@ -1659,12 +1661,8 @@
1659 }1661 }
1660 recordBufferPoolInventory();1662 recordBufferPoolInventory();
16611663
1662 /*1664 _cleanupManager.close(flush);
1663 * The copier is responsible for background pruning of aborted1665 waitForIOTaskStop(_cleanupManager);
1664 * transactions. Halt it so Transaction#close() can be called
1665 * without being concerned about its state changing.
1666 */
1667 _journalManager.stopCopier();
16681666
1669 getTransaction().close();1667 getTransaction().close();
1670 cleanup();1668 cleanup();
@@ -1681,9 +1679,6 @@
1681 }1679 }
1682 }1680 }
16831681
1684 _cleanupManager.close(flush);
1685 waitForIOTaskStop(_cleanupManager);
1686
1687 _checkpointManager.close(flush);1682 _checkpointManager.close(flush);
1688 waitForIOTaskStop(_checkpointManager);1683 waitForIOTaskStop(_checkpointManager);
16891684
@@ -1693,26 +1688,13 @@
1693 pool.close();1688 pool.close();
1694 }1689 }
16951690
1696 /*
1697 * Close (and abort) all remaining transactions.
1698 */
1699 Set<Transaction> transactions;
1700 synchronized (_transactionSessionMap) {
1701 transactions = new HashSet<Transaction>(_transactionSessionMap.values());
1702 _transactionSessionMap.clear();
1703 }
1704 for (final Transaction txn : transactions) {
1705 try {
1706 txn.close();
1707 } catch (final PersistitException e) {
1708 _logBase.exception.log(e);
1709 }
1710 }
1711
1712 _journalManager.close();1691 _journalManager.close();
1713 final IOTaskRunnable task = _transactionIndex.close();1692 final IOTaskRunnable task = _transactionIndex.close();
1714 waitForIOTaskStop(task);1693 waitForIOTaskStop(task);
17151694
1695 interruptActiveThreads(SHORT_DELAY);
1696 closeZombieTransactions();
1697
1716 for (final Volume volume : volumes) {1698 for (final Volume volume : volumes) {
1717 volume.close();1699 volume.close();
1718 }1700 }
@@ -1730,6 +1712,50 @@
1730 releaseAllResources();1712 releaseAllResources();
1731 }1713 }
17321714
1715 private void closeZombieTransactions() {
1716 final Set<SessionId> sessionIds;
1717 synchronized (_transactionSessionMap) {
1718 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
1719 }
1720 for (final SessionId sessionId : sessionIds) {
1721 Transaction transaction = null;
1722 synchronized (_transactionSessionMap) {
1723 transaction = _transactionSessionMap.remove(sessionId);
1724 }
1725 if (!sessionId.isAlive()) {
1726 if (transaction != null) {
1727 try {
1728 transaction.close();
1729 } catch (final Exception e) {
1730 _logBase.exception.log(e);
1731 }
1732 }
1733 }
1734 }
1735 }
1736
1737 private void interruptActiveThreads(final long timeout) throws PersistitInterruptedException {
1738 final long expires = System.currentTimeMillis() + timeout;
1739 boolean remaining = false;
1740 do {
1741 final Set<SessionId> sessionIds;
1742 synchronized (_transactionSessionMap) {
1743 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
1744 }
1745 for (final SessionId sessionId : sessionIds) {
1746 if (sessionId.isAlive()) {
1747 if (sessionId.interrupt()) {
1748 _logBase.interruptedAtClose.log(sessionId.ownerName());
1749 }
1750 remaining = true;
1751 }
1752 }
1753 if (remaining) {
1754 Util.spinSleep();
1755 }
1756 } while (remaining && System.currentTimeMillis() < expires);
1757 }
1758
1733 /**1759 /**
1734 * Abruptly stop (using {@link Thread#stop()}) the writer and collector1760 * Abruptly stop (using {@link Thread#stop()}) the writer and collector
1735 * processes. This method should be used only by tests.1761 * processes. This method should be used only by tests.
@@ -1885,7 +1911,7 @@
1885 }1911 }
1886 final long now = System.currentTimeMillis();1912 final long now = System.currentTimeMillis();
1887 if (now > _nextCloseTime) {1913 if (now > _nextCloseTime) {
1888 _logBase.waitForClose.log((_nextCloseTime - _beginCloseTime) / 1000);1914 _logBase.waitForClose.log((_nextCloseTime - _beginCloseTime) / NS_PER_S);
1889 _nextCloseTime += CLOSE_LOG_INTERVAL;1915 _nextCloseTime += CLOSE_LOG_INTERVAL;
1890 }1916 }
1891 }1917 }
18921918
=== modified file 'src/main/java/com/persistit/SessionId.java'
--- src/main/java/com/persistit/SessionId.java 2012-09-26 13:31:13 +0000
+++ src/main/java/com/persistit/SessionId.java 2012-10-08 19:12:23 +0000
@@ -77,6 +77,16 @@
77 _owner.set(Thread.currentThread());77 _owner.set(Thread.currentThread());
78 }78 }
7979
80 boolean interrupt() {
81 final Thread t = _owner.get();
82 if (t != null && t != Thread.currentThread()) {
83 t.interrupt();
84 return true;
85 } else {
86 return false;
87 }
88 }
89
80 public String ownerName() {90 public String ownerName() {
81 final Thread t = _owner.get();91 final Thread t = _owner.get();
82 if (t == null) {92 if (t == null) {
8393
=== modified file 'src/main/java/com/persistit/Transaction.java'
--- src/main/java/com/persistit/Transaction.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/Transaction.java 2012-10-08 19:12:23 +0000
@@ -725,7 +725,6 @@
725 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,725 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,
726 _persistit.getTimestampAllocator().getCurrentTimestamp());726 _persistit.getTimestampAllocator().getCurrentTimestamp());
727 _rollbackCompleted = true;727 _rollbackCompleted = true;
728
729 }728 }
730 }729 }
731 }730 }
732731
=== modified file 'src/main/java/com/persistit/TransactionIndexBucket.java'
--- src/main/java/com/persistit/TransactionIndexBucket.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/TransactionIndexBucket.java 2012-10-08 19:12:23 +0000
@@ -170,10 +170,13 @@
170 _lock.unlock();170 _lock.unlock();
171 }171 }
172172
173 TransactionStatus allocateTransactionStatus() {173 TransactionStatus allocateTransactionStatus() throws InterruptedException {
174 assert _lock.isHeldByCurrentThread();174 assert _lock.isHeldByCurrentThread();
175 final TransactionStatus status = _free;175 final TransactionStatus status = _free;
176 if (status != null) {176 if (status != null) {
177 if (status.isLocked()) {
178 status.briefLock();
179 }
177 assert !status.isLocked();180 assert !status.isLocked();
178 _free = status.getNext();181 _free = status.getNext();
179 _freeCount--;182 _freeCount--;
@@ -569,14 +572,7 @@
569 }572 }
570 }573 }
571 } else if (tc < 0 && tc != ABORTED && -tc < timestamp) {574 } else if (tc < 0 && tc != ABORTED && -tc < timestamp) {
572 boolean locked = false;575 status.briefLock();
573 try {
574 locked = status.wwLock(TransactionIndex.SHORT_TIMEOUT);
575 } finally {
576 if (locked) {
577 status.wwUnlock();
578 }
579 }
580 _transactionIndex.incrementAccumulatorSnapshotRetryCounter();576 _transactionIndex.incrementAccumulatorSnapshotRetryCounter();
581 throw RetryException.SINGLE;577 throw RetryException.SINGLE;
582 }578 }
583579
=== modified file 'src/main/java/com/persistit/TransactionStatus.java'
--- src/main/java/com/persistit/TransactionStatus.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/TransactionStatus.java 2012-10-08 19:12:23 +0000
@@ -327,6 +327,26 @@
327 }327 }
328328
329 /**329 /**
330 * Block briefly until another thread transiently holding the wwLock
331 * vacates. Times out and returns after
332 * {@value TransactionIndex#SHORT_TIMEOUT} milliseconds.
333 *
334 * @throws InterruptedException
335 */
336 void briefLock() throws InterruptedException {
337 boolean locked = false;
338 try {
339 locked = wwLock(TransactionIndex.SHORT_TIMEOUT);
340 } catch (final InterruptedException ie) {
341 Thread.currentThread().interrupt();
342 } finally {
343 if (locked) {
344 wwUnlock();
345 }
346 }
347 }
348
349 /**
330 * <p>350 * <p>
331 * Acquire a lock on this TransactionStatus. This supports the351 * Acquire a lock on this TransactionStatus. This supports the
332 * {@link TransactionIndex#wwDependency(long, long, long)} method. While a352 * {@link TransactionIndex#wwDependency(long, long, long)} method. While a
333353
=== modified file 'src/main/java/com/persistit/VolumeHeader.java'
--- src/main/java/com/persistit/VolumeHeader.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/VolumeHeader.java 2012-10-08 19:12:23 +0000
@@ -287,11 +287,12 @@
287 */287 */
288 public static boolean verifyVolumeHeader(final VolumeSpecification specification, final long systemTimestamp)288 public static boolean verifyVolumeHeader(final VolumeSpecification specification, final long systemTimestamp)
289 throws CorruptVolumeException, InvalidVolumeSpecificationException, PersistitIOException {289 throws CorruptVolumeException, InvalidVolumeSpecificationException, PersistitIOException {
290 FileInputStream stream = null;
290 try {291 try {
291 final File file = new File(specification.getPath());292 final File file = new File(specification.getPath());
292 if (file.exists()) {293 if (file.exists()) {
293 if (file.isFile()) {294 if (file.isFile()) {
294 final FileInputStream stream = new FileInputStream(file);295 stream = new FileInputStream(file);
295 final byte[] bytes = new byte[SIZE];296 final byte[] bytes = new byte[SIZE];
296 final int readSize = stream.read(bytes);297 final int readSize = stream.read(bytes);
297 if (readSize < SIZE) {298 if (readSize < SIZE) {
@@ -336,6 +337,13 @@
336 }337 }
337 } catch (final IOException ioe) {338 } catch (final IOException ioe) {
338 throw new PersistitIOException(ioe);339 throw new PersistitIOException(ioe);
340 } finally {
341 if (stream != null) {
342 try {
343 stream.close();
344 } catch (final IOException e) {
345 }
346 }
339 }347 }
340 }348 }
341349
342350
=== modified file 'src/main/java/com/persistit/logging/LogBase.java'
--- src/main/java/com/persistit/logging/LogBase.java 2012-09-06 20:48:31 +0000
+++ src/main/java/com/persistit/logging/LogBase.java 2012-10-08 19:12:23 +0000
@@ -253,6 +253,9 @@
253 @Message("WARNING|Exception while writing buffer pool inventory %s")253 @Message("WARNING|Exception while writing buffer pool inventory %s")
254 public final LogItem bufferInventoryException = PersistitLogMessage.empty();254 public final LogItem bufferInventoryException = PersistitLogMessage.empty();
255255
256 @Message("WARNING|Thread %s interrupted due to shutdown")
257 public final LogItem interruptedAtClose = PersistitLogMessage.empty();
258
256 public static String recurring(final String message, final int count, final long duration) {259 public static String recurring(final String message, final int count, final long duration) {
257 return String.format(RECURRING, message, count, duration);260 return String.format(RECURRING, message, count, duration);
258 }261 }
259262
=== modified file 'src/main/java/com/persistit/util/Util.java'
--- src/main/java/com/persistit/util/Util.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/util/Util.java 2012-10-08 19:12:23 +0000
@@ -38,6 +38,10 @@
38 public final static String NEW_LINE = System.getProperty("line.separator");38 public final static String NEW_LINE = System.getProperty("line.separator");
39 private final static String REGEX_QUOTE = "^$*+?()[].";39 private final static String REGEX_QUOTE = "^$*+?()[].";
4040
41 public final static long NS_PER_S = 1000000000L;
42 public final static long MS_PER_S = 1000L;
43 public final static long NS_PER_MS = 1000000L;
44
41 public final static char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D',45 public final static char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D',
42 'E', 'F' };46 'E', 'F' };
4347
4448
=== modified file 'src/test/java/com/persistit/IOFailureTest.java'
--- src/test/java/com/persistit/IOFailureTest.java 2012-08-24 14:00:17 +0000
+++ src/test/java/com/persistit/IOFailureTest.java 2012-10-08 19:12:23 +0000
@@ -308,6 +308,8 @@
308 final long size0 = channel0.size();308 final long size0 = channel0.size();
309 channel0.truncate(100);309 channel0.truncate(100);
310310
311 channel0.close();
312
311 final File file1 = jman.addressToFile(currentAddress);313 final File file1 = jman.addressToFile(currentAddress);
312 final FileChannel channel1 = new RandomAccessFile(file1, "rw").getChannel();314 final FileChannel channel1 = new RandomAccessFile(file1, "rw").getChannel();
313 final long size1 = channel1.size();315 final long size1 = channel1.size();
314316
=== modified file 'src/test/java/com/persistit/TransactionTest2.java'
--- src/test/java/com/persistit/TransactionTest2.java 2012-08-24 13:57:19 +0000
+++ src/test/java/com/persistit/TransactionTest2.java 2012-10-08 19:12:23 +0000
@@ -29,6 +29,7 @@
29import org.junit.Test;29import org.junit.Test;
3030
31import com.persistit.Transaction.CommitPolicy;31import com.persistit.Transaction.CommitPolicy;
32import com.persistit.exception.PersistitClosedException;
32import com.persistit.exception.PersistitException;33import com.persistit.exception.PersistitException;
33import com.persistit.exception.PersistitIOException;34import com.persistit.exception.PersistitIOException;
34import com.persistit.exception.PersistitInterruptedException;35import com.persistit.exception.PersistitInterruptedException;
@@ -47,15 +48,17 @@
4748
48 final static CommitPolicy policy = CommitPolicy.SOFT;49 final static CommitPolicy policy = CommitPolicy.SOFT;
4950
50 final static long TIMEOUT = 20000; // 20 seconds51 final static long TIMEOUT = 10000; // 10 seconds
52 final static int ITERATIONS_PER_THREAD = 25000;
5153
52 static int _threadCount = 8;54 static int _threadCount = 8;
53 static int _iterationsPerThread = 25000;55 static int _iterationsPerThread = ITERATIONS_PER_THREAD;
54 static int _accounts = 5000;56 static int _accounts = 5000;
5557
56 static AtomicInteger _retriedTransactionCount = new AtomicInteger();58 static AtomicInteger _retriedTransactionCount = new AtomicInteger();
57 static AtomicInteger _completedTransactionCount = new AtomicInteger();59 static AtomicInteger _completedTransactionCount = new AtomicInteger();
58 static AtomicInteger _failedTransactionCount = new AtomicInteger();60 static AtomicInteger _failedTransactionCount = new AtomicInteger();
61 static AtomicInteger _strandedThreads = new AtomicInteger();
5962
60 static int _threadCounter = 0;63 static int _threadCounter = 0;
6164
@@ -118,7 +121,7 @@
118 threadArray[index] = new Thread(new Runnable() {121 threadArray[index] = new Thread(new Runnable() {
119 @Override122 @Override
120 public void run() {123 public void run() {
121 runIt();124 runIt(_iterationsPerThread);
122 }125 }
123 }, "TransactionThread_" + index);126 }, "TransactionThread_" + index);
124127
@@ -182,7 +185,7 @@
182 final Thread thread = new Thread(new Runnable() {185 final Thread thread = new Thread(new Runnable() {
183 @Override186 @Override
184 public void run() {187 public void run() {
185 runIt();188 runIt(Integer.MAX_VALUE);
186 }189 }
187 }, "TransactionThread_" + ++index);190 }, "TransactionThread_" + ++index);
188 threads.add(thread);191 threads.add(thread);
@@ -238,19 +241,44 @@
238 txn.begin();241 txn.begin();
239 }242 }
240243
241 public void runIt() {244 @Test
245 public void transactionsConcurrentWithPersistitClose() throws Exception {
246 new Thread(new Runnable() {
247 @Override
248 public void run() {
249 final Thread[] threadArray = new Thread[_threadCount];
250 for (int index = 0; index < _threadCount; index++) {
251 threadArray[index] = new Thread(new Runnable() {
252 @Override
253 public void run() {
254 runIt(Integer.MAX_VALUE);
255 }
256 }, "TransactionThread_" + index);
257 }
258 for (int index = 0; index < _threadCount; index++) {
259 threadArray[index].start();
260 }
261 }
262 }).start();
263 /*
264 * Let the threads crank up
265 */
266 Thread.sleep(1000);
267 _persistit.close();
268 assertEquals("All threads should have exited correctly", 0, _strandedThreads.get());
269 }
270
271 public void runIt(final int limit) {
272 _strandedThreads.incrementAndGet();
242 try {273 try {
243 final Exchange accountEx = _persistit.getExchange("persistit", "account", true);274 final Exchange accountEx = _persistit.getExchange("persistit", "account", true);
244 //275 //
245 final Random random = new Random();276 final Random random = new Random();
246 for (int iterations = 1; iterations <= _iterationsPerThread; iterations++) {277 for (int iterations = 1; iterations <= limit; iterations++) {
247 final int accountNo1 = random.nextInt(_accounts);278 final int accountNo1 = random.nextInt(_accounts);
248 // int accountNo2 = random.nextInt(_accounts - 1);
249 // if (accountNo2 == accountNo1) accountNo2++;
250 final int accountNo2 = random.nextInt(_accounts);279 final int accountNo2 = random.nextInt(_accounts);
251280
252 final int delta = random.nextInt(10000);281 final int delta = random.nextInt(10000);
253 // final int delta = 1;
254282
255 transfer(accountEx, accountNo1, accountNo2, delta);283 transfer(accountEx, accountNo1, accountNo2, delta);
256 _completedTransactionCount.incrementAndGet();284 _completedTransactionCount.incrementAndGet();
@@ -260,10 +288,16 @@
260 System.out.flush();288 System.out.flush();
261 }289 }
262 }290 }
291 _strandedThreads.decrementAndGet();
263 } catch (final PersistitInterruptedException exception) {292 } catch (final PersistitInterruptedException exception) {
293 _strandedThreads.decrementAndGet();
294 // expected
295 } catch (final PersistitClosedException exception) {
296 _strandedThreads.decrementAndGet();
264 // expected297 // expected
265 } catch (final PersistitIOException exception) {298 } catch (final PersistitIOException exception) {
266 if (InterruptedIOException.class.equals(exception.getCause().getClass())) {299 if (InterruptedIOException.class.equals(exception.getCause().getClass())) {
300 _strandedThreads.decrementAndGet();
267 // expected301 // expected
268 } else {302 } else {
269 exception.printStackTrace();303 exception.printStackTrace();
270304
=== modified file 'src/test/java/com/persistit/stress/AbstractStressTest.java'
--- src/test/java/com/persistit/stress/AbstractStressTest.java 2012-08-24 13:57:19 +0000
+++ src/test/java/com/persistit/stress/AbstractStressTest.java 2012-10-08 19:12:23 +0000
@@ -15,6 +15,8 @@
1515
16package com.persistit.stress;16package com.persistit.stress;
1717
18import static com.persistit.util.Util.NS_PER_S;
19
18import com.persistit.Persistit;20import com.persistit.Persistit;
1921
20/**22/**
@@ -128,7 +130,7 @@
128 _result = new TestResult(false, throwable);130 _result = new TestResult(false, throwable);
129 }131 }
130 forceStop();132 forceStop();
131 final long elapsed = (System.nanoTime() - _startTime) / AbstractSuite.NS_PER_S;133 final long elapsed = (System.nanoTime() - _startTime) / NS_PER_S;
132 System.err.printf("\n%s at %,d seconds: %s\n", this, elapsed, _result);134 System.err.printf("\n%s at %,d seconds: %s\n", this, elapsed, _result);
133 }135 }
134136
135137
=== modified file 'src/test/java/com/persistit/stress/AbstractSuite.java'
--- src/test/java/com/persistit/stress/AbstractSuite.java 2012-08-24 14:00:17 +0000
+++ src/test/java/com/persistit/stress/AbstractSuite.java 2012-10-08 19:12:23 +0000
@@ -15,6 +15,10 @@
1515
16package com.persistit.stress;16package com.persistit.stress;
1717
18import static com.persistit.util.Util.MS_PER_S;
19import static com.persistit.util.Util.NS_PER_MS;
20import static com.persistit.util.Util.NS_PER_S;
21
18import java.io.File;22import java.io.File;
19import java.io.FileWriter;23import java.io.FileWriter;
20import java.io.IOException;24import java.io.IOException;
@@ -42,10 +46,6 @@
4246
43 protected final static long PROGRESS_LOG_INTERVAL = 600000;47 protected final static long PROGRESS_LOG_INTERVAL = 600000;
4448
45 protected final static long NS_PER_MS = 1000000;
46 protected final static long MS_PER_S = 1000;
47 protected final static long NS_PER_S = NS_PER_MS * MS_PER_S;
48
49 private long _nextReport;49 private long _nextReport;
50 private long _accumulatedWork;50 private long _accumulatedWork;
5151
5252
=== modified file 'src/test/java/com/persistit/stress/StartStop.java'
--- src/test/java/com/persistit/stress/StartStop.java 2012-08-24 13:57:19 +0000
+++ src/test/java/com/persistit/stress/StartStop.java 2012-10-08 19:12:23 +0000
@@ -15,6 +15,8 @@
1515
16package com.persistit.stress;16package com.persistit.stress;
1717
18import static com.persistit.util.Util.NS_PER_S;
19
18import com.persistit.IntegrityCheck;20import com.persistit.IntegrityCheck;
19import com.persistit.Persistit;21import com.persistit.Persistit;
20import com.persistit.Transaction.CommitPolicy;22import com.persistit.Transaction.CommitPolicy;
2123
=== modified file 'src/test/java/com/persistit/unit/ExchangeTest.java'
--- src/test/java/com/persistit/unit/ExchangeTest.java 2012-08-24 13:57:19 +0000
+++ src/test/java/com/persistit/unit/ExchangeTest.java 2012-10-08 19:12:23 +0000
@@ -368,4 +368,120 @@
368 assertTrue("Not enough methods were tested: " + tested, tested > 10);368 assertTrue("Not enough methods were tested: " + tested, tested > 10);
369 }369 }
370370
371 /**
372 * Test for https://bugs.launchpad.net/akiban-persistit/+bug/1023549:
373 *
374 * traverse(EQ, false, 0) returns incorrect result
375 *
376 * This method returns true even when the tree is empty. traverse(EQ, true,
377 * 0) returns the correct value.
378 *
379 * @throws Exception
380 */
381 @Test
382 public void traverse_EQ_false_0__IsCorrect() throws Exception {
383 traverseCases(false);
384 traverseCases(true);
385 }
386
387 /**
388 * Test for https://bugs.launchpad.net/akiban-persistit/+bug/1023549:
389 *
390 * traverse(EQ, false, 0) returns incorrect result
391 *
392 * This method returns true even when the tree is empty. traverse(EQ, true,
393 * 0) returns the correct value.
394 *
395 * @throws Exception
396 */
397 @Test
398 public void traverse_EQ_false_0__IsCorrect_Txn() throws Exception {
399 final Transaction txn = _persistit.getTransaction();
400 txn.begin();
401 traverseCases(false);
402 txn.commit();
403 txn.end();
404
405 txn.begin();
406 traverseCases(true);
407 txn.commit();
408 txn.end();
409 }
410
411 private void traverseCases(final boolean deep) throws Exception {
412 final Exchange ex = _persistit.getExchange("persistit", "gogo", true);
413
414 ex.removeAll();
415 ex.clear();
416 assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1));
417 ex.clear();
418 assertEquals("Should be false", false, ex.traverse(Key.GTEQ, deep, -1));
419 ex.clear();
420 assertEquals("Should be false", false, ex.traverse(Key.GT, deep, -1));
421 ex.clear();
422 assertEquals("Should be false", false, ex.traverse(Key.LTEQ, deep, -1));
423 ex.clear();
424 assertEquals("Should be false", false, ex.traverse(Key.LT, deep, -1));
425 ex.clear();
426
427 ex.append(1).append(2).store();
428 ex.clear().append(Key.BEFORE);
429 assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1));
430 assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, -1));
431 assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, -1));
432 ex.clear().append(1);
433 assertEquals("Should be " + !deep, !deep, ex.traverse(Key.EQ, deep, -1));
434
435 ex.clear().append(Key.AFTER);
436 assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1));
437 ex.clear().append(Key.AFTER);
438 assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, -1));
439 assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, -1));
440
441 ex.removeAll();
442 ex.clear();
443 assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0));
444 keyCheck(ex, "{{before}}");
445 ex.clear();
446 assertEquals("Should be false", false, ex.traverse(Key.GTEQ, deep, 0));
447 keyCheck(ex, "{{before}}");
448 ex.clear();
449 assertEquals("Should be false", false, ex.traverse(Key.GT, deep, 0));
450 keyCheck(ex, "{{before}}");
451 ex.clear();
452 assertEquals("Should be false", false, ex.traverse(Key.LTEQ, deep, 0));
453 keyCheck(ex, "{{after}}");
454 ex.clear();
455 assertEquals("Should be false", false, ex.traverse(Key.LT, deep, 0));
456 keyCheck(ex, "{{after}}");
457 ex.clear();
458
459 ex.append(1).append(2).store();
460 ex.clear().append(Key.BEFORE);
461 assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0));
462 keyCheck(ex, "{{before}}");
463 assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, 0));
464 keyCheck(ex, deep ? "{1,2}" : "{1}");
465 assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, 0));
466 keyCheck(ex, deep ? "{1,2}" : "{1}");
467 assertEquals("Should be true", true, ex.traverse(Key.EQ, deep, 0));
468 keyCheck(ex, deep ? "{1,2}" : "{1}");
469
470 ex.clear().append(Key.AFTER);
471 assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0));
472 keyCheck(ex, "{{before}}");
473 ex.clear().append(Key.AFTER);
474 assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, 0));
475 keyCheck(ex, deep ? "{1,2}" : "{1}");
476 assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, 0));
477 keyCheck(ex, deep ? "{1,2}" : "{1}");
478 assertEquals("Should be true", true, ex.traverse(Key.EQ, deep, 0));
479 keyCheck(ex, deep ? "{1,2}" : "{1}");
480
481 }
482
483 private void keyCheck(final Exchange ex, final String expected) {
484 assertEquals("Key should be " + expected, expected, ex.getKey().toString());
485 }
486
371}487}

Subscribers

People subscribed via source and target branches