Merge lp:~pbeaman/akiban-persistit/transaction-tree-management into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Merged at revision: 430
Proposed branch: lp:~pbeaman/akiban-persistit/transaction-tree-management
Merge into: lp:akiban-persistit
Diff against target: 3507 lines (+1893/-414)
37 files modified
pom.xml (+1/-1)
src/main/java/com/persistit/Buffer.java (+1/-2)
src/main/java/com/persistit/CheckpointManager.java (+1/-0)
src/main/java/com/persistit/ClassIndex.java (+12/-1)
src/main/java/com/persistit/Exchange.java (+60/-38)
src/main/java/com/persistit/JournalManager.java (+5/-0)
src/main/java/com/persistit/MVV.java (+1/-1)
src/main/java/com/persistit/Persistit.java (+93/-43)
src/main/java/com/persistit/RecoveryManager.java (+27/-0)
src/main/java/com/persistit/SharedResource.java (+3/-3)
src/main/java/com/persistit/TimelyResource.java (+505/-0)
src/main/java/com/persistit/TransactionPlayer.java (+107/-68)
src/main/java/com/persistit/Tree.java (+165/-46)
src/main/java/com/persistit/ValueHelper.java (+12/-0)
src/main/java/com/persistit/Version.java (+69/-0)
src/main/java/com/persistit/Volume.java (+4/-1)
src/main/java/com/persistit/VolumeStructure.java (+47/-54)
src/main/java/com/persistit/logging/LogBase.java (+3/-0)
src/main/java/com/persistit/util/SequencerConstants.java (+1/-12)
src/test/java/com/persistit/AccumulatorRecoveryTest.java (+5/-0)
src/test/java/com/persistit/AccumulatorTest.java (+1/-1)
src/test/java/com/persistit/Bug1018526Test.java (+2/-2)
src/test/java/com/persistit/Bug920754Test.java (+2/-1)
src/test/java/com/persistit/Bug932097Test.java (+1/-1)
src/test/java/com/persistit/CorruptVolumeTest.java (+2/-1)
src/test/java/com/persistit/IOFailureTest.java (+4/-1)
src/test/java/com/persistit/IntegrityCheckTest.java (+2/-0)
src/test/java/com/persistit/JournalManagerTest.java (+12/-0)
src/test/java/com/persistit/MVCCPruneBufferTest.java (+9/-4)
src/test/java/com/persistit/PersistitUnitTestCase.java (+11/-0)
src/test/java/com/persistit/RecoveryTest.java (+27/-19)
src/test/java/com/persistit/SplitPolicyTest.java (+1/-1)
src/test/java/com/persistit/TimelyResourceTest.java (+290/-0)
src/test/java/com/persistit/TreeLifetimeTest.java (+26/-88)
src/test/java/com/persistit/TreeTransactionalLifetimeTest.java (+277/-0)
src/test/java/com/persistit/unit/ConcurrentUtil.java (+103/-24)
src/test/java/com/persistit/unit/TransactionTest1.java (+1/-1)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/transaction-tree-management
Reviewer Review Type Date Requested Status
Nathan Williams Needs Fixing
Review via email: mp+145415@code.launchpad.net

Description of the change

This branch makes the creation and removal of Tree instances transactional. That is, a Tree created within a transaction is not visible outside of that transaction until it commits (and then only to transactions that start after the creator's commit timestamp), and a Tree removed within a transaction remains visible to concurrent transactions; its removal is visible only to transactions that start after the removing transaction commits. Attempts by concurrent transactions to alter the existence of a Tree (either to create or remove it) result in write-write conflicts so that only one such transaction performing a tree creation or deletion can commit.

These changes will support DDL operations on Akiban Server. For example, a new index can be created inside of a transaction; the entire Tree becomes visible only when the transaction commits. Further, since the entire Tree would be removed in the event the transaction rolls back, the updates within the tree are made without writing MVVs that need to be pruned.

Similarly, the removal of an index by removing its Tree within a transaction will be visible only to transactions that start after the removal commits; concurrent transactions continue to see the tree.

To accommodate these behaviors, removing a tree within a transaction does not immediately add the tree's physical pages to the garbage chain. Instead, a version is recorded in memory, and when that version is eventually pruned the physical storage of the tree is removed.

The unit test TreeTransactionalManagementTest demonstrates some of the new behaviors.

The changes in the proposal are widespread because of many side effects. The following is a guide. I expect review questions and will expand upon areas that are unclear.
----

The largest body of new code is the TimelyResource class. It provides a general capability of maintaining a versioned history of a cached resource. It is intended as a mechanism within Persistit that can manage any kind of versioned object, where the object represents state created within a transaction. For example, a TimelyResource could be used to house versions of metadata (the AIS and/or schema) so that concurrent transactions can acquire and use the correct version. For this proposal, it is used to hold Tree.TreeVersion instances.

A TimelyResource manages Version instances, and TimelyResource is generic with respect to a particular Version implementation class. Each TimelyResource is added to a set maintained by the root Persistit instance, and these are polled periodically by the CleanupManager to perform pruning. If a Version maintained by a TimelyResource implements the subinterface PrunableVersion, then prune() method is called for any Version which has become obsolete according to our standard MVCC notion of liveness (i.e., no other active transaction needs the Version for a valid snapshot). The semantics of TimelyResource are documented in its JavaDoc, hopefully accurately and clearly.

Each Tree now uses one TimelyResource instance to manage its transactional state. A Tree created within a transaction has a Tree.TreeVersion instance within the TimelyResource. That version contains most of the state that was formerly held by the Tree object itself. Removing a Tree within a transaction adds a TreeVersion denoted the removed state; the actual reclamation of space held by that three is done when the versions are pruned.

Most of the accessors for state on the Tree object now delegate to the TreeVersion that is visible according to the currently active transaction.

There are some messy bits:

- The serialized state of a Tree represents its existence (which is transactional) and its physical B-Tree allocation (which is not). There are accommodations for this in the VolumeStructure class. We might have preferred a cleaner serialized representation, but the current version works, does not fundamentally limit the design, and is backward compatible. Another accommodation is that the update operations that change the serialized state of the Tree within the directory itself tree must not be reapplied during recovery. Instead, the state changes of the directory tree are implied by the operations that create and remove the trees themselves. Therefore the Transaction#store method explicitly prevents writing of certain directory tree updates even though they occur within the scope of a transaction.

- To make the new Exchange#lock method work properly, it was necessary to disable the transactional nature of tree creation/removal in the temporary volume used for locks. On the basis that this is likely the only temporary volume that will be shared among multiple threads, the current test creates and removes all trees in temporary volumes in a non-transactional (primordial) fashion. We could make this more precise, or we could simply define the transactional tree semantics as affecting only non-temporary volumes (which I think I prefer).

- The tree used by ClassIndex needs to be primordial to avoid unwanted write-write dependencies. Therefore there's an explicit call to create the tree during initialization.

- TreeStatistics are now maintained within a TreeVersion. Because the serialized form needs to be a snapshot view, these are now flushed within the checkpoint cycle only, and not periodically. As a consequence I believe the recovered tree statistics are always consistent with their state at checkpoint plus whatever updates are performed during recovery. Needless to say, the count of read operations performed after the keystone checkpoint is lost. This is not keeping me awake at night.

Because the semantic change here is pretty big, I am proposing a new major version increment to 3.3. Unlike other changes, this one does not merely add new methods; it subtly changes the fundamental behavior.

At the moment I have not run the server tests within this, nor have I run benchmarks to assess performance changes. These should be done before we approve the proposal.

To post a comment you must log in.
416. By Peter Beaman

Skip directory tree updates in RecoveryManager#DefaultRecoveryListener rather than Transaction#store/#remove

Revision history for this message
Peter Beaman (pbeaman) wrote :

Updated to store but not recover directory tree updates in the journal. Needed because rollback needs the records in order to prune. Otherwise we get an assertion on a negative mvvCount.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Information only. Here's what we need to deal with in server:

Results :

Failed tests: deleteMissingRow(com.akiban.server.test.it.dxl.CBasicIT): open cursors remaining:{CursorId[session 2712, table 39, cursor 1407]=ScanData[cursor=Cursor(created=1359482331405, state=FINISHED, request=com.akiban.server.api.dml.scan.ScanAllRequest@60ac2d55), columns=[0, 1]]}
  testYaml [test-bug1025059](com.akiban.sql.pg.PostgresServerMiscYamlIT): src/test/resources/com/akiban/sql/pg/yaml/bugs/test-bug1025059.yaml: org.postgresql.util.PSQLException: An I/O error occured while sending to the backend.

Tests in error:
  dropGroupingForeignKeyMiddleOfGroup(com.akiban.server.test.it.dxl.AlterTableBasicIT): PERSISTIT_ERROR: Persistit Data Layer error: null
  setDataType_C_id(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  setDataType_O_id(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropColumn_C_id(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropColumn_O_id(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropPrimaryKey_C(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropPrimaryKey_O(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  addGroupingForeignKey_C(com.akiban.server.test.it.dxl.AlterTableCAOIIT): PERSISTIT_ERROR: Persistit Data Layer error: null
  dropGroupingForeignKey_O(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  setDataType_O_cid(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropColumn_O_cid(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  deleteMissingRow(com.akiban.server.test.it.dxl.CBasicIT): Unexpected exception, expected<com.akiban.server.error.NoSuchRowException> but was<com.akiban.server.error.PersistitAdapterException>
  testRepeated(com.akiban.sql.pg.PostgresServerCacheIT): An I/O error occured while sending to the backend.
  testMultipleUpdate [commit-1](com.akiban.sql.pg.PostgresServerMultipleUpdateIT): An I/O error occured while sending to the backend.

Tests run: 2245, Failures: 2, Errors: 14, Skipped: 99

Revision history for this message
Nathan Williams (nwilliams) wrote :

Very nice! The meat of the changes really aren't too complicated and everything seems to fit nicely.

The only code change I would suggest, ask about really, is the live-ness determination in TimelyResource#prune(). I don't suspect it to be wrong but it is performing the same thing as MVV#prune(). They are operating on structures so different that I don't see a simple way to merge, but I had to convince myself they did the same thing. Can you think of anything to share the logic between them?

I would also like to see a handful of tests around what is in the directory tree, shutdown and recovery where multiple versions were present, and the (lack of) MVVs in writes to trees created in the same transaction.

The rest is a smattering of feedback, ranging from small to nits.

TimelyResource
- JavaDoc has a number of invalid links. Lots of other docs like that too, but this is all new.
- Should getVersionCount() should be public? Fairly harmless, but I don't think we leak information like that from other places (Exchange, etc)
- Should delete() check for _first != null? Seems like deleting a resource with no versions should be exception or assert worthy.

Exchange
- Why was the assertCorrecThread() removed from init(), hasChildren(), and isDirectoryExchange??
- The semantic change on checkThread (null-ing out of !set) is subtle. Perhaps renaming the method to checkAndSetThread or checkSetOrClearThread or something would help.
- A couple places call throttle() and now have multiple conditions. A helper may be in order.
- The addition of 'spareValue.isDefined() || !_tree.isTransationPrivate' in store() isn't clear to me. When would doMVCC be be true and the value be empty?

Tree
- End of the new paragraph of JavaDoc has two open <code> tags instead of open and close.
- version() has a // TODO about throwing a RuntimeException
- There are now a number of places where isValid() and isDeleted() must be called together. Maybe a new helper? A more drastic change, but perhaps safer, would be to make it protected in SharedResource and not promote it to public in Tree.

VolumeStructure
- remoteTree() now only returns true (or throws)

TimelyResourceTest
- testAndPruneResources(boolean) might want to append withTransactions to the assert messages. Does the CleanupManager need disabled to avoid spurious failures?
- doConcurrentTransaction() is called from threads so using fail() won't cause the test to fail but only generate console output. This needs to get propagated back to the main thread somewhere. There are lightly used helpers in ConcurrentUtil for this.
- deleteResources() uses a raw assert instead of jUnits.

TreeTransactionalLifetimeTest
- simplePruning() has a 5s sleep and a commented out manual cleanup call. The latter seems cleaner and more deterministic, does it not work?
- I'd like to see some tests around the behavior regarding steps. I believe having trees obey it just like data makes the most sense, which is what it does now, but confirming it would be good.
- The TExec helper is duplicating part of the functionality in ConcurrentUtil and doesn't handle errors in a way that will fail the test.

review: Needs Fixing
417. By Peter Beaman

Modifications per review comments

418. By Peter Beaman

Source code cleanup

419. By Peter Beaman

Fix up documentation. Optimization: don't add new version to delete resource created with same version handle

420. By Peter Beaman

Add some crash and stop/start tests

421. By Peter Beaman

& -> &&

422. By Peter Beaman

assert -> assertTrue

Revision history for this message
Peter Beaman (pbeaman) wrote :
Download full text (3.5 KiB)

Thanks. The updated version is changed as follows:

1. The only code change I would suggest, ask about really, is the live-ness determination in TimelyResource#prune().

- No change yet. Will look at that again tomorrow.

2. I would also like to see a handful of tests around what is in the directory tree, shutdown and recovery where multiple versions were present, and the (lack of) MVVs in writes to trees created in the same transaction.

- I believe these cases are covered by new tests added to TreeTransactionalLifetimeTest.

3. TimelyResource:

 - JavaDoc fixed. I believe there are no broke links in the Javadoc - if you find some, please let me know.
 - getVersionCount() - reduced to package private
 - delete() - check for null and throw IllegalStateException. Fixed a test that relied on lenient behavior.

Exchange

 - Why was assertCorrectThread() removed - in every case removed I determined there was another call made by a subordinate method. The removed calls were redundant.
 - checkThread(boolean) - I believe the implementations are identical except that the new one does not set _thread to t when it is known already to be t. Since _thread is volatile this may reduce memory contention. (Actually, I'm not sure if it matters.)
 - throttle() - a new helper, as suggested..
 - doMVCC && (spareValue.isDefined() || !tree.isTransactionPrivate()). This is an attempted optimization. In particular, it might be helpful when building a new index within a transaction by avoiding the need to write and prune MVVs. A rollback range-deletes the entire tree, so no value written inside the tree can ever become visible, even though it was written as primordial. There was an error related to different steps - that's fixed with the rather arcane new boolean argument of isTransactionPrivate(boolean). (I thought about two methods with different names but lacked the imagination to name them.) The main point is that in the case where the tree is unknown to any other transaction and there are no step-py things going on, we can simply write a primordial value since (a) no other transaction can see the transaction, and (b) any step that could see the tree must be later than the step that writes the primordial value. To answer your specific question, I believe doMVCC is true whenever a transaction writes a value to a key that did not previously exist.

Tree
 - Javadoc issues: fixed
 - version() now throws a subclass of RuntimeException and does not have a TODO
 - isValue && !isDeleted() is now computed by helper isLive()

VolumeStructure
- removeTree() is now properly void

TimelyResourceTest
- Append withTransactions - done
- doConcurrentTransaction() - reworked to use ConcurrentUtil. Note that I modified ConcurrentUtil extensively to expose more options for use. This was done to allow doConcurrentTransactions to replace threads during execution. I now fully understand the elegance of ConcurrentUtil (which I had not previously paid much attention to) and will endeavor to replace lots of cases in unit tests that created and join background threads with it.
- assert - fixed

TreeTransactionLifetimeTest
- simplePruning() sleep - fixed
- I'd like to see new tests...

Read more...

Revision history for this message
Peter Beaman (pbeaman) wrote :

Note that Akiban Server requires changes to work with transactional trees. Branch lp:~pbeaman/akiban-server/try-transactional-tree contains more or less minimal changes necessary to complete tests. It does not make use of transactional trees to enhance DDL operations, however.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Thanks for all the tweaks. Aside from the (possibly unrealistic) desire to unify the live-ness determination, this all looks good to me.

Have we ran the long stress suite or tpcc against this? I assume we'll want to before putting into trunk.

423. By Peter Beaman

Merge from trunk with changes for lock pruning fix

424. By Peter Beaman

Synchronize volume open

Revision history for this message
Peter Beaman (pbeaman) wrote :

Stress tests have run four successful nightly iterations.

425. By Peter Beaman

Merge from trunk

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'pom.xml'
--- pom.xml 2013-04-10 20:09:48 +0000
+++ pom.xml 2013-04-29 21:38:25 +0000
@@ -4,7 +4,7 @@
44
5 <groupId>com.akiban</groupId>5 <groupId>com.akiban</groupId>
6 <artifactId>akiban-persistit</artifactId>6 <artifactId>akiban-persistit</artifactId>
7 <version>3.2.9-SNAPSHOT</version>7 <version>3.3-SNAPSHOT</version>
8 <packaging>jar</packaging>8 <packaging>jar</packaging>
99
10 <parent>10 <parent>
1111
=== modified file 'src/main/java/com/persistit/Buffer.java'
--- src/main/java/com/persistit/Buffer.java 2013-03-06 16:20:57 +0000
+++ src/main/java/com/persistit/Buffer.java 2013-04-29 21:38:25 +0000
@@ -1424,7 +1424,6 @@
1424 if (Debug.ENABLED) {1424 if (Debug.ENABLED) {
1425 assertVerify();1425 assertVerify();
1426 }1426 }
1427
1428 final boolean exactMatch = (foundAt & EXACT_MASK) > 0;1427 final boolean exactMatch = (foundAt & EXACT_MASK) > 0;
1429 final int p = foundAt & P_MASK;1428 final int p = foundAt & P_MASK;
14301429
@@ -3869,7 +3868,7 @@
3869 * @return a human-readable inventory of the contents of this buffer3868 * @return a human-readable inventory of the contents of this buffer
3870 */3869 */
3871 public String toStringDetail() {3870 public String toStringDetail() {
3872 return toStringDetail(-1, 42, 42, 0, true);3871 return toStringDetail(-1, 42, 82, 0, true);
3873 }3872 }
38743873
3875 /**3874 /**
38763875
=== modified file 'src/main/java/com/persistit/CheckpointManager.java'
--- src/main/java/com/persistit/CheckpointManager.java 2013-02-04 16:32:35 +0000
+++ src/main/java/com/persistit/CheckpointManager.java 2013-04-29 21:38:25 +0000
@@ -245,6 +245,7 @@
245 final List<Accumulator> accumulators = _persistit.takeCheckpointAccumulators(txn.getStartTimestamp());245 final List<Accumulator> accumulators = _persistit.takeCheckpointAccumulators(txn.getStartTimestamp());
246 _persistit.getTransactionIndex().checkpointAccumulatorSnapshots(txn.getStartTimestamp(), accumulators);246 _persistit.getTransactionIndex().checkpointAccumulatorSnapshots(txn.getStartTimestamp(), accumulators);
247 Accumulator.saveAccumulatorCheckpointValues(accumulators);247 Accumulator.saveAccumulatorCheckpointValues(accumulators);
248 _persistit.flushStatistics();
248 txn.commit(CommitPolicy.HARD);249 txn.commit(CommitPolicy.HARD);
249 _currentCheckpoint = new Checkpoint(txn.getStartTimestamp(), System.currentTimeMillis());250 _currentCheckpoint = new Checkpoint(txn.getStartTimestamp(), System.currentTimeMillis());
250 _outstandingCheckpoints.add(_currentCheckpoint);251 _outstandingCheckpoints.add(_currentCheckpoint);
251252
=== modified file 'src/main/java/com/persistit/ClassIndex.java'
--- src/main/java/com/persistit/ClassIndex.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/ClassIndex.java 2013-04-29 21:38:25 +0000
@@ -94,11 +94,21 @@
94 * 94 *
95 * @param persistit95 * @param persistit
96 * Owning Persistit instance.96 * Owning Persistit instance.
97 * @throws PersistitException
97 */98 */
98 ClassIndex(final Persistit persistit) {99 ClassIndex(final Persistit persistit) {
99 _persistit = persistit;100 _persistit = persistit;
100 }101 }
101102
103 void initialize() throws PersistitException {
104 /*
105 * Called during Persistit initialization. This has the desired
106 * side-effect of the class index tree outside of a transaction so that
107 * its existence is primordial.
108 */
109 getExchange();
110 }
111
102 /**112 /**
103 * @return Number of <code>ClassInfo</code> objects currently stored in this113 * @return Number of <code>ClassInfo</code> objects currently stored in this
104 * ClassIndex.114 * ClassIndex.
@@ -178,8 +188,9 @@
178 } catch (final PersistitException pe) {188 } catch (final PersistitException pe) {
179 throw new ConversionException(pe);189 throw new ConversionException(pe);
180 } finally {190 } finally {
181 if (ex != null)191 if (ex != null) {
182 releaseExchange(ex);192 releaseExchange(ex);
193 }
183 }194 }
184 }195 }
185 }196 }
186197
=== modified file 'src/main/java/com/persistit/Exchange.java'
--- src/main/java/com/persistit/Exchange.java 2013-03-23 21:59:48 +0000
+++ src/main/java/com/persistit/Exchange.java 2013-04-29 21:38:25 +0000
@@ -413,7 +413,6 @@
413 }413 }
414414
415 void init(final Volume volume, final String treeName, final boolean create) throws PersistitException {415 void init(final Volume volume, final String treeName, final boolean create) throws PersistitException {
416 assertCorrectThread(true);
417 if (volume == null) {416 if (volume == null) {
418 throw new NullPointerException();417 throw new NullPointerException();
419 }418 }
@@ -441,6 +440,7 @@
441 _tree = tree;440 _tree = tree;
442 _treeHolder = new ReentrantResourceHolder(_tree);441 _treeHolder = new ReentrantResourceHolder(_tree);
443 _cachedTreeGeneration = -1;442 _cachedTreeGeneration = -1;
443 _isDirectoryExchange = tree == _volume.getDirectoryTree();
444 initCache();444 initCache();
445 }445 }
446 _splitPolicy = _persistit.getDefaultSplitPolicy();446 _splitPolicy = _persistit.getDefaultSplitPolicy();
@@ -503,7 +503,7 @@
503503
504 private void checkLevelCache() throws PersistitException {504 private void checkLevelCache() throws PersistitException {
505505
506 if (!_tree.isValid()) {506 if (!_tree.isLive()) {
507 if (_tree.getVolume().isTemporary()) {507 if (_tree.getVolume().isTemporary()) {
508 _tree = _tree.getVolume().getTree(_tree.getName(), true);508 _tree = _tree.getVolume().getTree(_tree.getName(), true);
509 _treeHolder = new ReentrantResourceHolder(_tree);509 _treeHolder = new ReentrantResourceHolder(_tree);
@@ -1356,11 +1356,7 @@
1356 if (!isDirectoryExchange()) {1356 if (!isDirectoryExchange()) {
1357 _persistit.checkSuspended();1357 _persistit.checkSuspended();
1358 }1358 }
1359 if (!_ignoreTransactions && !_transaction.isActive()) {1359 throttle();
1360 _persistit.getJournalManager().throttle();
1361 }
1362 // TODO: directoryExchange, and lots of tests, don't use transactions.
1363 // Skip MVCC for now.
1364 int options = StoreOptions.WAIT;1360 int options = StoreOptions.WAIT;
1365 options |= (!_ignoreTransactions && _transaction.isActive()) ? StoreOptions.MVCC : 0;1361 options |= (!_ignoreTransactions && _transaction.isActive()) ? StoreOptions.MVCC : 0;
1366 storeInternal(key, value, 0, options);1362 storeInternal(key, value, 0, options);
@@ -1551,7 +1547,13 @@
1551 }1547 }
1552 }1548 }
15531549
1554 if (doMVCC) {1550 /*
1551 * If the Tree is private to an active transaction, and
1552 * if this is a virgin value, then we can store it
1553 * primordially because if the transaction rolls back,
1554 * the entire Tree will be removed.
1555 */
1556 if (doMVCC && (_spareValue.isDefined() || !_tree.isTransactionPrivate(true))) {
1555 valueToStore = spareValue;1557 valueToStore = spareValue;
1556 final int valueSize = value.getEncodedSize();1558 final int valueSize = value.getEncodedSize();
1557 int retries = VERSIONS_OUT_OF_ORDER_RETRY_COUNT;1559 int retries = VERSIONS_OUT_OF_ORDER_RETRY_COUNT;
@@ -3188,7 +3190,6 @@
3188 * @throws PersistitException3190 * @throws PersistitException
3189 */3191 */
3190 public boolean hasChildren() throws PersistitException {3192 public boolean hasChildren() throws PersistitException {
3191 assertCorrectThread(true);
3192 _key.copyTo(_spareKey2);3193 _key.copyTo(_spareKey2);
3193 final int size = _key.getEncodedSize();3194 final int size = _key.getEncodedSize();
3194 final boolean result = traverse(GT, true, 0, _key.getDepth() + 1, size, null);3195 final boolean result = traverse(GT, true, 0, _key.getDepth() + 1, size, null);
@@ -3228,30 +3229,16 @@
3228 */3229 */
3229 public void removeTree() throws PersistitException {3230 public void removeTree() throws PersistitException {
3230 assertCorrectThread(true);3231 assertCorrectThread(true);
3232 _persistit.checkSuspended();
3231 _persistit.checkClosed();3233 _persistit.checkClosed();
32323234
3233 final long timestamp = _persistit.getCurrentTimestamp();3235 _volume.getStructure().removeTree(_tree);
3234 for (int i = 0; i < 100; i++) {
3235 _persistit.checkClosed();
3236 _persistit.checkSuspended();
3237 _persistit.getJournalManager().pruneObsoleteTransactions();
3238 if (_persistit.getJournalManager().getEarliestAbortedTransactionTimestamp() > timestamp) {
3239 break;
3240 }
3241 Util.sleep(1000);
3242 }
3243 if (!_ignoreTransactions) {3236 if (!_ignoreTransactions) {
3237 assert !isDirectoryExchange();
3244 _transaction.removeTree(this);3238 _transaction.removeTree(this);
3245 }3239 }
32463240 _key.clear();
3247 clear();
3248
3249 _value.clear();3241 _value.clear();
3250 /*
3251 * Remove from directory tree.
3252 */
3253 _volume.getStructure().removeTree(_tree);
3254
3255 initCache();3242 initCache();
3256 }3243 }
32573244
@@ -3410,9 +3397,8 @@
3410 if (!isDirectoryExchange()) {3397 if (!isDirectoryExchange()) {
3411 _persistit.checkSuspended();3398 _persistit.checkSuspended();
3412 }3399 }
3413 if (!_ignoreTransactions && !_transaction.isActive()) {3400
3414 _persistit.getJournalManager().throttle();3401 throttle();
3415 }
34163402
3417 if (_ignoreTransactions || !_transaction.isActive()) {3403 if (_ignoreTransactions || !_transaction.isActive()) {
3418 return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false);3404 return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false);
@@ -3422,6 +3408,15 @@
34223408
3423 _transaction.remove(this, key1, key2);3409 _transaction.remove(this, key1, key2);
34243410
3411 /*
3412 * If the Tree was created within this transaction then we can just
3413 * range-delete the tree since it is not visible outside this
3414 * transaction.
3415 */
3416 if (_tree.isTransactionPrivate(true)) {
3417 return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false);
3418 }
3419
3425 checkLevelCache();3420 checkLevelCache();
34263421
3427 _value.clear().putAntiValueMVV();3422 _value.clear().putAntiValueMVV();
@@ -3994,7 +3989,7 @@
39943989
3995 boolean prune(final Key key) throws PersistitException {3990 boolean prune(final Key key) throws PersistitException {
3996 Buffer buffer = null;3991 Buffer buffer = null;
3997 Debug.$assert1.t(_tree.isValid());3992 Debug.$assert1.t(_tree.isLive());
3998 try {3993 try {
3999 search(key, true);3994 search(key, true);
4000 buffer = _levelCache[0]._buffer;3995 buffer = _levelCache[0]._buffer;
@@ -4014,7 +4009,7 @@
4014 Buffer buffer = null;4009 Buffer buffer = null;
4015 boolean pruned = false;4010 boolean pruned = false;
40164011
4017 Debug.$assert1.t(_tree.isValid());4012 Debug.$assert1.t(_tree.isLive());
4018 try {4013 try {
4019 search(key1, true);4014 search(key1, true);
4020 buffer = _levelCache[0]._buffer;4015 buffer = _levelCache[0]._buffer;
@@ -4132,13 +4127,30 @@
4132 assert checkThread(set) : "Thread " + Thread.currentThread() + " must not use " + this + " owned by " + _thread;4127 assert checkThread(set) : "Thread " + Thread.currentThread() + " must not use " + this + " owned by " + _thread;
4133 }4128 }
41344129
4130 /**
4131 * Ensure the this Exchange is compatible with the current Thread; if a
4132 * Thread was previously assigned then this thread must be the same one.
4133 *
4134 * @param set
4135 * whether to assign the current thread
4136 * @return true if and only if there was no assigned Thread or the assigned
4137 * Thread is same as the current Thread.
4138 */
4135 private boolean checkThread(final boolean set) {4139 private boolean checkThread(final boolean set) {
4136 final Thread t = Thread.currentThread();4140 final Thread t = Thread.currentThread();
4137 final boolean okay = _thread == null || _thread == t;4141 if (_thread == t) {
4138 if (okay) {4142 if (!set) {
4139 _thread = set ? t : null;4143 _thread = null;
4140 }4144 }
4141 return okay;4145 return true;
4146 }
4147 if (_thread == null) {
4148 if (set) {
4149 _thread = t;
4150 }
4151 return true;
4152 }
4153 return false;
4142 }4154 }
41434155
4144 /**4156 /**
@@ -4187,7 +4199,6 @@
4187 * <code>false</code>.4199 * <code>false</code>.
4188 */4200 */
4189 boolean isDirectoryExchange() {4201 boolean isDirectoryExchange() {
4190 assertCorrectThread(true);
4191 return _isDirectoryExchange;4202 return _isDirectoryExchange;
4192 }4203 }
41934204
@@ -4426,4 +4437,15 @@
4426 _ignoreMVCCFetch = savedIgnore;4437 _ignoreMVCCFetch = savedIgnore;
4427 }4438 }
4428 }4439 }
4440
4441 private void throttle() throws PersistitInterruptedException {
4442 /*
4443 * Don't throttle operations on the directory tree since that makes some
4444 * unit tests very slow. This test is now necessary because a directory
4445 * tree update can now occur within the scope of a transaction.
4446 */
4447 if (!_ignoreTransactions && !_transaction.isActive() && !isDirectoryExchange()) {
4448 _persistit.getJournalManager().throttle();
4449 }
4450 }
4429}4451}
44304452
=== modified file 'src/main/java/com/persistit/JournalManager.java'
--- src/main/java/com/persistit/JournalManager.java 2013-02-15 15:39:42 +0000
+++ src/main/java/com/persistit/JournalManager.java 2013-04-29 21:38:25 +0000
@@ -2862,6 +2862,11 @@
2862 return false;2862 return false;
2863 }2863 }
28642864
2865 @Override
2866 public boolean createTree(final long timestamp) throws PersistitException {
2867 return false;
2868 }
2869
2865 }2870 }
28662871
2867 /**2872 /**
28682873
=== modified file 'src/main/java/com/persistit/MVV.java'
--- src/main/java/com/persistit/MVV.java 2012-10-03 14:43:25 +0000
+++ src/main/java/com/persistit/MVV.java 2013-04-29 21:38:25 +0000
@@ -451,7 +451,7 @@
451 if (tc == UNCOMMITTED) {451 if (tc == UNCOMMITTED) {
452 final long ts = vh2ts(versionHandle);452 final long ts = vh2ts(versionHandle);
453 if (uncommittedTransactionTs != 0 && uncommittedTransactionTs != ts) {453 if (uncommittedTransactionTs != 0 && uncommittedTransactionTs != ts) {
454 throw new CorruptValueException("Multiple uncommitted version");454 throw new CorruptValueException("Multiple uncommitted versions");
455 }455 }
456 uncommittedTransactionTs = ts;456 uncommittedTransactionTs = ts;
457 mark(bytes, from);457 mark(bytes, from);
458458
=== modified file 'src/main/java/com/persistit/Persistit.java'
--- src/main/java/com/persistit/Persistit.java 2013-03-20 16:04:52 +0000
+++ src/main/java/com/persistit/Persistit.java 2013-04-29 21:38:25 +0000
@@ -29,6 +29,7 @@
29import java.lang.management.MemoryMXBean;29import java.lang.management.MemoryMXBean;
30import java.lang.management.MemoryUsage;30import java.lang.management.MemoryUsage;
31import java.lang.ref.SoftReference;31import java.lang.ref.SoftReference;
32import java.lang.ref.WeakReference;
32import java.rmi.RemoteException;33import java.rmi.RemoteException;
33import java.util.ArrayList;34import java.util.ArrayList;
34import java.util.Collections;35import java.util.Collections;
@@ -264,6 +265,8 @@
264265
265 private final Set<AccumulatorRef> _accumulators = new HashSet<AccumulatorRef>();266 private final Set<AccumulatorRef> _accumulators = new HashSet<AccumulatorRef>();
266267
268 private final Set<WeakReference<TimelyResource<?>>> _timelyResourceSet = new HashSet<WeakReference<TimelyResource<?>>>();
269
267 private final WeakHashMap<SessionId, CLI> _cliSessionMap = new WeakHashMap<SessionId, CLI>();270 private final WeakHashMap<SessionId, CLI> _cliSessionMap = new WeakHashMap<SessionId, CLI>();
268271
269 private boolean _readRetryEnabled;272 private boolean _readRetryEnabled;
@@ -440,6 +443,7 @@
440 startJournal();443 startJournal();
441 startBufferPools();444 startBufferPools();
442 preloadBufferPools();445 preloadBufferPools();
446 initializeClassIndex();
443 finishRecovery();447 finishRecovery();
444 startTransactionIndexPollTask();448 startTransactionIndexPollTask();
445 flush();449 flush();
@@ -672,6 +676,10 @@
672 _enableBufferInventory.set(_configuration.isBufferInventoryEnabled());676 _enableBufferInventory.set(_configuration.isBufferInventoryEnabled());
673 }677 }
674678
679 private void initializeClassIndex() throws PersistitException {
680 _classIndex.initialize();
681 }
682
675 void startCheckpointManager() {683 void startCheckpointManager() {
676 _checkpointManager.start();684 _checkpointManager.start();
677 }685 }
@@ -793,17 +801,21 @@
793 }801 }
794 }802 }
795803
796 synchronized void addVolume(final Volume volume) throws VolumeAlreadyExistsException {804 void addVolume(final Volume volume) throws VolumeAlreadyExistsException {
797 Volume otherVolume;805 synchronized (_volumes) {
798 otherVolume = getVolume(volume.getName());806 Volume otherVolume;
799 if (otherVolume != null) {807 otherVolume = getVolume(volume.getName());
800 throw new VolumeAlreadyExistsException("Volume " + otherVolume);808 if (otherVolume != null) {
809 throw new VolumeAlreadyExistsException("Volume " + otherVolume);
810 }
811 _volumes.add(volume);
801 }812 }
802 _volumes.add(volume);
803 }813 }
804814
805 synchronized void removeVolume(final Volume volume) throws PersistitInterruptedException {815 void removeVolume(final Volume volume) throws PersistitInterruptedException {
806 _volumes.remove(volume);816 synchronized (_volumes) {
817 _volumes.remove(volume);
818 }
807 }819 }
808820
809 /**821 /**
@@ -965,7 +977,6 @@
965 }977 }
966 List<Exchange> stack;978 List<Exchange> stack;
967 final SessionId sessionId = getSessionId();979 final SessionId sessionId = getSessionId();
968
969 synchronized (_exchangePoolMap) {980 synchronized (_exchangePoolMap) {
970 stack = _exchangePoolMap.get(sessionId);981 stack = _exchangePoolMap.get(sessionId);
971 if (stack == null) {982 if (stack == null) {
@@ -986,7 +997,9 @@
986 * @return the List997 * @return the List
987 */998 */
988 public List<Volume> getVolumes() {999 public List<Volume> getVolumes() {
989 return new ArrayList<Volume>(_volumes);1000 synchronized (_volumes) {
1001 return new ArrayList<Volume>(_volumes);
1002 }
990 }1003 }
9911004
992 /**1005 /**
@@ -1000,9 +1013,10 @@
1000 * @return the List1013 * @return the List
1001 * @throws PersistitException1014 * @throws PersistitException
1002 */1015 */
1003 public synchronized List<Tree> getSelectedTrees(final TreeSelector selector) throws PersistitException {1016 public List<Tree> getSelectedTrees(final TreeSelector selector) throws PersistitException {
1004 final List<Tree> list = new ArrayList<Tree>();1017 final List<Tree> list = new ArrayList<Tree>();
1005 for (final Volume volume : _volumes) {1018 final List<Volume> volumes = getVolumes();
1019 for (final Volume volume : volumes) {
1006 if (selector.isSelected(volume)) {1020 if (selector.isSelected(volume)) {
1007 if (selector.isVolumeOnlySelection(volume.getName())) {1021 if (selector.isVolumeOnlySelection(volume.getName())) {
1008 list.add(volume.getDirectoryTree());1022 list.add(volume.getDirectoryTree());
@@ -1239,10 +1253,10 @@
1239 if (name == null) {1253 if (name == null) {
1240 throw new NullPointerException("Null volume name");1254 throw new NullPointerException("Null volume name");
1241 }1255 }
1256 final List<Volume> volumes = getVolumes();
1242 Volume result = null;1257 Volume result = null;
12431258 for (int i = 0; i < volumes.size(); i++) {
1244 for (int i = 0; i < _volumes.size(); i++) {1259 final Volume vol = volumes.get(i);
1245 final Volume vol = _volumes.get(i);
1246 if (name.equals(vol.getName())) {1260 if (name.equals(vol.getName())) {
1247 if (result == null)1261 if (result == null)
1248 result = vol;1262 result = vol;
@@ -1256,8 +1270,8 @@
1256 }1270 }
12571271
1258 final File file = new File(name).getAbsoluteFile();1272 final File file = new File(name).getAbsoluteFile();
1259 for (int i = 0; i < _volumes.size(); i++) {1273 for (int i = 0; i < volumes.size(); i++) {
1260 final Volume vol = _volumes.get(i);1274 final Volume vol = volumes.get(i);
1261 if (file.equals(vol.getAbsoluteFile())) {1275 if (file.equals(vol.getAbsoluteFile())) {
1262 if (result == null)1276 if (result == null)
1263 result = vol;1277 result = vol;
@@ -1477,16 +1491,17 @@
1477 */1491 */
1478 private Volume getSpecialVolume(final String propName, final String dflt) throws VolumeNotFoundException {1492 private Volume getSpecialVolume(final String propName, final String dflt) throws VolumeNotFoundException {
1479 final String volumeName = _configuration.getSysVolume();1493 final String volumeName = _configuration.getSysVolume();
14801494 synchronized (_volumes) {
1481 Volume volume = getVolume(volumeName);
1482 if (volume == null) {
1483 if ((_volumes.size() == 1) && (volumeName.equals(dflt))) {1495 if ((_volumes.size() == 1) && (volumeName.equals(dflt))) {
1484 volume = _volumes.get(0);1496 return _volumes.get(0);
1485 } else {
1486 throw new VolumeNotFoundException(volumeName);
1487 }1497 }
1488 }1498 }
1489 return volume;1499 final Volume volume = getVolume(volumeName);
1500 if (volume == null) {
1501 throw new VolumeNotFoundException(volumeName);
1502 } else {
1503 return volume;
1504 }
1490 }1505 }
14911506
1492 /**1507 /**
@@ -1512,13 +1527,8 @@
1512 */1527 */
1513 void cleanup() {1528 void cleanup() {
1514 closeZombieTransactions(false);1529 closeZombieTransactions(false);
1515 final List<Volume> volumes;1530 _transactionIndex.updateActiveTransactionCache();
1516 synchronized (this) {1531 pruneTimelyResources();
1517 volumes = new ArrayList<Volume>(_volumes);
1518 }
1519 for (final Volume volume : volumes) {
1520 volume.getStructure().flushStatistics();
1521 }
1522 }1532 }
15231533
1524 /**1534 /**
@@ -1653,7 +1663,6 @@
16531663
1654 if (flush) {1664 if (flush) {
1655 for (final Volume volume : volumes) {1665 for (final Volume volume : volumes) {
1656 volume.getStructure().flushStatistics();
1657 volume.getStorage().flush();1666 volume.getStorage().flush();
1658 }1667 }
1659 }1668 }
@@ -1755,7 +1764,7 @@
1755 // the volume files - otherwise there will be left over channels1764 // the volume files - otherwise there will be left over channels
1756 // and FileLocks that interfere with subsequent tests.1765 // and FileLocks that interfere with subsequent tests.
1757 //1766 //
1758 final List<Volume> volumes = new ArrayList<Volume>(_volumes);1767 final List<Volume> volumes = getVolumes();
1759 for (final Volume volume : volumes) {1768 for (final Volume volume : volumes) {
1760 try {1769 try {
1761 volume.getStorage().close();1770 volume.getStorage().close();
@@ -1824,8 +1833,11 @@
1824 synchronized (_accumulators) {1833 synchronized (_accumulators) {
1825 _accumulators.clear();1834 _accumulators.clear();
1826 }1835 }
1827 synchronized (this) {1836 synchronized (_volumes) {
1828 _volumes.clear();1837 _volumes.clear();
1838 }
1839
1840 synchronized (this) {
1829 _alertMonitors.clear();1841 _alertMonitors.clear();
1830 _bufferPoolTable.clear();1842 _bufferPoolTable.clear();
1831 _intArrayThreadLocal.set(null);1843 _intArrayThreadLocal.set(null);
@@ -1867,8 +1879,8 @@
1867 if (_closed.get() || !_initialized.get()) {1879 if (_closed.get() || !_initialized.get()) {
1868 return false;1880 return false;
1869 }1881 }
1870 for (final Volume volume : _volumes) {1882 final List<Volume> volumes = getVolumes();
1871 volume.getStructure().flushStatistics();1883 for (final Volume volume : volumes) {
1872 volume.getStorage().flush();1884 volume.getStorage().flush();
1873 volume.getStorage().force();1885 volume.getStorage().force();
1874 }1886 }
@@ -1894,6 +1906,13 @@
1894 }1906 }
1895 }1907 }
18961908
1909 void flushStatistics() throws PersistitException {
1910 final List<Volume> volumes = getVolumes();
1911 for (final Volume volume : volumes) {
1912 volume.getStructure().flushStatistics();
1913 }
1914 }
1915
1897 void waitForIOTaskStop(final IOTaskRunnable task) {1916 void waitForIOTaskStop(final IOTaskRunnable task) {
1898 if (_beginCloseTime == 0) {1917 if (_beginCloseTime == 0) {
1899 _beginCloseTime = System.nanoTime();1918 _beginCloseTime = System.nanoTime();
@@ -1926,7 +1945,7 @@
1926 if (_closed.get() || !_initialized.get()) {1945 if (_closed.get() || !_initialized.get()) {
1927 return;1946 return;
1928 }1947 }
1929 final ArrayList<Volume> volumes = _volumes;1948 final List<Volume> volumes = getVolumes();
19301949
1931 for (int index = 0; index < volumes.size(); index++) {1950 for (int index = 0; index < volumes.size(); index++) {
1932 final Volume volume = volumes.get(index);1951 final Volume volume = volumes.get(index);
@@ -2236,10 +2255,6 @@
2236 return _transactionIndex;2255 return _transactionIndex;
2237 }2256 }
22382257
2239 public long getCheckpointIntervalNanos() {
2240 return _checkpointManager.getCheckpointIntervalNanos();
2241 }
2242
2243 /**2258 /**
2244 * Replaces the current logger implementation.2259 * Replaces the current logger implementation.
2245 * 2260 *
@@ -2286,8 +2301,9 @@
2286 */2301 */
2287 public void checkAllVolumes() throws PersistitException {2302 public void checkAllVolumes() throws PersistitException {
2288 final IntegrityCheck icheck = new IntegrityCheck(this);2303 final IntegrityCheck icheck = new IntegrityCheck(this);
2289 for (int index = 0; index < _volumes.size(); index++) {2304 final List<Volume> volumes = getVolumes();
2290 final Volume volume = _volumes.get(index);2305 for (int index = 0; index < volumes.size(); index++) {
2306 final Volume volume = volumes.get(index);
2291 System.out.println("Checking " + volume + " ");2307 System.out.println("Checking " + volume + " ");
2292 try {2308 try {
2293 icheck.checkVolume(volume);2309 icheck.checkVolume(volume);
@@ -2403,6 +2419,12 @@
2403 _suspendUpdates.set(suspended);2419 _suspendUpdates.set(suspended);
2404 }2420 }
24052421
2422 void addTimelyResource(final TimelyResource<? extends Version> resource) {
2423 synchronized (_timelyResourceSet) {
2424 _timelyResourceSet.add(new WeakReference<TimelyResource<? extends Version>>(resource));
2425 }
2426 }
2427
2406 void addAccumulator(final Accumulator accumulator) throws PersistitException {2428 void addAccumulator(final Accumulator accumulator) throws PersistitException {
2407 int checkpointCount = 0;2429 int checkpointCount = 0;
2408 synchronized (_accumulators) {2430 synchronized (_accumulators) {
@@ -2462,6 +2484,34 @@
2462 return result;2484 return result;
2463 }2485 }
24642486
2487 void pruneTimelyResources() {
2488 final List<TimelyResource<?>> resourcesToPrune = new ArrayList<TimelyResource<?>>();
2489 synchronized (_timelyResourceSet) {
2490 for (final Iterator<WeakReference<TimelyResource<?>>> iter = _timelyResourceSet.iterator(); iter.hasNext();) {
2491 final WeakReference<TimelyResource<?>> ref = iter.next();
2492 final TimelyResource<?> resource = ref.get();
2493 if (resource != null) {
2494 resourcesToPrune.add(resource);
2495 }
2496 }
2497 }
2498 for (final TimelyResource<?> resource : resourcesToPrune) {
2499 try {
2500 resource.prune();
2501 } catch (final PersistitException e) {
2502 _logBase.timelyResourcePruneException.log(e, resource);
2503 }
2504 }
2505 synchronized (_timelyResourceSet) {
2506 for (final Iterator<WeakReference<TimelyResource<?>>> iter = _timelyResourceSet.iterator(); iter.hasNext();) {
2507 final WeakReference<TimelyResource<?>> ref = iter.next();
2508 if (ref.get() == null) {
2509 iter.remove();
2510 }
2511 }
2512 }
2513 }
2514
2465 synchronized CLI getSessionCLI() {2515 synchronized CLI getSessionCLI() {
2466 CLI cli = _cliSessionMap.get(getSessionId());2516 CLI cli = _cliSessionMap.get(getSessionId());
2467 if (cli == null) {2517 if (cli == null) {
24682518
=== modified file 'src/main/java/com/persistit/RecoveryManager.java'
--- src/main/java/com/persistit/RecoveryManager.java 2013-02-15 15:39:42 +0000
+++ src/main/java/com/persistit/RecoveryManager.java 2013-04-29 21:38:25 +0000
@@ -224,14 +224,32 @@
224 private final TransactionPlayer _player = new TransactionPlayer(new RecoveryTransactionPlayerSupport());224 private final TransactionPlayer _player = new TransactionPlayer(new RecoveryTransactionPlayerSupport());
225225
226 static class DefaultRecoveryListener implements TransactionPlayerListener {226 static class DefaultRecoveryListener implements TransactionPlayerListener {
227
227 @Override228 @Override
228 public void store(final long address, final long timestamp, final Exchange exchange) throws PersistitException {229 public void store(final long address, final long timestamp, final Exchange exchange) throws PersistitException {
230 if (exchange.isDirectoryExchange() && exchange.getValue().isDefined()
231 && exchange.getValue().getTypeHandle() == Value.CLASS_TREE) {
232 /*
233 * Don't recover tree structure updates within transactions
234 * because the allocation of root pages is not transactional.
235 * The intent of the change is conveyed by the implicit creation
236 * of new trees and explicit remove tree records.
237 */
238 return;
239 }
229 exchange.store();240 exchange.store();
230 }241 }
231242
232 @Override243 @Override
233 public void removeKeyRange(final long address, final long timestamp, final Exchange exchange, final Key from,244 public void removeKeyRange(final long address, final long timestamp, final Exchange exchange, final Key from,
234 final Key to) throws PersistitException {245 final Key to) throws PersistitException {
246 if (exchange.isDirectoryExchange()) {
247 /*
248 * Don't recover directory tree removes because they are implied
249 * by Remove Tree records in the journal.
250 */
251 return;
252 }
235 exchange.raw_removeKeyRangeInternal(from, to, false, false);253 exchange.raw_removeKeyRangeInternal(from, to, false, false);
236 }254 }
237255
@@ -275,6 +293,10 @@
275 return true;293 return true;
276 }294 }
277295
296 @Override
297 public boolean createTree(final long timestamp) throws PersistitException {
298 return true;
299 }
278 }300 }
279301
280 class DefaultRollbackListener implements TransactionPlayerListener {302 class DefaultRollbackListener implements TransactionPlayerListener {
@@ -336,6 +358,11 @@
336 return false;358 return false;
337 }359 }
338360
361 @Override
362 public boolean createTree(final long timestamp) throws PersistitException {
363 return false;
364 }
365
339 }366 }
340367
341 private class RecoveryTransactionPlayerSupport implements TransactionPlayerSupport {368 private class RecoveryTransactionPlayerSupport implements TransactionPlayerSupport {
342369
=== modified file 'src/main/java/com/persistit/SharedResource.java'
--- src/main/java/com/persistit/SharedResource.java 2013-04-10 20:09:48 +0000
+++ src/main/java/com/persistit/SharedResource.java 2013-04-29 21:38:25 +0000
@@ -38,7 +38,7 @@
38 * 38 *
39 * @author peter39 * @author peter
40 */40 */
41class SharedResource {41abstract class SharedResource {
4242
43 /**43 /**
44 * Default maximum time to wait for access to this resource. Methods throw44 * Default maximum time to wait for access to this resource. Methods throw
@@ -260,10 +260,10 @@
260 /**260 /**
261 * A counter that increments every time the resource is changed.261 * A counter that increments every time the resource is changed.
262 */262 */
263 protected AtomicLong _generation = new AtomicLong();263 private final AtomicLong _generation = new AtomicLong();
264264
265 protected SharedResource(final Persistit persistit) {265 protected SharedResource(final Persistit persistit) {
266 this._persistit = persistit;266 _persistit = persistit;
267 }267 }
268268
269 public boolean isAvailable(final boolean writer) {269 public boolean isAvailable(final boolean writer) {
270270
=== added file 'src/main/java/com/persistit/TimelyResource.java'
--- src/main/java/com/persistit/TimelyResource.java 1970-01-01 00:00:00 +0000
+++ src/main/java/com/persistit/TimelyResource.java 2013-04-29 21:38:25 +0000
@@ -0,0 +1,505 @@
1/**
2 * Copyright © 2012 Akiban Technologies, Inc. All rights reserved.
3 *
4 * This program and the accompanying materials are made available
5 * under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * This program may also be available under different license terms.
10 * For more information, see www.akiban.com or contact licensing@akiban.com.
11 *
12 * Contributors:
13 * Akiban Technologies, Inc.
14 */
15
16package com.persistit;
17
18import static com.persistit.TransactionIndex.tss2vh;
19import static com.persistit.TransactionIndex.vh2step;
20import static com.persistit.TransactionIndex.vh2ts;
21import static com.persistit.TransactionStatus.ABORTED;
22import static com.persistit.TransactionStatus.PRIMORDIAL;
23import static com.persistit.TransactionStatus.TIMED_OUT;
24import static com.persistit.TransactionStatus.UNCOMMITTED;
25
26import java.util.ArrayList;
27import java.util.List;
28
29import com.persistit.Version.PrunableVersion;
30import com.persistit.Version.VersionCreator;
31import com.persistit.exception.PersistitException;
32import com.persistit.exception.PersistitInterruptedException;
33import com.persistit.exception.RollbackException;
34import com.persistit.exception.TimeoutException;
35import com.persistit.exception.WWRetryException;
36
37/**
38 * <p>
39 * Transactionally manage multiple versions of a resource. For example, a
40 * resource which caches state created and either committed or rolled back by a
41 * transaction may be use a TimelyResource to hold its versions. Each version
42 * must implement the {@link Version} interface and may optionally implement
43 * {@link PrunableVersion} in which case its {@link PrunableVersion#prune()}
44 * method is called when <code>TimelyResource</code> detects that the version is
45 * obsolete.
46 * </p>
47 * <p>
48 * The method {@link #addVersion(Version, Transaction)} attempts to add a new
49 * version on behalf of the supplied transaction.
50 * </p>
51 * <p>
52 * The method {@link #getVersion()} returns the snapshot version associated with
53 * the transaction, in other words, an instance of a P which was either
54 * committed last before this transaction started, or which was created by this
55 * transaction. If there is no such version the method returns <code>null</code>
56 * .
57 * </p>
58 * <p>
59 * The method {@link #getVersion(com.persistit.Version.VersionCreator)} does the
60 * same thing, except that if there is no snapshot version the
61 * {@link VersionCreator#createVersion(TimelyResource)} method is called to
62 * construct a new version.
63 * </p>
64 *
65 * @author peter
66 *
67 * @param <V>
68 * specific type of {@link Version} this <code>TimelyResource</code>
69 * manages
70 */
71public class TimelyResource<V extends Version> {
72
73 private final Persistit _persistit;
74 private volatile Entry _first;
75
76 public TimelyResource(final Persistit persistit) {
77 _persistit = persistit;
78 _persistit.addTimelyResource(this);
79 }
80
81 private long tss2v(final Transaction txn) {
82 if (txn == null || !txn.isActive()) {
83 return tss2vh(_persistit.getTimestampAllocator().updateTimestamp(), 0);
84 } else {
85 return tss2vh(txn.getStartTimestamp(), txn.getStep());
86 }
87 }
88
89 public synchronized void delete() throws RollbackException, PersistitException {
90 if (_first == null) {
91 throw new IllegalStateException("There is no resource to delete");
92 }
93 if (!_first.isDeleted()) {
94 final Transaction txn = _persistit.getTransaction();
95 final long version = tss2v(txn);
96 if (version == _first.getVersion()) {
97 _first.setDeleted();
98 } else {
99 final V resource = _first.getResource();
100 final Entry entry = new Entry(version, resource);
101 entry.setDeleted();
102 addVersion(entry, txn);
103 }
104 }
105 }
106
107 public void addVersion(final V resource, final Transaction txn) throws PersistitInterruptedException,
108 RollbackException {
109 if (resource == null) {
110 throw new NullPointerException("Null resource");
111 }
112 addVersion(new Entry(tss2v(txn), resource), txn);
113 }
114
115 public V getVersion() throws TimeoutException, PersistitInterruptedException {
116 final Entry first = _first;
117 if (first != null && first.getVersion() == PRIMORDIAL) {
118 return first.getResource();
119 }
120 final Transaction txn = _persistit.getTransaction();
121 return getVersion(tss2v(txn));
122 }
123
124 public V getVersion(final VersionCreator<V> creator) throws PersistitException, RollbackException {
125 final Entry first = _first;
126 if (first != null && first.getVersion() == PRIMORDIAL) {
127 return first.getResource();
128 }
129 final Transaction txn = _persistit.getTransaction();
130 V version = getVersion(tss2v(txn));
131 if (version == null) {
132 version = creator.createVersion(this);
133 addVersion(version, txn);
134 }
135 return version;
136 }
137
138 /**
139 * @return <code>true</code> if and only if this <code>TimelyResource</code>
140 * has no <code>Version</code> instances.
141 */
142 public boolean isEmpty() throws TimeoutException, PersistitInterruptedException {
143 Entry first = _first;
144 if (first == null) {
145 return true;
146 }
147 if (first.getVersion() == PRIMORDIAL) {
148 return false;
149 }
150 first = getEntry(tss2v(_persistit.getTransaction()));
151 if (first == null) {
152 return true;
153 }
154 if (first.isDeleted()) {
155 return true;
156 }
157 return false;
158 }
159
160 /**
161 *
162 * @return Whether this resource exists only within the context of the
163 * current transaction.
164 * @throws TimeoutException
165 * @throws PersistitInterruptedException
166 */
167
168 public boolean isTransactionPrivate(final boolean byStep) throws TimeoutException, PersistitInterruptedException {
169 Entry entry = _first;
170 if (entry != null && entry.getVersion() == PRIMORDIAL) {
171 return false;
172 }
173 final Transaction txn = _persistit.getTransaction();
174 final long versionHandle = tss2v(txn);
175 entry = getEntry(versionHandle);
176 if (entry == null) {
177 return true;
178 } else {
179 if (byStep) {
180 return entry.getVersion() == versionHandle;
181 } else {
182 return vh2ts(entry.getVersion()) == vh2ts(versionHandle);
183 }
184 }
185 }
186
187 /**
188 * @return Count of versions currently being managed.
189 */
190 int getVersionCount() {
191 int count = 0;
192 for (Entry e = _first; e != null; e = e._previous) {
193 count++;
194 }
195 return count;
196 }
197
198 @Override
199 public String toString() {
200 final StringBuilder sb = new StringBuilder("TimelyResource(");
201 boolean first = true;
202 for (Entry entry = _first; entry != null; entry = entry.getPrevious()) {
203 if (sb.length() > 1000) {
204 sb.append("...");
205 break;
206 }
207 if (!first) {
208 sb.append(',');
209 } else {
210 first = false;
211 }
212 sb.append(entry);
213 }
214 sb.append(')');
215 return sb.toString();
216 }
217
218 synchronized void setPrimordial() {
219 if (_first != null && _first.getPrevious() == null) {
220 _first.setPrimordial();
221 } else {
222 throw new IllegalStateException("Cannot be made primordial: " + this);
223 }
224 }
225
226 /**
227 * Remove all obsolete <code>Version</code> instances. For each instance
228 * that implements <code>PrunableVersion</code>, invoke its
229 * {@link PrunableVersion#prune()} method.
230 *
231 * @throws TimeoutException
232 * @throws PersistitException
233 */
234 void prune() throws TimeoutException, PersistitException {
235 final List<Entry> entriesToPrune = new ArrayList<Entry>();
236 PrunableVersion versionToVacate = null;
237
238 final TransactionIndex ti = _persistit.getTransactionIndex();
239
240 synchronized (this) {
241 try {
242
243 Entry newer = null;
244 Entry latest = null;
245 boolean isPrimordial = true;
246 long lastCommit = UNCOMMITTED;
247
248 for (Entry entry = _first; entry != null; entry = entry.getPrevious()) {
249 boolean keepIt = false;
250 final long versionHandle = entry.getVersion();
251 final long tc = ti.commitStatus(versionHandle, UNCOMMITTED, 0);
252 if (tc >= PRIMORDIAL) {
253 if (tc == UNCOMMITTED) {
254 keepIt = true;
255 isPrimordial = false;
256 } else {
257 final boolean hasConcurrent = ti.hasConcurrentTransaction(tc, lastCommit);
258 if (latest == null || hasConcurrent) {
259 keepIt = true;
260 if (latest == null) {
261 latest = entry;
262 }
263 }
264 if (keepIt && ti.hasConcurrentTransaction(0, tc)) {
265 isPrimordial = false;
266 }
267 }
268 lastCommit = tc;
269 } else {
270 assert tc == ABORTED;
271 }
272 if (keepIt) {
273 newer = entry;
274 } else {
275 if (tc == ABORTED ^ entry.isDeleted()) {
276 entriesToPrune.add(entry);
277 }
278 if (newer == null) {
279 _first = entry.getPrevious();
280 } else {
281 newer.setPrevious(entry.getPrevious());
282 }
283 }
284 }
285 if (isPrimordial && _first != null) {
286 assert _first.getPrevious() == null;
287 if (_first.isDeleted()) {
288 final V version = _first.getResource();
289 if (version instanceof PrunableVersion) {
290 versionToVacate = (PrunableVersion) version;
291 }
292 entriesToPrune.add(_first);
293 _first = null;
294 } else {
295 _first.setPrimordial();
296 }
297 }
298 } catch (final InterruptedException ie) {
299 throw new PersistitInterruptedException(ie);
300 }
301 }
302 for (final Entry e : entriesToPrune) {
303 if (versionToVacate != null) {
304 versionToVacate.vacate();
305 }
306 e.prune();
307 }
308 }
309
310 /**
311 * Helper method that adds an Entry containing a Version and its timestamp
312 * information. This method checks for write-write dependencies throws a
313 * RollbackException if there is a conflict.
314 *
315 * @param entry
316 * @param txn
317 * @throws PersistitException
318 * @throws RollbackException
319 */
320 private void addVersion(final Entry entry, final Transaction txn) throws PersistitInterruptedException,
321 RollbackException {
322 final TransactionIndex ti = _persistit.getTransactionIndex();
323 while (true) {
324 try {
325 synchronized (this) {
326 if (_first != null) {
327 if (_first.getVersion() > entry.getVersion()) {
328 /*
329 * This thread lost a race to make the most recent
330 * version
331 */
332 throw new RollbackException();
333 }
334 if (txn.isActive()) {
335 for (Entry e = _first; e != null; e = e.getPrevious()) {
336 final long version = e.getVersion();
337 final long depends = ti.wwDependency(version, txn.getTransactionStatus(), 0);
338 if (depends == TIMED_OUT) {
339 throw new WWRetryException(version);
340 }
341 if (depends != 0 && depends != ABORTED) {
342 /*
343 * version is from a concurrent transaction
344 * that already committed or timed out
345 * waiting to see. Either way, must abort.
346 */
347 throw new RollbackException();
348 }
349 }
350 }
351 }
352 entry.setPrevious(_first);
353 _first = entry;
354 break;
355 }
356 } catch (final WWRetryException re) {
357 try {
358 final long depends = _persistit.getTransactionIndex().wwDependency(re.getVersionHandle(),
359 txn.getTransactionStatus(), SharedResource.DEFAULT_MAX_WAIT_TIME);
360 if (depends != 0 && depends != ABORTED) {
361 /*
362 * version is from concurrent txn that already committed
363 * or timed out waiting to see. Either way, must abort.
364 */
365 throw new RollbackException();
366 }
367 } catch (final InterruptedException ie) {
368 throw new PersistitInterruptedException(ie);
369 }
370 } catch (final InterruptedException ie) {
371 throw new PersistitInterruptedException(ie);
372 }
373 }
374 }
375
376 /**
377 * Get the <code>Version</code> from the snapshot view specified by the
378 * supplied version handle.
379 *
380 * @param version
381 * versionHandle
382 * @return <code>Version</code> for given version handle
383 * @throws TimeoutException
384 * @throws PersistitInterruptedException
385 */
386 V getVersion(final long version) throws TimeoutException, PersistitInterruptedException {
387 final Entry e = getEntry(version);
388 return e == null ? null : e.getResource();
389 }
390
391 Entry getEntry(final long version) throws TimeoutException, PersistitInterruptedException {
392 final TransactionIndex ti = _persistit.getTransactionIndex();
393 try {
394 /*
395 * Note: not necessary to synchronize here. A concurrent transaction
396 * may modify _first, but this method does not need to see that
397 * version since it has not been committed. Conversely, if there is
398 * some transaction that committed before this transaction's start
399 * timestamp, then there is a happened-before relationship due to
400 * the synchronization in transaction registration; since _first is
401 * volatile, we are guaranteed to see the modification made by the
402 * committed transaction.
403 */
404 for (Entry e = _first; e != null; e = e._previous) {
405 final long commitTs = ti.commitStatus(e.getVersion(), vh2ts(version), vh2step(version));
406 if (commitTs >= 0 && commitTs != UNCOMMITTED) {
407 if (e.isDeleted()) {
408 return null;
409 }
410 return e;
411 }
412 }
413 return null;
414 } catch (final InterruptedException e) {
415 throw new PersistitInterruptedException(e);
416 }
417 }
418
419 /**
420 * <p>
421 * Holder for one instance of an object, such as a {@link Tree}, whose
422 * availability is governed by visibility within a transaction.
423 * </p>
424 * <p>
425 * An Entry has a version handle just like a version in an {@link MVV}. It
426 * is invisible to any transactions that started before the start timestamp,
427 * in write-write conflict with any concurrent transaction, and available to
428 * any transaction that starts after the version has committed. Visibility
429 * within the transaction that creates the TimelyResourceEntry is determined
430 * by the relative step numbers of the current transaction and the version
431 * handle. If there exists a TimelyResource instance with start and commit
432 * timestamps ts1, tc1 then any attempt to add another TimelyResource will
433 * fail unless ts2 > tc1.
434 * </p>
435 *
436 * @author peter
437 */
438
439 private class Entry {
440
441 private long _version;
442 private V _resource;
443 private volatile boolean _deleted;
444 private volatile Entry _previous;
445
446 private Entry(final long versionHandle, final V resource) {
447 _version = versionHandle;
448 _resource = resource;
449 }
450
451 private V getResource() {
452 return _resource;
453 }
454
455 private void setResource(final V resource) {
456 _resource = resource;
457 }
458
459 private Entry getPrevious() {
460 return _previous;
461 }
462
463 private void setPrevious(final Entry tr) {
464 _previous = tr;
465 }
466
467 private long getVersion() {
468 return _version;
469 }
470
471 private void setPrimordial() {
472 _version = PRIMORDIAL;
473 }
474
475 private void setDeleted() {
476 _deleted = true;
477 }
478
479 private boolean isDeleted() {
480 return _deleted;
481 }
482
483 private boolean prune() throws PersistitException {
484 if (_resource instanceof PrunableVersion) {
485 return ((PrunableVersion) _resource).prune();
486 } else {
487 return true;
488 }
489 }
490
491 @Override
492 public String toString() {
493 String tcStatus;
494 try {
495 final long tc = _persistit.getTransactionIndex().commitStatus(_version, Long.MAX_VALUE, 0);
496 tcStatus = TransactionStatus.tcString(tc);
497 } catch (final Exception e) {
498 tcStatus = e.toString();
499 }
500 return String.format("(ts=%s tc=%s)->%s%s", TransactionStatus.versionString(_version), tcStatus, _resource,
501 _previous != null ? "*" : "");
502 }
503 }
504
505}
0506
=== modified file 'src/main/java/com/persistit/TransactionPlayer.java'
--- src/main/java/com/persistit/TransactionPlayer.java 2012-11-28 18:52:21 +0000
+++ src/main/java/com/persistit/TransactionPlayer.java 2013-04-29 21:38:25 +0000
@@ -15,11 +15,8 @@
1515
16package com.persistit;16package com.persistit;
1717
18/**18import static com.persistit.TransactionIndex.ts2vh;
19 * 19
20 * Read and apply transaction from the journal to the live database. To apply
21 * a transaction, this class calls methods of a
22 */
23import java.nio.ByteBuffer;20import java.nio.ByteBuffer;
24import java.util.ArrayList;21import java.util.ArrayList;
25import java.util.List;22import java.util.List;
@@ -39,6 +36,11 @@
39import com.persistit.exception.PersistitException;36import com.persistit.exception.PersistitException;
40import com.persistit.exception.VolumeNotFoundException;37import com.persistit.exception.VolumeNotFoundException;
4138
39/**
40 * Read and apply transaction from the journal to the live database. To apply a
41 * transaction, this class calls methods of a TransactionPlayerListener.
42 */
43
42class TransactionPlayer {44class TransactionPlayer {
4345
44 private final AtomicLong appliedUpdates = new AtomicLong();46 private final AtomicLong appliedUpdates = new AtomicLong();
@@ -67,6 +69,7 @@
6769
68 boolean requiresLongRecordConversion();70 boolean requiresLongRecordConversion();
6971
72 boolean createTree(long timestamp) throws PersistitException;
70 }73 }
7174
72 final TransactionPlayerSupport _support;75 final TransactionPlayerSupport _support;
@@ -153,93 +156,106 @@
153 case SR.TYPE: {156 case SR.TYPE: {
154 final int keySize = SR.getKeySize(bb);157 final int keySize = SR.getKeySize(bb);
155 final int treeHandle = SR.getTreeHandle(bb);158 final int treeHandle = SR.getTreeHandle(bb);
156 final Exchange exchange = getExchange(treeHandle, address, startTimestamp);159
157 exchange.ignoreTransactions();160 final Exchange exchange = getExchange(treeHandle, address, startTimestamp, listener);
158 final Key key = exchange.getKey();161 if (exchange != null) {
159 final Value value = exchange.getValue();162 exchange.ignoreTransactions();
160 System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD, key.getEncodedBytes(), 0, keySize);163 final Key key = exchange.getKey();
161 key.setEncodedSize(keySize);164 final Value value = exchange.getValue();
162 final int valueSize = innerSize - SR.OVERHEAD - keySize;165 System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD, key.getEncodedBytes(), 0, keySize);
163 value.ensureFit(valueSize);166 key.setEncodedSize(keySize);
164 System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD + keySize, value.getEncodedBytes(), 0,167 final int valueSize = innerSize - SR.OVERHEAD - keySize;
165 valueSize);168 value.ensureFit(valueSize);
166 value.setEncodedSize(valueSize);169 System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD + keySize, value.getEncodedBytes(), 0,
167170 valueSize);
168 if (value.getEncodedSize() >= Buffer.LONGREC_SIZE171 value.setEncodedSize(valueSize);
169 && (value.getEncodedBytes()[0] & 0xFF) == Buffer.LONGREC_TYPE) {172
173 if (value.getEncodedSize() >= Buffer.LONGREC_SIZE
174 && (value.getEncodedBytes()[0] & 0xFF) == Buffer.LONGREC_TYPE) {
175 /*
176 * convertToLongRecord will pollute the
177 * getReadBuffer(). Therefore before calling it we
178 * need to copy the TX record to a fresh ByteBuffer.
179 */
180 if (bb == _support.getReadBuffer()) {
181 end = recordSize - (position - start);
182 bb = ByteBuffer.allocate(end);
183 bb.put(_support.getReadBuffer().array(), position, end);
184 bb.flip();
185 position = 0;
186 }
187 if (listener.requiresLongRecordConversion()) {
188 _support.convertToLongRecord(value, treeHandle, address, commitTimestamp);
189 }
190 }
191
192 listener.store(address, startTimestamp, exchange);
170 /*193 /*
171 * convertToLongRecord will pollute the getReadBuffer().194 * Don't keep exchanges with enlarged value - let them
172 * Therefore before calling it we need to copy the TX195 * be GC'd
173 * record to a fresh ByteBuffer.
174 */196 */
175 if (bb == _support.getReadBuffer()) {197 if (exchange.getValue().getMaximumSize() < Value.DEFAULT_MAXIMUM_SIZE) {
176 end = recordSize - (position - start);198 releaseExchange(exchange);
177 bb = ByteBuffer.allocate(end);
178 bb.put(_support.getReadBuffer().array(), position, end);
179 bb.flip();
180 position = 0;
181 }
182 if (listener.requiresLongRecordConversion()) {
183 _support.convertToLongRecord(value, treeHandle, address, commitTimestamp);
184 }199 }
185 }200 }
186
187 listener.store(address, startTimestamp, exchange);
188 appliedUpdates.incrementAndGet();201 appliedUpdates.incrementAndGet();
189 // Don't keep exchanges with enlarged value - let them be
190 // GC'd
191 if (exchange.getValue().getMaximumSize() < Value.DEFAULT_MAXIMUM_SIZE) {
192 releaseExchange(exchange);
193 }
194 break;202 break;
195 }203 }
196204
197 case DR.TYPE: {205 case DR.TYPE: {
198 final int key1Size = DR.getKey1Size(bb);206 final int key1Size = DR.getKey1Size(bb);
199 final int elisionCount = DR.getKey2Elision(bb);207 final int elisionCount = DR.getKey2Elision(bb);
200 final Exchange exchange = getExchange(DR.getTreeHandle(bb), address, startTimestamp);208 final Exchange exchange = getExchange(DR.getTreeHandle(bb), address, startTimestamp, listener);
201 exchange.ignoreTransactions();209 if (exchange != null) {
202 final Key key1 = exchange.getAuxiliaryKey3();210 exchange.ignoreTransactions();
203 final Key key2 = exchange.getAuxiliaryKey4();211 final Key key1 = exchange.getAuxiliaryKey3();
204 System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD, key1.getEncodedBytes(), 0, key1Size);212 final Key key2 = exchange.getAuxiliaryKey4();
205 key1.setEncodedSize(key1Size);213 System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD, key1.getEncodedBytes(), 0, key1Size);
206 final int key2Size = innerSize - DR.OVERHEAD - key1Size;214 key1.setEncodedSize(key1Size);
207 System.arraycopy(key1.getEncodedBytes(), 0, key2.getEncodedBytes(), 0, elisionCount);215 final int key2Size = innerSize - DR.OVERHEAD - key1Size;
208 System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD + key1Size, key2.getEncodedBytes(),216 System.arraycopy(key1.getEncodedBytes(), 0, key2.getEncodedBytes(), 0, elisionCount);
209 elisionCount, key2Size);217 System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD + key1Size, key2.getEncodedBytes(),
210 key2.setEncodedSize(key2Size + elisionCount);218 elisionCount, key2Size);
211 listener.removeKeyRange(address, startTimestamp, exchange, key1, key2);219 key2.setEncodedSize(key2Size + elisionCount);
220 listener.removeKeyRange(address, startTimestamp, exchange, key1, key2);
221 releaseExchange(exchange);
222 }
212 appliedUpdates.incrementAndGet();223 appliedUpdates.incrementAndGet();
213 releaseExchange(exchange);
214 break;224 break;
215 }225 }
216226
217 case DT.TYPE: {227 case DT.TYPE: {
218 final Exchange exchange = getExchange(DT.getTreeHandle(bb), address, startTimestamp);228 final Exchange exchange = getExchange(DT.getTreeHandle(bb), address, startTimestamp, listener);
219 listener.removeTree(address, startTimestamp, exchange);229 if (exchange != null) {
230 listener.removeTree(address, startTimestamp, exchange);
231 releaseExchange(exchange);
232 }
220 appliedUpdates.incrementAndGet();233 appliedUpdates.incrementAndGet();
221 releaseExchange(exchange);
222 break;234 break;
223 }235 }
224236
225 case D0.TYPE: {237 case D0.TYPE: {
226 final Exchange exchange = getExchange(D0.getTreeHandle(bb), address, startTimestamp);238 final Exchange exchange = getExchange(D0.getTreeHandle(bb), address, startTimestamp, listener);
227 /*239 if (exchange != null) {
228 * Note that the commitTimestamp, not startTimestamp is240 /*
229 * passed to the delta method. The241 * Note that the commitTimestamp, not startTimestamp is
230 * Accumulator#updateBaseValue method needs the242 * passed to the delta method. The
231 * commitTimestamp.243 * Accumulator#updateBaseValue method needs the
232 */244 * commitTimestamp.
233 listener.delta(address, commitTimestamp, exchange.getTree(), D0.getIndex(bb),245 */
234 D0.getAccumulatorTypeOrdinal(bb), 1);246 listener.delta(address, commitTimestamp, exchange.getTree(), D0.getIndex(bb),
235 appliedUpdates.incrementAndGet();247 D0.getAccumulatorTypeOrdinal(bb), 1);
248 appliedUpdates.incrementAndGet();
249 }
236 break;250 break;
237 }251 }
238252
239 case D1.TYPE: {253 case D1.TYPE: {
240 final Exchange exchange = getExchange(D1.getTreeHandle(bb), address, startTimestamp);254 final Exchange exchange = getExchange(D1.getTreeHandle(bb), address, startTimestamp, listener);
241 listener.delta(address, startTimestamp, exchange.getTree(), D1.getIndex(bb),255 if (exchange != null) {
242 D1.getAccumulatorTypeOrdinal(bb), D1.getValue(bb));256 listener.delta(address, startTimestamp, exchange.getTree(), D1.getIndex(bb),
257 D1.getAccumulatorTypeOrdinal(bb), D1.getValue(bb));
258 }
243 appliedUpdates.incrementAndGet();259 appliedUpdates.incrementAndGet();
244 break;260 break;
245 }261 }
@@ -281,7 +297,26 @@
281 return String.format("JournalAddress %,d{%,d}", address, timestamp);297 return String.format("JournalAddress %,d{%,d}", address, timestamp);
282 }298 }
283299
284 private Exchange getExchange(final int treeHandle, final long from, final long timestamp) throws PersistitException {300 /**
301 * Returns an Exchange on which an operation can be applied or rolled back.
302 * For a {@link TransactionPlayerListener} that performs roll backs, it is
303 * important not to create a new tree when none exists. Therefore this
304 * method may return <code>null</code> to indicate that no tree exists and
305 * therefore the requested operation should be ignored. Whether to create a
306 * new tree is determined by the
307 * {@link TransactionPlayerListener#createTree(long)} method.
308 *
309 * @param treeHandle
310 * @param from
311 * @param timestamp
312 * @param listener
313 * @return the <code>Exchange</code> on which a recovery operation should be
314 * applied, or <code>null</code> if there is no backing
315 * <code>Tree</code>.
316 * @throws PersistitException
317 */
318 private Exchange getExchange(final int treeHandle, final long from, final long timestamp,
319 final TransactionPlayerListener listener) throws PersistitException {
285 final TreeDescriptor td = _support.getPersistit().getJournalManager().lookupTreeHandle(treeHandle);320 final TreeDescriptor td = _support.getPersistit().getJournalManager().lookupTreeHandle(treeHandle);
286 if (td == null) {321 if (td == null) {
287 throw new CorruptJournalException("Tree handle " + treeHandle + " is undefined at "322 throw new CorruptJournalException("Tree handle " + treeHandle + " is undefined at "
@@ -295,6 +330,10 @@
295 if (VolumeStructure.DIRECTORY_TREE_NAME.equals(td.getTreeName())) {330 if (VolumeStructure.DIRECTORY_TREE_NAME.equals(td.getTreeName())) {
296 return volume.getStructure().directoryExchange();331 return volume.getStructure().directoryExchange();
297 } else {332 } else {
333 final Tree tree = volume.getStructure().getTreeInternal(td.getTreeName());
334 if (!listener.createTree(timestamp) && (tree == null || !tree.hasVersion(ts2vh(timestamp)))) {
335 return null;
336 }
298 final Exchange exchange = _support.getPersistit().getExchange(volume, td.getTreeName(), true);337 final Exchange exchange = _support.getPersistit().getExchange(volume, td.getTreeName(), true);
299 exchange.ignoreTransactions();338 exchange.ignoreTransactions();
300 return exchange;339 return exchange;
301340
=== modified file 'src/main/java/com/persistit/Tree.java'
--- src/main/java/com/persistit/Tree.java 2013-04-11 15:31:16 +0000
+++ src/main/java/com/persistit/Tree.java 2013-04-29 21:38:25 +0000
@@ -23,8 +23,13 @@
23import com.persistit.Accumulator.MinAccumulator;23import com.persistit.Accumulator.MinAccumulator;
24import com.persistit.Accumulator.SeqAccumulator;24import com.persistit.Accumulator.SeqAccumulator;
25import com.persistit.Accumulator.SumAccumulator;25import com.persistit.Accumulator.SumAccumulator;
26import com.persistit.Version.PrunableVersion;
27import com.persistit.Version.VersionCreator;
26import com.persistit.exception.CorruptVolumeException;28import com.persistit.exception.CorruptVolumeException;
27import com.persistit.exception.PersistitException;29import com.persistit.exception.PersistitException;
30import com.persistit.exception.PersistitInterruptedException;
31import com.persistit.exception.RollbackException;
32import com.persistit.exception.TimeoutException;
28import com.persistit.util.Debug;33import com.persistit.util.Debug;
29import com.persistit.util.Util;34import com.persistit.util.Util;
3035
@@ -36,6 +41,20 @@
36 * {@link Accumulator}s for a B-Tree.41 * {@link Accumulator}s for a B-Tree.
37 * </p>42 * </p>
38 * <p>43 * <p>
44 * As of Persistit 3.3, this class supports version within transactions. A new
45 * <code>Tree</code> created within the cope of a {@link Transaction} is not
46 * visible within the other transactions until it commits. Similarly, if a
47 * <code>Tree</code> is removed within the scope of a transaction, other
48 * transactions that started before the current transaction commits will
49 * continue to be able to read and write the <code>Tree</code>. As a
50 * side-effect, the physical storage for a <code>Tree</code> is not deallocated
51 * until there are no remaining active transactions that started before the
52 * commit timestamp of the current transaction. Concurrent transactions that
53 * attempt to create or remove the same <code>Tree</code> instance are subject
54 * to a a write-write dependency (see {@link Transaction}); all but one such
55 * transaction must roll back.
56 * </p>
57 * <p>
39 * <code>Tree</code> instances are created by58 * <code>Tree</code> instances are created by
40 * {@link Volume#getTree(String, boolean)}. If the <code>Volume</code> already59 * {@link Volume#getTree(String, boolean)}. If the <code>Volume</code> already
41 * has a B-Tree with the specified name, then the <code>Tree</code> object60 * has a B-Tree with the specified name, then the <code>Tree</code> object
@@ -67,15 +86,76 @@
6786
68 private final String _name;87 private final String _name;
69 private final Volume _volume;88 private final Volume _volume;
70 private volatile long _rootPageAddr;
71 private volatile int _depth;
72 private final AtomicLong _changeCount = new AtomicLong(0);
73 private final AtomicReference<Object> _appCache = new AtomicReference<Object>();89 private final AtomicReference<Object> _appCache = new AtomicReference<Object>();
74 private final AtomicInteger _handle = new AtomicInteger();90 private final AtomicInteger _handle = new AtomicInteger();
7591
76 private final Accumulator[] _accumulators = new Accumulator[MAX_ACCUMULATOR_COUNT];92 private final TimelyResource<TreeVersion> _timelyResource;
7793
78 private final TreeStatistics _treeStatistics = new TreeStatistics();94 private final VersionCreator<TreeVersion> _creator = new VersionCreator<TreeVersion>() {
95
96 @Override
97 public TreeVersion createVersion(final TimelyResource<? extends TreeVersion> resource)
98 throws PersistitException {
99 return new TreeVersion();
100 }
101 };
102
103 class TreeVersion implements PrunableVersion {
104 volatile long _rootPageAddr;
105 volatile int _depth;
106 volatile long _generation = _persistit.getTimestampAllocator().updateTimestamp();
107 final AtomicLong _changeCount = new AtomicLong();
108 volatile boolean _pruned;
109 private final Accumulator[] _accumulators = new Accumulator[MAX_ACCUMULATOR_COUNT];
110 private final TreeStatistics _treeStatistics = new TreeStatistics();
111
112 @Override
113 public boolean prune() throws PersistitException {
114 assert !_pruned;
115 _volume.getStructure().deallocateTree(_rootPageAddr, _depth);
116 discardAccumulators();
117 _pruned = true;
118 _rootPageAddr = -1;
119 return true;
120 }
121
122 @Override
123 public void vacate() {
124 clearValid();
125 _volume.getStructure().removed(Tree.this);
126 }
127
128 @Override
129 public String toString() {
130 return String.format("Tree(%d,%d)%s", _rootPageAddr, _depth, _pruned ? "#" : "");
131 }
132
133 /**
134 * Forget about any instantiated accumulator and remove it from the
135 * active list in Persistit. This should only be called in the during
136 * the process of removing a tree.
137 */
138 void discardAccumulators() {
139 for (int i = 0; i < _accumulators.length; ++i) {
140 if (_accumulators[i] != null) {
141 _persistit.removeAccumulator(_accumulators[i]);
142 _accumulators[i] = null;
143 }
144 }
145 }
146 }
147
148 /**
149 * Unchecked wrapper for PersistitException thrown while trying to acquire a
150 * TreeVersion.
151 */
152 public static class TreeVersionException extends RuntimeException {
153 private static final long serialVersionUID = -6372589972106489591L;
154
155 TreeVersionException(final Exception e) {
156 super(e);
157 }
158 }
79159
80 Tree(final Persistit persistit, final Volume volume, final String name) {160 Tree(final Persistit persistit, final Volume volume, final String name) {
81 super(persistit);161 super(persistit);
@@ -86,7 +166,35 @@
86 }166 }
87 _name = name;167 _name = name;
88 _volume = volume;168 _volume = volume;
89 _generation.set(1);169 _timelyResource = new TimelyResource<TreeVersion>(persistit);
170 }
171
172 TreeVersion version() {
173 try {
174 return _timelyResource.getVersion(_creator);
175 } catch (final PersistitException e) {
176 throw new TreeVersionException(e);
177 }
178 }
179
180 public boolean isDeleted() throws TimeoutException, PersistitInterruptedException {
181 return _timelyResource.isEmpty();
182 }
183
184 boolean isLive() throws TimeoutException, PersistitInterruptedException {
185 return isValid() && !isDeleted();
186 }
187
188 boolean isTransactionPrivate(final boolean byStep) throws TimeoutException, PersistitInterruptedException {
189 return _timelyResource.isTransactionPrivate(byStep);
190 }
191
192 boolean hasVersion(final long versionHandle) throws TimeoutException, PersistitInterruptedException {
193 return _timelyResource.getVersion(versionHandle) != null;
194 }
195
196 void delete() throws RollbackException, PersistitException {
197 _timelyResource.delete();
90 }198 }
91199
92 /**200 /**
@@ -110,7 +218,9 @@
110218
111 @Override219 @Override
112 public boolean equals(final Object o) {220 public boolean equals(final Object o) {
113 if (o instanceof Tree) {221 if (o == this) {
222 return true;
223 } else if (o instanceof Tree) {
114 final Tree tree = (Tree) o;224 final Tree tree = (Tree) o;
115 return _name.equals(tree._name) && _volume.equals(tree.getVolume());225 return _name.equals(tree._name) && _volume.equals(tree.getVolume());
116 } else {226 } else {
@@ -126,20 +236,32 @@
126 * @return The page address236 * @return The page address
127 */237 */
128 public long getRootPageAddr() {238 public long getRootPageAddr() {
129 return _rootPageAddr;239 final TreeVersion version = version();
240 return version._rootPageAddr;
130 }241 }
131242
132 /**243 /**
133 * @return the number of levels of the <code>Tree</code>.244 * @return the number of levels of the <code>Tree</code>.
134 */245 */
135 public int getDepth() {246 public int getDepth() {
136 return _depth;247 return version()._depth;
248 }
249
250 @Override
251 public long getGeneration() {
252 return version()._generation;
253 }
254
255 @Override
256 void bumpGeneration() {
257 version()._generation = _persistit.getTimestampAllocator().updateTimestamp();
137 }258 }
138259
139 void changeRootPageAddr(final long rootPageAddr, final int deltaDepth) throws PersistitException {260 void changeRootPageAddr(final long rootPageAddr, final int deltaDepth) throws PersistitException {
140 Debug.$assert0.t(isOwnedAsWriterByMe());261 Debug.$assert0.t(isOwnedAsWriterByMe());
141 _rootPageAddr = rootPageAddr;262 final TreeVersion version = version();
142 _depth += deltaDepth;263 version._rootPageAddr = rootPageAddr;
264 version._depth += deltaDepth;
143 }265 }
144266
145 void bumpChangeCount() {267 void bumpChangeCount() {
@@ -147,7 +269,7 @@
147 // Note: the changeCount only gets written when there's a structure269 // Note: the changeCount only gets written when there's a structure
148 // change in the tree that causes it to be committed.270 // change in the tree that causes it to be committed.
149 //271 //
150 _changeCount.incrementAndGet();272 version()._changeCount.incrementAndGet();
151 }273 }
152274
153 /**275 /**
@@ -155,7 +277,7 @@
155 * this tree; does not including replacement of an existing value277 * this tree; does not including replacement of an existing value
156 */278 */
157 long getChangeCount() {279 long getChangeCount() {
158 return _changeCount.get();280 return version()._changeCount.get();
159 }281 }
160282
161 /**283 /**
@@ -165,9 +287,10 @@
165 */287 */
166 int store(final byte[] bytes, final int index) {288 int store(final byte[] bytes, final int index) {
167 final byte[] nameBytes = Util.stringToBytes(_name);289 final byte[] nameBytes = Util.stringToBytes(_name);
168 Util.putLong(bytes, index, _rootPageAddr);290 final TreeVersion version = version();
169 Util.putLong(bytes, index + 8, getChangeCount());291 Util.putLong(bytes, index, version._rootPageAddr);
170 Util.putShort(bytes, index + 16, _depth);292 Util.putLong(bytes, index + 8, version._changeCount.get());
293 Util.putShort(bytes, index + 16, version._depth);
171 Util.putShort(bytes, index + 18, nameBytes.length);294 Util.putShort(bytes, index + 18, nameBytes.length);
172 Util.putBytes(bytes, index + 20, nameBytes);295 Util.putBytes(bytes, index + 20, nameBytes);
173 return 20 + nameBytes.length;296 return 20 + nameBytes.length;
@@ -187,9 +310,10 @@
187 if (!_name.equals(name)) {310 if (!_name.equals(name)) {
188 throw new IllegalStateException("Invalid tree name recorded: " + name + " for tree " + _name);311 throw new IllegalStateException("Invalid tree name recorded: " + name + " for tree " + _name);
189 }312 }
190 _rootPageAddr = Util.getLong(bytes, index);313 final TreeVersion version = version();
191 _changeCount.set(Util.getLong(bytes, index + 8));314 version._rootPageAddr = Util.getLong(bytes, index);
192 _depth = Util.getShort(bytes, index + 16);315 version._changeCount.set(Util.getLong(bytes, index + 8));
316 version._depth = Util.getShort(bytes, index + 16);
193 return length;317 return length;
194 }318 }
195319
@@ -200,7 +324,8 @@
200 * @throws PersistitException324 * @throws PersistitException
201 */325 */
202 void setRootPageAddress(final long rootPageAddr) throws PersistitException {326 void setRootPageAddress(final long rootPageAddr) throws PersistitException {
203 if (_rootPageAddr != rootPageAddr) {327 final TreeVersion version = version();
328 if (version._rootPageAddr != rootPageAddr) {
204 // Derive the index depth329 // Derive the index depth
205 Buffer buffer = null;330 Buffer buffer = null;
206 try {331 try {
@@ -210,8 +335,8 @@
210 throw new CorruptVolumeException(String.format("Tree root page %,d has invalid type %s",335 throw new CorruptVolumeException(String.format("Tree root page %,d has invalid type %s",
211 rootPageAddr, buffer.getPageTypeName()));336 rootPageAddr, buffer.getPageTypeName()));
212 }337 }
213 _rootPageAddr = rootPageAddr;338 version._rootPageAddr = rootPageAddr;
214 _depth = type - Buffer.PAGE_TYPE_DATA + 1;339 version._depth = type - Buffer.PAGE_TYPE_DATA + 1;
215 } finally {340 } finally {
216 if (buffer != null) {341 if (buffer != null) {
217 buffer.releaseTouched();342 buffer.releaseTouched();
@@ -226,10 +351,15 @@
226 * <code>Tree</code> to fail.351 * <code>Tree</code> to fail.
227 */352 */
228 void invalidate() {353 void invalidate() {
354 final TreeVersion version = version();
229 super.clearValid();355 super.clearValid();
230 _depth = -1;356 version._depth = -1;
231 _rootPageAddr = -1;357 version._rootPageAddr = -1;
232 _generation.set(-1);358 version._generation = _persistit.getTimestampAllocator().updateTimestamp();
359 }
360
361 void setPrimordial() {
362 _timelyResource.setPrimordial();
233 }363 }
234364
235 /**365 /**
@@ -238,7 +368,7 @@
238 * </code>Tree</code>368 * </code>Tree</code>
239 */369 */
240 public TreeStatistics getStatistics() {370 public TreeStatistics getStatistics() {
241 return _treeStatistics;371 return version()._treeStatistics;
242 }372 }
243373
244 /**374 /**
@@ -248,8 +378,9 @@
248 */378 */
249 @Override379 @Override
250 public String toString() {380 public String toString() {
251 return "<Tree " + _name + " in volume " + _volume.getName() + " rootPageAddr=" + _rootPageAddr + " depth="381 final TreeVersion version = version();
252 + _depth + " status=" + getStatusDisplayString() + ">";382 return "<Tree " + _name + " in volume " + _volume.getName() + " rootPageAddr=" + version._rootPageAddr
383 + " depth=" + version._depth + " status=" + getStatusDisplayString() + ">";
253 }384 }
254385
255 /**386 /**
@@ -278,8 +409,8 @@
278 }409 }
279410
280 /**411 /**
281 * Set the tree handle. The tree must may not be a member of a temporary412 * Assign and set the tree handle. The tree must may not be a member of a
282 * volume.413 * temporary volume.
283 * 414 *
284 * @throws PersistitException415 * @throws PersistitException
285 */416 */
@@ -397,7 +528,8 @@
397 if (index < 0 || index >= MAX_ACCUMULATOR_COUNT) {528 if (index < 0 || index >= MAX_ACCUMULATOR_COUNT) {
398 throw new IllegalArgumentException("Invalid accumulator index: " + index);529 throw new IllegalArgumentException("Invalid accumulator index: " + index);
399 }530 }
400 Accumulator accumulator = _accumulators[index];531 final TreeVersion version = version();
532 Accumulator accumulator = version._accumulators[index];
401 if (accumulator == null) {533 if (accumulator == null) {
402 final AccumulatorState saved = Accumulator.getAccumulatorState(this, index);534 final AccumulatorState saved = Accumulator.getAccumulatorState(this, index);
403 long savedValue = 0;535 long savedValue = 0;
@@ -411,7 +543,7 @@
411 savedValue = saved.getValue();543 savedValue = saved.getValue();
412 }544 }
413 accumulator = Accumulator.accumulator(type, this, index, savedValue, _persistit.getTransactionIndex());545 accumulator = Accumulator.accumulator(type, this, index, savedValue, _persistit.getTransactionIndex());
414 _accumulators[index] = accumulator;546 version._accumulators[index] = accumulator;
415 _persistit.addAccumulator(accumulator);547 _persistit.addAccumulator(accumulator);
416 } else if (accumulator.getType() != type) {548 } else if (accumulator.getType() != type) {
417 throw new IllegalStateException("Wrong type " + accumulator + " is not a " + type + " accumulator");549 throw new IllegalStateException("Wrong type " + accumulator + " is not a " + type + " accumulator");
@@ -442,17 +574,4 @@
442 _handle.set(0);574 _handle.set(0);
443 }575 }
444576
445 /**
446 * Forget about any instantiated accumulator and remove it from the active
447 * list in Persistit. This should only be called in the during the process
448 * of removing a tree.
449 */
450 void discardAccumulators() {
451 for (int i = 0; i < _accumulators.length; ++i) {
452 if (_accumulators[i] != null) {
453 _persistit.removeAccumulator(_accumulators[i]);
454 _accumulators[i] = null;
455 }
456 }
457 }
458}577}
459578
=== modified file 'src/main/java/com/persistit/ValueHelper.java'
--- src/main/java/com/persistit/ValueHelper.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/ValueHelper.java 2013-04-29 21:38:25 +0000
@@ -15,6 +15,8 @@
1515
16package com.persistit;16package com.persistit;
1717
18import com.persistit.util.Util;
19
18interface ValueHelper {20interface ValueHelper {
1921
20 int requiredLength(final byte[] target, int targetOffset, int targetLength);22 int requiredLength(final byte[] target, int targetOffset, int targetLength);
@@ -102,6 +104,11 @@
102 public boolean isMVV() {104 public boolean isMVV() {
103 return false;105 return false;
104 }106 }
107
108 @Override
109 public String toString() {
110 return _value != null ? Util.hexDump(_value.getEncodedBytes(), 0, _value.getEncodedSize()) : "null";
111 }
105 };112 };
106113
107 static class MVVValueWriter implements ValueHelper {114 static class MVVValueWriter implements ValueHelper {
@@ -145,5 +152,10 @@
145 public boolean isMVV() {152 public boolean isMVV() {
146 return true;153 return true;
147 }154 }
155
156 @Override
157 public String toString() {
158 return _value != null ? Util.hexDump(_value.getEncodedBytes(), 0, _value.getEncodedSize()) : "null";
159 }
148 };160 };
149}161}
150162
=== added file 'src/main/java/com/persistit/Version.java'
--- src/main/java/com/persistit/Version.java 1970-01-01 00:00:00 +0000
+++ src/main/java/com/persistit/Version.java 2013-04-29 21:38:25 +0000
@@ -0,0 +1,69 @@
1/**
2 * Copyright © 2012 Akiban Technologies, Inc. All rights reserved.
3 *
4 * This program and the accompanying materials are made available
5 * under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * This program may also be available under different license terms.
10 * For more information, see www.akiban.com or contact licensing@akiban.com.
11 *
12 * Contributors:
13 * Akiban Technologies, Inc.
14 */
15
16package com.persistit;
17
18import com.persistit.exception.PersistitException;
19
20/**
21 * Marker interface implemented by objects managed within the
22 * {@link TimelyResource} framework.
23 *
24 * @author peter
25 */
26interface Version {
27 /**
28 * Sub-interface describing <code>Version</code> types that require pruning
29 * when obsolete.
30 *
31 * @author peter
32 */
33 interface PrunableVersion extends Version {
34 /**
35 * Clean up any state held by this resource. For example, when a
36 * {@link Tree} is pruned, all pages allocated to its content are
37 * deallocated. This method is called when a newer
38 * <code>TimelyResource</code> has been created and is visible to all
39 * active transactions.
40 *
41 * @return <code>true</code> if all pruning work for this resource has
42 * been completed, <code>false</code> if the prune method should
43 * be called again later
44 */
45 boolean prune() throws PersistitException;
46
47 /**
48 * Called after the last known <code>Version</code> managed by a
49 * <code>TimelyResource</code> has been pruned.
50 *
51 * @throws PersistitException
52 */
53 void vacate() throws PersistitException;
54
55 }
56
57 /**
58 * Interface for a factory that creates Versions of the specified type.
59 *
60 * @author peter
61 *
62 * @param <T>
63 * @param <V>
64 */
65 interface VersionCreator<V> {
66 V createVersion(final TimelyResource<? extends V> resource) throws PersistitException;
67 }
68
69}
0\ No newline at end of file70\ No newline at end of file
171
=== modified file 'src/main/java/com/persistit/Volume.java'
--- src/main/java/com/persistit/Volume.java 2013-03-13 17:14:51 +0000
+++ src/main/java/com/persistit/Volume.java 2013-04-29 21:38:25 +0000
@@ -456,8 +456,11 @@
456 * 456 *
457 * @throws PersistitException457 * @throws PersistitException
458 */458 */
459 void open(final Persistit persistit) throws PersistitException {459 synchronized void open(final Persistit persistit) throws PersistitException {
460 checkClosing();460 checkClosing();
461 if (isOpened()) {
462 return;
463 }
461 if (_specification == null) {464 if (_specification == null) {
462 throw new IllegalStateException("Missing VolumeSpecification");465 throw new IllegalStateException("Missing VolumeSpecification");
463 }466 }
464467
=== modified file 'src/main/java/com/persistit/VolumeStructure.java'
--- src/main/java/com/persistit/VolumeStructure.java 2013-03-13 16:27:04 +0000
+++ src/main/java/com/persistit/VolumeStructure.java 2013-04-29 21:38:25 +0000
@@ -15,17 +15,12 @@
1515
16package com.persistit;16package com.persistit;
1717
18import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_A;
19import static com.persistit.util.ThreadSequencer.sequence;
20
21import java.lang.ref.WeakReference;18import java.lang.ref.WeakReference;
22import java.util.ArrayList;19import java.util.ArrayList;
23import java.util.HashMap;20import java.util.HashMap;
24import java.util.List;21import java.util.List;
25import java.util.Map;22import java.util.Map;
2623
27import com.persistit.AlertMonitor.AlertLevel;
28import com.persistit.AlertMonitor.Event;
29import com.persistit.exception.BufferSizeUnavailableException;24import com.persistit.exception.BufferSizeUnavailableException;
30import com.persistit.exception.CorruptVolumeException;25import com.persistit.exception.CorruptVolumeException;
31import com.persistit.exception.InUseException;26import com.persistit.exception.InUseException;
@@ -131,7 +126,6 @@
131126
132 Exchange directoryExchange() throws BufferSizeUnavailableException {127 Exchange directoryExchange() throws BufferSizeUnavailableException {
133 final Exchange ex = new Exchange(_directoryTree);128 final Exchange ex = new Exchange(_directoryTree);
134 ex.ignoreTransactions();
135 return ex;129 return ex;
136 }130 }
137131
@@ -191,21 +185,30 @@
191 if (DIRECTORY_TREE_NAME.equals(name)) {185 if (DIRECTORY_TREE_NAME.equals(name)) {
192 throw new IllegalArgumentException("Tree name is reserved: " + name);186 throw new IllegalArgumentException("Tree name is reserved: " + name);
193 }187 }
194 Tree tree;188 Tree tree = null;
195 final WeakReference<Tree> treeRef = _treeNameHashMap.get(name);189 final WeakReference<Tree> treeRef = _treeNameHashMap.get(name);
196 if (treeRef != null) {190 if (treeRef != null) {
197 tree = treeRef.get();191 tree = treeRef.get();
198 if (tree != null && tree.isValid()) {192 if (tree != null) {
199 return tree;193 if (tree.isLive()) {
194 return tree;
195 } else {
196 if (!createIfNecessary) {
197 return null;
198 }
199 }
200 }200 }
201 }201 }
202 if (tree == null) {
203 tree = new Tree(_persistit, _volume, name);
204 }
202 final Exchange ex = directoryExchange();205 final Exchange ex = directoryExchange();
203 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(name);206 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(name);
204 final Value value = ex.fetch().getValue();207 final Value value = ex.fetch().getValue();
205 tree = new Tree(_persistit, _volume, name);
206 if (value.isDefined()) {208 if (value.isDefined()) {
207 value.get(tree);209 value.get(tree);
208 loadTreeStatistics(tree);210 loadTreeStatistics(tree);
211 tree.setPrimordial();
209 tree.setValid();212 tree.setValid();
210 } else if (createIfNecessary) {213 } else if (createIfNecessary) {
211 final long rootPageAddr = createTreeRoot(tree);214 final long rootPageAddr = createTreeRoot(tree);
@@ -216,10 +219,14 @@
216 } else {219 } else {
217 return null;220 return null;
218 }221 }
222 if (_volume.isTemporary() || _volume.isLockVolume()) {
223 tree.setPrimordial();
224 }
219 if (!_volume.isTemporary()) {225 if (!_volume.isTemporary()) {
220 tree.loadHandle();226 tree.loadHandle();
221 }227 }
222 _treeNameHashMap.put(name, new WeakReference<Tree>(tree));228 _treeNameHashMap.put(name, new WeakReference<Tree>(tree));
229
223 return tree;230 return tree;
224 }231 }
225232
@@ -249,13 +256,16 @@
249 }256 }
250 } else {257 } else {
251 final Exchange ex = directoryExchange();258 final Exchange ex = directoryExchange();
259 if (!tree.isTransactionPrivate(false)) {
260 ex.ignoreTransactions();
261 }
252 ex.getValue().put(tree);262 ex.getValue().put(tree);
253 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(tree.getName()).store();263 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(tree.getName()).store();
254 }264 }
255 }265 }
256266
257 void storeTreeStatistics(final Tree tree) throws PersistitException {267 void storeTreeStatistics(final Tree tree) throws PersistitException {
258 if (tree.getStatistics().isDirty() && !DIRECTORY_TREE_NAME.equals(tree.getName())) {268 if (tree.isLive() && tree.getStatistics().isDirty() && tree != _directoryTree) {
259 final Exchange ex = directoryExchange();269 final Exchange ex = directoryExchange();
260 if (!ex.getVolume().isReadOnly()) {270 if (!ex.getVolume().isReadOnly()) {
261 ex.getValue().put(tree.getStatistics());271 ex.getValue().put(tree.getStatistics());
@@ -273,43 +283,30 @@
273 }283 }
274 }284 }
275285
276 boolean removeTree(final Tree tree) throws PersistitException {286 void removeTree(final Tree tree) throws PersistitException {
277 if (tree == _directoryTree) {287 if (tree == _directoryTree) {
278 throw new IllegalArgumentException("Can't delete the Directory tree");288 throw new IllegalArgumentException("Can't delete the Directory tree");
279 }289 }
280 _persistit.checkSuspended();
281290
282 if (!tree.claim(true)) {291 if (!tree.claim(true)) {
283 throw new InUseException("Unable to acquire writer claim on " + tree);292 throw new InUseException("Unable to acquire writer claim on " + tree);
284 }293 }
285
286 final int treeDepth = tree.getDepth();
287 final long treeRootPage = tree.getRootPageAddr();
288
289 try {294 try {
290 tree.discardAccumulators();295 final Exchange ex = directoryExchange();
291296 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(tree.getName()).remove(Key.GTEQ);
292 synchronized (this) {297 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_STATS).append(tree.getName()).remove(Key.GTEQ);
293 _treeNameHashMap.remove(tree.getName());298 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ACCUMULATOR).append(tree.getName()).remove(Key.GTEQ);
294 tree.bumpGeneration();299 tree.delete();
295 tree.invalidate();
296
297 tree.changeRootPageAddr(-1, 0);
298 final Exchange ex = directoryExchange();
299 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(tree.getName()).remove(Key.GTEQ);
300 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_STATS).append(tree.getName()).remove(Key.GTEQ);
301 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ACCUMULATOR).append(tree.getName()).remove(Key.GTEQ);
302 }
303 sequence(TREE_CREATE_REMOVE_A);
304 } finally {300 } finally {
305 tree.release();301 tree.release();
306 }302 }
307303 }
308 // The Tree is now gone. The following deallocates the304
309 // pages formerly associated with it. If this fails we'll be305 synchronized void removed(final Tree tree) {
310 // left with allocated pages that are not available on the garbage306 _treeNameHashMap.remove(tree.getName());
311 // chain for reuse.307 }
312308
309 void deallocateTree(final long treeRootPage, final int treeDepth) throws PersistitException {
313 int depth = treeDepth;310 int depth = treeDepth;
314 long page = treeRootPage;311 long page = treeRootPage;
315 while (page != -1) {312 while (page != -1) {
@@ -341,7 +338,6 @@
341 deallocateGarbageChain(deallocate, 0);338 deallocateGarbageChain(deallocate, 0);
342 }339 }
343 }340 }
344 return true;
345 }341 }
346342
347 /**343 /**
@@ -362,25 +358,22 @@
362 /**358 /**
363 * Flush dirty {@link TreeStatistics} instances. Called periodically on the359 * Flush dirty {@link TreeStatistics} instances. Called periodically on the
364 * PAGE_WRITER thread from {@link Persistit#cleanup()}.360 * PAGE_WRITER thread from {@link Persistit#cleanup()}.
361 *
362 * @throws PersistitException
365 */363 */
366 void flushStatistics() {364 void flushStatistics() throws PersistitException {
367 try {365 final List<Tree> trees = new ArrayList<Tree>();
368 final List<Tree> trees = new ArrayList<Tree>();366 synchronized (this) {
369 synchronized (this) {367 for (final WeakReference<Tree> ref : _treeNameHashMap.values()) {
370 for (final WeakReference<Tree> ref : _treeNameHashMap.values()) {368 final Tree tree = ref.get();
371 final Tree tree = ref.get();369 if (tree != null && tree != _directoryTree) {
372 if (tree != null && tree != _directoryTree) {370 trees.add(tree);
373 trees.add(tree);
374 }
375 }371 }
376 }372 }
377 for (final Tree tree : trees) {373 }
378 storeTreeStatistics(tree);374
379 }375 for (final Tree tree : trees) {
380 } catch (final Exception e) {376 storeTreeStatistics(tree);
381 _persistit.getAlertMonitor().post(
382 new Event(AlertLevel.ERROR, _persistit.getLogBase().adminFlushException, e),
383 AlertMonitor.FLUSH_STATISTICS_CATEGORY);
384 }377 }
385 }378 }
386379
387380
=== modified file 'src/main/java/com/persistit/logging/LogBase.java'
--- src/main/java/com/persistit/logging/LogBase.java 2012-10-04 20:23:10 +0000
+++ src/main/java/com/persistit/logging/LogBase.java 2013-04-29 21:38:25 +0000
@@ -226,6 +226,9 @@
226 @Message("WARNING|%s while pruning transaction record %s")226 @Message("WARNING|%s while pruning transaction record %s")
227 public final LogItem pruneException = PersistitLogMessage.empty();227 public final LogItem pruneException = PersistitLogMessage.empty();
228228
229 @Message("WARNING|%s while pruning TimelyResource %s")
230 public final LogItem timelyResourcePruneException = PersistitLogMessage.empty();
231
229 @Message("WARNING|Transaction %s pruning incomplete at %s after rollback")232 @Message("WARNING|Transaction %s pruning incomplete at %s after rollback")
230 public final LogItem pruningIncomplete = PersistitLogMessage.empty();233 public final LogItem pruningIncomplete = PersistitLogMessage.empty();
231234
232235
=== modified file 'src/main/java/com/persistit/util/SequencerConstants.java'
--- src/main/java/com/persistit/util/SequencerConstants.java 2012-10-10 16:06:49 +0000
+++ src/main/java/com/persistit/util/SequencerConstants.java 2013-04-29 21:38:25 +0000
@@ -61,17 +61,6 @@
61 array(WRITE_WRITE_STORE_A, WRITE_WRITE_STORE_C) };61 array(WRITE_WRITE_STORE_A, WRITE_WRITE_STORE_C) };
6262
63 /*63 /*
64 * Used in testing sequencing between tree creation and removal
65 */
66 int TREE_CREATE_REMOVE_A = allocate("TREE_CREATE_REMOVE_A");
67 int TREE_CREATE_REMOVE_B = allocate("TREE_CREATE_REMOVE_B");
68 int TREE_CREATE_REMOVE_C = allocate("TREE_CREATE_REMOVE_C");
69
70 int[][] TREE_CREATE_REMOVE_SCHEDULE = new int[][] { array(TREE_CREATE_REMOVE_A, TREE_CREATE_REMOVE_B),
71 array(TREE_CREATE_REMOVE_B), array(TREE_CREATE_REMOVE_A, TREE_CREATE_REMOVE_C),
72 array(TREE_CREATE_REMOVE_A, TREE_CREATE_REMOVE_C) };
73
74 /*
75 * Used in testing sequencing between pageNode reading and invalidation in64 * Used in testing sequencing between pageNode reading and invalidation in
76 * JournalManager65 * JournalManager
77 */66 */
@@ -104,7 +93,7 @@
104 array(DEALLOCATE_CHAIN_A, DEALLOCATE_CHAIN_C) };93 array(DEALLOCATE_CHAIN_A, DEALLOCATE_CHAIN_C) };
10594
106 /*95 /*
107 * Used in testing delete/deallocate sequence in Bug1022567Test96 * Used in testing delete/deallocate sequence in Bug1064565Test
108 */97 */
109 int ACCUMULATOR_CHECKPOINT_A = allocate("ACCUMULATOR_CHECKPOINT_A");98 int ACCUMULATOR_CHECKPOINT_A = allocate("ACCUMULATOR_CHECKPOINT_A");
110 int ACCUMULATOR_CHECKPOINT_B = allocate("ACCUMULATOR_CHECKPOINT_B");99 int ACCUMULATOR_CHECKPOINT_B = allocate("ACCUMULATOR_CHECKPOINT_B");
111100
=== modified file 'src/test/java/com/persistit/AccumulatorRecoveryTest.java'
--- src/test/java/com/persistit/AccumulatorRecoveryTest.java 2013-04-10 20:09:48 +0000
+++ src/test/java/com/persistit/AccumulatorRecoveryTest.java 2013-04-29 21:38:25 +0000
@@ -167,6 +167,11 @@
167 return true;167 return true;
168 }168 }
169169
170 @Override
171 public boolean createTree(final long timestamp) throws PersistitException {
172 return true;
173 }
174
170 };175 };
171 plan.applyAllRecoveredTransactions(commitListener, plan.getDefaultRollbackListener());176 plan.applyAllRecoveredTransactions(commitListener, plan.getDefaultRollbackListener());
172 assertEquals(15, recoveryTimestamps.size());177 assertEquals(15, recoveryTimestamps.size());
173178
=== modified file 'src/test/java/com/persistit/AccumulatorTest.java'
--- src/test/java/com/persistit/AccumulatorTest.java 2013-04-10 20:09:48 +0000
+++ src/test/java/com/persistit/AccumulatorTest.java 2013-04-29 21:38:25 +0000
@@ -375,7 +375,7 @@
375 */375 */
376 @Test376 @Test
377 public void testRecreateAccumulatorAfterCheckpoint() throws PersistitException {377 public void testRecreateAccumulatorAfterCheckpoint() throws PersistitException {
378 final int PASS_COUNT = 2;378 final int PASS_COUNT = 5;
379 final int ROW_COUNT = 10;379 final int ROW_COUNT = 10;
380 final int ACCUM_INDEX = 0;380 final int ACCUM_INDEX = 0;
381 final String TEST_VOLUME_NAME = UnitTestProperties.VOLUME_NAME;381 final String TEST_VOLUME_NAME = UnitTestProperties.VOLUME_NAME;
382382
=== modified file 'src/test/java/com/persistit/Bug1018526Test.java'
--- src/test/java/com/persistit/Bug1018526Test.java 2012-11-20 17:45:51 +0000
+++ src/test/java/com/persistit/Bug1018526Test.java 2013-04-29 21:38:25 +0000
@@ -92,7 +92,7 @@
92 final TreeDescriptor td = map.remove(handle);92 final TreeDescriptor td = map.remove(handle);
93 assertNotNull("Permanent Tree should be un the tree map", td);93 assertNotNull("Permanent Tree should be un the tree map", td);
94 }94 }
95 // expect 1: the directory tree95 // expect 2: _directory and _classIndex
96 assertEquals("Recovered tree map should contain only permanent trees", 1, map.size());96 assertEquals("Recovered tree map should contain only permanent trees", 2, map.size());
97 }97 }
98}98}
9999
=== modified file 'src/test/java/com/persistit/Bug920754Test.java'
--- src/test/java/com/persistit/Bug920754Test.java 2013-04-10 20:09:48 +0000
+++ src/test/java/com/persistit/Bug920754Test.java 2013-04-29 21:38:25 +0000
@@ -68,6 +68,7 @@
68 while (dir.next(true)) {68 while (dir.next(true)) {
69 keys++;69 keys++;
70 }70 }
71 assertEquals("There should be no remaining keys in the directory tree", 0, keys);71 // _classIndex
72 assertEquals("There should be one remaining key in the directory tree", 1, keys);
72 }73 }
73}74}
7475
=== modified file 'src/test/java/com/persistit/Bug932097Test.java'
--- src/test/java/com/persistit/Bug932097Test.java 2012-11-25 20:14:58 +0000
+++ src/test/java/com/persistit/Bug932097Test.java 2013-04-29 21:38:25 +0000
@@ -21,6 +21,7 @@
2121
22 @Test22 @Test
23 public void testInjectedAbortTransactionStatus() throws Exception {23 public void testInjectedAbortTransactionStatus() throws Exception {
24 final Exchange ex = _persistit.getExchange("persistit", "test", true);
24 /*25 /*
25 * Create a bunch of incomplete transactions26 * Create a bunch of incomplete transactions
26 */27 */
@@ -28,7 +29,6 @@
28 _persistit.setSessionId(new SessionId());29 _persistit.setSessionId(new SessionId());
29 final Transaction txn = _persistit.getTransaction();30 final Transaction txn = _persistit.getTransaction();
30 txn.begin();31 txn.begin();
31 final Exchange ex = _persistit.getExchange("persistit", "test", true);
32 ex.getValue().put(RED_FOX);32 ex.getValue().put(RED_FOX);
33 txn.begin();33 txn.begin();
34 for (int k = 1; k < 10; k++) {34 for (int k = 1; k < 10; k++) {
3535
=== modified file 'src/test/java/com/persistit/CorruptVolumeTest.java'
--- src/test/java/com/persistit/CorruptVolumeTest.java 2012-08-24 13:57:19 +0000
+++ src/test/java/com/persistit/CorruptVolumeTest.java 2013-04-29 21:38:25 +0000
@@ -37,7 +37,8 @@
37 exchange.to(i).store();37 exchange.to(i).store();
38 }38 }
39 // Corrupt the volume by zonking the the index page39 // Corrupt the volume by zonking the the index page
40 final Buffer buffer = exchange.getBufferPool().get(exchange.getVolume(), 4, true, true);40 final long pageAddr = exchange.fetchBufferCopy(1).getPageAddress();
41 final Buffer buffer = exchange.getBufferPool().get(exchange.getVolume(), pageAddr, true, true);
41 Arrays.fill(buffer.getBytes(), 20, 200, (byte) 0);42 Arrays.fill(buffer.getBytes(), 20, 200, (byte) 0);
42 buffer.setDirtyAtTimestamp(_persistit.getTimestampAllocator().updateTimestamp());43 buffer.setDirtyAtTimestamp(_persistit.getTimestampAllocator().updateTimestamp());
43 buffer.releaseTouched();44 buffer.releaseTouched();
4445
=== modified file 'src/test/java/com/persistit/IOFailureTest.java'
--- src/test/java/com/persistit/IOFailureTest.java 2012-11-20 17:45:51 +0000
+++ src/test/java/com/persistit/IOFailureTest.java 2013-04-29 21:38:25 +0000
@@ -283,7 +283,6 @@
283283
284 @Test284 @Test
285 public void testJournalEOFonRecovery() throws Exception {285 public void testJournalEOFonRecovery() throws Exception {
286 final Properties properties = _persistit.getProperties();
287 final JournalManager jman = _persistit.getJournalManager();286 final JournalManager jman = _persistit.getJournalManager();
288 final Exchange exchange = _persistit.getExchange(_volumeName, "RecoveryTest", true);287 final Exchange exchange = _persistit.getExchange(_volumeName, "RecoveryTest", true);
289 exchange.getValue().put(RED_FOX);288 exchange.getValue().put(RED_FOX);
@@ -465,6 +464,10 @@
465 boolean done = false;464 boolean done = false;
466 while (System.currentTimeMillis() < expires) {465 while (System.currentTimeMillis() < expires) {
467 try {466 try {
467 /*
468 * Needed to avoid leaving a dirty page during checkpoint
469 */
470 _persistit.flushStatistics();
468 _persistit.copyBackPages();471 _persistit.copyBackPages();
469 done = true;472 done = true;
470 break;473 break;
471474
=== modified file 'src/test/java/com/persistit/IntegrityCheckTest.java'
--- src/test/java/com/persistit/IntegrityCheckTest.java 2012-11-20 17:45:51 +0000
+++ src/test/java/com/persistit/IntegrityCheckTest.java 2013-04-29 21:38:25 +0000
@@ -173,6 +173,8 @@
173 txn.end();173 txn.end();
174 }174 }
175 }175 }
176 _persistit.checkAllVolumes();
177 System.out.println();
176 final Configuration config = _persistit.getConfiguration();178 final Configuration config = _persistit.getConfiguration();
177 _persistit.crash();179 _persistit.crash();
178 _persistit = new Persistit();180 _persistit = new Persistit();
179181
=== modified file 'src/test/java/com/persistit/JournalManagerTest.java'
--- src/test/java/com/persistit/JournalManagerTest.java 2013-04-10 20:09:48 +0000
+++ src/test/java/com/persistit/JournalManagerTest.java 2013-04-29 21:38:25 +0000
@@ -59,6 +59,12 @@
5959
60 private final String _volumeName = "persistit";60 private final String _volumeName = "persistit";
6161
62 @Override
63 public void setUp() throws Exception {
64 super.setUp();
65 _persistit.getExchange(_volumeName, "JournalManagerTest1", true);
66 }
67
62 @Test68 @Test
63 public void testJournalRecords() throws Exception {69 public void testJournalRecords() throws Exception {
64 store1();70 store1();
@@ -216,6 +222,11 @@
216 return true;222 return true;
217 }223 }
218224
225 @Override
226 public boolean createTree(final long timestamp) throws PersistitException {
227 return true;
228 }
229
219 };230 };
220 rman.applyAllRecoveredTransactions(actor, rman.getDefaultRollbackListener());231 rman.applyAllRecoveredTransactions(actor, rman.getDefaultRollbackListener());
221 assertEquals(commitCount, recoveryTimestamps.size());232 assertEquals(commitCount, recoveryTimestamps.size());
@@ -298,6 +309,7 @@
298 // Allow test to control when pruning will happen309 // Allow test to control when pruning will happen
299 _persistit.getJournalManager().setRollbackPruningEnabled(false);310 _persistit.getJournalManager().setRollbackPruningEnabled(false);
300 _persistit.getJournalManager().setWritePagePruningEnabled(false);311 _persistit.getJournalManager().setWritePagePruningEnabled(false);
312
301 final Transaction txn = _persistit.getTransaction();313 final Transaction txn = _persistit.getTransaction();
302 for (int i = 0; i < 10; i++) {314 for (int i = 0; i < 10; i++) {
303 txn.begin();315 txn.begin();
304316
=== modified file 'src/test/java/com/persistit/MVCCPruneBufferTest.java'
--- src/test/java/com/persistit/MVCCPruneBufferTest.java 2013-03-06 16:20:57 +0000
+++ src/test/java/com/persistit/MVCCPruneBufferTest.java 2013-04-29 21:38:25 +0000
@@ -320,6 +320,8 @@
320320
321 @Test321 @Test
322 public void testWritePagePrune() throws Exception {322 public void testWritePagePrune() throws Exception {
323 _persistit.getJournalManager().setWritePagePruningEnabled(false);
324 final Exchange exchange = exchange(1);
323 final Transaction txn = _persistit.getTransaction();325 final Transaction txn = _persistit.getTransaction();
324 try {326 try {
325 txn.begin();327 txn.begin();
@@ -328,8 +330,8 @@
328 } finally {330 } finally {
329 txn.end();331 txn.end();
330 }332 }
331 final Volume volume = _persistit.getVolume(TEST_VOLUME_NAME);333 final long pageAddr = exchange.fetchBufferCopy(0).getPageAddress();
332 final Buffer buffer = volume.getPool().get(volume, 3, true, true);334 final Buffer buffer = exchange.getBufferPool().get(exchange.getVolume(), pageAddr, true, true);
333 assertTrue("Should have multiple MVV records", buffer.getMvvCount() > 2);335 assertTrue("Should have multiple MVV records", buffer.getMvvCount() > 2);
334 _persistit.getJournalManager().setWritePagePruningEnabled(true);336 _persistit.getJournalManager().setWritePagePruningEnabled(true);
335 _persistit.getTransactionIndex().updateActiveTransactionCache();337 _persistit.getTransactionIndex().updateActiveTransactionCache();
@@ -340,14 +342,17 @@
340 }342 }
341343
342 private void storeNewVersion(final int cycle) throws Exception {344 private void storeNewVersion(final int cycle) throws Exception {
343 final Exchange exchange = _persistit.getExchange(TEST_VOLUME_NAME,345 final Exchange exchange = exchange(cycle);
344 String.format("%s%04d", TEST_TREE_NAME, cycle), true);
345 exchange.getValue().put(String.format("%s%04d", RED_FOX, cycle));346 exchange.getValue().put(String.format("%s%04d", RED_FOX, cycle));
346 for (int i = 1; i <= 100; i++) {347 for (int i = 1; i <= 100; i++) {
347 exchange.to(i).store();348 exchange.to(i).store();
348 }349 }
349 }350 }
350351
352 private Exchange exchange(final int cycle) throws PersistitException {
353 return _persistit.getExchange(TEST_VOLUME_NAME, String.format("%s%04d", TEST_TREE_NAME, cycle), true);
354 }
355
351 private void removeKeys(final int cycle) throws Exception {356 private void removeKeys(final int cycle) throws Exception {
352 final Exchange exchange = _persistit.getExchange(TEST_VOLUME_NAME, TEST_TREE_NAME, true);357 final Exchange exchange = _persistit.getExchange(TEST_VOLUME_NAME, TEST_TREE_NAME, true);
353 for (int i = (cycle % 2) + 1; i <= 100; i += 2) {358 for (int i = (cycle % 2) + 1; i <= 100; i += 2) {
354359
=== modified file 'src/test/java/com/persistit/PersistitUnitTestCase.java'
--- src/test/java/com/persistit/PersistitUnitTestCase.java 2012-11-20 18:18:02 +0000
+++ src/test/java/com/persistit/PersistitUnitTestCase.java 2013-04-29 21:38:25 +0000
@@ -136,4 +136,15 @@
136 _persistit.getCleanupManager().setPollInterval(-1);136 _persistit.getCleanupManager().setPollInterval(-1);
137 _persistit.getJournalManager().setWritePagePruningEnabled(false);137 _persistit.getJournalManager().setWritePagePruningEnabled(false);
138 }138 }
139
140 protected void drainJournal() throws Exception {
141 _persistit.flush();
142 /*
143 * Causes all TreeStatistics to be marked clean so that the subsequent
144 * checkpoint will not add another dirty page.
145 */
146 _persistit.flushStatistics();
147 _persistit.checkpoint();
148 _persistit.getJournalManager().copyBack();
149 }
139}150}
140151
=== modified file 'src/test/java/com/persistit/RecoveryTest.java'
--- src/test/java/com/persistit/RecoveryTest.java 2012-11-20 17:45:51 +0000
+++ src/test/java/com/persistit/RecoveryTest.java 2013-04-29 21:38:25 +0000
@@ -83,9 +83,7 @@
83 store1();83 store1();
84 JournalManager jman = _persistit.getJournalManager();84 JournalManager jman = _persistit.getJournalManager();
85 assertTrue(jman.getPageMapSize() > 0);85 assertTrue(jman.getPageMapSize() > 0);
86 _persistit.flush();86 drainJournal();
87 _persistit.checkpoint();
88 jman.copyBack();
89 assertEquals(0, jman.getPageMapSize());87 assertEquals(0, jman.getPageMapSize());
90 _persistit.close();88 _persistit.close();
91 _persistit = new Persistit(_config);89 _persistit = new Persistit(_config);
@@ -159,6 +157,11 @@
159 return true;157 return true;
160 }158 }
161159
160 @Override
161 public boolean createTree(final long timestamp) throws PersistitException {
162 return true;
163 }
164
162 };165 };
163 plan.applyAllRecoveredTransactions(actor, plan.getDefaultRollbackListener());166 plan.applyAllRecoveredTransactions(actor, plan.getDefaultRollbackListener());
164 assertEquals(15, recoveryTimestamps.size());167 assertEquals(15, recoveryTimestamps.size());
@@ -193,8 +196,7 @@
193 store1();196 store1();
194197
195 jman.rollover();198 jman.rollover();
196 _persistit.checkpoint();199 drainJournal();
197 jman.copyBack();
198 assertEquals(0, jman.getPageMapSize());200 assertEquals(0, jman.getPageMapSize());
199201
200 final Transaction txn = _persistit.getTransaction();202 final Transaction txn = _persistit.getTransaction();
@@ -203,8 +205,10 @@
203 store1();205 store1();
204 txn.commit();206 txn.commit();
205 txn.end();207 txn.end();
206 // Flush an uncommitted version of this transaction - should208 /*
207 // prevent journal cleanup.209 * Flush an uncommitted version of this transaction - should prevent
210 * journal cleanup.
211 */
208 txn.begin();212 txn.begin();
209 store0();213 store0();
210 txn.flushTransactionBuffer(true);214 txn.flushTransactionBuffer(true);
@@ -212,14 +216,13 @@
212 txn.end();216 txn.end();
213217
214 jman.rollover();218 jman.rollover();
215 _persistit.checkpoint();219 drainJournal();
216 jman.copyBack();
217 //
218 // Because JournalManager thinks there's an open transaction
219 // it should preserve the journal file containing the TX record
220 // for the transaction.
221 //
222 assertEquals(0, jman.getPageMapSize());220 assertEquals(0, jman.getPageMapSize());
221 /*
222 * Because JournalManager thinks there's an open transaction it should
223 * preserve the journal file containing the TX record for the
224 * transaction.
225 */
223 assertTrue(jman.getBaseAddress() < jman.getCurrentAddress());226 assertTrue(jman.getBaseAddress() < jman.getCurrentAddress());
224 txn.begin();227 txn.begin();
225 store1();228 store1();
@@ -229,6 +232,7 @@
229 jman.unitTestClearTransactionMap();232 jman.unitTestClearTransactionMap();
230233
231 jman.rollover();234 jman.rollover();
235 _persistit.flushStatistics();
232 _persistit.checkpoint();236 _persistit.checkpoint();
233 /*237 /*
234 * The TI active transaction cache may be a bit out of date, which can238 * The TI active transaction cache may be a bit out of date, which can
@@ -238,8 +242,7 @@
238 * copier does the right thing.242 * copier does the right thing.
239 */243 */
240 _persistit.getTransactionIndex().updateActiveTransactionCache();244 _persistit.getTransactionIndex().updateActiveTransactionCache();
241 jman.copyBack();245 drainJournal();
242
243 assertEquals(jman.getBaseAddress(), jman.getCurrentAddress());246 assertEquals(jman.getBaseAddress(), jman.getCurrentAddress());
244 assertEquals(0, jman.getPageMapSize());247 assertEquals(0, jman.getPageMapSize());
245248
@@ -381,9 +384,13 @@
381 assertTrue(_persistit.getJournalManager().getHandleCount() > updatedHandleValue);384 assertTrue(_persistit.getJournalManager().getHandleCount() > updatedHandleValue);
382 }385 }
383386
387 private final static int T1 = 1000;
388 private final static int T2 = 2000;
389
384 @Test390 @Test
385 public void testIndexHoles() throws Exception {391 public void testIndexHoles() throws Exception {
386 _persistit.getJournalManager().setAppendOnly(true);392 _persistit.getJournalManager().setAppendOnly(true);
393
387 final Transaction transaction = _persistit.getTransaction();394 final Transaction transaction = _persistit.getTransaction();
388 final StringBuilder sb = new StringBuilder();395 final StringBuilder sb = new StringBuilder();
389 while (sb.length() < 1000) {396 while (sb.length() < 1000) {
@@ -392,7 +399,7 @@
392399
393 final String s = sb.toString();400 final String s = sb.toString();
394 for (int cycle = 0; cycle < 2; cycle++) {401 for (int cycle = 0; cycle < 2; cycle++) {
395 for (int i = 1000; i < 2000; i++) {402 for (int i = T1; i < T2; i++) {
396 final Exchange exchange = _persistit.getExchange("persistit", "RecoveryTest" + i, true);403 final Exchange exchange = _persistit.getExchange("persistit", "RecoveryTest" + i, true);
397 transaction.begin();404 transaction.begin();
398 try {405 try {
@@ -407,7 +414,7 @@
407 }414 }
408415
409 for (int j = 0; j < 20; j++) {416 for (int j = 0; j < 20; j++) {
410 for (int i = 1000; i < 2000; i++) {417 for (int i = T1; i < T2; i++) {
411 final Exchange exchange = _persistit.getExchange("persistit", "RecoveryTest" + i, true);418 final Exchange exchange = _persistit.getExchange("persistit", "RecoveryTest" + i, true);
412 transaction.begin();419 transaction.begin();
413 try {420 try {
@@ -419,7 +426,7 @@
419 }426 }
420 }427 }
421428
422 for (int i = 1000; i < 2000; i += 2) {429 for (int i = T1; i < T2; i += 2) {
423 transaction.begin();430 transaction.begin();
424 try {431 try {
425 final Exchange exchange = _persistit.getExchange("persistit", "RecoveryTest" + i, true);432 final Exchange exchange = _persistit.getExchange("persistit", "RecoveryTest" + i, true);
@@ -430,6 +437,7 @@
430 }437 }
431 }438 }
432 }439 }
440 _persistit.checkAllVolumes();
433 _persistit.crash();441 _persistit.crash();
434442
435 _persistit = new Persistit();443 _persistit = new Persistit();
436444
=== modified file 'src/test/java/com/persistit/SplitPolicyTest.java'
--- src/test/java/com/persistit/SplitPolicyTest.java 2012-08-24 13:57:19 +0000
+++ src/test/java/com/persistit/SplitPolicyTest.java 2013-04-29 21:38:25 +0000
@@ -253,7 +253,7 @@
253 //253 //
254 // forward sequential254 // forward sequential
255 //255 //
256 for (long page = 2; page < 20; page++) {256 for (long page = ex.clear().append(0).fetchBufferCopy(0).getPageAddress(); page < 20; page++) {
257 final Buffer buffer = ex.getBufferPool().get(ex.getVolume(), page, false, true);257 final Buffer buffer = ex.getBufferPool().get(ex.getVolume(), page, false, true);
258 if (buffer.isDataPage()) {258 if (buffer.isDataPage()) {
259 final int available = buffer.getAvailableSize();259 final int available = buffer.getAvailableSize();
260260
=== added file 'src/test/java/com/persistit/TimelyResourceTest.java'
--- src/test/java/com/persistit/TimelyResourceTest.java 1970-01-01 00:00:00 +0000
+++ src/test/java/com/persistit/TimelyResourceTest.java 2013-04-29 21:38:25 +0000
@@ -0,0 +1,290 @@
1/**
2 * Copyright © 2011-2012 Akiban Technologies, Inc. All rights reserved.
3 *
4 * This program and the accompanying materials are made available
5 * under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * This program may also be available under different license terms.
10 * For more information, see www.akiban.com or contact licensing@akiban.com.
11 *
12 * Contributors:
13 * Akiban Technologies, Inc.
14 */
15
16package com.persistit;
17
18import static com.persistit.TransactionIndex.tss2vh;
19import static com.persistit.TransactionStatus.UNCOMMITTED;
20import static com.persistit.unit.ConcurrentUtil.assertSuccess;
21import static com.persistit.unit.ConcurrentUtil.createThread;
22import static com.persistit.unit.ConcurrentUtil.join;
23import static com.persistit.unit.ConcurrentUtil.start;
24import static com.persistit.util.Util.NS_PER_S;
25import static org.junit.Assert.assertEquals;
26import static org.junit.Assert.assertTrue;
27
28import java.util.ArrayList;
29import java.util.Iterator;
30import java.util.List;
31import java.util.Random;
32import java.util.concurrent.Semaphore;
33import java.util.concurrent.atomic.AtomicInteger;
34
35import org.junit.Test;
36
37import com.persistit.Version.PrunableVersion;
38import com.persistit.Version.VersionCreator;
39import com.persistit.exception.PersistitException;
40import com.persistit.exception.RollbackException;
41import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
42import com.persistit.unit.ConcurrentUtil.UncaughtExceptionHandler;
43import com.persistit.util.Util;
44
45public class TimelyResourceTest extends PersistitUnitTestCase {
46
47 private int _idCounter;
48
49 static class TestVersion implements PrunableVersion {
50 final int _id;
51 final TimelyResourceTest _test;
52 final AtomicInteger _pruned = new AtomicInteger();
53
54 TestVersion(final int id, final TimelyResourceTest test) {
55 _id = id;
56 _test = test;
57 }
58
59 @Override
60 public boolean prune() {
61 _pruned.incrementAndGet();
62 return true;
63 }
64
65 @Override
66 public void vacate() {
67 System.out.println("No more versions");
68 }
69
70 @Override
71 public String toString() {
72 return String.format("<%,d:%,d>", _id, _pruned.get());
73 }
74
75 TimelyResourceTest getContainer() {
76 return _test;
77 }
78 }
79
80 @Test
81 public void testAddAndPruneResources() throws Exception {
82 testAddAndPruneResources1(false);
83 testAddAndPruneResources1(true);
84 }
85
86 private void testAddAndPruneResources1(final boolean withTransactions) throws Exception {
87 final Transaction txn = _persistit.getTransaction();
88 final TimelyResource<TestVersion> tr = new TimelyResource<TestVersion>(_persistit);
89 final long[] history = new long[5];
90 final TestVersion[] resources = new TestVersion[5];
91 for (int i = 0; i < 5; i++) {
92 if (withTransactions) {
93 txn.begin();
94 }
95 final TestVersion resource = new TestVersion(i, this);
96 resources[i] = resource;
97 if (!tr.isEmpty()) {
98 tr.delete();
99 }
100 tr.addVersion(resource, txn);
101 if (withTransactions) {
102 txn.commit();
103 txn.end();
104 }
105 history[i] = _persistit.getTimestampAllocator().updateTimestamp();
106 }
107 assertEquals("Incorrect version count " + withTransactions, 9, tr.getVersionCount());
108
109 for (int i = 0; i < 5; i++) {
110 final TestVersion t = tr.getVersion(tss2vh(history[i], 0));
111 assertTrue("Missing version " + withTransactions, t != null);
112 assertEquals("Wrong version " + withTransactions, i, t._id);
113 }
114 _persistit.getTransactionIndex().updateActiveTransactionCache();
115 tr.prune();
116 assertEquals("Should have one version left " + withTransactions, 1, tr.getVersionCount());
117 assertEquals("Wrong version " + withTransactions, 4, tr.getVersion(tss2vh(UNCOMMITTED, 0))._id);
118
119 tr.delete();
120
121 assertEquals("Should have two versions left " + withTransactions, 2, tr.getVersionCount());
122 _persistit.getTransactionIndex().updateActiveTransactionCache();
123 tr.prune();
124 assertEquals("Should have no versions left " + withTransactions, 0, tr.getVersionCount());
125
126 for (int i = 0; i < 5; i++) {
127 assertEquals("Should have been pruned " + withTransactions, 1, resources[i]._pruned.get());
128 }
129 }
130
131 @Test
132 public void concurrentAddAndPruneResources() throws Exception {
133 final TimelyResource<TestVersion> tr = new TimelyResource<TestVersion>(_persistit);
134 final Random random = new Random(1);
135 final long expires = System.nanoTime() + 10 * NS_PER_S;
136 final AtomicInteger sequence = new AtomicInteger();
137 final AtomicInteger rollbackCount = new AtomicInteger();
138 final List<Thread> threads = new ArrayList<Thread>();
139 final UncaughtExceptionHandler handler = new UncaughtExceptionHandler();
140 int threadCounter = 0;
141 while (System.nanoTime() < expires) {
142 for (final Iterator<Thread> iter = threads.iterator(); iter.hasNext();) {
143 if (!iter.next().isAlive()) {
144 iter.remove();
145 }
146 }
147 while (threads.size() < 20) {
148 final Thread t = createThread(String.format("Thread_%06d", ++threadCounter), new ThrowingRunnable() {
149 @Override
150 public void run() throws Exception {
151 doConcurrentTransaction(tr, random, sequence, rollbackCount);
152 }
153 });
154 threads.add(t);
155 start(handler, t);
156 }
157 Util.sleep(10);
158 tr.prune();
159 }
160 join(Long.MAX_VALUE, handler.getThrowableMap(), threads.toArray(new Thread[threads.size()]));
161 assertSuccess(handler.getThrowableMap());
162 assertTrue("Every transaction rolled back", rollbackCount.get() < sequence.get());
163 System.out.printf("%,d entries, %,d rollbacks\n", sequence.get(), rollbackCount.get());
164 }
165
166 private void doConcurrentTransaction(final TimelyResource<TestVersion> tr, final Random random,
167 final AtomicInteger sequence, final AtomicInteger rollbackCount) throws PersistitException {
168 try {
169 final Transaction txn = _persistit.getTransaction();
170 for (int i = 0; i < 25; i++) {
171 txn.begin();
172 try {
173 final int id = sequence.incrementAndGet();
174 tr.addVersion(new TestVersion(id, this), txn);
175 final int delay = (1 << random.nextInt(3));
176 // Up to 7/1000 of a second
177 Util.sleep(delay);
178 final TestVersion mine = tr.getVersion();
179 assertEquals("Should not have been pruned yet", 0, mine._pruned.get());
180 assertEquals("Wrong resource", id, mine._id);
181 if (random.nextInt(10) == 0) {
182 txn.rollback();
183 } else {
184 txn.commit();
185 }
186 } catch (final RollbackException e) {
187 txn.rollback();
188 rollbackCount.incrementAndGet();
189 } finally {
190 txn.end();
191 }
192 }
193 } catch (final RollbackException e) {
194 rollbackCount.incrementAndGet();
195 }
196 }
197
198 @Test
199 public void deleteResource() throws Exception {
200 final TimelyResource<TestVersion> tr = new TimelyResource<TestVersion>(_persistit);
201 _idCounter = 0;
202 final VersionCreator<TestVersion> creator = new VersionCreator<TestVersion>() {
203
204 @Override
205 public TestVersion createVersion(final TimelyResource<? extends TestVersion> resource)
206 throws PersistitException {
207 return new TestVersion(++_idCounter, TimelyResourceTest.this);
208 }
209 };
210 final Transaction txn1 = _persistit.getTransaction();
211 _persistit.setSessionId(new SessionId());
212 final Transaction txn2 = _persistit.getTransaction();
213 TestVersion v1;
214 txn1.begin();
215 v1 = tr.getVersion(creator);
216 assertTrue("Version ID mismatch", v1._id == _idCounter);
217 txn2.begin();
218 txn1.incrementStep();
219 tr.delete();
220 tr.prune();
221 assertEquals("Should still be two versions", 2, tr.getVersionCount());
222 txn2.commit();
223 txn1.commit();
224 _persistit.getTransactionIndex().updateActiveTransactionCache();
225 tr.prune();
226 assertEquals("Should now have no versions", 0, tr.getVersionCount());
227 txn1.end();
228 txn2.end();
229
230 }
231
232 @Test
233 public void versions() throws Exception {
234 final TimelyResource<TestVersion> tr = new TimelyResource<TestVersion>(_persistit);
235 _idCounter = 0;
236 final VersionCreator<TestVersion> creator = new VersionCreator<TestVersion>() {
237
238 @Override
239 public TestVersion createVersion(final TimelyResource<? extends TestVersion> resource)
240 throws PersistitException {
241 return new TestVersion(++_idCounter, TimelyResourceTest.this);
242 }
243 };
244 final Semaphore semaphore1 = new Semaphore(0);
245 final Transaction txn = _persistit.getTransaction();
246 final Thread[] threads = new Thread[10];
247 for (int i = 0; i < 10; i++) {
248 try {
249 txn.begin();
250 tr.addVersion(creator.createVersion(tr), txn);
251 txn.commit();
252 } finally {
253 txn.end();
254 }
255 final Semaphore semaphore2 = new Semaphore(0);
256 threads[i] = new Thread(new Runnable() {
257 @Override
258 public void run() {
259 final Transaction txn = _persistit.getTransaction();
260 try {
261 txn.begin();
262 final TestVersion t = tr.getVersion();
263 assertEquals(t._id, _idCounter);
264 semaphore2.release();
265 semaphore1.acquire();
266 txn.commit();
267 } catch (final Exception e) {
268 e.printStackTrace();
269 } finally {
270 txn.end();
271 }
272 }
273 });
274 threads[i].start();
275 semaphore2.acquire();
276 }
277 _persistit.getTransactionIndex().updateActiveTransactionCache();
278 tr.prune();
279 assertEquals(10, tr.getVersionCount());
280 semaphore1.release(10);
281 for (final Thread thread : threads) {
282 thread.join();
283 }
284 _persistit.getTransactionIndex().updateActiveTransactionCache();
285 tr.prune();
286 assertEquals(1, tr.getVersionCount());
287 assertEquals("Surviving primordial version should be last one committed", 10, tr.getVersion(null)._id);
288 }
289
290}
0291
=== modified file 'src/test/java/com/persistit/TreeLifetimeTest.java'
--- src/test/java/com/persistit/TreeLifetimeTest.java 2012-08-24 13:57:19 +0000
+++ src/test/java/com/persistit/TreeLifetimeTest.java 2013-04-29 21:38:25 +0000
@@ -15,20 +15,6 @@
1515
16package com.persistit;16package com.persistit;
1717
18import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_A;
19import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_B;
20import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_C;
21import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_SCHEDULE;
22import static com.persistit.util.ThreadSequencer.addSchedules;
23import static com.persistit.util.ThreadSequencer.array;
24import static com.persistit.util.ThreadSequencer.describeHistory;
25import static com.persistit.util.ThreadSequencer.describePartialOrdering;
26import static com.persistit.util.ThreadSequencer.disableSequencer;
27import static com.persistit.util.ThreadSequencer.enableSequencer;
28import static com.persistit.util.ThreadSequencer.historyMeetsPartialOrdering;
29import static com.persistit.util.ThreadSequencer.out;
30import static com.persistit.util.ThreadSequencer.rawSequenceHistoryCopy;
31import static com.persistit.util.ThreadSequencer.sequence;
32import static org.junit.Assert.assertEquals;18import static org.junit.Assert.assertEquals;
33import static org.junit.Assert.assertFalse;19import static org.junit.Assert.assertFalse;
34import static org.junit.Assert.assertNotNull;20import static org.junit.Assert.assertNotNull;
@@ -38,7 +24,6 @@
3824
39import java.util.Arrays;25import java.util.Arrays;
40import java.util.List;26import java.util.List;
41import java.util.concurrent.ConcurrentLinkedQueue;
4227
43import org.junit.Test;28import org.junit.Test;
4429
@@ -49,9 +34,6 @@
4934
50public class TreeLifetimeTest extends PersistitUnitTestCase {35public class TreeLifetimeTest extends PersistitUnitTestCase {
51 private static final String TREE_NAME = "tree_one";36 private static final String TREE_NAME = "tree_one";
52 final int A = TREE_CREATE_REMOVE_A;
53 final int B = TREE_CREATE_REMOVE_B;
54 final int C = TREE_CREATE_REMOVE_C;
5537
56 private Volume getVolume() {38 private Volume getVolume() {
57 return _persistit.getVolume(UnitTestProperties.VOLUME_NAME);39 return _persistit.getVolume(UnitTestProperties.VOLUME_NAME);
@@ -62,11 +44,34 @@
62 }44 }
6345
64 @Test46 @Test
65 public void testRemovedTreeGoesToGarbageChain() throws PersistitException {47 public void testRemovedTreeGoesToGarbageChainNoTxn() throws PersistitException {
48 Exchange ex = getExchange(true);
49 for (int i = 0; i < 5; ++i) {
50 ex.clear().append(i).getValue().clear().put(i);
51 ex.store();
52 }
53 _persistit.releaseExchange(ex);
54 ex = null;
55
56 ex = getExchange(false);
57 final long treeRoot = ex.getTree().getRootPageAddr();
58 ex.removeTree();
59 _persistit.releaseExchange(ex);
60 ex = null;
61 _persistit.cleanup();
62
63 final List<Long> garbage = getVolume().getStructure().getGarbageList();
64 assertTrue("Expected tree root <" + treeRoot + "> in garbage list <" + garbage.toString() + ">",
65 garbage.contains(treeRoot));
66 }
67
68 @Test
69 public void testRemovedTreeGoesToGarbageChainTxn() throws PersistitException {
66 final Transaction txn = _persistit.getTransaction();70 final Transaction txn = _persistit.getTransaction();
71 Exchange ex;
6772
68 txn.begin();73 txn.begin();
69 Exchange ex = getExchange(true);74 ex = getExchange(true);
70 for (int i = 0; i < 5; ++i) {75 for (int i = 0; i < 5; ++i) {
71 ex.clear().append(i).getValue().clear().put(i);76 ex.clear().append(i).getValue().clear().put(i);
72 ex.store();77 ex.store();
@@ -84,7 +89,7 @@
84 txn.end();89 txn.end();
85 _persistit.releaseExchange(ex);90 _persistit.releaseExchange(ex);
86 ex = null;91 ex = null;
8792 _persistit.cleanup();
88 final List<Long> garbage = getVolume().getStructure().getGarbageList();93 final List<Long> garbage = getVolume().getStructure().getGarbageList();
89 assertTrue("Expected tree root <" + treeRoot + "> in garbage list <" + garbage.toString() + ">",94 assertTrue("Expected tree root <" + treeRoot + "> in garbage list <" + garbage.toString() + ">",
90 garbage.contains(treeRoot));95 garbage.contains(treeRoot));
@@ -143,71 +148,4 @@
143 assertNull("Tree should not exist after cleanup action", getVolume().getTree(TREE_NAME, false));148 assertNull("Tree should not exist after cleanup action", getVolume().getTree(TREE_NAME, false));
144 }149 }
145150
146 @Test
147 public void testReanimatedTreeCreateAndRemoveSynchronization() throws PersistitException, InterruptedException {
148 enableSequencer(true);
149 addSchedules(TREE_CREATE_REMOVE_SCHEDULE);
150
151 final ConcurrentLinkedQueue<Throwable> threadErrors = new ConcurrentLinkedQueue<Throwable>();
152
153 final Exchange origEx = getExchange(true);
154 for (int i = 0; i < 5; ++i) {
155 origEx.clear().append(i).store();
156 }
157 _persistit.releaseExchange(origEx);
158
159 final Thread thread1 = new Thread(new Runnable() {
160 @Override
161 public void run() {
162 Exchange ex = null;
163 try {
164 ex = getExchange(false);
165 ex.removeTree();
166 } catch (final Throwable t) {
167 threadErrors.add(t);
168 }
169 if (ex != null) {
170 _persistit.releaseExchange(ex);
171 }
172 }
173 });
174
175 final Thread thread2 = new Thread(new Runnable() {
176 @Override
177 public void run() {
178 sequence(TREE_CREATE_REMOVE_B);
179 Exchange ex = null;
180 try {
181 ex = getExchange(true);
182 int count = 0;
183 while (ex.next(true)) {
184 ++count;
185 }
186 sequence(TREE_CREATE_REMOVE_C);
187 assertEquals("New tree has zero keys in it", 0, count);
188 } catch (final Throwable t) {
189 threadErrors.add(t);
190 }
191 if (ex != null) {
192 _persistit.releaseExchange(ex);
193 }
194 }
195 });
196
197 thread1.start();
198 thread2.start();
199
200 thread1.join();
201 thread2.join();
202
203 assertEquals("Threads had no exceptions", "[]", threadErrors.toString());
204
205 final int[] actual = rawSequenceHistoryCopy();
206 final int[][] expectedSequence = { array(A, B), array(out(B)), array(C), array(out(A), out(C)) };
207 if (!historyMeetsPartialOrdering(actual, expectedSequence)) {
208 assertEquals("Unexpected sequencing", describePartialOrdering(expectedSequence), describeHistory(actual));
209 }
210
211 disableSequencer();
212 }
213}151}
214152
=== added file 'src/test/java/com/persistit/TreeTransactionalLifetimeTest.java'
--- src/test/java/com/persistit/TreeTransactionalLifetimeTest.java 1970-01-01 00:00:00 +0000
+++ src/test/java/com/persistit/TreeTransactionalLifetimeTest.java 2013-04-29 21:38:25 +0000
@@ -0,0 +1,277 @@
1/**
2 * Copyright © 2013 Akiban Technologies, Inc. All rights reserved.
3 *
4 * This program and the accompanying materials are made available
5 * under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * This program may also be available under different license terms.
10 * For more information, see www.akiban.com or contact licensing@akiban.com.
11 *
12 * Contributors:
13 * Akiban Technologies, Inc.
14 */
15
16package com.persistit;
17
18import static com.persistit.unit.ConcurrentUtil.assertSuccess;
19import static com.persistit.unit.ConcurrentUtil.createThread;
20import static com.persistit.unit.ConcurrentUtil.join;
21import static com.persistit.unit.ConcurrentUtil.start;
22import static org.junit.Assert.assertEquals;
23import static org.junit.Assert.assertNull;
24import static org.junit.Assert.assertTrue;
25
26import java.util.Map;
27import java.util.concurrent.Semaphore;
28
29import org.junit.Test;
30
31import com.persistit.exception.PersistitException;
32import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
33
34public class TreeTransactionalLifetimeTest extends PersistitUnitTestCase {
35 final static int TIMEOUT_MS = 10000;
36
37 final Semaphore semA = new Semaphore(0);
38 final Semaphore semB = new Semaphore(0);
39 final Semaphore semT = new Semaphore(0);
40 final Semaphore semU = new Semaphore(0);
41
42 /*
43 * This class needs to be in com.persistit because of some package-private
44 * methods used in controlling the test.
45 */
46
47 private Tree tree(final String name) throws PersistitException {
48 return vstruc().getTree(name, false);
49 }
50
51 private Exchange exchange(final String name) throws PersistitException {
52 return _persistit.getExchange("persistit", name, true);
53 }
54
55 private VolumeStructure vstruc() {
56 return _persistit.getVolume("persistit").getStructure();
57 }
58
59 @Test
60 public void simplePruning() throws Exception {
61
62 final Transaction txn = _persistit.getTransaction();
63 for (int i = 0; i < 2; i++) {
64 txn.begin();
65 final Exchange ex = exchange("ttlt");
66 ex.getValue().put(RED_FOX);
67 ex.to(1).store();
68 ex.to(2).store();
69 txn.rollback();
70 txn.end();
71 }
72 _persistit.cleanup();
73 assertEquals("There should be no tree", null, tree("ttlt"));
74 assertTrue(vstruc().getGarbageRoot() != 0);
75 }
76
77 @Test
78 public void createdTreeIsNotVisibleUntilCommit() throws Exception {
79 final Thread t = createThread("ttlt", new TExec() {
80 @Override
81 void exec(final Transaction txn) throws Exception {
82 final Exchange ex1 = exchange("ttlt");
83 ex1.getValue().put(RED_FOX);
84 ex1.to(1).store();
85 semA.release();
86 semB.acquire();
87 txn.commit();
88 }
89 });
90 final Map<Thread, Throwable> errors = start(t);
91 semA.acquire();
92 assertEquals(null, _persistit.getVolume("persistit").getTree("ttlt", false));
93 semB.release();
94 join(TIMEOUT_MS, errors, t);
95 assertSuccess(errors);
96 final Exchange ex = exchange("ttlt");
97 assertTrue(ex.to(Key.BEFORE).next());
98 assertEquals(1, ex.getKey().decodeInt());
99 }
100
101 @Test
102 public void removeTreeIsNotVisibleUntilCommit() throws Exception {
103 final Exchange ex = exchange("ttlt");
104 ex.getValue().put(RED_FOX);
105 ex.to(1).store();
106 final Thread t = createThread("ttlt", new TExec() {
107 @Override
108 void exec(final Transaction txn) throws Exception {
109 final Exchange ex1 = exchange("ttlt");
110 ex1.removeTree();
111 semA.release();
112 semB.acquire();
113 txn.commit();
114 }
115 });
116 final Map<Thread, Throwable> errors = start(t);
117 semA.acquire();
118 assertEquals(ex.getTree(), ex.getVolume().getTree("ttlt", false));
119 semB.release();
120 join(TIMEOUT_MS, errors, t);
121 assertNull(ex.getVolume().getTree("ttlt", false));
122 }
123
124 @Test
125 public void removeCreateRemove() throws Exception {
126
127 final Exchange ex = exchange("ttlt");
128 ex.getValue().put(RED_FOX);
129 ex.to(1).store();
130 final Thread t = createThread("ttlt", new TExec() {
131 @Override
132 void exec(final Transaction txn) throws Exception {
133 final Exchange ex1 = exchange("ttlt");
134 ex1.removeTree();
135 final Exchange ex2 = exchange("ttlt");
136 ex2.getValue().put(RED_FOX);
137 ex2.to(2).store();
138 ex2.removeTree();
139 final Exchange ex3 = exchange("ttlt");
140 ex3.getValue().put(RED_FOX);
141 ex3.to(3).store();
142 final Exchange ex4 = exchange("ttlt");
143 ex4.to(Key.BEFORE);
144 assertTrue(ex4.next());
145 assertEquals(3, ex4.getKey().decodeInt());
146 assertTrue(!ex4.next());
147 semA.release();
148 semB.acquire();
149 txn.rollback();
150 }
151 });
152 final Map<Thread, Throwable> errors = start(t);
153 semA.acquire();
154 assertEquals(ex.getTree(), ex.getVolume().getTree("ttlt", false));
155 semB.release();
156 join(TIMEOUT_MS, errors, t);
157 _persistit.getTransactionIndex().updateActiveTransactionCache();
158 _persistit.pruneTimelyResources();
159 assertTrue(ex.to(Key.BEFORE).next());
160 assertEquals(1, ex.getKey().decodeInt());
161 assertTrue(!ex.next());
162 assertTrue(ex.getVolume().getStructure().getGarbageRoot() != 0);
163 }
164
165 @Test
166 public void createRemoveByStep() throws Exception {
167 createRemoveByStepHelper("ttlt1", false, true, false, false, "0,1:,2:a=step2,3,4:b=step4", "0:b=step4");
168 createRemoveByStepHelper("ttlt2", false, false, false, false, "0,1:,2:a=step2,3,4:b=step4", "0");
169 createRemoveByStepHelper("ttlt3", true, true, false, false, "0:,1:,2:a=step2,3,4:b=step4", "0:b=step4");
170 createRemoveByStepHelper("ttlt4", true, false, false, false, "0:,1:,2:a=step2,3,4:b=step4", "0:");
171
172 createRemoveByStepHelper("ttlt5", false, true, false, true, "0,1:,2:a=step2,3,4:b=step4", "0:b=step4");
173 createRemoveByStepHelper("ttlt6", false, false, false, true, "0,1:,2:a=step2,3,4:b=step4", "0");
174 createRemoveByStepHelper("ttlt7", true, true, false, true, "0:,1:,2:a=step2,3,4:b=step4", "0:b=step4");
175 createRemoveByStepHelper("ttlt8", true, false, false, true, "0:,1:,2:a=step2,3,4:b=step4", "0:");
176
177 createRemoveByStepHelper("ttlt1cr", false, true, true, false, "0,1:,2:a=step2,3,4:b=step4", "0");
178 createRemoveByStepHelper("ttlt2cr", false, false, true, false, "0,1:,2:a=step2,3,4:b=step4", "0");
179 createRemoveByStepHelper("ttlt3cr", true, true, true, false, "0:,1:,2:a=step2,3,4:b=step4", "0:");
180 createRemoveByStepHelper("ttlt4cr", true, false, false, false, "0:,1:,2:a=step2,3,4:b=step4", "0:");
181
182 createRemoveByStepHelper("ttlt5cr", false, true, true, true, "0,1:,2:a=step2,3,4:b=step4", "0");
183 createRemoveByStepHelper("ttlt6cr", false, false, true, true, "0,1:,2:a=step2,3,4:b=step4", "0");
184 createRemoveByStepHelper("ttlt7cr", true, true, true, true, "0:,1:,2:a=step2,3,4:b=step4", "0:");
185 createRemoveByStepHelper("ttlt8cr", true, false, true, true, "0:,1:,2:a=step2,3,4:b=step4", "0:");
186 }
187
188 private void createRemoveByStepHelper(final String treeName, final boolean primordial, final boolean commit,
189 final boolean crash, final boolean restart, final String expected1, final String expected2)
190 throws Exception {
191
192 final Transaction txn = _persistit.getTransaction();
193 final Volume volume = _persistit.getVolume("persistit");
194 if (primordial) {
195 volume.getTree(treeName, true);
196 }
197 txn.begin();
198 try {
199 txn.setStep(1);
200 volume.getTree(treeName, true);
201
202 txn.setStep(2);
203 final Exchange ex1 = exchange(treeName);
204 ex1.getValue().put("step2");
205 ex1.to("a").store();
206
207 txn.setStep(3);
208 ex1.removeTree();
209
210 txn.setStep(4);
211 final Exchange ex2 = exchange(treeName);
212 ex2.getValue().put("step4");
213 ex2.to("b").store();
214
215 assertEquals("Expected contents at steps", expected1, computeCreateRemoveState(treeName, 5));
216
217 if (crash) {
218 _persistit.checkpoint();
219 _persistit.crash();
220 } else {
221 if (commit) {
222 txn.commit();
223 } else {
224 txn.rollback();
225 }
226 }
227 } finally {
228 if (!crash) {
229 txn.end();
230 }
231 }
232 if (restart) {
233 _persistit.close();
234 }
235 if (crash || restart) {
236 _persistit = new Persistit(_config);
237 _persistit.initialize();
238 }
239 assertEquals("Expected contents at steps", expected2, computeCreateRemoveState(treeName, 1));
240 }
241
242 private String computeCreateRemoveState(final String treeName, final int steps) throws PersistitException {
243 final StringBuilder sb = new StringBuilder();
244 for (int step = 0; step < steps; step++) {
245 _persistit.getTransaction().setStep(step);
246 if (sb.length() > 0) {
247 sb.append(",");
248 }
249 sb.append(step);
250 if (_persistit.getVolume("persistit").getTree(treeName, false) != null) {
251 sb.append(":");
252 final Exchange ex = exchange(treeName);
253 ex.append(Key.BEFORE);
254 while (ex.next()) {
255 sb.append(ex.getKey().decodeString()).append("=").append(ex.getValue().getString());
256 }
257 }
258 }
259 return sb.toString();
260 }
261
262 abstract class TExec extends ThrowingRunnable {
263
264 @Override
265 public void run() throws Exception {
266 final Transaction txn = _persistit.getTransaction();
267 txn.begin();
268 try {
269 exec(txn);
270 } finally {
271 txn.end();
272 }
273 }
274
275 abstract void exec(final Transaction txn) throws Exception;
276 }
277}
0278
=== modified file 'src/test/java/com/persistit/unit/ConcurrentUtil.java'
--- src/test/java/com/persistit/unit/ConcurrentUtil.java 2012-08-24 13:57:19 +0000
+++ src/test/java/com/persistit/unit/ConcurrentUtil.java 2013-04-29 21:38:25 +0000
@@ -21,11 +21,49 @@
21import java.util.HashMap;21import java.util.HashMap;
22import java.util.Map;22import java.util.Map;
2323
24/**
25 * Helper methods to create, start, join and check error status of test threads.
26 * The key element is a Map<Thread, Throwable> the can be checked after threads
27 * run for unhandled exceptions. The {@link #assertSuccess(Map)} method is
28 * called in the main thread to aggregate and report all exceptions that
29 * occurred in other threads.
30 */
24public class ConcurrentUtil {31public class ConcurrentUtil {
32
33 /**
34 * An implementation of {@link Thread.UncaughtExceptionHandler} which
35 * records any uncaught errors or exceptions in a map. A test case can pass
36 * the map to the {@link ConcurrentUtil#assertSuccess(Map)} method verify
37 * that no exceptions or errors were caught on test threads.
38 *
39 */
40 public static class UncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
41 final Map<Thread, Throwable> throwableMap = Collections.synchronizedMap(new HashMap<Thread, Throwable>());
42
43 @Override
44 public void uncaughtException(final Thread t, final Throwable e) {
45 throwableMap.put(t, e);
46 }
47
48 public Map<Thread, Throwable> getThrowableMap() {
49 return throwableMap;
50 }
51 }
52
53 /**
54 * A version of Runnable in which the #run method throws Exception.
55 */
25 public static abstract class ThrowingRunnable {56 public static abstract class ThrowingRunnable {
26 public abstract void run() throws Throwable;57 public abstract void run() throws Throwable;
27 }58 }
2859
60 /**
61 * Create a named thread from a ThrowableRunnable.
62 *
63 * @param name
64 * @param runnable
65 * @return
66 */
29 public static Thread createThread(final String name, final ThrowingRunnable runnable) {67 public static Thread createThread(final String name, final ThrowingRunnable runnable) {
30 return new Thread(new Runnable() {68 return new Thread(new Runnable() {
31 @Override69 @Override
@@ -40,34 +78,49 @@
40 }78 }
4179
42 /**80 /**
43 * Start and join on all given threads. Wait on each thread, individually,81 * Start all given threads. Return a map on which unhandled exceptions will
44 * for <code>timeout</code> milliseconds. The {@link Thread#join(long)}82 * be reported.
45 * method is used for this (<code>0</code> means indefinite).
46 * 83 *
47 * @param timeout
48 * How long to join on each thread for.
49 * @param threads84 * @param threads
50 * Threads to start and join.85 * Threads to start.
51 * 86 *
52 * @return A map with an entry for each thread that had an unhandled87 * @return A map with an entry for each thread that had an unhandled
53 * exception or did not complete in the allotted time. This map will88 * exception or did not complete in the allotted time. This map will
54 * be empty if all threads completed successfully.89 * be empty if all threads completed successfully.
55 */90 */
56 public static Map<Thread, Throwable> startAndJoin(final long timeout, final Thread... threads) {91 public static Map<Thread, Throwable> start(final Thread... threads) {
57 final Map<Thread, Throwable> throwableMap = Collections.synchronizedMap(new HashMap<Thread, Throwable>());92 final UncaughtExceptionHandler handler = new UncaughtExceptionHandler();
5893 start(handler, threads);
59 final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {94 return handler.getThrowableMap();
60 @Override95 }
61 public void uncaughtException(final Thread t, final Throwable e) {96
62 throwableMap.put(t, e);97 /**
63 }98 * Start all given threads with the supplied UncaughtExceptionHandler. The
64 };99 * handler will record any uncaught exceptions or errors in a map associated
65100 * with the handler.
101 *
102 * @param handler
103 * @param threads
104 */
105 public static void start(final UncaughtExceptionHandler handler, final Thread... threads) {
66 for (final Thread t : threads) {106 for (final Thread t : threads) {
67 t.setUncaughtExceptionHandler(handler);107 t.setUncaughtExceptionHandler(handler);
68 t.start();108 t.start();
69 }109 }
70110
111 }
112
113 /**
114 * Wait on each thread, individually, for <code>timeout</code> milliseconds.
115 * The {@link Thread#join(long)} method is used for this (<code>0</code>
116 * means indefinite). Add an Exception to the error map for any thread that
117 * did not end within its timeout.
118 *
119 * @param timeout
120 * @param throwableMap
121 * @param threads
122 */
123 public static void join(final long timeout, final Map<Thread, Throwable> throwableMap, final Thread... threads) {
71 for (final Thread t : threads) {124 for (final Thread t : threads) {
72 Throwable error = null;125 Throwable error = null;
73 try {126 try {
@@ -78,12 +131,42 @@
78 } catch (final InterruptedException e) {131 } catch (final InterruptedException e) {
79 error = e;132 error = e;
80 }133 }
81
82 if (error != null) {134 if (error != null) {
83 throwableMap.put(t, error);135 throwableMap.put(t, error);
84 }136 }
85 }137 }
86138 }
139
140 /**
141 * Assert that no thread had any unhandled exceptions or timeouts.
142 *
143 * @param throwableMap
144 * map in which threads accumulated any unhandled Exceptions
145 */
146 public static void assertSuccess(final Map<Thread, Throwable> throwableMap) {
147 String description = "";
148 for (final Map.Entry<Thread, Throwable> entry : throwableMap.entrySet()) {
149 description += " " + entry.getKey().getName() + "=" + entry.getValue().toString();
150 }
151 assertEquals("All threads completed successfully", "{}", "{" + description + "}");
152 }
153
154 /**
155 * Call {@link #start(Thread...)} for all threads and then
156 * {@link #join(long, Map, Thread...)} for all threads.
157 *
158 * @param timeout
159 * How long to join on each thread for.
160 * @param threads
161 * Threads to start and join.
162 *
163 * @return A map with an entry for each thread that had an unhandled
164 * exception or did not complete in the allotted time. This map will
165 * be empty if all threads completed successfully.
166 */
167 public static Map<Thread, Throwable> startAndJoin(final long timeout, final Thread... threads) {
168 final Map<Thread, Throwable> throwableMap = start(threads);
169 join(timeout, throwableMap, threads);
87 return throwableMap;170 return throwableMap;
88 }171 }
89172
@@ -98,11 +181,7 @@
98 * Threads to start and join.181 * Threads to start and join.
99 */182 */
100 public static void startAndJoinAssertSuccess(final long timeout, final Thread... threads) {183 public static void startAndJoinAssertSuccess(final long timeout, final Thread... threads) {
101 final Map<Thread, Throwable> errors = startAndJoin(timeout, threads);184 final Map<Thread, Throwable> throwableMap = startAndJoin(timeout, threads);
102 String description = "";185 assertSuccess(throwableMap);
103 for (final Map.Entry<Thread, Throwable> entry : errors.entrySet()) {
104 description += " " + entry.getKey().getName() + "=" + entry.getValue().toString();
105 }
106 assertEquals("All threads completed successfully", "{}", "{" + description + "}");
107 }186 }
108}187}
109188
=== modified file 'src/test/java/com/persistit/unit/TransactionTest1.java'
--- src/test/java/com/persistit/unit/TransactionTest1.java 2012-08-24 13:57:19 +0000
+++ src/test/java/com/persistit/unit/TransactionTest1.java 2013-04-29 21:38:25 +0000
@@ -88,7 +88,7 @@
88 } finally {88 } finally {
89 txn.end();89 txn.end();
90 }90 }
91 assertTrue(!ex.getTree().isValid());91 assertTrue(ex.getTree().isDeleted());
92 try {92 try {
93 ex.clear().append("test1").hasChildren();93 ex.clear().append("test1").hasChildren();
94 fail("Should have thrown an exception");94 fail("Should have thrown an exception");

Subscribers

People subscribed via source and target branches