Merge lp:~pbeaman/akiban-persistit/transaction-tree-management into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Merged at revision: 430
Proposed branch: lp:~pbeaman/akiban-persistit/transaction-tree-management
Merge into: lp:akiban-persistit
Diff against target: 3507 lines (+1893/-414)
37 files modified
pom.xml (+1/-1)
src/main/java/com/persistit/Buffer.java (+1/-2)
src/main/java/com/persistit/CheckpointManager.java (+1/-0)
src/main/java/com/persistit/ClassIndex.java (+12/-1)
src/main/java/com/persistit/Exchange.java (+60/-38)
src/main/java/com/persistit/JournalManager.java (+5/-0)
src/main/java/com/persistit/MVV.java (+1/-1)
src/main/java/com/persistit/Persistit.java (+93/-43)
src/main/java/com/persistit/RecoveryManager.java (+27/-0)
src/main/java/com/persistit/SharedResource.java (+3/-3)
src/main/java/com/persistit/TimelyResource.java (+505/-0)
src/main/java/com/persistit/TransactionPlayer.java (+107/-68)
src/main/java/com/persistit/Tree.java (+165/-46)
src/main/java/com/persistit/ValueHelper.java (+12/-0)
src/main/java/com/persistit/Version.java (+69/-0)
src/main/java/com/persistit/Volume.java (+4/-1)
src/main/java/com/persistit/VolumeStructure.java (+47/-54)
src/main/java/com/persistit/logging/LogBase.java (+3/-0)
src/main/java/com/persistit/util/SequencerConstants.java (+1/-12)
src/test/java/com/persistit/AccumulatorRecoveryTest.java (+5/-0)
src/test/java/com/persistit/AccumulatorTest.java (+1/-1)
src/test/java/com/persistit/Bug1018526Test.java (+2/-2)
src/test/java/com/persistit/Bug920754Test.java (+2/-1)
src/test/java/com/persistit/Bug932097Test.java (+1/-1)
src/test/java/com/persistit/CorruptVolumeTest.java (+2/-1)
src/test/java/com/persistit/IOFailureTest.java (+4/-1)
src/test/java/com/persistit/IntegrityCheckTest.java (+2/-0)
src/test/java/com/persistit/JournalManagerTest.java (+12/-0)
src/test/java/com/persistit/MVCCPruneBufferTest.java (+9/-4)
src/test/java/com/persistit/PersistitUnitTestCase.java (+11/-0)
src/test/java/com/persistit/RecoveryTest.java (+27/-19)
src/test/java/com/persistit/SplitPolicyTest.java (+1/-1)
src/test/java/com/persistit/TimelyResourceTest.java (+290/-0)
src/test/java/com/persistit/TreeLifetimeTest.java (+26/-88)
src/test/java/com/persistit/TreeTransactionalLifetimeTest.java (+277/-0)
src/test/java/com/persistit/unit/ConcurrentUtil.java (+103/-24)
src/test/java/com/persistit/unit/TransactionTest1.java (+1/-1)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/transaction-tree-management
Reviewer Review Type Date Requested Status
Nathan Williams Needs Fixing
Review via email: mp+145415@code.launchpad.net

Description of the change

This branch makes the creation and removal of Tree instances transactional. That is, a Tree created within a transaction is not visible outside of that transaction until it commits (and then only to transactions that start after the creator's commit timestamp), and a Tree removed within a transaction remains visible to concurrent transactions; its removal is visible only to transactions that start after the removing transaction commits. Attempts by concurrent transactions to alter the existence of a Tree (either to create or remove it) result in write-write conflicts so that only one such transaction performing a tree creation or deletion can commit.

These changes will support DDL operations on Akiban Server. For example, a new index can be created inside of a transaction; the entire Tree becomes visible only when the transaction commits. Further, since the entire Tree would be removed in the event the transaction rolls back, the updates within the tree are made without writing MVVs that need to be pruned.

Similarly, the removal of an index by removing its Tree within a transaction will be visible only to transactions that start after the removal commits; concurrent transactions continue to see the tree.

To accommodate these behaviors, removing a tree within a transaction does not immediately add the tree's physical pages to the garbage chain. Instead, a version is recorded in memory, and when that version is eventually pruned the physical storage of the tree is removed.

The unit test TreeTransactionalManagementTest demonstrates some of the new behaviors.

The changes in the proposal are widespread because of many side effects. The following is a guide. I expect review questions and will expand upon areas that are unclear.
----

The largest body of new code is the TimelyResource class. It provides a general capability of maintaining a versioned history of a cached resource. It is intended as a mechanism within Persistit that can manage any kind of versioned object, where the object represents state created within a transaction. For example, a TimelyResource could be used to house versions of metadata (the AIS and/or schema) so that concurrent transactions can acquire and use the correct version. For this proposal, it is used to hold Tree.TreeVersion instances.

A TimelyResource manages Version instances, and TimelyResource is generic with respect to a particular Version implementation class. Each TimelyResource is added to a set maintained by the root Persistit instance, and these are polled periodically by the CleanupManager to perform pruning. If a Version maintained by a TimelyResource implements the subinterface PrunableVersion, then prune() method is called for any Version which has become obsolete according to our standard MVCC notion of liveness (i.e., no other active transaction needs the Version for a valid snapshot). The semantics of TimelyResource are documented in its JavaDoc, hopefully accurately and clearly.

Each Tree now uses one TimelyResource instance to manage its transactional state. A Tree created within a transaction has a Tree.TreeVersion instance within the TimelyResource. That version contains most of the state that was formerly held by the Tree object itself. Removing a Tree within a transaction adds a TreeVersion denoted the removed state; the actual reclamation of space held by that three is done when the versions are pruned.

Most of the accessors for state on the Tree object now delegate to the TreeVersion that is visible according to the currently active transaction.

There are some messy bits:

- The serialized state of a Tree represents its existence (which is transactional) and its physical B-Tree allocation (which is not). There are accommodations for this in the VolumeStructure class. We might have preferred a cleaner serialized representation, but the current version works, does not fundamentally limit the design, and is backward compatible. Another accommodation is that the update operations that change the serialized state of the Tree within the directory itself tree must not be reapplied during recovery. Instead, the state changes of the directory tree are implied by the operations that create and remove the trees themselves. Therefore the Transaction#store method explicitly prevents writing of certain directory tree updates even though they occur within the scope of a transaction.

- To make the new Exchange#lock method work properly, it was necessary to disable the transactional nature of tree creation/removal in the temporary volume used for locks. On the basis that this is likely the only temporary volume that will be shared among multiple threads, the current test creates and removes all trees in temporary volumes in a non-transactional (primordial) fashion. We could make this more precise, or we could simply define the transactional tree semantics as affecting only non-temporary volumes (which I think I prefer).

- The tree used by ClassIndex needs to be primordial to avoid unwanted write-write dependencies. Therefore there's an explicit call to create the tree during initialization.

- TreeStatistics are now maintained within a TreeVersion. Because the serialized form needs to be a snapshot view, these are now flushed within the checkpoint cycle only, and not periodically. As a consequence I believe the recovered tree statistics are always consistent with their state at checkpoint plus whatever updates are performed during recovery. Needless to say, the count of read operations performed after the keystone checkpoint is lost. This is not keeping me awake at night.

Because the semantic change here is pretty big, I am proposing a new major version increment to 3.3. Unlike other changes, this one does not merely add new methods; it subtly changes the fundamental behavior.

At the moment I have not run the server tests within this, nor have I run benchmarks to assess performance changes. These should be done before we approve the proposal.

To post a comment you must log in.
416. By Peter Beaman

Skip directory tree updates in RecoveryManager#DefaultRecoveryListener rather than Transaction#store/#remove

Revision history for this message
Peter Beaman (pbeaman) wrote :

Updated to store but not recover directory tree updates in the journal. Needed because rollback needs the records in order to prune. Otherwise we get an assertion on a negative mvvCount.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Information only. Here's what we need to deal with in server:

Results :

Failed tests: deleteMissingRow(com.akiban.server.test.it.dxl.CBasicIT): open cursors remaining:{CursorId[session 2712, table 39, cursor 1407]=ScanData[cursor=Cursor(created=1359482331405, state=FINISHED, request=com.akiban.server.api.dml.scan.ScanAllRequest@60ac2d55), columns=[0, 1]]}
  testYaml [test-bug1025059](com.akiban.sql.pg.PostgresServerMiscYamlIT): src/test/resources/com/akiban/sql/pg/yaml/bugs/test-bug1025059.yaml: org.postgresql.util.PSQLException: An I/O error occured while sending to the backend.

Tests in error:
  dropGroupingForeignKeyMiddleOfGroup(com.akiban.server.test.it.dxl.AlterTableBasicIT): PERSISTIT_ERROR: Persistit Data Layer error: null
  setDataType_C_id(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  setDataType_O_id(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropColumn_C_id(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropColumn_O_id(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropPrimaryKey_C(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropPrimaryKey_O(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  addGroupingForeignKey_C(com.akiban.server.test.it.dxl.AlterTableCAOIIT): PERSISTIT_ERROR: Persistit Data Layer error: null
  dropGroupingForeignKey_O(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  setDataType_O_cid(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  dropColumn_O_cid(com.akiban.server.test.it.dxl.AlterTableCAOIIT)
  deleteMissingRow(com.akiban.server.test.it.dxl.CBasicIT): Unexpected exception, expected<com.akiban.server.error.NoSuchRowException> but was<com.akiban.server.error.PersistitAdapterException>
  testRepeated(com.akiban.sql.pg.PostgresServerCacheIT): An I/O error occured while sending to the backend.
  testMultipleUpdate [commit-1](com.akiban.sql.pg.PostgresServerMultipleUpdateIT): An I/O error occured while sending to the backend.

Tests run: 2245, Failures: 2, Errors: 14, Skipped: 99

Revision history for this message
Nathan Williams (nwilliams) wrote :

Very nice! The meat of the changes really aren't too complicated and everything seems to fit nicely.

The only code change I would suggest, ask about really, is the live-ness determination in TimelyResource#prune(). I don't suspect it to be wrong but it is performing the same thing as MVV#prune(). They are operating on structures so different that I don't see a simple way to merge, but I had to convince myself they did the same thing. Can you think of anything to share the logic between them?

I would also like to see a handful of tests around what is in the directory tree, shutdown and recovery where multiple versions were present, and the (lack of) MVVs in writes to trees created in the same transaction.

The rest is a smattering of feedback, ranging from small to nits.

TimelyResource
- JavaDoc has a number of invalid links. Lots of other docs like that too, but this is all new.
- Should getVersionCount() should be public? Fairly harmless, but I don't think we leak information like that from other places (Exchange, etc)
- Should delete() check for _first != null? Seems like deleting a resource with no versions should be exception or assert worthy.

Exchange
- Why was the assertCorrecThread() removed from init(), hasChildren(), and isDirectoryExchange??
- The semantic change on checkThread (null-ing out of !set) is subtle. Perhaps renaming the method to checkAndSetThread or checkSetOrClearThread or something would help.
- A couple places call throttle() and now have multiple conditions. A helper may be in order.
- The addition of 'spareValue.isDefined() || !_tree.isTransationPrivate' in store() isn't clear to me. When would doMVCC be be true and the value be empty?

Tree
- End of the new paragraph of JavaDoc has two open <code> tags instead of open and close.
- version() has a // TODO about throwing a RuntimeException
- There are now a number of places where isValid() and isDeleted() must be called together. Maybe a new helper? A more drastic change, but perhaps safer, would be to make it protected in SharedResource and not promote it to public in Tree.

VolumeStructure
- remoteTree() now only returns true (or throws)

TimelyResourceTest
- testAndPruneResources(boolean) might want to append withTransactions to the assert messages. Does the CleanupManager need disabled to avoid spurious failures?
- doConcurrentTransaction() is called from threads so using fail() won't cause the test to fail but only generate console output. This needs to get propagated back to the main thread somewhere. There are lightly used helpers in ConcurrentUtil for this.
- deleteResources() uses a raw assert instead of jUnits.

TreeTransactionalLifetimeTest
- simplePruning() has a 5s sleep and a commented out manual cleanup call. The latter seems cleaner and more deterministic, does it not work?
- I'd like to see some tests around the behavior regarding steps. I believe having trees obey it just like data makes the most sense, which is what it does now, but confirming it would be good.
- The TExec helper is duplicating part of the functionality in ConcurrentUtil and doesn't handle errors in a way that will fail the test.

review: Needs Fixing
417. By Peter Beaman

Modifications per review comments

418. By Peter Beaman

Source code cleanup

419. By Peter Beaman

Fix up documentation. Optimization: don't add new version to delete resource created with same version handle

420. By Peter Beaman

Add some crash and stop/start tests

421. By Peter Beaman

& -> &&

422. By Peter Beaman

assert -> assertTrue

Revision history for this message
Peter Beaman (pbeaman) wrote :
Download full text (3.5 KiB)

Thanks. The updated version is changed as follows:

1. The only code change I would suggest, ask about really, is the live-ness determination in TimelyResource#prune().

- No change yet. Will look at that again tomorrow.

2. I would also like to see a handful of tests around what is in the directory tree, shutdown and recovery where multiple versions were present, and the (lack of) MVVs in writes to trees created in the same transaction.

- I believe these cases are covered by new tests added to TreeTransactionalLifetimeTest.

3. TimelyResource:

 - JavaDoc fixed. I believe there are no broke links in the Javadoc - if you find some, please let me know.
 - getVersionCount() - reduced to package private
 - delete() - check for null and throw IllegalStateException. Fixed a test that relied on lenient behavior.

Exchange

 - Why was assertCorrectThread() removed - in every case removed I determined there was another call made by a subordinate method. The removed calls were redundant.
 - checkThread(boolean) - I believe the implementations are identical except that the new one does not set _thread to t when it is known already to be t. Since _thread is volatile this may reduce memory contention. (Actually, I'm not sure if it matters.)
 - throttle() - a new helper, as suggested..
 - doMVCC && (spareValue.isDefined() || !tree.isTransactionPrivate()). This is an attempted optimization. In particular, it might be helpful when building a new index within a transaction by avoiding the need to write and prune MVVs. A rollback range-deletes the entire tree, so no value written inside the tree can ever become visible, even though it was written as primordial. There was an error related to different steps - that's fixed with the rather arcane new boolean argument of isTransactionPrivate(boolean). (I thought about two methods with different names but lacked the imagination to name them.) The main point is that in the case where the tree is unknown to any other transaction and there are no step-py things going on, we can simply write a primordial value since (a) no other transaction can see the transaction, and (b) any step that could see the tree must be later than the step that writes the primordial value. To answer your specific question, I believe doMVCC is true whenever a transaction writes a value to a key that did not previously exist.

Tree
 - Javadoc issues: fixed
 - version() now throws a subclass of RuntimeException and does not have a TODO
 - isValue && !isDeleted() is now computed by helper isLive()

VolumeStructure
- removeTree() is now properly void

TimelyResourceTest
- Append withTransactions - done
- doConcurrentTransaction() - reworked to use ConcurrentUtil. Note that I modified ConcurrentUtil extensively to expose more options for use. This was done to allow doConcurrentTransactions to replace threads during execution. I now fully understand the elegance of ConcurrentUtil (which I had not previously paid much attention to) and will endeavor to replace lots of cases in unit tests that created and join background threads with it.
- assert - fixed

TreeTransactionLifetimeTest
- simplePruning() sleep - fixed
- I'd like to see new tests...

Read more...

Revision history for this message
Peter Beaman (pbeaman) wrote :

Note that Akiban Server requires changes to work with transactional trees. Branch lp:~pbeaman/akiban-server/try-transactional-tree contains more or less minimal changes necessary to complete tests. It does not make use of transactional trees to enhance DDL operations, however.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Thanks for all the tweaks. Aside from the (possibly unrealistic) desire to unify the live-ness determination, this all looks good to me.

Have we ran the long stress suite or tpcc against this? I assume we'll want to before putting into trunk.

423. By Peter Beaman

Merge from trunk with changes for lock pruning fix

424. By Peter Beaman

Synchronize volume open

Revision history for this message
Peter Beaman (pbeaman) wrote :

Stress tests have run four successful nightly iterations.

425. By Peter Beaman

Merge from trunk

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'pom.xml'
2--- pom.xml 2013-04-10 20:09:48 +0000
3+++ pom.xml 2013-04-29 21:38:25 +0000
4@@ -4,7 +4,7 @@
5
6 <groupId>com.akiban</groupId>
7 <artifactId>akiban-persistit</artifactId>
8- <version>3.2.9-SNAPSHOT</version>
9+ <version>3.3-SNAPSHOT</version>
10 <packaging>jar</packaging>
11
12 <parent>
13
14=== modified file 'src/main/java/com/persistit/Buffer.java'
15--- src/main/java/com/persistit/Buffer.java 2013-03-06 16:20:57 +0000
16+++ src/main/java/com/persistit/Buffer.java 2013-04-29 21:38:25 +0000
17@@ -1424,7 +1424,6 @@
18 if (Debug.ENABLED) {
19 assertVerify();
20 }
21-
22 final boolean exactMatch = (foundAt & EXACT_MASK) > 0;
23 final int p = foundAt & P_MASK;
24
25@@ -3869,7 +3868,7 @@
26 * @return a human-readable inventory of the contents of this buffer
27 */
28 public String toStringDetail() {
29- return toStringDetail(-1, 42, 42, 0, true);
30+ return toStringDetail(-1, 42, 82, 0, true);
31 }
32
33 /**
34
35=== modified file 'src/main/java/com/persistit/CheckpointManager.java'
36--- src/main/java/com/persistit/CheckpointManager.java 2013-02-04 16:32:35 +0000
37+++ src/main/java/com/persistit/CheckpointManager.java 2013-04-29 21:38:25 +0000
38@@ -245,6 +245,7 @@
39 final List<Accumulator> accumulators = _persistit.takeCheckpointAccumulators(txn.getStartTimestamp());
40 _persistit.getTransactionIndex().checkpointAccumulatorSnapshots(txn.getStartTimestamp(), accumulators);
41 Accumulator.saveAccumulatorCheckpointValues(accumulators);
42+ _persistit.flushStatistics();
43 txn.commit(CommitPolicy.HARD);
44 _currentCheckpoint = new Checkpoint(txn.getStartTimestamp(), System.currentTimeMillis());
45 _outstandingCheckpoints.add(_currentCheckpoint);
46
47=== modified file 'src/main/java/com/persistit/ClassIndex.java'
48--- src/main/java/com/persistit/ClassIndex.java 2012-08-24 13:57:19 +0000
49+++ src/main/java/com/persistit/ClassIndex.java 2013-04-29 21:38:25 +0000
50@@ -94,11 +94,21 @@
51 *
52 * @param persistit
53 * Owning Persistit instance.
54+ * @throws PersistitException
55 */
56 ClassIndex(final Persistit persistit) {
57 _persistit = persistit;
58 }
59
60+ void initialize() throws PersistitException {
61+ /*
62+ * Called during Persistit initialization. This has the desired
63+ * side-effect of the class index tree outside of a transaction so that
64+ * its existence is primordial.
65+ */
66+ getExchange();
67+ }
68+
69 /**
70 * @return Number of <code>ClassInfo</code> objects currently stored in this
71 * ClassIndex.
72@@ -178,8 +188,9 @@
73 } catch (final PersistitException pe) {
74 throw new ConversionException(pe);
75 } finally {
76- if (ex != null)
77+ if (ex != null) {
78 releaseExchange(ex);
79+ }
80 }
81 }
82 }
83
84=== modified file 'src/main/java/com/persistit/Exchange.java'
85--- src/main/java/com/persistit/Exchange.java 2013-03-23 21:59:48 +0000
86+++ src/main/java/com/persistit/Exchange.java 2013-04-29 21:38:25 +0000
87@@ -413,7 +413,6 @@
88 }
89
90 void init(final Volume volume, final String treeName, final boolean create) throws PersistitException {
91- assertCorrectThread(true);
92 if (volume == null) {
93 throw new NullPointerException();
94 }
95@@ -441,6 +440,7 @@
96 _tree = tree;
97 _treeHolder = new ReentrantResourceHolder(_tree);
98 _cachedTreeGeneration = -1;
99+ _isDirectoryExchange = tree == _volume.getDirectoryTree();
100 initCache();
101 }
102 _splitPolicy = _persistit.getDefaultSplitPolicy();
103@@ -503,7 +503,7 @@
104
105 private void checkLevelCache() throws PersistitException {
106
107- if (!_tree.isValid()) {
108+ if (!_tree.isLive()) {
109 if (_tree.getVolume().isTemporary()) {
110 _tree = _tree.getVolume().getTree(_tree.getName(), true);
111 _treeHolder = new ReentrantResourceHolder(_tree);
112@@ -1356,11 +1356,7 @@
113 if (!isDirectoryExchange()) {
114 _persistit.checkSuspended();
115 }
116- if (!_ignoreTransactions && !_transaction.isActive()) {
117- _persistit.getJournalManager().throttle();
118- }
119- // TODO: directoryExchange, and lots of tests, don't use transactions.
120- // Skip MVCC for now.
121+ throttle();
122 int options = StoreOptions.WAIT;
123 options |= (!_ignoreTransactions && _transaction.isActive()) ? StoreOptions.MVCC : 0;
124 storeInternal(key, value, 0, options);
125@@ -1551,7 +1547,13 @@
126 }
127 }
128
129- if (doMVCC) {
130+ /*
131+ * If the Tree is private to an active transaction, and
132+ * if this is a virgin value, then we can store it
133+ * primordially because if the transaction rolls back,
134+ * the entire Tree will be removed.
135+ */
136+ if (doMVCC && (_spareValue.isDefined() || !_tree.isTransactionPrivate(true))) {
137 valueToStore = spareValue;
138 final int valueSize = value.getEncodedSize();
139 int retries = VERSIONS_OUT_OF_ORDER_RETRY_COUNT;
140@@ -3188,7 +3190,6 @@
141 * @throws PersistitException
142 */
143 public boolean hasChildren() throws PersistitException {
144- assertCorrectThread(true);
145 _key.copyTo(_spareKey2);
146 final int size = _key.getEncodedSize();
147 final boolean result = traverse(GT, true, 0, _key.getDepth() + 1, size, null);
148@@ -3228,30 +3229,16 @@
149 */
150 public void removeTree() throws PersistitException {
151 assertCorrectThread(true);
152+ _persistit.checkSuspended();
153 _persistit.checkClosed();
154
155- final long timestamp = _persistit.getCurrentTimestamp();
156- for (int i = 0; i < 100; i++) {
157- _persistit.checkClosed();
158- _persistit.checkSuspended();
159- _persistit.getJournalManager().pruneObsoleteTransactions();
160- if (_persistit.getJournalManager().getEarliestAbortedTransactionTimestamp() > timestamp) {
161- break;
162- }
163- Util.sleep(1000);
164- }
165+ _volume.getStructure().removeTree(_tree);
166 if (!_ignoreTransactions) {
167+ assert !isDirectoryExchange();
168 _transaction.removeTree(this);
169 }
170-
171- clear();
172-
173+ _key.clear();
174 _value.clear();
175- /*
176- * Remove from directory tree.
177- */
178- _volume.getStructure().removeTree(_tree);
179-
180 initCache();
181 }
182
183@@ -3410,9 +3397,8 @@
184 if (!isDirectoryExchange()) {
185 _persistit.checkSuspended();
186 }
187- if (!_ignoreTransactions && !_transaction.isActive()) {
188- _persistit.getJournalManager().throttle();
189- }
190+
191+ throttle();
192
193 if (_ignoreTransactions || !_transaction.isActive()) {
194 return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false);
195@@ -3422,6 +3408,15 @@
196
197 _transaction.remove(this, key1, key2);
198
199+ /*
200+ * If the Tree was created within this transaction then we can just
201+ * range-delete the tree since it is not visible outside this
202+ * transaction.
203+ */
204+ if (_tree.isTransactionPrivate(true)) {
205+ return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false);
206+ }
207+
208 checkLevelCache();
209
210 _value.clear().putAntiValueMVV();
211@@ -3994,7 +3989,7 @@
212
213 boolean prune(final Key key) throws PersistitException {
214 Buffer buffer = null;
215- Debug.$assert1.t(_tree.isValid());
216+ Debug.$assert1.t(_tree.isLive());
217 try {
218 search(key, true);
219 buffer = _levelCache[0]._buffer;
220@@ -4014,7 +4009,7 @@
221 Buffer buffer = null;
222 boolean pruned = false;
223
224- Debug.$assert1.t(_tree.isValid());
225+ Debug.$assert1.t(_tree.isLive());
226 try {
227 search(key1, true);
228 buffer = _levelCache[0]._buffer;
229@@ -4132,13 +4127,30 @@
230 assert checkThread(set) : "Thread " + Thread.currentThread() + " must not use " + this + " owned by " + _thread;
231 }
232
233+ /**
234+ * Ensure the this Exchange is compatible with the current Thread; if a
235+ * Thread was previously assigned then this thread must be the same one.
236+ *
237+ * @param set
238+ * whether to assign the current thread
239+ * @return true if and only if there was no assigned Thread or the assigned
240+ * Thread is same as the current Thread.
241+ */
242 private boolean checkThread(final boolean set) {
243 final Thread t = Thread.currentThread();
244- final boolean okay = _thread == null || _thread == t;
245- if (okay) {
246- _thread = set ? t : null;
247- }
248- return okay;
249+ if (_thread == t) {
250+ if (!set) {
251+ _thread = null;
252+ }
253+ return true;
254+ }
255+ if (_thread == null) {
256+ if (set) {
257+ _thread = t;
258+ }
259+ return true;
260+ }
261+ return false;
262 }
263
264 /**
265@@ -4187,7 +4199,6 @@
266 * <code>false</code>.
267 */
268 boolean isDirectoryExchange() {
269- assertCorrectThread(true);
270 return _isDirectoryExchange;
271 }
272
273@@ -4426,4 +4437,15 @@
274 _ignoreMVCCFetch = savedIgnore;
275 }
276 }
277+
278+ private void throttle() throws PersistitInterruptedException {
279+ /*
280+ * Don't throttle operations on the directory tree since that makes some
281+ * unit tests very slow. This test is now necessary because a directory
282+ * tree update can now occur within the scope of a transaction.
283+ */
284+ if (!_ignoreTransactions && !_transaction.isActive() && !isDirectoryExchange()) {
285+ _persistit.getJournalManager().throttle();
286+ }
287+ }
288 }
289
290=== modified file 'src/main/java/com/persistit/JournalManager.java'
291--- src/main/java/com/persistit/JournalManager.java 2013-02-15 15:39:42 +0000
292+++ src/main/java/com/persistit/JournalManager.java 2013-04-29 21:38:25 +0000
293@@ -2862,6 +2862,11 @@
294 return false;
295 }
296
297+ @Override
298+ public boolean createTree(final long timestamp) throws PersistitException {
299+ return false;
300+ }
301+
302 }
303
304 /**
305
306=== modified file 'src/main/java/com/persistit/MVV.java'
307--- src/main/java/com/persistit/MVV.java 2012-10-03 14:43:25 +0000
308+++ src/main/java/com/persistit/MVV.java 2013-04-29 21:38:25 +0000
309@@ -451,7 +451,7 @@
310 if (tc == UNCOMMITTED) {
311 final long ts = vh2ts(versionHandle);
312 if (uncommittedTransactionTs != 0 && uncommittedTransactionTs != ts) {
313- throw new CorruptValueException("Multiple uncommitted version");
314+ throw new CorruptValueException("Multiple uncommitted versions");
315 }
316 uncommittedTransactionTs = ts;
317 mark(bytes, from);
318
319=== modified file 'src/main/java/com/persistit/Persistit.java'
320--- src/main/java/com/persistit/Persistit.java 2013-03-20 16:04:52 +0000
321+++ src/main/java/com/persistit/Persistit.java 2013-04-29 21:38:25 +0000
322@@ -29,6 +29,7 @@
323 import java.lang.management.MemoryMXBean;
324 import java.lang.management.MemoryUsage;
325 import java.lang.ref.SoftReference;
326+import java.lang.ref.WeakReference;
327 import java.rmi.RemoteException;
328 import java.util.ArrayList;
329 import java.util.Collections;
330@@ -264,6 +265,8 @@
331
332 private final Set<AccumulatorRef> _accumulators = new HashSet<AccumulatorRef>();
333
334+ private final Set<WeakReference<TimelyResource<?>>> _timelyResourceSet = new HashSet<WeakReference<TimelyResource<?>>>();
335+
336 private final WeakHashMap<SessionId, CLI> _cliSessionMap = new WeakHashMap<SessionId, CLI>();
337
338 private boolean _readRetryEnabled;
339@@ -440,6 +443,7 @@
340 startJournal();
341 startBufferPools();
342 preloadBufferPools();
343+ initializeClassIndex();
344 finishRecovery();
345 startTransactionIndexPollTask();
346 flush();
347@@ -672,6 +676,10 @@
348 _enableBufferInventory.set(_configuration.isBufferInventoryEnabled());
349 }
350
351+ private void initializeClassIndex() throws PersistitException {
352+ _classIndex.initialize();
353+ }
354+
355 void startCheckpointManager() {
356 _checkpointManager.start();
357 }
358@@ -793,17 +801,21 @@
359 }
360 }
361
362- synchronized void addVolume(final Volume volume) throws VolumeAlreadyExistsException {
363- Volume otherVolume;
364- otherVolume = getVolume(volume.getName());
365- if (otherVolume != null) {
366- throw new VolumeAlreadyExistsException("Volume " + otherVolume);
367+ void addVolume(final Volume volume) throws VolumeAlreadyExistsException {
368+ synchronized (_volumes) {
369+ Volume otherVolume;
370+ otherVolume = getVolume(volume.getName());
371+ if (otherVolume != null) {
372+ throw new VolumeAlreadyExistsException("Volume " + otherVolume);
373+ }
374+ _volumes.add(volume);
375 }
376- _volumes.add(volume);
377 }
378
379- synchronized void removeVolume(final Volume volume) throws PersistitInterruptedException {
380- _volumes.remove(volume);
381+ void removeVolume(final Volume volume) throws PersistitInterruptedException {
382+ synchronized (_volumes) {
383+ _volumes.remove(volume);
384+ }
385 }
386
387 /**
388@@ -965,7 +977,6 @@
389 }
390 List<Exchange> stack;
391 final SessionId sessionId = getSessionId();
392-
393 synchronized (_exchangePoolMap) {
394 stack = _exchangePoolMap.get(sessionId);
395 if (stack == null) {
396@@ -986,7 +997,9 @@
397 * @return the List
398 */
399 public List<Volume> getVolumes() {
400- return new ArrayList<Volume>(_volumes);
401+ synchronized (_volumes) {
402+ return new ArrayList<Volume>(_volumes);
403+ }
404 }
405
406 /**
407@@ -1000,9 +1013,10 @@
408 * @return the List
409 * @throws PersistitException
410 */
411- public synchronized List<Tree> getSelectedTrees(final TreeSelector selector) throws PersistitException {
412+ public List<Tree> getSelectedTrees(final TreeSelector selector) throws PersistitException {
413 final List<Tree> list = new ArrayList<Tree>();
414- for (final Volume volume : _volumes) {
415+ final List<Volume> volumes = getVolumes();
416+ for (final Volume volume : volumes) {
417 if (selector.isSelected(volume)) {
418 if (selector.isVolumeOnlySelection(volume.getName())) {
419 list.add(volume.getDirectoryTree());
420@@ -1239,10 +1253,10 @@
421 if (name == null) {
422 throw new NullPointerException("Null volume name");
423 }
424+ final List<Volume> volumes = getVolumes();
425 Volume result = null;
426-
427- for (int i = 0; i < _volumes.size(); i++) {
428- final Volume vol = _volumes.get(i);
429+ for (int i = 0; i < volumes.size(); i++) {
430+ final Volume vol = volumes.get(i);
431 if (name.equals(vol.getName())) {
432 if (result == null)
433 result = vol;
434@@ -1256,8 +1270,8 @@
435 }
436
437 final File file = new File(name).getAbsoluteFile();
438- for (int i = 0; i < _volumes.size(); i++) {
439- final Volume vol = _volumes.get(i);
440+ for (int i = 0; i < volumes.size(); i++) {
441+ final Volume vol = volumes.get(i);
442 if (file.equals(vol.getAbsoluteFile())) {
443 if (result == null)
444 result = vol;
445@@ -1477,16 +1491,17 @@
446 */
447 private Volume getSpecialVolume(final String propName, final String dflt) throws VolumeNotFoundException {
448 final String volumeName = _configuration.getSysVolume();
449-
450- Volume volume = getVolume(volumeName);
451- if (volume == null) {
452+ synchronized (_volumes) {
453 if ((_volumes.size() == 1) && (volumeName.equals(dflt))) {
454- volume = _volumes.get(0);
455- } else {
456- throw new VolumeNotFoundException(volumeName);
457+ return _volumes.get(0);
458 }
459 }
460- return volume;
461+ final Volume volume = getVolume(volumeName);
462+ if (volume == null) {
463+ throw new VolumeNotFoundException(volumeName);
464+ } else {
465+ return volume;
466+ }
467 }
468
469 /**
470@@ -1512,13 +1527,8 @@
471 */
472 void cleanup() {
473 closeZombieTransactions(false);
474- final List<Volume> volumes;
475- synchronized (this) {
476- volumes = new ArrayList<Volume>(_volumes);
477- }
478- for (final Volume volume : volumes) {
479- volume.getStructure().flushStatistics();
480- }
481+ _transactionIndex.updateActiveTransactionCache();
482+ pruneTimelyResources();
483 }
484
485 /**
486@@ -1653,7 +1663,6 @@
487
488 if (flush) {
489 for (final Volume volume : volumes) {
490- volume.getStructure().flushStatistics();
491 volume.getStorage().flush();
492 }
493 }
494@@ -1755,7 +1764,7 @@
495 // the volume files - otherwise there will be left over channels
496 // and FileLocks that interfere with subsequent tests.
497 //
498- final List<Volume> volumes = new ArrayList<Volume>(_volumes);
499+ final List<Volume> volumes = getVolumes();
500 for (final Volume volume : volumes) {
501 try {
502 volume.getStorage().close();
503@@ -1824,8 +1833,11 @@
504 synchronized (_accumulators) {
505 _accumulators.clear();
506 }
507- synchronized (this) {
508+ synchronized (_volumes) {
509 _volumes.clear();
510+ }
511+
512+ synchronized (this) {
513 _alertMonitors.clear();
514 _bufferPoolTable.clear();
515 _intArrayThreadLocal.set(null);
516@@ -1867,8 +1879,8 @@
517 if (_closed.get() || !_initialized.get()) {
518 return false;
519 }
520- for (final Volume volume : _volumes) {
521- volume.getStructure().flushStatistics();
522+ final List<Volume> volumes = getVolumes();
523+ for (final Volume volume : volumes) {
524 volume.getStorage().flush();
525 volume.getStorage().force();
526 }
527@@ -1894,6 +1906,13 @@
528 }
529 }
530
531+ void flushStatistics() throws PersistitException {
532+ final List<Volume> volumes = getVolumes();
533+ for (final Volume volume : volumes) {
534+ volume.getStructure().flushStatistics();
535+ }
536+ }
537+
538 void waitForIOTaskStop(final IOTaskRunnable task) {
539 if (_beginCloseTime == 0) {
540 _beginCloseTime = System.nanoTime();
541@@ -1926,7 +1945,7 @@
542 if (_closed.get() || !_initialized.get()) {
543 return;
544 }
545- final ArrayList<Volume> volumes = _volumes;
546+ final List<Volume> volumes = getVolumes();
547
548 for (int index = 0; index < volumes.size(); index++) {
549 final Volume volume = volumes.get(index);
550@@ -2236,10 +2255,6 @@
551 return _transactionIndex;
552 }
553
554- public long getCheckpointIntervalNanos() {
555- return _checkpointManager.getCheckpointIntervalNanos();
556- }
557-
558 /**
559 * Replaces the current logger implementation.
560 *
561@@ -2286,8 +2301,9 @@
562 */
563 public void checkAllVolumes() throws PersistitException {
564 final IntegrityCheck icheck = new IntegrityCheck(this);
565- for (int index = 0; index < _volumes.size(); index++) {
566- final Volume volume = _volumes.get(index);
567+ final List<Volume> volumes = getVolumes();
568+ for (int index = 0; index < volumes.size(); index++) {
569+ final Volume volume = volumes.get(index);
570 System.out.println("Checking " + volume + " ");
571 try {
572 icheck.checkVolume(volume);
573@@ -2403,6 +2419,12 @@
574 _suspendUpdates.set(suspended);
575 }
576
577+ void addTimelyResource(final TimelyResource<? extends Version> resource) {
578+ synchronized (_timelyResourceSet) {
579+ _timelyResourceSet.add(new WeakReference<TimelyResource<? extends Version>>(resource));
580+ }
581+ }
582+
583 void addAccumulator(final Accumulator accumulator) throws PersistitException {
584 int checkpointCount = 0;
585 synchronized (_accumulators) {
586@@ -2462,6 +2484,34 @@
587 return result;
588 }
589
590+ void pruneTimelyResources() {
591+ final List<TimelyResource<?>> resourcesToPrune = new ArrayList<TimelyResource<?>>();
592+ synchronized (_timelyResourceSet) {
593+ for (final Iterator<WeakReference<TimelyResource<?>>> iter = _timelyResourceSet.iterator(); iter.hasNext();) {
594+ final WeakReference<TimelyResource<?>> ref = iter.next();
595+ final TimelyResource<?> resource = ref.get();
596+ if (resource != null) {
597+ resourcesToPrune.add(resource);
598+ }
599+ }
600+ }
601+ for (final TimelyResource<?> resource : resourcesToPrune) {
602+ try {
603+ resource.prune();
604+ } catch (final PersistitException e) {
605+ _logBase.timelyResourcePruneException.log(e, resource);
606+ }
607+ }
608+ synchronized (_timelyResourceSet) {
609+ for (final Iterator<WeakReference<TimelyResource<?>>> iter = _timelyResourceSet.iterator(); iter.hasNext();) {
610+ final WeakReference<TimelyResource<?>> ref = iter.next();
611+ if (ref.get() == null) {
612+ iter.remove();
613+ }
614+ }
615+ }
616+ }
617+
618 synchronized CLI getSessionCLI() {
619 CLI cli = _cliSessionMap.get(getSessionId());
620 if (cli == null) {
621
622=== modified file 'src/main/java/com/persistit/RecoveryManager.java'
623--- src/main/java/com/persistit/RecoveryManager.java 2013-02-15 15:39:42 +0000
624+++ src/main/java/com/persistit/RecoveryManager.java 2013-04-29 21:38:25 +0000
625@@ -224,14 +224,32 @@
626 private final TransactionPlayer _player = new TransactionPlayer(new RecoveryTransactionPlayerSupport());
627
628 static class DefaultRecoveryListener implements TransactionPlayerListener {
629+
630 @Override
631 public void store(final long address, final long timestamp, final Exchange exchange) throws PersistitException {
632+ if (exchange.isDirectoryExchange() && exchange.getValue().isDefined()
633+ && exchange.getValue().getTypeHandle() == Value.CLASS_TREE) {
634+ /*
635+ * Don't recover tree structure updates within transactions
636+ * because the allocation of root pages is not transactional.
637+ * The intent of the change is conveyed by the implicit creation
638+ * of new trees and explicit remove tree records.
639+ */
640+ return;
641+ }
642 exchange.store();
643 }
644
645 @Override
646 public void removeKeyRange(final long address, final long timestamp, final Exchange exchange, final Key from,
647 final Key to) throws PersistitException {
648+ if (exchange.isDirectoryExchange()) {
649+ /*
650+ * Don't recover directory tree removes because they are implied
651+ * by Remove Tree records in the journal.
652+ */
653+ return;
654+ }
655 exchange.raw_removeKeyRangeInternal(from, to, false, false);
656 }
657
658@@ -275,6 +293,10 @@
659 return true;
660 }
661
662+ @Override
663+ public boolean createTree(final long timestamp) throws PersistitException {
664+ return true;
665+ }
666 }
667
668 class DefaultRollbackListener implements TransactionPlayerListener {
669@@ -336,6 +358,11 @@
670 return false;
671 }
672
673+ @Override
674+ public boolean createTree(final long timestamp) throws PersistitException {
675+ return false;
676+ }
677+
678 }
679
680 private class RecoveryTransactionPlayerSupport implements TransactionPlayerSupport {
681
682=== modified file 'src/main/java/com/persistit/SharedResource.java'
683--- src/main/java/com/persistit/SharedResource.java 2013-04-10 20:09:48 +0000
684+++ src/main/java/com/persistit/SharedResource.java 2013-04-29 21:38:25 +0000
685@@ -38,7 +38,7 @@
686 *
687 * @author peter
688 */
689-class SharedResource {
690+abstract class SharedResource {
691
692 /**
693 * Default maximum time to wait for access to this resource. Methods throw
694@@ -260,10 +260,10 @@
695 /**
696 * A counter that increments every time the resource is changed.
697 */
698- protected AtomicLong _generation = new AtomicLong();
699+ private final AtomicLong _generation = new AtomicLong();
700
701 protected SharedResource(final Persistit persistit) {
702- this._persistit = persistit;
703+ _persistit = persistit;
704 }
705
706 public boolean isAvailable(final boolean writer) {
707
708=== added file 'src/main/java/com/persistit/TimelyResource.java'
709--- src/main/java/com/persistit/TimelyResource.java 1970-01-01 00:00:00 +0000
710+++ src/main/java/com/persistit/TimelyResource.java 2013-04-29 21:38:25 +0000
711@@ -0,0 +1,505 @@
712+/**
713+ * Copyright © 2012 Akiban Technologies, Inc. All rights reserved.
714+ *
715+ * This program and the accompanying materials are made available
716+ * under the terms of the Eclipse Public License v1.0 which
717+ * accompanies this distribution, and is available at
718+ * http://www.eclipse.org/legal/epl-v10.html
719+ *
720+ * This program may also be available under different license terms.
721+ * For more information, see www.akiban.com or contact licensing@akiban.com.
722+ *
723+ * Contributors:
724+ * Akiban Technologies, Inc.
725+ */
726+
727+package com.persistit;
728+
729+import static com.persistit.TransactionIndex.tss2vh;
730+import static com.persistit.TransactionIndex.vh2step;
731+import static com.persistit.TransactionIndex.vh2ts;
732+import static com.persistit.TransactionStatus.ABORTED;
733+import static com.persistit.TransactionStatus.PRIMORDIAL;
734+import static com.persistit.TransactionStatus.TIMED_OUT;
735+import static com.persistit.TransactionStatus.UNCOMMITTED;
736+
737+import java.util.ArrayList;
738+import java.util.List;
739+
740+import com.persistit.Version.PrunableVersion;
741+import com.persistit.Version.VersionCreator;
742+import com.persistit.exception.PersistitException;
743+import com.persistit.exception.PersistitInterruptedException;
744+import com.persistit.exception.RollbackException;
745+import com.persistit.exception.TimeoutException;
746+import com.persistit.exception.WWRetryException;
747+
748+/**
749+ * <p>
750+ * Transactionally manage multiple versions of a resource. For example, a
751+ * resource which caches state created and either committed or rolled back by a
752+ * transaction may be use a TimelyResource to hold its versions. Each version
753+ * must implement the {@link Version} interface and may optionally implement
754+ * {@link PrunableVersion} in which case its {@link PrunableVersion#prune()}
755+ * method is called when <code>TimelyResource</code> detects that the version is
756+ * obsolete.
757+ * </p>
758+ * <p>
759+ * The method {@link #addVersion(Version, Transaction)} attempts to add a new
760+ * version on behalf of the supplied transaction.
761+ * </p>
762+ * <p>
763+ * The method {@link #getVersion()} returns the snapshot version associated with
764+ * the transaction, in other words, an instance of a P which was either
765+ * committed last before this transaction started, or which was created by this
766+ * transaction. If there is no such version the method returns <code>null</code>
767+ * .
768+ * </p>
769+ * <p>
770+ * The method {@link #getVersion(com.persistit.Version.VersionCreator)} does the
771+ * same thing, except that if there is no snapshot version the
772+ * {@link VersionCreator#createVersion(TimelyResource)} method is called to
773+ * construct a new version.
774+ * </p>
775+ *
776+ * @author peter
777+ *
778+ * @param <V>
779+ * specific type of {@link Version} this <code>TimelyResource</code>
780+ * manages
781+ */
782+public class TimelyResource<V extends Version> {
783+
784+ private final Persistit _persistit;
785+ private volatile Entry _first;
786+
787+ public TimelyResource(final Persistit persistit) {
788+ _persistit = persistit;
789+ _persistit.addTimelyResource(this);
790+ }
791+
792+ private long tss2v(final Transaction txn) {
793+ if (txn == null || !txn.isActive()) {
794+ return tss2vh(_persistit.getTimestampAllocator().updateTimestamp(), 0);
795+ } else {
796+ return tss2vh(txn.getStartTimestamp(), txn.getStep());
797+ }
798+ }
799+
800+ public synchronized void delete() throws RollbackException, PersistitException {
801+ if (_first == null) {
802+ throw new IllegalStateException("There is no resource to delete");
803+ }
804+ if (!_first.isDeleted()) {
805+ final Transaction txn = _persistit.getTransaction();
806+ final long version = tss2v(txn);
807+ if (version == _first.getVersion()) {
808+ _first.setDeleted();
809+ } else {
810+ final V resource = _first.getResource();
811+ final Entry entry = new Entry(version, resource);
812+ entry.setDeleted();
813+ addVersion(entry, txn);
814+ }
815+ }
816+ }
817+
818+ public void addVersion(final V resource, final Transaction txn) throws PersistitInterruptedException,
819+ RollbackException {
820+ if (resource == null) {
821+ throw new NullPointerException("Null resource");
822+ }
823+ addVersion(new Entry(tss2v(txn), resource), txn);
824+ }
825+
826+ public V getVersion() throws TimeoutException, PersistitInterruptedException {
827+ final Entry first = _first;
828+ if (first != null && first.getVersion() == PRIMORDIAL) {
829+ return first.getResource();
830+ }
831+ final Transaction txn = _persistit.getTransaction();
832+ return getVersion(tss2v(txn));
833+ }
834+
835+ public V getVersion(final VersionCreator<V> creator) throws PersistitException, RollbackException {
836+ final Entry first = _first;
837+ if (first != null && first.getVersion() == PRIMORDIAL) {
838+ return first.getResource();
839+ }
840+ final Transaction txn = _persistit.getTransaction();
841+ V version = getVersion(tss2v(txn));
842+ if (version == null) {
843+ version = creator.createVersion(this);
844+ addVersion(version, txn);
845+ }
846+ return version;
847+ }
848+
849+ /**
850+ * @return <code>true</code> if and only if this <code>TimelyResource</code>
851+ * has no <code>Version</code> instances.
852+ */
853+ public boolean isEmpty() throws TimeoutException, PersistitInterruptedException {
854+ Entry first = _first;
855+ if (first == null) {
856+ return true;
857+ }
858+ if (first.getVersion() == PRIMORDIAL) {
859+ return false;
860+ }
861+ first = getEntry(tss2v(_persistit.getTransaction()));
862+ if (first == null) {
863+ return true;
864+ }
865+ if (first.isDeleted()) {
866+ return true;
867+ }
868+ return false;
869+ }
870+
871+ /**
872+ *
873+ * @return Whether this resource exists only within the context of the
874+ * current transaction.
875+ * @throws TimeoutException
876+ * @throws PersistitInterruptedException
877+ */
878+
879+ public boolean isTransactionPrivate(final boolean byStep) throws TimeoutException, PersistitInterruptedException {
880+ Entry entry = _first;
881+ if (entry != null && entry.getVersion() == PRIMORDIAL) {
882+ return false;
883+ }
884+ final Transaction txn = _persistit.getTransaction();
885+ final long versionHandle = tss2v(txn);
886+ entry = getEntry(versionHandle);
887+ if (entry == null) {
888+ return true;
889+ } else {
890+ if (byStep) {
891+ return entry.getVersion() == versionHandle;
892+ } else {
893+ return vh2ts(entry.getVersion()) == vh2ts(versionHandle);
894+ }
895+ }
896+ }
897+
898+ /**
899+ * @return Count of versions currently being managed.
900+ */
901+ int getVersionCount() {
902+ int count = 0;
903+ for (Entry e = _first; e != null; e = e._previous) {
904+ count++;
905+ }
906+ return count;
907+ }
908+
909+ @Override
910+ public String toString() {
911+ final StringBuilder sb = new StringBuilder("TimelyResource(");
912+ boolean first = true;
913+ for (Entry entry = _first; entry != null; entry = entry.getPrevious()) {
914+ if (sb.length() > 1000) {
915+ sb.append("...");
916+ break;
917+ }
918+ if (!first) {
919+ sb.append(',');
920+ } else {
921+ first = false;
922+ }
923+ sb.append(entry);
924+ }
925+ sb.append(')');
926+ return sb.toString();
927+ }
928+
929+ synchronized void setPrimordial() {
930+ if (_first != null && _first.getPrevious() == null) {
931+ _first.setPrimordial();
932+ } else {
933+ throw new IllegalStateException("Cannot be made primordial: " + this);
934+ }
935+ }
936+
937+ /**
938+ * Remove all obsolete <code>Version</code> instances. For each instance
939+ * that implements <code>PrunableVersion</code>, invoke its
940+ * {@link PrunableVersion#prune()} method.
941+ *
942+ * @throws TimeoutException
943+ * @throws PersistitException
944+ */
945+ void prune() throws TimeoutException, PersistitException {
946+ final List<Entry> entriesToPrune = new ArrayList<Entry>();
947+ PrunableVersion versionToVacate = null;
948+
949+ final TransactionIndex ti = _persistit.getTransactionIndex();
950+
951+ synchronized (this) {
952+ try {
953+
954+ Entry newer = null;
955+ Entry latest = null;
956+ boolean isPrimordial = true;
957+ long lastCommit = UNCOMMITTED;
958+
959+ for (Entry entry = _first; entry != null; entry = entry.getPrevious()) {
960+ boolean keepIt = false;
961+ final long versionHandle = entry.getVersion();
962+ final long tc = ti.commitStatus(versionHandle, UNCOMMITTED, 0);
963+ if (tc >= PRIMORDIAL) {
964+ if (tc == UNCOMMITTED) {
965+ keepIt = true;
966+ isPrimordial = false;
967+ } else {
968+ final boolean hasConcurrent = ti.hasConcurrentTransaction(tc, lastCommit);
969+ if (latest == null || hasConcurrent) {
970+ keepIt = true;
971+ if (latest == null) {
972+ latest = entry;
973+ }
974+ }
975+ if (keepIt && ti.hasConcurrentTransaction(0, tc)) {
976+ isPrimordial = false;
977+ }
978+ }
979+ lastCommit = tc;
980+ } else {
981+ assert tc == ABORTED;
982+ }
983+ if (keepIt) {
984+ newer = entry;
985+ } else {
986+ if (tc == ABORTED ^ entry.isDeleted()) {
987+ entriesToPrune.add(entry);
988+ }
989+ if (newer == null) {
990+ _first = entry.getPrevious();
991+ } else {
992+ newer.setPrevious(entry.getPrevious());
993+ }
994+ }
995+ }
996+ if (isPrimordial && _first != null) {
997+ assert _first.getPrevious() == null;
998+ if (_first.isDeleted()) {
999+ final V version = _first.getResource();
1000+ if (version instanceof PrunableVersion) {
1001+ versionToVacate = (PrunableVersion) version;
1002+ }
1003+ entriesToPrune.add(_first);
1004+ _first = null;
1005+ } else {
1006+ _first.setPrimordial();
1007+ }
1008+ }
1009+ } catch (final InterruptedException ie) {
1010+ throw new PersistitInterruptedException(ie);
1011+ }
1012+ }
1013+ for (final Entry e : entriesToPrune) {
1014+ if (versionToVacate != null) {
1015+ versionToVacate.vacate();
1016+ }
1017+ e.prune();
1018+ }
1019+ }
1020+
1021+ /**
1022+ * Helper method that adds an Entry containing a Version and its timestamp
1023+ * information. This method checks for write-write dependencies throws a
1024+ * RollbackException if there is a conflict.
1025+ *
1026+ * @param entry
1027+ * @param txn
1028+ * @throws PersistitException
1029+ * @throws RollbackException
1030+ */
1031+ private void addVersion(final Entry entry, final Transaction txn) throws PersistitInterruptedException,
1032+ RollbackException {
1033+ final TransactionIndex ti = _persistit.getTransactionIndex();
1034+ while (true) {
1035+ try {
1036+ synchronized (this) {
1037+ if (_first != null) {
1038+ if (_first.getVersion() > entry.getVersion()) {
1039+ /*
1040+ * This thread lost a race to make the most recent
1041+ * version
1042+ */
1043+ throw new RollbackException();
1044+ }
1045+ if (txn.isActive()) {
1046+ for (Entry e = _first; e != null; e = e.getPrevious()) {
1047+ final long version = e.getVersion();
1048+ final long depends = ti.wwDependency(version, txn.getTransactionStatus(), 0);
1049+ if (depends == TIMED_OUT) {
1050+ throw new WWRetryException(version);
1051+ }
1052+ if (depends != 0 && depends != ABORTED) {
1053+ /*
1054+ * version is from a concurrent transaction
1055+ * that already committed or timed out
1056+ * waiting to see. Either way, must abort.
1057+ */
1058+ throw new RollbackException();
1059+ }
1060+ }
1061+ }
1062+ }
1063+ entry.setPrevious(_first);
1064+ _first = entry;
1065+ break;
1066+ }
1067+ } catch (final WWRetryException re) {
1068+ try {
1069+ final long depends = _persistit.getTransactionIndex().wwDependency(re.getVersionHandle(),
1070+ txn.getTransactionStatus(), SharedResource.DEFAULT_MAX_WAIT_TIME);
1071+ if (depends != 0 && depends != ABORTED) {
1072+ /*
1073+ * version is from concurrent txn that already committed
1074+ * or timed out waiting to see. Either way, must abort.
1075+ */
1076+ throw new RollbackException();
1077+ }
1078+ } catch (final InterruptedException ie) {
1079+ throw new PersistitInterruptedException(ie);
1080+ }
1081+ } catch (final InterruptedException ie) {
1082+ throw new PersistitInterruptedException(ie);
1083+ }
1084+ }
1085+ }
1086+
1087+ /**
1088+ * Get the <code>Version</code> from the snapshot view specified by the
1089+ * supplied version handle.
1090+ *
1091+ * @param version
1092+ * versionHandle
1093+ * @return <code>Version</code> for given version handle
1094+ * @throws TimeoutException
1095+ * @throws PersistitInterruptedException
1096+ */
1097+ V getVersion(final long version) throws TimeoutException, PersistitInterruptedException {
1098+ final Entry e = getEntry(version);
1099+ return e == null ? null : e.getResource();
1100+ }
1101+
1102+ Entry getEntry(final long version) throws TimeoutException, PersistitInterruptedException {
1103+ final TransactionIndex ti = _persistit.getTransactionIndex();
1104+ try {
1105+ /*
1106+ * Note: not necessary to synchronize here. A concurrent transaction
1107+ * may modify _first, but this method does not need to see that
1108+ * version since it has not been committed. Conversely, if there is
1109+ * some transaction that committed before this transaction's start
1110+ * timestamp, then there is a happened-before relationship due to
1111+ * the synchronization in transaction registration; since _first is
1112+ * volatile, we are guaranteed to see the modification made by the
1113+ * committed transaction.
1114+ */
1115+ for (Entry e = _first; e != null; e = e._previous) {
1116+ final long commitTs = ti.commitStatus(e.getVersion(), vh2ts(version), vh2step(version));
1117+ if (commitTs >= 0 && commitTs != UNCOMMITTED) {
1118+ if (e.isDeleted()) {
1119+ return null;
1120+ }
1121+ return e;
1122+ }
1123+ }
1124+ return null;
1125+ } catch (final InterruptedException e) {
1126+ throw new PersistitInterruptedException(e);
1127+ }
1128+ }
1129+
1130+ /**
1131+ * <p>
1132+ * Holder for one instance of an object, such as a {@link Tree}, whose
1133+ * availability is governed by visibility within a transaction.
1134+ * </p>
1135+ * <p>
1136+ * An Entry has a version handle just like a version in an {@link MVV}. It
1137+ * is invisible to any transactions that started before the start timestamp,
1138+ * in write-write conflict with any concurrent transaction, and available to
1139+ * any transaction that starts after the version has committed. Visibility
1140+ * within the transaction that creates the TimelyResourceEntry is determined
1141+ * by the relative step numbers of the current transaction and the version
1142+ * handle. If there exists a TimelyResource instance with start and commit
1143+ * timestamps ts1, tc1 then any attempt to add another TimelyResource will
1144+ * fail unless ts2 > tc1.
1145+ * </p>
1146+ *
1147+ * @author peter
1148+ */
1149+
1150+ private class Entry {
1151+
1152+ private long _version;
1153+ private V _resource;
1154+ private volatile boolean _deleted;
1155+ private volatile Entry _previous;
1156+
1157+ private Entry(final long versionHandle, final V resource) {
1158+ _version = versionHandle;
1159+ _resource = resource;
1160+ }
1161+
1162+ private V getResource() {
1163+ return _resource;
1164+ }
1165+
1166+ private void setResource(final V resource) {
1167+ _resource = resource;
1168+ }
1169+
1170+ private Entry getPrevious() {
1171+ return _previous;
1172+ }
1173+
1174+ private void setPrevious(final Entry tr) {
1175+ _previous = tr;
1176+ }
1177+
1178+ private long getVersion() {
1179+ return _version;
1180+ }
1181+
1182+ private void setPrimordial() {
1183+ _version = PRIMORDIAL;
1184+ }
1185+
1186+ private void setDeleted() {
1187+ _deleted = true;
1188+ }
1189+
1190+ private boolean isDeleted() {
1191+ return _deleted;
1192+ }
1193+
1194+ private boolean prune() throws PersistitException {
1195+ if (_resource instanceof PrunableVersion) {
1196+ return ((PrunableVersion) _resource).prune();
1197+ } else {
1198+ return true;
1199+ }
1200+ }
1201+
1202+ @Override
1203+ public String toString() {
1204+ String tcStatus;
1205+ try {
1206+ final long tc = _persistit.getTransactionIndex().commitStatus(_version, Long.MAX_VALUE, 0);
1207+ tcStatus = TransactionStatus.tcString(tc);
1208+ } catch (final Exception e) {
1209+ tcStatus = e.toString();
1210+ }
1211+ return String.format("(ts=%s tc=%s)->%s%s", TransactionStatus.versionString(_version), tcStatus, _resource,
1212+ _previous != null ? "*" : "");
1213+ }
1214+ }
1215+
1216+}
1217
1218=== modified file 'src/main/java/com/persistit/TransactionPlayer.java'
1219--- src/main/java/com/persistit/TransactionPlayer.java 2012-11-28 18:52:21 +0000
1220+++ src/main/java/com/persistit/TransactionPlayer.java 2013-04-29 21:38:25 +0000
1221@@ -15,11 +15,8 @@
1222
1223 package com.persistit;
1224
1225-/**
1226- *
1227- * Read and apply transaction from the journal to the live database. To apply
1228- * a transaction, this class calls methods of a
1229- */
1230+import static com.persistit.TransactionIndex.ts2vh;
1231+
1232 import java.nio.ByteBuffer;
1233 import java.util.ArrayList;
1234 import java.util.List;
1235@@ -39,6 +36,11 @@
1236 import com.persistit.exception.PersistitException;
1237 import com.persistit.exception.VolumeNotFoundException;
1238
1239+/**
1240+ * Read and apply transaction from the journal to the live database. To apply a
1241+ * transaction, this class calls methods of a TransactionPlayerListener.
1242+ */
1243+
1244 class TransactionPlayer {
1245
1246 private final AtomicLong appliedUpdates = new AtomicLong();
1247@@ -67,6 +69,7 @@
1248
1249 boolean requiresLongRecordConversion();
1250
1251+ boolean createTree(long timestamp) throws PersistitException;
1252 }
1253
1254 final TransactionPlayerSupport _support;
1255@@ -153,93 +156,106 @@
1256 case SR.TYPE: {
1257 final int keySize = SR.getKeySize(bb);
1258 final int treeHandle = SR.getTreeHandle(bb);
1259- final Exchange exchange = getExchange(treeHandle, address, startTimestamp);
1260- exchange.ignoreTransactions();
1261- final Key key = exchange.getKey();
1262- final Value value = exchange.getValue();
1263- System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD, key.getEncodedBytes(), 0, keySize);
1264- key.setEncodedSize(keySize);
1265- final int valueSize = innerSize - SR.OVERHEAD - keySize;
1266- value.ensureFit(valueSize);
1267- System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD + keySize, value.getEncodedBytes(), 0,
1268- valueSize);
1269- value.setEncodedSize(valueSize);
1270-
1271- if (value.getEncodedSize() >= Buffer.LONGREC_SIZE
1272- && (value.getEncodedBytes()[0] & 0xFF) == Buffer.LONGREC_TYPE) {
1273+
1274+ final Exchange exchange = getExchange(treeHandle, address, startTimestamp, listener);
1275+ if (exchange != null) {
1276+ exchange.ignoreTransactions();
1277+ final Key key = exchange.getKey();
1278+ final Value value = exchange.getValue();
1279+ System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD, key.getEncodedBytes(), 0, keySize);
1280+ key.setEncodedSize(keySize);
1281+ final int valueSize = innerSize - SR.OVERHEAD - keySize;
1282+ value.ensureFit(valueSize);
1283+ System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD + keySize, value.getEncodedBytes(), 0,
1284+ valueSize);
1285+ value.setEncodedSize(valueSize);
1286+
1287+ if (value.getEncodedSize() >= Buffer.LONGREC_SIZE
1288+ && (value.getEncodedBytes()[0] & 0xFF) == Buffer.LONGREC_TYPE) {
1289+ /*
1290+ * convertToLongRecord will pollute the
1291+ * getReadBuffer(). Therefore before calling it we
1292+ * need to copy the TX record to a fresh ByteBuffer.
1293+ */
1294+ if (bb == _support.getReadBuffer()) {
1295+ end = recordSize - (position - start);
1296+ bb = ByteBuffer.allocate(end);
1297+ bb.put(_support.getReadBuffer().array(), position, end);
1298+ bb.flip();
1299+ position = 0;
1300+ }
1301+ if (listener.requiresLongRecordConversion()) {
1302+ _support.convertToLongRecord(value, treeHandle, address, commitTimestamp);
1303+ }
1304+ }
1305+
1306+ listener.store(address, startTimestamp, exchange);
1307 /*
1308- * convertToLongRecord will pollute the getReadBuffer().
1309- * Therefore before calling it we need to copy the TX
1310- * record to a fresh ByteBuffer.
1311+ * Don't keep exchanges with enlarged value - let them
1312+ * be GC'd
1313 */
1314- if (bb == _support.getReadBuffer()) {
1315- end = recordSize - (position - start);
1316- bb = ByteBuffer.allocate(end);
1317- bb.put(_support.getReadBuffer().array(), position, end);
1318- bb.flip();
1319- position = 0;
1320- }
1321- if (listener.requiresLongRecordConversion()) {
1322- _support.convertToLongRecord(value, treeHandle, address, commitTimestamp);
1323+ if (exchange.getValue().getMaximumSize() < Value.DEFAULT_MAXIMUM_SIZE) {
1324+ releaseExchange(exchange);
1325 }
1326 }
1327-
1328- listener.store(address, startTimestamp, exchange);
1329 appliedUpdates.incrementAndGet();
1330- // Don't keep exchanges with enlarged value - let them be
1331- // GC'd
1332- if (exchange.getValue().getMaximumSize() < Value.DEFAULT_MAXIMUM_SIZE) {
1333- releaseExchange(exchange);
1334- }
1335 break;
1336 }
1337
1338 case DR.TYPE: {
1339 final int key1Size = DR.getKey1Size(bb);
1340 final int elisionCount = DR.getKey2Elision(bb);
1341- final Exchange exchange = getExchange(DR.getTreeHandle(bb), address, startTimestamp);
1342- exchange.ignoreTransactions();
1343- final Key key1 = exchange.getAuxiliaryKey3();
1344- final Key key2 = exchange.getAuxiliaryKey4();
1345- System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD, key1.getEncodedBytes(), 0, key1Size);
1346- key1.setEncodedSize(key1Size);
1347- final int key2Size = innerSize - DR.OVERHEAD - key1Size;
1348- System.arraycopy(key1.getEncodedBytes(), 0, key2.getEncodedBytes(), 0, elisionCount);
1349- System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD + key1Size, key2.getEncodedBytes(),
1350- elisionCount, key2Size);
1351- key2.setEncodedSize(key2Size + elisionCount);
1352- listener.removeKeyRange(address, startTimestamp, exchange, key1, key2);
1353+ final Exchange exchange = getExchange(DR.getTreeHandle(bb), address, startTimestamp, listener);
1354+ if (exchange != null) {
1355+ exchange.ignoreTransactions();
1356+ final Key key1 = exchange.getAuxiliaryKey3();
1357+ final Key key2 = exchange.getAuxiliaryKey4();
1358+ System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD, key1.getEncodedBytes(), 0, key1Size);
1359+ key1.setEncodedSize(key1Size);
1360+ final int key2Size = innerSize - DR.OVERHEAD - key1Size;
1361+ System.arraycopy(key1.getEncodedBytes(), 0, key2.getEncodedBytes(), 0, elisionCount);
1362+ System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD + key1Size, key2.getEncodedBytes(),
1363+ elisionCount, key2Size);
1364+ key2.setEncodedSize(key2Size + elisionCount);
1365+ listener.removeKeyRange(address, startTimestamp, exchange, key1, key2);
1366+ releaseExchange(exchange);
1367+ }
1368 appliedUpdates.incrementAndGet();
1369- releaseExchange(exchange);
1370 break;
1371 }
1372
1373 case DT.TYPE: {
1374- final Exchange exchange = getExchange(DT.getTreeHandle(bb), address, startTimestamp);
1375- listener.removeTree(address, startTimestamp, exchange);
1376+ final Exchange exchange = getExchange(DT.getTreeHandle(bb), address, startTimestamp, listener);
1377+ if (exchange != null) {
1378+ listener.removeTree(address, startTimestamp, exchange);
1379+ releaseExchange(exchange);
1380+ }
1381 appliedUpdates.incrementAndGet();
1382- releaseExchange(exchange);
1383 break;
1384 }
1385
1386 case D0.TYPE: {
1387- final Exchange exchange = getExchange(D0.getTreeHandle(bb), address, startTimestamp);
1388- /*
1389- * Note that the commitTimestamp, not startTimestamp is
1390- * passed to the delta method. The
1391- * Accumulator#updateBaseValue method needs the
1392- * commitTimestamp.
1393- */
1394- listener.delta(address, commitTimestamp, exchange.getTree(), D0.getIndex(bb),
1395- D0.getAccumulatorTypeOrdinal(bb), 1);
1396- appliedUpdates.incrementAndGet();
1397+ final Exchange exchange = getExchange(D0.getTreeHandle(bb), address, startTimestamp, listener);
1398+ if (exchange != null) {
1399+ /*
1400+ * Note that the commitTimestamp, not startTimestamp is
1401+ * passed to the delta method. The
1402+ * Accumulator#updateBaseValue method needs the
1403+ * commitTimestamp.
1404+ */
1405+ listener.delta(address, commitTimestamp, exchange.getTree(), D0.getIndex(bb),
1406+ D0.getAccumulatorTypeOrdinal(bb), 1);
1407+ appliedUpdates.incrementAndGet();
1408+ }
1409 break;
1410 }
1411
1412 case D1.TYPE: {
1413- final Exchange exchange = getExchange(D1.getTreeHandle(bb), address, startTimestamp);
1414- listener.delta(address, startTimestamp, exchange.getTree(), D1.getIndex(bb),
1415- D1.getAccumulatorTypeOrdinal(bb), D1.getValue(bb));
1416+ final Exchange exchange = getExchange(D1.getTreeHandle(bb), address, startTimestamp, listener);
1417+ if (exchange != null) {
1418+ listener.delta(address, startTimestamp, exchange.getTree(), D1.getIndex(bb),
1419+ D1.getAccumulatorTypeOrdinal(bb), D1.getValue(bb));
1420+ }
1421 appliedUpdates.incrementAndGet();
1422 break;
1423 }
1424@@ -281,7 +297,26 @@
1425 return String.format("JournalAddress %,d{%,d}", address, timestamp);
1426 }
1427
1428- private Exchange getExchange(final int treeHandle, final long from, final long timestamp) throws PersistitException {
1429+ /**
1430+ * Returns an Exchange on which an operation can be applied or rolled back.
1431+ * For a {@link TransactionPlayerListener} that performs roll backs, it is
1432+ * important not to create a new tree when none exists. Therefore this
1433+ * method may return <code>null</code> to indicate that no tree exists and
1434+ * therefore the requested operation should be ignored. Whether to create a
1435+ * new tree is determined by the
1436+ * {@link TransactionPlayerListener#createTree(long)} method.
1437+ *
1438+ * @param treeHandle
1439+ * @param from
1440+ * @param timestamp
1441+ * @param listener
1442+ * @return the <code>Exchange</code> on which a recovery operation should be
1443+ * applied, or <code>null</code> if there is no backing
1444+ * <code>Tree</code>.
1445+ * @throws PersistitException
1446+ */
1447+ private Exchange getExchange(final int treeHandle, final long from, final long timestamp,
1448+ final TransactionPlayerListener listener) throws PersistitException {
1449 final TreeDescriptor td = _support.getPersistit().getJournalManager().lookupTreeHandle(treeHandle);
1450 if (td == null) {
1451 throw new CorruptJournalException("Tree handle " + treeHandle + " is undefined at "
1452@@ -295,6 +330,10 @@
1453 if (VolumeStructure.DIRECTORY_TREE_NAME.equals(td.getTreeName())) {
1454 return volume.getStructure().directoryExchange();
1455 } else {
1456+ final Tree tree = volume.getStructure().getTreeInternal(td.getTreeName());
1457+ if (!listener.createTree(timestamp) && (tree == null || !tree.hasVersion(ts2vh(timestamp)))) {
1458+ return null;
1459+ }
1460 final Exchange exchange = _support.getPersistit().getExchange(volume, td.getTreeName(), true);
1461 exchange.ignoreTransactions();
1462 return exchange;
1463
1464=== modified file 'src/main/java/com/persistit/Tree.java'
1465--- src/main/java/com/persistit/Tree.java 2013-04-11 15:31:16 +0000
1466+++ src/main/java/com/persistit/Tree.java 2013-04-29 21:38:25 +0000
1467@@ -23,8 +23,13 @@
1468 import com.persistit.Accumulator.MinAccumulator;
1469 import com.persistit.Accumulator.SeqAccumulator;
1470 import com.persistit.Accumulator.SumAccumulator;
1471+import com.persistit.Version.PrunableVersion;
1472+import com.persistit.Version.VersionCreator;
1473 import com.persistit.exception.CorruptVolumeException;
1474 import com.persistit.exception.PersistitException;
1475+import com.persistit.exception.PersistitInterruptedException;
1476+import com.persistit.exception.RollbackException;
1477+import com.persistit.exception.TimeoutException;
1478 import com.persistit.util.Debug;
1479 import com.persistit.util.Util;
1480
1481@@ -36,6 +41,20 @@
1482 * {@link Accumulator}s for a B-Tree.
1483 * </p>
1484 * <p>
1485+ * As of Persistit 3.3, this class supports version within transactions. A new
1486+ * <code>Tree</code> created within the cope of a {@link Transaction} is not
1487+ * visible within the other transactions until it commits. Similarly, if a
1488+ * <code>Tree</code> is removed within the scope of a transaction, other
1489+ * transactions that started before the current transaction commits will
1490+ * continue to be able to read and write the <code>Tree</code>. As a
1491+ * side-effect, the physical storage for a <code>Tree</code> is not deallocated
1492+ * until there are no remaining active transactions that started before the
1493+ * commit timestamp of the current transaction. Concurrent transactions that
1494+ * attempt to create or remove the same <code>Tree</code> instance are subject
1495+ * to a a write-write dependency (see {@link Transaction}); all but one such
1496+ * transaction must roll back.
1497+ * </p>
1498+ * <p>
1499 * <code>Tree</code> instances are created by
1500 * {@link Volume#getTree(String, boolean)}. If the <code>Volume</code> already
1501 * has a B-Tree with the specified name, then the <code>Tree</code> object
1502@@ -67,15 +86,76 @@
1503
1504 private final String _name;
1505 private final Volume _volume;
1506- private volatile long _rootPageAddr;
1507- private volatile int _depth;
1508- private final AtomicLong _changeCount = new AtomicLong(0);
1509 private final AtomicReference<Object> _appCache = new AtomicReference<Object>();
1510 private final AtomicInteger _handle = new AtomicInteger();
1511
1512- private final Accumulator[] _accumulators = new Accumulator[MAX_ACCUMULATOR_COUNT];
1513-
1514- private final TreeStatistics _treeStatistics = new TreeStatistics();
1515+ private final TimelyResource<TreeVersion> _timelyResource;
1516+
1517+ private final VersionCreator<TreeVersion> _creator = new VersionCreator<TreeVersion>() {
1518+
1519+ @Override
1520+ public TreeVersion createVersion(final TimelyResource<? extends TreeVersion> resource)
1521+ throws PersistitException {
1522+ return new TreeVersion();
1523+ }
1524+ };
1525+
1526+ class TreeVersion implements PrunableVersion {
1527+ volatile long _rootPageAddr;
1528+ volatile int _depth;
1529+ volatile long _generation = _persistit.getTimestampAllocator().updateTimestamp();
1530+ final AtomicLong _changeCount = new AtomicLong();
1531+ volatile boolean _pruned;
1532+ private final Accumulator[] _accumulators = new Accumulator[MAX_ACCUMULATOR_COUNT];
1533+ private final TreeStatistics _treeStatistics = new TreeStatistics();
1534+
1535+ @Override
1536+ public boolean prune() throws PersistitException {
1537+ assert !_pruned;
1538+ _volume.getStructure().deallocateTree(_rootPageAddr, _depth);
1539+ discardAccumulators();
1540+ _pruned = true;
1541+ _rootPageAddr = -1;
1542+ return true;
1543+ }
1544+
1545+ @Override
1546+ public void vacate() {
1547+ clearValid();
1548+ _volume.getStructure().removed(Tree.this);
1549+ }
1550+
1551+ @Override
1552+ public String toString() {
1553+ return String.format("Tree(%d,%d)%s", _rootPageAddr, _depth, _pruned ? "#" : "");
1554+ }
1555+
1556+ /**
1557+ * Forget about any instantiated accumulator and remove it from the
1558+ * active list in Persistit. This should only be called in the during
1559+ * the process of removing a tree.
1560+ */
1561+ void discardAccumulators() {
1562+ for (int i = 0; i < _accumulators.length; ++i) {
1563+ if (_accumulators[i] != null) {
1564+ _persistit.removeAccumulator(_accumulators[i]);
1565+ _accumulators[i] = null;
1566+ }
1567+ }
1568+ }
1569+ }
1570+
1571+ /**
1572+ * Unchecked wrapper for PersistitException thrown while trying to acquire a
1573+ * TreeVersion.
1574+ */
1575+ public static class TreeVersionException extends RuntimeException {
1576+ private static final long serialVersionUID = -6372589972106489591L;
1577+
1578+ TreeVersionException(final Exception e) {
1579+ super(e);
1580+ }
1581+ }
1582
1583 Tree(final Persistit persistit, final Volume volume, final String name) {
1584 super(persistit);
1585@@ -86,7 +166,35 @@
1586 }
1587 _name = name;
1588 _volume = volume;
1589- _generation.set(1);
1590+ _timelyResource = new TimelyResource<TreeVersion>(persistit);
1591+ }
1592+
1593+ TreeVersion version() {
1594+ try {
1595+ return _timelyResource.getVersion(_creator);
1596+ } catch (final PersistitException e) {
1597+ throw new TreeVersionException(e);
1598+ }
1599+ }
1600+
1601+ public boolean isDeleted() throws TimeoutException, PersistitInterruptedException {
1602+ return _timelyResource.isEmpty();
1603+ }
1604+
1605+ boolean isLive() throws TimeoutException, PersistitInterruptedException {
1606+ return isValid() && !isDeleted();
1607+ }
1608+
1609+ boolean isTransactionPrivate(final boolean byStep) throws TimeoutException, PersistitInterruptedException {
1610+ return _timelyResource.isTransactionPrivate(byStep);
1611+ }
1612+
1613+ boolean hasVersion(final long versionHandle) throws TimeoutException, PersistitInterruptedException {
1614+ return _timelyResource.getVersion(versionHandle) != null;
1615+ }
1616+
1617+ void delete() throws RollbackException, PersistitException {
1618+ _timelyResource.delete();
1619 }
1620
1621 /**
1622@@ -110,7 +218,9 @@
1623
1624 @Override
1625 public boolean equals(final Object o) {
1626- if (o instanceof Tree) {
1627+ if (o == this) {
1628+ return true;
1629+ } else if (o instanceof Tree) {
1630 final Tree tree = (Tree) o;
1631 return _name.equals(tree._name) && _volume.equals(tree.getVolume());
1632 } else {
1633@@ -126,20 +236,32 @@
1634 * @return The page address
1635 */
1636 public long getRootPageAddr() {
1637- return _rootPageAddr;
1638+ final TreeVersion version = version();
1639+ return version._rootPageAddr;
1640 }
1641
1642 /**
1643 * @return the number of levels of the <code>Tree</code>.
1644 */
1645 public int getDepth() {
1646- return _depth;
1647+ return version()._depth;
1648+ }
1649+
1650+ @Override
1651+ public long getGeneration() {
1652+ return version()._generation;
1653+ }
1654+
1655+ @Override
1656+ void bumpGeneration() {
1657+ version()._generation = _persistit.getTimestampAllocator().updateTimestamp();
1658 }
1659
1660 void changeRootPageAddr(final long rootPageAddr, final int deltaDepth) throws PersistitException {
1661 Debug.$assert0.t(isOwnedAsWriterByMe());
1662- _rootPageAddr = rootPageAddr;
1663- _depth += deltaDepth;
1664+ final TreeVersion version = version();
1665+ version._rootPageAddr = rootPageAddr;
1666+ version._depth += deltaDepth;
1667 }
1668
1669 void bumpChangeCount() {
1670@@ -147,7 +269,7 @@
1671 // Note: the changeCount only gets written when there's a structure
1672 // change in the tree that causes it to be committed.
1673 //
1674- _changeCount.incrementAndGet();
1675+ version()._changeCount.incrementAndGet();
1676 }
1677
1678 /**
1679@@ -155,7 +277,7 @@
1680 * this tree; does not including replacement of an existing value
1681 */
1682 long getChangeCount() {
1683- return _changeCount.get();
1684+ return version()._changeCount.get();
1685 }
1686
1687 /**
1688@@ -165,9 +287,10 @@
1689 */
1690 int store(final byte[] bytes, final int index) {
1691 final byte[] nameBytes = Util.stringToBytes(_name);
1692- Util.putLong(bytes, index, _rootPageAddr);
1693- Util.putLong(bytes, index + 8, getChangeCount());
1694- Util.putShort(bytes, index + 16, _depth);
1695+ final TreeVersion version = version();
1696+ Util.putLong(bytes, index, version._rootPageAddr);
1697+ Util.putLong(bytes, index + 8, version._changeCount.get());
1698+ Util.putShort(bytes, index + 16, version._depth);
1699 Util.putShort(bytes, index + 18, nameBytes.length);
1700 Util.putBytes(bytes, index + 20, nameBytes);
1701 return 20 + nameBytes.length;
1702@@ -187,9 +310,10 @@
1703 if (!_name.equals(name)) {
1704 throw new IllegalStateException("Invalid tree name recorded: " + name + " for tree " + _name);
1705 }
1706- _rootPageAddr = Util.getLong(bytes, index);
1707- _changeCount.set(Util.getLong(bytes, index + 8));
1708- _depth = Util.getShort(bytes, index + 16);
1709+ final TreeVersion version = version();
1710+ version._rootPageAddr = Util.getLong(bytes, index);
1711+ version._changeCount.set(Util.getLong(bytes, index + 8));
1712+ version._depth = Util.getShort(bytes, index + 16);
1713 return length;
1714 }
1715
1716@@ -200,7 +324,8 @@
1717 * @throws PersistitException
1718 */
1719 void setRootPageAddress(final long rootPageAddr) throws PersistitException {
1720- if (_rootPageAddr != rootPageAddr) {
1721+ final TreeVersion version = version();
1722+ if (version._rootPageAddr != rootPageAddr) {
1723 // Derive the index depth
1724 Buffer buffer = null;
1725 try {
1726@@ -210,8 +335,8 @@
1727 throw new CorruptVolumeException(String.format("Tree root page %,d has invalid type %s",
1728 rootPageAddr, buffer.getPageTypeName()));
1729 }
1730- _rootPageAddr = rootPageAddr;
1731- _depth = type - Buffer.PAGE_TYPE_DATA + 1;
1732+ version._rootPageAddr = rootPageAddr;
1733+ version._depth = type - Buffer.PAGE_TYPE_DATA + 1;
1734 } finally {
1735 if (buffer != null) {
1736 buffer.releaseTouched();
1737@@ -226,10 +351,15 @@
1738 * <code>Tree</code> to fail.
1739 */
1740 void invalidate() {
1741+ final TreeVersion version = version();
1742 super.clearValid();
1743- _depth = -1;
1744- _rootPageAddr = -1;
1745- _generation.set(-1);
1746+ version._depth = -1;
1747+ version._rootPageAddr = -1;
1748+ version._generation = _persistit.getTimestampAllocator().updateTimestamp();
1749+ }
1750+
1751+ void setPrimordial() {
1752+ _timelyResource.setPrimordial();
1753 }
1754
1755 /**
1756@@ -238,7 +368,7 @@
1757 * </code>Tree</code>
1758 */
1759 public TreeStatistics getStatistics() {
1760- return _treeStatistics;
1761+ return version()._treeStatistics;
1762 }
1763
1764 /**
1765@@ -248,8 +378,9 @@
1766 */
1767 @Override
1768 public String toString() {
1769- return "<Tree " + _name + " in volume " + _volume.getName() + " rootPageAddr=" + _rootPageAddr + " depth="
1770- + _depth + " status=" + getStatusDisplayString() + ">";
1771+ final TreeVersion version = version();
1772+ return "<Tree " + _name + " in volume " + _volume.getName() + " rootPageAddr=" + version._rootPageAddr
1773+ + " depth=" + version._depth + " status=" + getStatusDisplayString() + ">";
1774 }
1775
1776 /**
1777@@ -278,8 +409,8 @@
1778 }
1779
1780 /**
1781- * Set the tree handle. The tree must may not be a member of a temporary
1782- * volume.
1783+ * Assign and set the tree handle. The tree must may not be a member of a
1784+ * temporary volume.
1785 *
1786 * @throws PersistitException
1787 */
1788@@ -397,7 +528,8 @@
1789 if (index < 0 || index >= MAX_ACCUMULATOR_COUNT) {
1790 throw new IllegalArgumentException("Invalid accumulator index: " + index);
1791 }
1792- Accumulator accumulator = _accumulators[index];
1793+ final TreeVersion version = version();
1794+ Accumulator accumulator = version._accumulators[index];
1795 if (accumulator == null) {
1796 final AccumulatorState saved = Accumulator.getAccumulatorState(this, index);
1797 long savedValue = 0;
1798@@ -411,7 +543,7 @@
1799 savedValue = saved.getValue();
1800 }
1801 accumulator = Accumulator.accumulator(type, this, index, savedValue, _persistit.getTransactionIndex());
1802- _accumulators[index] = accumulator;
1803+ version._accumulators[index] = accumulator;
1804 _persistit.addAccumulator(accumulator);
1805 } else if (accumulator.getType() != type) {
1806 throw new IllegalStateException("Wrong type " + accumulator + " is not a " + type + " accumulator");
1807@@ -442,17 +574,4 @@
1808 _handle.set(0);
1809 }
1810
1811- /**
1812- * Forget about any instantiated accumulator and remove it from the active
1813- * list in Persistit. This should only be called in the during the process
1814- * of removing a tree.
1815- */
1816- void discardAccumulators() {
1817- for (int i = 0; i < _accumulators.length; ++i) {
1818- if (_accumulators[i] != null) {
1819- _persistit.removeAccumulator(_accumulators[i]);
1820- _accumulators[i] = null;
1821- }
1822- }
1823- }
1824 }
1825
1826=== modified file 'src/main/java/com/persistit/ValueHelper.java'
1827--- src/main/java/com/persistit/ValueHelper.java 2012-08-24 13:57:19 +0000
1828+++ src/main/java/com/persistit/ValueHelper.java 2013-04-29 21:38:25 +0000
1829@@ -15,6 +15,8 @@
1830
1831 package com.persistit;
1832
1833+import com.persistit.util.Util;
1834+
1835 interface ValueHelper {
1836
1837 int requiredLength(final byte[] target, int targetOffset, int targetLength);
1838@@ -102,6 +104,11 @@
1839 public boolean isMVV() {
1840 return false;
1841 }
1842+
1843+ @Override
1844+ public String toString() {
1845+ return _value != null ? Util.hexDump(_value.getEncodedBytes(), 0, _value.getEncodedSize()) : "null";
1846+ }
1847 };
1848
1849 static class MVVValueWriter implements ValueHelper {
1850@@ -145,5 +152,10 @@
1851 public boolean isMVV() {
1852 return true;
1853 }
1854+
1855+ @Override
1856+ public String toString() {
1857+ return _value != null ? Util.hexDump(_value.getEncodedBytes(), 0, _value.getEncodedSize()) : "null";
1858+ }
1859 };
1860 }
1861
1862=== added file 'src/main/java/com/persistit/Version.java'
1863--- src/main/java/com/persistit/Version.java 1970-01-01 00:00:00 +0000
1864+++ src/main/java/com/persistit/Version.java 2013-04-29 21:38:25 +0000
1865@@ -0,0 +1,69 @@
1866+/**
1867+ * Copyright © 2012 Akiban Technologies, Inc. All rights reserved.
1868+ *
1869+ * This program and the accompanying materials are made available
1870+ * under the terms of the Eclipse Public License v1.0 which
1871+ * accompanies this distribution, and is available at
1872+ * http://www.eclipse.org/legal/epl-v10.html
1873+ *
1874+ * This program may also be available under different license terms.
1875+ * For more information, see www.akiban.com or contact licensing@akiban.com.
1876+ *
1877+ * Contributors:
1878+ * Akiban Technologies, Inc.
1879+ */
1880+
1881+package com.persistit;
1882+
1883+import com.persistit.exception.PersistitException;
1884+
1885+/**
1886+ * Marker interface implemented by objects managed within the
1887+ * {@link TimelyResource} framework.
1888+ *
1889+ * @author peter
1890+ */
1891+interface Version {
1892+ /**
1893+ * Sub-interface describing <code>Version</code> types that require pruning
1894+ * when obsolete.
1895+ *
1896+ * @author peter
1897+ */
1898+ interface PrunableVersion extends Version {
1899+ /**
1900+ * Clean up any state held by this resource. For example, when a
1901+ * {@link Tree} is pruned, all pages allocated to its content are
1902+ * deallocated. This method is called when a newer
1903+ * <code>TimelyResource</code> has been created and is visible to all
1904+ * active transactions.
1905+ *
1906+ * @return <code>true</code> if all pruning work for this resource has
1907+ * been completed, <code>false</code> if the prune method should
1908+ * be called again later
1909+ */
1910+ boolean prune() throws PersistitException;
1911+
1912+ /**
1913+ * Called after the last known <code>Version</code> managed by a
1914+ * <code>TimelyResource</code> has been pruned.
1915+ *
1916+ * @throws PersistitException
1917+ */
1918+ void vacate() throws PersistitException;
1919+
1920+ }
1921+
1922+ /**
1923+ * Interface for a factory that creates Versions of the specified type.
1924+ *
1925+ * @author peter
1926+ *
1927+ * @param <T>
1928+ * @param <V>
1929+ */
1930+ interface VersionCreator<V> {
1931+ V createVersion(final TimelyResource<? extends V> resource) throws PersistitException;
1932+ }
1933+
1934+}
1935\ No newline at end of file
1936
1937=== modified file 'src/main/java/com/persistit/Volume.java'
1938--- src/main/java/com/persistit/Volume.java 2013-03-13 17:14:51 +0000
1939+++ src/main/java/com/persistit/Volume.java 2013-04-29 21:38:25 +0000
1940@@ -456,8 +456,11 @@
1941 *
1942 * @throws PersistitException
1943 */
1944- void open(final Persistit persistit) throws PersistitException {
1945+ synchronized void open(final Persistit persistit) throws PersistitException {
1946 checkClosing();
1947+ if (isOpened()) {
1948+ return;
1949+ }
1950 if (_specification == null) {
1951 throw new IllegalStateException("Missing VolumeSpecification");
1952 }
1953
1954=== modified file 'src/main/java/com/persistit/VolumeStructure.java'
1955--- src/main/java/com/persistit/VolumeStructure.java 2013-03-13 16:27:04 +0000
1956+++ src/main/java/com/persistit/VolumeStructure.java 2013-04-29 21:38:25 +0000
1957@@ -15,17 +15,12 @@
1958
1959 package com.persistit;
1960
1961-import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_A;
1962-import static com.persistit.util.ThreadSequencer.sequence;
1963-
1964 import java.lang.ref.WeakReference;
1965 import java.util.ArrayList;
1966 import java.util.HashMap;
1967 import java.util.List;
1968 import java.util.Map;
1969
1970-import com.persistit.AlertMonitor.AlertLevel;
1971-import com.persistit.AlertMonitor.Event;
1972 import com.persistit.exception.BufferSizeUnavailableException;
1973 import com.persistit.exception.CorruptVolumeException;
1974 import com.persistit.exception.InUseException;
1975@@ -131,7 +126,6 @@
1976
1977 Exchange directoryExchange() throws BufferSizeUnavailableException {
1978 final Exchange ex = new Exchange(_directoryTree);
1979- ex.ignoreTransactions();
1980 return ex;
1981 }
1982
1983@@ -191,21 +185,30 @@
1984 if (DIRECTORY_TREE_NAME.equals(name)) {
1985 throw new IllegalArgumentException("Tree name is reserved: " + name);
1986 }
1987- Tree tree;
1988+ Tree tree = null;
1989 final WeakReference<Tree> treeRef = _treeNameHashMap.get(name);
1990 if (treeRef != null) {
1991 tree = treeRef.get();
1992- if (tree != null && tree.isValid()) {
1993- return tree;
1994+ if (tree != null) {
1995+ if (tree.isLive()) {
1996+ return tree;
1997+ } else {
1998+ if (!createIfNecessary) {
1999+ return null;
2000+ }
2001+ }
2002 }
2003 }
2004+ if (tree == null) {
2005+ tree = new Tree(_persistit, _volume, name);
2006+ }
2007 final Exchange ex = directoryExchange();
2008 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(name);
2009 final Value value = ex.fetch().getValue();
2010- tree = new Tree(_persistit, _volume, name);
2011 if (value.isDefined()) {
2012 value.get(tree);
2013 loadTreeStatistics(tree);
2014+ tree.setPrimordial();
2015 tree.setValid();
2016 } else if (createIfNecessary) {
2017 final long rootPageAddr = createTreeRoot(tree);
2018@@ -216,10 +219,14 @@
2019 } else {
2020 return null;
2021 }
2022+ if (_volume.isTemporary() || _volume.isLockVolume()) {
2023+ tree.setPrimordial();
2024+ }
2025 if (!_volume.isTemporary()) {
2026 tree.loadHandle();
2027 }
2028 _treeNameHashMap.put(name, new WeakReference<Tree>(tree));
2029+
2030 return tree;
2031 }
2032
2033@@ -249,13 +256,16 @@
2034 }
2035 } else {
2036 final Exchange ex = directoryExchange();
2037+ if (!tree.isTransactionPrivate(false)) {
2038+ ex.ignoreTransactions();
2039+ }
2040 ex.getValue().put(tree);
2041 ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(tree.getName()).store();
2042 }
2043 }
2044
2045 void storeTreeStatistics(final Tree tree) throws PersistitException {
2046- if (tree.getStatistics().isDirty() && !DIRECTORY_TREE_NAME.equals(tree.getName())) {
2047+ if (tree.isLive() && tree.getStatistics().isDirty() && tree != _directoryTree) {
2048 final Exchange ex = directoryExchange();
2049 if (!ex.getVolume().isReadOnly()) {
2050 ex.getValue().put(tree.getStatistics());
2051@@ -273,43 +283,30 @@
2052 }
2053 }
2054
2055- boolean removeTree(final Tree tree) throws PersistitException {
2056+ void removeTree(final Tree tree) throws PersistitException {
2057 if (tree == _directoryTree) {
2058 throw new IllegalArgumentException("Can't delete the Directory tree");
2059 }
2060- _persistit.checkSuspended();
2061
2062 if (!tree.claim(true)) {
2063 throw new InUseException("Unable to acquire writer claim on " + tree);
2064 }
2065-
2066- final int treeDepth = tree.getDepth();
2067- final long treeRootPage = tree.getRootPageAddr();
2068-
2069 try {
2070- tree.discardAccumulators();
2071-
2072- synchronized (this) {
2073- _treeNameHashMap.remove(tree.getName());
2074- tree.bumpGeneration();
2075- tree.invalidate();
2076-
2077- tree.changeRootPageAddr(-1, 0);
2078- final Exchange ex = directoryExchange();
2079- ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(tree.getName()).remove(Key.GTEQ);
2080- ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_STATS).append(tree.getName()).remove(Key.GTEQ);
2081- ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ACCUMULATOR).append(tree.getName()).remove(Key.GTEQ);
2082- }
2083- sequence(TREE_CREATE_REMOVE_A);
2084+ final Exchange ex = directoryExchange();
2085+ ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ROOT).append(tree.getName()).remove(Key.GTEQ);
2086+ ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_STATS).append(tree.getName()).remove(Key.GTEQ);
2087+ ex.clear().append(DIRECTORY_TREE_NAME).append(TREE_ACCUMULATOR).append(tree.getName()).remove(Key.GTEQ);
2088+ tree.delete();
2089 } finally {
2090 tree.release();
2091 }
2092-
2093- // The Tree is now gone. The following deallocates the
2094- // pages formerly associated with it. If this fails we'll be
2095- // left with allocated pages that are not available on the garbage
2096- // chain for reuse.
2097-
2098+ }
2099+
2100+ synchronized void removed(final Tree tree) {
2101+ _treeNameHashMap.remove(tree.getName());
2102+ }
2103+
2104+ void deallocateTree(final long treeRootPage, final int treeDepth) throws PersistitException {
2105 int depth = treeDepth;
2106 long page = treeRootPage;
2107 while (page != -1) {
2108@@ -341,7 +338,6 @@
2109 deallocateGarbageChain(deallocate, 0);
2110 }
2111 }
2112- return true;
2113 }
2114
2115 /**
2116@@ -362,25 +358,22 @@
2117 /**
2118 * Flush dirty {@link TreeStatistics} instances. Called periodically on the
2119 * PAGE_WRITER thread from {@link Persistit#cleanup()}.
2120+ *
2121+ * @throws PersistitException
2122 */
2123- void flushStatistics() {
2124- try {
2125- final List<Tree> trees = new ArrayList<Tree>();
2126- synchronized (this) {
2127- for (final WeakReference<Tree> ref : _treeNameHashMap.values()) {
2128- final Tree tree = ref.get();
2129- if (tree != null && tree != _directoryTree) {
2130- trees.add(tree);
2131- }
2132+ void flushStatistics() throws PersistitException {
2133+ final List<Tree> trees = new ArrayList<Tree>();
2134+ synchronized (this) {
2135+ for (final WeakReference<Tree> ref : _treeNameHashMap.values()) {
2136+ final Tree tree = ref.get();
2137+ if (tree != null && tree != _directoryTree) {
2138+ trees.add(tree);
2139 }
2140 }
2141- for (final Tree tree : trees) {
2142- storeTreeStatistics(tree);
2143- }
2144- } catch (final Exception e) {
2145- _persistit.getAlertMonitor().post(
2146- new Event(AlertLevel.ERROR, _persistit.getLogBase().adminFlushException, e),
2147- AlertMonitor.FLUSH_STATISTICS_CATEGORY);
2148+ }
2149+
2150+ for (final Tree tree : trees) {
2151+ storeTreeStatistics(tree);
2152 }
2153 }
2154
2155
2156=== modified file 'src/main/java/com/persistit/logging/LogBase.java'
2157--- src/main/java/com/persistit/logging/LogBase.java 2012-10-04 20:23:10 +0000
2158+++ src/main/java/com/persistit/logging/LogBase.java 2013-04-29 21:38:25 +0000
2159@@ -226,6 +226,9 @@
2160 @Message("WARNING|%s while pruning transaction record %s")
2161 public final LogItem pruneException = PersistitLogMessage.empty();
2162
2163+ @Message("WARNING|%s while pruning TimelyResource %s")
2164+ public final LogItem timelyResourcePruneException = PersistitLogMessage.empty();
2165+
2166 @Message("WARNING|Transaction %s pruning incomplete at %s after rollback")
2167 public final LogItem pruningIncomplete = PersistitLogMessage.empty();
2168
2169
2170=== modified file 'src/main/java/com/persistit/util/SequencerConstants.java'
2171--- src/main/java/com/persistit/util/SequencerConstants.java 2012-10-10 16:06:49 +0000
2172+++ src/main/java/com/persistit/util/SequencerConstants.java 2013-04-29 21:38:25 +0000
2173@@ -61,17 +61,6 @@
2174 array(WRITE_WRITE_STORE_A, WRITE_WRITE_STORE_C) };
2175
2176 /*
2177- * Used in testing sequencing between tree creation and removal
2178- */
2179- int TREE_CREATE_REMOVE_A = allocate("TREE_CREATE_REMOVE_A");
2180- int TREE_CREATE_REMOVE_B = allocate("TREE_CREATE_REMOVE_B");
2181- int TREE_CREATE_REMOVE_C = allocate("TREE_CREATE_REMOVE_C");
2182-
2183- int[][] TREE_CREATE_REMOVE_SCHEDULE = new int[][] { array(TREE_CREATE_REMOVE_A, TREE_CREATE_REMOVE_B),
2184- array(TREE_CREATE_REMOVE_B), array(TREE_CREATE_REMOVE_A, TREE_CREATE_REMOVE_C),
2185- array(TREE_CREATE_REMOVE_A, TREE_CREATE_REMOVE_C) };
2186-
2187- /*
2188 * Used in testing sequencing between pageNode reading and invalidation in
2189 * JournalManager
2190 */
2191@@ -104,7 +93,7 @@
2192 array(DEALLOCATE_CHAIN_A, DEALLOCATE_CHAIN_C) };
2193
2194 /*
2195- * Used in testing delete/deallocate sequence in Bug1022567Test
2196+ * Used in testing delete/deallocate sequence in Bug1064565Test
2197 */
2198 int ACCUMULATOR_CHECKPOINT_A = allocate("ACCUMULATOR_CHECKPOINT_A");
2199 int ACCUMULATOR_CHECKPOINT_B = allocate("ACCUMULATOR_CHECKPOINT_B");
2200
2201=== modified file 'src/test/java/com/persistit/AccumulatorRecoveryTest.java'
2202--- src/test/java/com/persistit/AccumulatorRecoveryTest.java 2013-04-10 20:09:48 +0000
2203+++ src/test/java/com/persistit/AccumulatorRecoveryTest.java 2013-04-29 21:38:25 +0000
2204@@ -167,6 +167,11 @@
2205 return true;
2206 }
2207
2208+ @Override
2209+ public boolean createTree(final long timestamp) throws PersistitException {
2210+ return true;
2211+ }
2212+
2213 };
2214 plan.applyAllRecoveredTransactions(commitListener, plan.getDefaultRollbackListener());
2215 assertEquals(15, recoveryTimestamps.size());
2216
2217=== modified file 'src/test/java/com/persistit/AccumulatorTest.java'
2218--- src/test/java/com/persistit/AccumulatorTest.java 2013-04-10 20:09:48 +0000
2219+++ src/test/java/com/persistit/AccumulatorTest.java 2013-04-29 21:38:25 +0000
2220@@ -375,7 +375,7 @@
2221 */
2222 @Test
2223 public void testRecreateAccumulatorAfterCheckpoint() throws PersistitException {
2224- final int PASS_COUNT = 2;
2225+ final int PASS_COUNT = 5;
2226 final int ROW_COUNT = 10;
2227 final int ACCUM_INDEX = 0;
2228 final String TEST_VOLUME_NAME = UnitTestProperties.VOLUME_NAME;
2229
2230=== modified file 'src/test/java/com/persistit/Bug1018526Test.java'
2231--- src/test/java/com/persistit/Bug1018526Test.java 2012-11-20 17:45:51 +0000
2232+++ src/test/java/com/persistit/Bug1018526Test.java 2013-04-29 21:38:25 +0000
2233@@ -92,7 +92,7 @@
2234 final TreeDescriptor td = map.remove(handle);
2235 assertNotNull("Permanent Tree should be un the tree map", td);
2236 }
2237- // expect 1: the directory tree
2238- assertEquals("Recovered tree map should contain only permanent trees", 1, map.size());
2239+ // expect 2: _directory and _classIndex
2240+ assertEquals("Recovered tree map should contain only permanent trees", 2, map.size());
2241 }
2242 }
2243
2244=== modified file 'src/test/java/com/persistit/Bug920754Test.java'
2245--- src/test/java/com/persistit/Bug920754Test.java 2013-04-10 20:09:48 +0000
2246+++ src/test/java/com/persistit/Bug920754Test.java 2013-04-29 21:38:25 +0000
2247@@ -68,6 +68,7 @@
2248 while (dir.next(true)) {
2249 keys++;
2250 }
2251- assertEquals("There should be no remaining keys in the directory tree", 0, keys);
2252+ // _classIndex
2253+ assertEquals("There should be one remaining key in the directory tree", 1, keys);
2254 }
2255 }
2256
2257=== modified file 'src/test/java/com/persistit/Bug932097Test.java'
2258--- src/test/java/com/persistit/Bug932097Test.java 2012-11-25 20:14:58 +0000
2259+++ src/test/java/com/persistit/Bug932097Test.java 2013-04-29 21:38:25 +0000
2260@@ -21,6 +21,7 @@
2261
2262 @Test
2263 public void testInjectedAbortTransactionStatus() throws Exception {
2264+ final Exchange ex = _persistit.getExchange("persistit", "test", true);
2265 /*
2266 * Create a bunch of incomplete transactions
2267 */
2268@@ -28,7 +29,6 @@
2269 _persistit.setSessionId(new SessionId());
2270 final Transaction txn = _persistit.getTransaction();
2271 txn.begin();
2272- final Exchange ex = _persistit.getExchange("persistit", "test", true);
2273 ex.getValue().put(RED_FOX);
2274 txn.begin();
2275 for (int k = 1; k < 10; k++) {
2276
2277=== modified file 'src/test/java/com/persistit/CorruptVolumeTest.java'
2278--- src/test/java/com/persistit/CorruptVolumeTest.java 2012-08-24 13:57:19 +0000
2279+++ src/test/java/com/persistit/CorruptVolumeTest.java 2013-04-29 21:38:25 +0000
2280@@ -37,7 +37,8 @@
2281 exchange.to(i).store();
2282 }
2283 // Corrupt the volume by zonking the the index page
2284- final Buffer buffer = exchange.getBufferPool().get(exchange.getVolume(), 4, true, true);
2285+ final long pageAddr = exchange.fetchBufferCopy(1).getPageAddress();
2286+ final Buffer buffer = exchange.getBufferPool().get(exchange.getVolume(), pageAddr, true, true);
2287 Arrays.fill(buffer.getBytes(), 20, 200, (byte) 0);
2288 buffer.setDirtyAtTimestamp(_persistit.getTimestampAllocator().updateTimestamp());
2289 buffer.releaseTouched();
2290
2291=== modified file 'src/test/java/com/persistit/IOFailureTest.java'
2292--- src/test/java/com/persistit/IOFailureTest.java 2012-11-20 17:45:51 +0000
2293+++ src/test/java/com/persistit/IOFailureTest.java 2013-04-29 21:38:25 +0000
2294@@ -283,7 +283,6 @@
2295
2296 @Test
2297 public void testJournalEOFonRecovery() throws Exception {
2298- final Properties properties = _persistit.getProperties();
2299 final JournalManager jman = _persistit.getJournalManager();
2300 final Exchange exchange = _persistit.getExchange(_volumeName, "RecoveryTest", true);
2301 exchange.getValue().put(RED_FOX);
2302@@ -465,6 +464,10 @@
2303 boolean done = false;
2304 while (System.currentTimeMillis() < expires) {
2305 try {
2306+ /*
2307+ * Needed to avoid leaving a dirty page during checkpoint
2308+ */
2309+ _persistit.flushStatistics();
2310 _persistit.copyBackPages();
2311 done = true;
2312 break;
2313
2314=== modified file 'src/test/java/com/persistit/IntegrityCheckTest.java'
2315--- src/test/java/com/persistit/IntegrityCheckTest.java 2012-11-20 17:45:51 +0000
2316+++ src/test/java/com/persistit/IntegrityCheckTest.java 2013-04-29 21:38:25 +0000
2317@@ -173,6 +173,8 @@
2318 txn.end();
2319 }
2320 }
2321+ _persistit.checkAllVolumes();
2322+ System.out.println();
2323 final Configuration config = _persistit.getConfiguration();
2324 _persistit.crash();
2325 _persistit = new Persistit();
2326
2327=== modified file 'src/test/java/com/persistit/JournalManagerTest.java'
2328--- src/test/java/com/persistit/JournalManagerTest.java 2013-04-10 20:09:48 +0000
2329+++ src/test/java/com/persistit/JournalManagerTest.java 2013-04-29 21:38:25 +0000
2330@@ -59,6 +59,12 @@
2331
2332 private final String _volumeName = "persistit";
2333
2334+ @Override
2335+ public void setUp() throws Exception {
2336+ super.setUp();
2337+ _persistit.getExchange(_volumeName, "JournalManagerTest1", true);
2338+ }
2339+
2340 @Test
2341 public void testJournalRecords() throws Exception {
2342 store1();
2343@@ -216,6 +222,11 @@
2344 return true;
2345 }
2346
2347+ @Override
2348+ public boolean createTree(final long timestamp) throws PersistitException {
2349+ return true;
2350+ }
2351+
2352 };
2353 rman.applyAllRecoveredTransactions(actor, rman.getDefaultRollbackListener());
2354 assertEquals(commitCount, recoveryTimestamps.size());
2355@@ -298,6 +309,7 @@
2356 // Allow test to control when pruning will happen
2357 _persistit.getJournalManager().setRollbackPruningEnabled(false);
2358 _persistit.getJournalManager().setWritePagePruningEnabled(false);
2359+
2360 final Transaction txn = _persistit.getTransaction();
2361 for (int i = 0; i < 10; i++) {
2362 txn.begin();
2363
2364=== modified file 'src/test/java/com/persistit/MVCCPruneBufferTest.java'
2365--- src/test/java/com/persistit/MVCCPruneBufferTest.java 2013-03-06 16:20:57 +0000
2366+++ src/test/java/com/persistit/MVCCPruneBufferTest.java 2013-04-29 21:38:25 +0000
2367@@ -320,6 +320,8 @@
2368
2369 @Test
2370 public void testWritePagePrune() throws Exception {
2371+ _persistit.getJournalManager().setWritePagePruningEnabled(false);
2372+ final Exchange exchange = exchange(1);
2373 final Transaction txn = _persistit.getTransaction();
2374 try {
2375 txn.begin();
2376@@ -328,8 +330,8 @@
2377 } finally {
2378 txn.end();
2379 }
2380- final Volume volume = _persistit.getVolume(TEST_VOLUME_NAME);
2381- final Buffer buffer = volume.getPool().get(volume, 3, true, true);
2382+ final long pageAddr = exchange.fetchBufferCopy(0).getPageAddress();
2383+ final Buffer buffer = exchange.getBufferPool().get(exchange.getVolume(), pageAddr, true, true);
2384 assertTrue("Should have multiple MVV records", buffer.getMvvCount() > 2);
2385 _persistit.getJournalManager().setWritePagePruningEnabled(true);
2386 _persistit.getTransactionIndex().updateActiveTransactionCache();
2387@@ -340,14 +342,17 @@
2388 }
2389
2390 private void storeNewVersion(final int cycle) throws Exception {
2391- final Exchange exchange = _persistit.getExchange(TEST_VOLUME_NAME,
2392- String.format("%s%04d", TEST_TREE_NAME, cycle), true);
2393+ final Exchange exchange = exchange(cycle);
2394 exchange.getValue().put(String.format("%s%04d", RED_FOX, cycle));
2395 for (int i = 1; i <= 100; i++) {
2396 exchange.to(i).store();
2397 }
2398 }
2399
2400+ private Exchange exchange(final int cycle) throws PersistitException {
2401+ return _persistit.getExchange(TEST_VOLUME_NAME, String.format("%s%04d", TEST_TREE_NAME, cycle), true);
2402+ }
2403+
2404 private void removeKeys(final int cycle) throws Exception {
2405 final Exchange exchange = _persistit.getExchange(TEST_VOLUME_NAME, TEST_TREE_NAME, true);
2406 for (int i = (cycle % 2) + 1; i <= 100; i += 2) {
2407
2408=== modified file 'src/test/java/com/persistit/PersistitUnitTestCase.java'
2409--- src/test/java/com/persistit/PersistitUnitTestCase.java 2012-11-20 18:18:02 +0000
2410+++ src/test/java/com/persistit/PersistitUnitTestCase.java 2013-04-29 21:38:25 +0000
2411@@ -136,4 +136,15 @@
2412 _persistit.getCleanupManager().setPollInterval(-1);
2413 _persistit.getJournalManager().setWritePagePruningEnabled(false);
2414 }
2415+
2416+ protected void drainJournal() throws Exception {
2417+ _persistit.flush();
2418+ /*
2419+ * Causes all TreeStatistics to be marked clean so that the subsequent
2420+ * checkpoint will not add another dirty page.
2421+ */
2422+ _persistit.flushStatistics();
2423+ _persistit.checkpoint();
2424+ _persistit.getJournalManager().copyBack();
2425+ }
2426 }
2427
2428=== modified file 'src/test/java/com/persistit/RecoveryTest.java'
2429--- src/test/java/com/persistit/RecoveryTest.java 2012-11-20 17:45:51 +0000
2430+++ src/test/java/com/persistit/RecoveryTest.java 2013-04-29 21:38:25 +0000
2431@@ -83,9 +83,7 @@
2432 store1();
2433 JournalManager jman = _persistit.getJournalManager();
2434 assertTrue(jman.getPageMapSize() > 0);
2435- _persistit.flush();
2436- _persistit.checkpoint();
2437- jman.copyBack();
2438+ drainJournal();
2439 assertEquals(0, jman.getPageMapSize());
2440 _persistit.close();
2441 _persistit = new Persistit(_config);
2442@@ -159,6 +157,11 @@
2443 return true;
2444 }
2445
2446+ @Override
2447+ public boolean createTree(final long timestamp) throws PersistitException {
2448+ return true;
2449+ }
2450+
2451 };
2452 plan.applyAllRecoveredTransactions(actor, plan.getDefaultRollbackListener());
2453 assertEquals(15, recoveryTimestamps.size());
2454@@ -193,8 +196,7 @@
2455 store1();
2456
2457 jman.rollover();
2458- _persistit.checkpoint();
2459- jman.copyBack();
2460+ drainJournal();
2461 assertEquals(0, jman.getPageMapSize());
2462
2463 final Transaction txn = _persistit.getTransaction();
2464@@ -203,8 +205,10 @@
2465 store1();
2466 txn.commit();
2467 txn.end();
2468- // Flush an uncommitted version of this transaction - should
2469- // prevent journal cleanup.
2470+ /*
2471+ * Flush an uncommitted version of this transaction - should prevent
2472+ * journal cleanup.
2473+ */
2474 txn.begin();
2475 store0();
2476 txn.flushTransactionBuffer(true);
2477@@ -212,14 +216,13 @@
2478 txn.end();
2479
2480 jman.rollover();
2481- _persistit.checkpoint();
2482- jman.copyBack();
2483- //
2484- // Because JournalManager thinks there's an open transaction
2485- // it should preserve the journal file containing the TX record
2486- // for the transaction.
2487- //
2488+ drainJournal();
2489 assertEquals(0, jman.getPageMapSize());
2490+ /*
2491+ * Because JournalManager thinks there's an open transaction it should
2492+ * preserve the journal file containing the TX record for the
2493+ * transaction.
2494+ */
2495 assertTrue(jman.getBaseAddress() < jman.getCurrentAddress());
2496 txn.begin();
2497 store1();
2498@@ -229,6 +232,7 @@
2499 jman.unitTestClearTransactionMap();
2500
2501 jman.rollover();
2502+ _persistit.flushStatistics();
2503 _persistit.checkpoint();
2504 /*
2505 * The TI active transaction cache may be a bit out of date, which can
2506@@ -238,8 +242,7 @@
2507 * copier does the right thing.
2508 */
2509 _persistit.getTransactionIndex().updateActiveTransactionCache();
2510- jman.copyBack();
2511-
2512+ drainJournal();
2513 assertEquals(jman.getBaseAddress(), jman.getCurrentAddress());
2514 assertEquals(0, jman.getPageMapSize());
2515
2516@@ -381,9 +384,13 @@
2517 assertTrue(_persistit.getJournalManager().getHandleCount() > updatedHandleValue);
2518 }
2519
2520+ private final static int T1 = 1000;
2521+ private final static int T2 = 2000;
2522+
2523 @Test
2524 public void testIndexHoles() throws Exception {
2525 _persistit.getJournalManager().setAppendOnly(true);
2526+
2527 final Transaction transaction = _persistit.getTransaction();
2528 final StringBuilder sb = new StringBuilder();
2529 while (sb.length() < 1000) {
2530@@ -392,7 +399,7 @@
2531
2532 final String s = sb.toString();
2533 for (int cycle = 0; cycle < 2; cycle++) {
2534- for (int i = 1000; i < 2000; i++) {
2535+ for (int i = T1; i < T2; i++) {
2536 final Exchange exchange = _persistit.getExchange("persistit", "RecoveryTest" + i, true);
2537 transaction.begin();
2538 try {
2539@@ -407,7 +414,7 @@
2540 }
2541
2542 for (int j = 0; j < 20; j++) {
2543- for (int i = 1000; i < 2000; i++) {
2544+ for (int i = T1; i < T2; i++) {
2545 final Exchange exchange = _persistit.getExchange("persistit", "RecoveryTest" + i, true);
2546 transaction.begin();
2547 try {
2548@@ -419,7 +426,7 @@
2549 }
2550 }
2551
2552- for (int i = 1000; i < 2000; i += 2) {
2553+ for (int i = T1; i < T2; i += 2) {
2554 transaction.begin();
2555 try {
2556 final Exchange exchange = _persistit.getExchange("persistit", "RecoveryTest" + i, true);
2557@@ -430,6 +437,7 @@
2558 }
2559 }
2560 }
2561+ _persistit.checkAllVolumes();
2562 _persistit.crash();
2563
2564 _persistit = new Persistit();
2565
2566=== modified file 'src/test/java/com/persistit/SplitPolicyTest.java'
2567--- src/test/java/com/persistit/SplitPolicyTest.java 2012-08-24 13:57:19 +0000
2568+++ src/test/java/com/persistit/SplitPolicyTest.java 2013-04-29 21:38:25 +0000
2569@@ -253,7 +253,7 @@
2570 //
2571 // forward sequential
2572 //
2573- for (long page = 2; page < 20; page++) {
2574+ for (long page = ex.clear().append(0).fetchBufferCopy(0).getPageAddress(); page < 20; page++) {
2575 final Buffer buffer = ex.getBufferPool().get(ex.getVolume(), page, false, true);
2576 if (buffer.isDataPage()) {
2577 final int available = buffer.getAvailableSize();
2578
2579=== added file 'src/test/java/com/persistit/TimelyResourceTest.java'
2580--- src/test/java/com/persistit/TimelyResourceTest.java 1970-01-01 00:00:00 +0000
2581+++ src/test/java/com/persistit/TimelyResourceTest.java 2013-04-29 21:38:25 +0000
2582@@ -0,0 +1,290 @@
2583+/**
2584+ * Copyright © 2011-2012 Akiban Technologies, Inc. All rights reserved.
2585+ *
2586+ * This program and the accompanying materials are made available
2587+ * under the terms of the Eclipse Public License v1.0 which
2588+ * accompanies this distribution, and is available at
2589+ * http://www.eclipse.org/legal/epl-v10.html
2590+ *
2591+ * This program may also be available under different license terms.
2592+ * For more information, see www.akiban.com or contact licensing@akiban.com.
2593+ *
2594+ * Contributors:
2595+ * Akiban Technologies, Inc.
2596+ */
2597+
2598+package com.persistit;
2599+
2600+import static com.persistit.TransactionIndex.tss2vh;
2601+import static com.persistit.TransactionStatus.UNCOMMITTED;
2602+import static com.persistit.unit.ConcurrentUtil.assertSuccess;
2603+import static com.persistit.unit.ConcurrentUtil.createThread;
2604+import static com.persistit.unit.ConcurrentUtil.join;
2605+import static com.persistit.unit.ConcurrentUtil.start;
2606+import static com.persistit.util.Util.NS_PER_S;
2607+import static org.junit.Assert.assertEquals;
2608+import static org.junit.Assert.assertTrue;
2609+
2610+import java.util.ArrayList;
2611+import java.util.Iterator;
2612+import java.util.List;
2613+import java.util.Random;
2614+import java.util.concurrent.Semaphore;
2615+import java.util.concurrent.atomic.AtomicInteger;
2616+
2617+import org.junit.Test;
2618+
2619+import com.persistit.Version.PrunableVersion;
2620+import com.persistit.Version.VersionCreator;
2621+import com.persistit.exception.PersistitException;
2622+import com.persistit.exception.RollbackException;
2623+import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
2624+import com.persistit.unit.ConcurrentUtil.UncaughtExceptionHandler;
2625+import com.persistit.util.Util;
2626+
2627+public class TimelyResourceTest extends PersistitUnitTestCase {
2628+
2629+ private int _idCounter;
2630+
2631+ static class TestVersion implements PrunableVersion {
2632+ final int _id;
2633+ final TimelyResourceTest _test;
2634+ final AtomicInteger _pruned = new AtomicInteger();
2635+
2636+ TestVersion(final int id, final TimelyResourceTest test) {
2637+ _id = id;
2638+ _test = test;
2639+ }
2640+
2641+ @Override
2642+ public boolean prune() {
2643+ _pruned.incrementAndGet();
2644+ return true;
2645+ }
2646+
2647+ @Override
2648+ public void vacate() {
2649+ System.out.println("No more versions");
2650+ }
2651+
2652+ @Override
2653+ public String toString() {
2654+ return String.format("<%,d:%,d>", _id, _pruned.get());
2655+ }
2656+
2657+ TimelyResourceTest getContainer() {
2658+ return _test;
2659+ }
2660+ }
2661+
2662+ @Test
2663+ public void testAddAndPruneResources() throws Exception {
2664+ testAddAndPruneResources1(false);
2665+ testAddAndPruneResources1(true);
2666+ }
2667+
2668+ private void testAddAndPruneResources1(final boolean withTransactions) throws Exception {
2669+ final Transaction txn = _persistit.getTransaction();
2670+ final TimelyResource<TestVersion> tr = new TimelyResource<TestVersion>(_persistit);
2671+ final long[] history = new long[5];
2672+ final TestVersion[] resources = new TestVersion[5];
2673+ for (int i = 0; i < 5; i++) {
2674+ if (withTransactions) {
2675+ txn.begin();
2676+ }
2677+ final TestVersion resource = new TestVersion(i, this);
2678+ resources[i] = resource;
2679+ if (!tr.isEmpty()) {
2680+ tr.delete();
2681+ }
2682+ tr.addVersion(resource, txn);
2683+ if (withTransactions) {
2684+ txn.commit();
2685+ txn.end();
2686+ }
2687+ history[i] = _persistit.getTimestampAllocator().updateTimestamp();
2688+ }
2689+ assertEquals("Incorrect version count " + withTransactions, 9, tr.getVersionCount());
2690+
2691+ for (int i = 0; i < 5; i++) {
2692+ final TestVersion t = tr.getVersion(tss2vh(history[i], 0));
2693+ assertTrue("Missing version " + withTransactions, t != null);
2694+ assertEquals("Wrong version " + withTransactions, i, t._id);
2695+ }
2696+ _persistit.getTransactionIndex().updateActiveTransactionCache();
2697+ tr.prune();
2698+ assertEquals("Should have one version left " + withTransactions, 1, tr.getVersionCount());
2699+ assertEquals("Wrong version " + withTransactions, 4, tr.getVersion(tss2vh(UNCOMMITTED, 0))._id);
2700+
2701+ tr.delete();
2702+
2703+ assertEquals("Should have two versions left " + withTransactions, 2, tr.getVersionCount());
2704+ _persistit.getTransactionIndex().updateActiveTransactionCache();
2705+ tr.prune();
2706+ assertEquals("Should have no versions left " + withTransactions, 0, tr.getVersionCount());
2707+
2708+ for (int i = 0; i < 5; i++) {
2709+ assertEquals("Should have been pruned " + withTransactions, 1, resources[i]._pruned.get());
2710+ }
2711+ }
2712+
2713+ @Test
2714+ public void concurrentAddAndPruneResources() throws Exception {
2715+ final TimelyResource<TestVersion> tr = new TimelyResource<TestVersion>(_persistit);
2716+ final Random random = new Random(1);
2717+ final long expires = System.nanoTime() + 10 * NS_PER_S;
2718+ final AtomicInteger sequence = new AtomicInteger();
2719+ final AtomicInteger rollbackCount = new AtomicInteger();
2720+ final List<Thread> threads = new ArrayList<Thread>();
2721+ final UncaughtExceptionHandler handler = new UncaughtExceptionHandler();
2722+ int threadCounter = 0;
2723+ while (System.nanoTime() < expires) {
2724+ for (final Iterator<Thread> iter = threads.iterator(); iter.hasNext();) {
2725+ if (!iter.next().isAlive()) {
2726+ iter.remove();
2727+ }
2728+ }
2729+ while (threads.size() < 20) {
2730+ final Thread t = createThread(String.format("Thread_%06d", ++threadCounter), new ThrowingRunnable() {
2731+ @Override
2732+ public void run() throws Exception {
2733+ doConcurrentTransaction(tr, random, sequence, rollbackCount);
2734+ }
2735+ });
2736+ threads.add(t);
2737+ start(handler, t);
2738+ }
2739+ Util.sleep(10);
2740+ tr.prune();
2741+ }
2742+ join(Long.MAX_VALUE, handler.getThrowableMap(), threads.toArray(new Thread[threads.size()]));
2743+ assertSuccess(handler.getThrowableMap());
2744+ assertTrue("Every transaction rolled back", rollbackCount.get() < sequence.get());
2745+ System.out.printf("%,d entries, %,d rollbacks\n", sequence.get(), rollbackCount.get());
2746+ }
2747+
2748+ private void doConcurrentTransaction(final TimelyResource<TestVersion> tr, final Random random,
2749+ final AtomicInteger sequence, final AtomicInteger rollbackCount) throws PersistitException {
2750+ try {
2751+ final Transaction txn = _persistit.getTransaction();
2752+ for (int i = 0; i < 25; i++) {
2753+ txn.begin();
2754+ try {
2755+ final int id = sequence.incrementAndGet();
2756+ tr.addVersion(new TestVersion(id, this), txn);
2757+ final int delay = (1 << random.nextInt(3));
2758+ // Up to 7/1000 of a second
2759+ Util.sleep(delay);
2760+ final TestVersion mine = tr.getVersion();
2761+ assertEquals("Should not have been pruned yet", 0, mine._pruned.get());
2762+ assertEquals("Wrong resource", id, mine._id);
2763+ if (random.nextInt(10) == 0) {
2764+ txn.rollback();
2765+ } else {
2766+ txn.commit();
2767+ }
2768+ } catch (final RollbackException e) {
2769+ txn.rollback();
2770+ rollbackCount.incrementAndGet();
2771+ } finally {
2772+ txn.end();
2773+ }
2774+ }
2775+ } catch (final RollbackException e) {
2776+ rollbackCount.incrementAndGet();
2777+ }
2778+ }
2779+
2780+ @Test
2781+ public void deleteResource() throws Exception {
2782+ final TimelyResource<TestVersion> tr = new TimelyResource<TestVersion>(_persistit);
2783+ _idCounter = 0;
2784+ final VersionCreator<TestVersion> creator = new VersionCreator<TestVersion>() {
2785+
2786+ @Override
2787+ public TestVersion createVersion(final TimelyResource<? extends TestVersion> resource)
2788+ throws PersistitException {
2789+ return new TestVersion(++_idCounter, TimelyResourceTest.this);
2790+ }
2791+ };
2792+ final Transaction txn1 = _persistit.getTransaction();
2793+ _persistit.setSessionId(new SessionId());
2794+ final Transaction txn2 = _persistit.getTransaction();
2795+ TestVersion v1;
2796+ txn1.begin();
2797+ v1 = tr.getVersion(creator);
2798+ assertTrue("Version ID mismatch", v1._id == _idCounter);
2799+ txn2.begin();
2800+ txn1.incrementStep();
2801+ tr.delete();
2802+ tr.prune();
2803+ assertEquals("Should still be two versions", 2, tr.getVersionCount());
2804+ txn2.commit();
2805+ txn1.commit();
2806+ _persistit.getTransactionIndex().updateActiveTransactionCache();
2807+ tr.prune();
2808+ assertEquals("Should now have no versions", 0, tr.getVersionCount());
2809+ txn1.end();
2810+ txn2.end();
2811+
2812+ }
2813+
2814+ @Test
2815+ public void versions() throws Exception {
2816+ final TimelyResource<TestVersion> tr = new TimelyResource<TestVersion>(_persistit);
2817+ _idCounter = 0;
2818+ final VersionCreator<TestVersion> creator = new VersionCreator<TestVersion>() {
2819+
2820+ @Override
2821+ public TestVersion createVersion(final TimelyResource<? extends TestVersion> resource)
2822+ throws PersistitException {
2823+ return new TestVersion(++_idCounter, TimelyResourceTest.this);
2824+ }
2825+ };
2826+ final Semaphore semaphore1 = new Semaphore(0);
2827+ final Transaction txn = _persistit.getTransaction();
2828+ final Thread[] threads = new Thread[10];
2829+ for (int i = 0; i < 10; i++) {
2830+ try {
2831+ txn.begin();
2832+ tr.addVersion(creator.createVersion(tr), txn);
2833+ txn.commit();
2834+ } finally {
2835+ txn.end();
2836+ }
2837+ final Semaphore semaphore2 = new Semaphore(0);
2838+ threads[i] = new Thread(new Runnable() {
2839+ @Override
2840+ public void run() {
2841+ final Transaction txn = _persistit.getTransaction();
2842+ try {
2843+ txn.begin();
2844+ final TestVersion t = tr.getVersion();
2845+ assertEquals(t._id, _idCounter);
2846+ semaphore2.release();
2847+ semaphore1.acquire();
2848+ txn.commit();
2849+ } catch (final Exception e) {
2850+ e.printStackTrace();
2851+ } finally {
2852+ txn.end();
2853+ }
2854+ }
2855+ });
2856+ threads[i].start();
2857+ semaphore2.acquire();
2858+ }
2859+ _persistit.getTransactionIndex().updateActiveTransactionCache();
2860+ tr.prune();
2861+ assertEquals(10, tr.getVersionCount());
2862+ semaphore1.release(10);
2863+ for (final Thread thread : threads) {
2864+ thread.join();
2865+ }
2866+ _persistit.getTransactionIndex().updateActiveTransactionCache();
2867+ tr.prune();
2868+ assertEquals(1, tr.getVersionCount());
2869+ assertEquals("Surviving primordial version should be last one committed", 10, tr.getVersion(null)._id);
2870+ }
2871+
2872+}
2873
2874=== modified file 'src/test/java/com/persistit/TreeLifetimeTest.java'
2875--- src/test/java/com/persistit/TreeLifetimeTest.java 2012-08-24 13:57:19 +0000
2876+++ src/test/java/com/persistit/TreeLifetimeTest.java 2013-04-29 21:38:25 +0000
2877@@ -15,20 +15,6 @@
2878
2879 package com.persistit;
2880
2881-import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_A;
2882-import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_B;
2883-import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_C;
2884-import static com.persistit.util.SequencerConstants.TREE_CREATE_REMOVE_SCHEDULE;
2885-import static com.persistit.util.ThreadSequencer.addSchedules;
2886-import static com.persistit.util.ThreadSequencer.array;
2887-import static com.persistit.util.ThreadSequencer.describeHistory;
2888-import static com.persistit.util.ThreadSequencer.describePartialOrdering;
2889-import static com.persistit.util.ThreadSequencer.disableSequencer;
2890-import static com.persistit.util.ThreadSequencer.enableSequencer;
2891-import static com.persistit.util.ThreadSequencer.historyMeetsPartialOrdering;
2892-import static com.persistit.util.ThreadSequencer.out;
2893-import static com.persistit.util.ThreadSequencer.rawSequenceHistoryCopy;
2894-import static com.persistit.util.ThreadSequencer.sequence;
2895 import static org.junit.Assert.assertEquals;
2896 import static org.junit.Assert.assertFalse;
2897 import static org.junit.Assert.assertNotNull;
2898@@ -38,7 +24,6 @@
2899
2900 import java.util.Arrays;
2901 import java.util.List;
2902-import java.util.concurrent.ConcurrentLinkedQueue;
2903
2904 import org.junit.Test;
2905
2906@@ -49,9 +34,6 @@
2907
2908 public class TreeLifetimeTest extends PersistitUnitTestCase {
2909 private static final String TREE_NAME = "tree_one";
2910- final int A = TREE_CREATE_REMOVE_A;
2911- final int B = TREE_CREATE_REMOVE_B;
2912- final int C = TREE_CREATE_REMOVE_C;
2913
2914 private Volume getVolume() {
2915 return _persistit.getVolume(UnitTestProperties.VOLUME_NAME);
2916@@ -62,11 +44,34 @@
2917 }
2918
2919 @Test
2920- public void testRemovedTreeGoesToGarbageChain() throws PersistitException {
2921+ public void testRemovedTreeGoesToGarbageChainNoTxn() throws PersistitException {
2922+ Exchange ex = getExchange(true);
2923+ for (int i = 0; i < 5; ++i) {
2924+ ex.clear().append(i).getValue().clear().put(i);
2925+ ex.store();
2926+ }
2927+ _persistit.releaseExchange(ex);
2928+ ex = null;
2929+
2930+ ex = getExchange(false);
2931+ final long treeRoot = ex.getTree().getRootPageAddr();
2932+ ex.removeTree();
2933+ _persistit.releaseExchange(ex);
2934+ ex = null;
2935+ _persistit.cleanup();
2936+
2937+ final List<Long> garbage = getVolume().getStructure().getGarbageList();
2938+ assertTrue("Expected tree root <" + treeRoot + "> in garbage list <" + garbage.toString() + ">",
2939+ garbage.contains(treeRoot));
2940+ }
2941+
2942+ @Test
2943+ public void testRemovedTreeGoesToGarbageChainTxn() throws PersistitException {
2944 final Transaction txn = _persistit.getTransaction();
2945+ Exchange ex;
2946
2947 txn.begin();
2948- Exchange ex = getExchange(true);
2949+ ex = getExchange(true);
2950 for (int i = 0; i < 5; ++i) {
2951 ex.clear().append(i).getValue().clear().put(i);
2952 ex.store();
2953@@ -84,7 +89,7 @@
2954 txn.end();
2955 _persistit.releaseExchange(ex);
2956 ex = null;
2957-
2958+ _persistit.cleanup();
2959 final List<Long> garbage = getVolume().getStructure().getGarbageList();
2960 assertTrue("Expected tree root <" + treeRoot + "> in garbage list <" + garbage.toString() + ">",
2961 garbage.contains(treeRoot));
2962@@ -143,71 +148,4 @@
2963 assertNull("Tree should not exist after cleanup action", getVolume().getTree(TREE_NAME, false));
2964 }
2965
2966- @Test
2967- public void testReanimatedTreeCreateAndRemoveSynchronization() throws PersistitException, InterruptedException {
2968- enableSequencer(true);
2969- addSchedules(TREE_CREATE_REMOVE_SCHEDULE);
2970-
2971- final ConcurrentLinkedQueue<Throwable> threadErrors = new ConcurrentLinkedQueue<Throwable>();
2972-
2973- final Exchange origEx = getExchange(true);
2974- for (int i = 0; i < 5; ++i) {
2975- origEx.clear().append(i).store();
2976- }
2977- _persistit.releaseExchange(origEx);
2978-
2979- final Thread thread1 = new Thread(new Runnable() {
2980- @Override
2981- public void run() {
2982- Exchange ex = null;
2983- try {
2984- ex = getExchange(false);
2985- ex.removeTree();
2986- } catch (final Throwable t) {
2987- threadErrors.add(t);
2988- }
2989- if (ex != null) {
2990- _persistit.releaseExchange(ex);
2991- }
2992- }
2993- });
2994-
2995- final Thread thread2 = new Thread(new Runnable() {
2996- @Override
2997- public void run() {
2998- sequence(TREE_CREATE_REMOVE_B);
2999- Exchange ex = null;
3000- try {
3001- ex = getExchange(true);
3002- int count = 0;
3003- while (ex.next(true)) {
3004- ++count;
3005- }
3006- sequence(TREE_CREATE_REMOVE_C);
3007- assertEquals("New tree has zero keys in it", 0, count);
3008- } catch (final Throwable t) {
3009- threadErrors.add(t);
3010- }
3011- if (ex != null) {
3012- _persistit.releaseExchange(ex);
3013- }
3014- }
3015- });
3016-
3017- thread1.start();
3018- thread2.start();
3019-
3020- thread1.join();
3021- thread2.join();
3022-
3023- assertEquals("Threads had no exceptions", "[]", threadErrors.toString());
3024-
3025- final int[] actual = rawSequenceHistoryCopy();
3026- final int[][] expectedSequence = { array(A, B), array(out(B)), array(C), array(out(A), out(C)) };
3027- if (!historyMeetsPartialOrdering(actual, expectedSequence)) {
3028- assertEquals("Unexpected sequencing", describePartialOrdering(expectedSequence), describeHistory(actual));
3029- }
3030-
3031- disableSequencer();
3032- }
3033 }
3034
3035=== added file 'src/test/java/com/persistit/TreeTransactionalLifetimeTest.java'
3036--- src/test/java/com/persistit/TreeTransactionalLifetimeTest.java 1970-01-01 00:00:00 +0000
3037+++ src/test/java/com/persistit/TreeTransactionalLifetimeTest.java 2013-04-29 21:38:25 +0000
3038@@ -0,0 +1,277 @@
3039+/**
3040+ * Copyright © 2013 Akiban Technologies, Inc. All rights reserved.
3041+ *
3042+ * This program and the accompanying materials are made available
3043+ * under the terms of the Eclipse Public License v1.0 which
3044+ * accompanies this distribution, and is available at
3045+ * http://www.eclipse.org/legal/epl-v10.html
3046+ *
3047+ * This program may also be available under different license terms.
3048+ * For more information, see www.akiban.com or contact licensing@akiban.com.
3049+ *
3050+ * Contributors:
3051+ * Akiban Technologies, Inc.
3052+ */
3053+
3054+package com.persistit;
3055+
3056+import static com.persistit.unit.ConcurrentUtil.assertSuccess;
3057+import static com.persistit.unit.ConcurrentUtil.createThread;
3058+import static com.persistit.unit.ConcurrentUtil.join;
3059+import static com.persistit.unit.ConcurrentUtil.start;
3060+import static org.junit.Assert.assertEquals;
3061+import static org.junit.Assert.assertNull;
3062+import static org.junit.Assert.assertTrue;
3063+
3064+import java.util.Map;
3065+import java.util.concurrent.Semaphore;
3066+
3067+import org.junit.Test;
3068+
3069+import com.persistit.exception.PersistitException;
3070+import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
3071+
3072+public class TreeTransactionalLifetimeTest extends PersistitUnitTestCase {
3073+ final static int TIMEOUT_MS = 10000;
3074+
3075+ final Semaphore semA = new Semaphore(0);
3076+ final Semaphore semB = new Semaphore(0);
3077+ final Semaphore semT = new Semaphore(0);
3078+ final Semaphore semU = new Semaphore(0);
3079+
3080+ /*
3081+ * This class needs to be in com.persistit because of some package-private
3082+ * methods used in controlling the test.
3083+ */
3084+
3085+ private Tree tree(final String name) throws PersistitException {
3086+ return vstruc().getTree(name, false);
3087+ }
3088+
3089+ private Exchange exchange(final String name) throws PersistitException {
3090+ return _persistit.getExchange("persistit", name, true);
3091+ }
3092+
3093+ private VolumeStructure vstruc() {
3094+ return _persistit.getVolume("persistit").getStructure();
3095+ }
3096+
3097+ @Test
3098+ public void simplePruning() throws Exception {
3099+
3100+ final Transaction txn = _persistit.getTransaction();
3101+ for (int i = 0; i < 2; i++) {
3102+ txn.begin();
3103+ final Exchange ex = exchange("ttlt");
3104+ ex.getValue().put(RED_FOX);
3105+ ex.to(1).store();
3106+ ex.to(2).store();
3107+ txn.rollback();
3108+ txn.end();
3109+ }
3110+ _persistit.cleanup();
3111+ assertEquals("There should be no tree", null, tree("ttlt"));
3112+ assertTrue(vstruc().getGarbageRoot() != 0);
3113+ }
3114+
3115+ @Test
3116+ public void createdTreeIsNotVisibleUntilCommit() throws Exception {
3117+ final Thread t = createThread("ttlt", new TExec() {
3118+ @Override
3119+ void exec(final Transaction txn) throws Exception {
3120+ final Exchange ex1 = exchange("ttlt");
3121+ ex1.getValue().put(RED_FOX);
3122+ ex1.to(1).store();
3123+ semA.release();
3124+ semB.acquire();
3125+ txn.commit();
3126+ }
3127+ });
3128+ final Map<Thread, Throwable> errors = start(t);
3129+ semA.acquire();
3130+ assertEquals(null, _persistit.getVolume("persistit").getTree("ttlt", false));
3131+ semB.release();
3132+ join(TIMEOUT_MS, errors, t);
3133+ assertSuccess(errors);
3134+ final Exchange ex = exchange("ttlt");
3135+ assertTrue(ex.to(Key.BEFORE).next());
3136+ assertEquals(1, ex.getKey().decodeInt());
3137+ }
3138+
3139+ @Test
3140+ public void removeTreeIsNotVisibleUntilCommit() throws Exception {
3141+ final Exchange ex = exchange("ttlt");
3142+ ex.getValue().put(RED_FOX);
3143+ ex.to(1).store();
3144+ final Thread t = createThread("ttlt", new TExec() {
3145+ @Override
3146+ void exec(final Transaction txn) throws Exception {
3147+ final Exchange ex1 = exchange("ttlt");
3148+ ex1.removeTree();
3149+ semA.release();
3150+ semB.acquire();
3151+ txn.commit();
3152+ }
3153+ });
3154+ final Map<Thread, Throwable> errors = start(t);
3155+ semA.acquire();
3156+ assertEquals(ex.getTree(), ex.getVolume().getTree("ttlt", false));
3157+ semB.release();
3158+ join(TIMEOUT_MS, errors, t);
3159+ assertNull(ex.getVolume().getTree("ttlt", false));
3160+ }
3161+
3162+ @Test
3163+ public void removeCreateRemove() throws Exception {
3164+
3165+ final Exchange ex = exchange("ttlt");
3166+ ex.getValue().put(RED_FOX);
3167+ ex.to(1).store();
3168+ final Thread t = createThread("ttlt", new TExec() {
3169+ @Override
3170+ void exec(final Transaction txn) throws Exception {
3171+ final Exchange ex1 = exchange("ttlt");
3172+ ex1.removeTree();
3173+ final Exchange ex2 = exchange("ttlt");
3174+ ex2.getValue().put(RED_FOX);
3175+ ex2.to(2).store();
3176+ ex2.removeTree();
3177+ final Exchange ex3 = exchange("ttlt");
3178+ ex3.getValue().put(RED_FOX);
3179+ ex3.to(3).store();
3180+ final Exchange ex4 = exchange("ttlt");
3181+ ex4.to(Key.BEFORE);
3182+ assertTrue(ex4.next());
3183+ assertEquals(3, ex4.getKey().decodeInt());
3184+ assertTrue(!ex4.next());
3185+ semA.release();
3186+ semB.acquire();
3187+ txn.rollback();
3188+ }
3189+ });
3190+ final Map<Thread, Throwable> errors = start(t);
3191+ semA.acquire();
3192+ assertEquals(ex.getTree(), ex.getVolume().getTree("ttlt", false));
3193+ semB.release();
3194+ join(TIMEOUT_MS, errors, t);
3195+ _persistit.getTransactionIndex().updateActiveTransactionCache();
3196+ _persistit.pruneTimelyResources();
3197+ assertTrue(ex.to(Key.BEFORE).next());
3198+ assertEquals(1, ex.getKey().decodeInt());
3199+ assertTrue(!ex.next());
3200+ assertTrue(ex.getVolume().getStructure().getGarbageRoot() != 0);
3201+ }
3202+
3203+ @Test
3204+ public void createRemoveByStep() throws Exception {
3205+ createRemoveByStepHelper("ttlt1", false, true, false, false, "0,1:,2:a=step2,3,4:b=step4", "0:b=step4");
3206+ createRemoveByStepHelper("ttlt2", false, false, false, false, "0,1:,2:a=step2,3,4:b=step4", "0");
3207+ createRemoveByStepHelper("ttlt3", true, true, false, false, "0:,1:,2:a=step2,3,4:b=step4", "0:b=step4");
3208+ createRemoveByStepHelper("ttlt4", true, false, false, false, "0:,1:,2:a=step2,3,4:b=step4", "0:");
3209+
3210+ createRemoveByStepHelper("ttlt5", false, true, false, true, "0,1:,2:a=step2,3,4:b=step4", "0:b=step4");
3211+ createRemoveByStepHelper("ttlt6", false, false, false, true, "0,1:,2:a=step2,3,4:b=step4", "0");
3212+ createRemoveByStepHelper("ttlt7", true, true, false, true, "0:,1:,2:a=step2,3,4:b=step4", "0:b=step4");
3213+ createRemoveByStepHelper("ttlt8", true, false, false, true, "0:,1:,2:a=step2,3,4:b=step4", "0:");
3214+
3215+ createRemoveByStepHelper("ttlt1cr", false, true, true, false, "0,1:,2:a=step2,3,4:b=step4", "0");
3216+ createRemoveByStepHelper("ttlt2cr", false, false, true, false, "0,1:,2:a=step2,3,4:b=step4", "0");
3217+ createRemoveByStepHelper("ttlt3cr", true, true, true, false, "0:,1:,2:a=step2,3,4:b=step4", "0:");
3218+ createRemoveByStepHelper("ttlt4cr", true, false, false, false, "0:,1:,2:a=step2,3,4:b=step4", "0:");
3219+
3220+ createRemoveByStepHelper("ttlt5cr", false, true, true, true, "0,1:,2:a=step2,3,4:b=step4", "0");
3221+ createRemoveByStepHelper("ttlt6cr", false, false, true, true, "0,1:,2:a=step2,3,4:b=step4", "0");
3222+ createRemoveByStepHelper("ttlt7cr", true, true, true, true, "0:,1:,2:a=step2,3,4:b=step4", "0:");
3223+ createRemoveByStepHelper("ttlt8cr", true, false, true, true, "0:,1:,2:a=step2,3,4:b=step4", "0:");
3224+ }
3225+
3226+ private void createRemoveByStepHelper(final String treeName, final boolean primordial, final boolean commit,
3227+ final boolean crash, final boolean restart, final String expected1, final String expected2)
3228+ throws Exception {
3229+
3230+ final Transaction txn = _persistit.getTransaction();
3231+ final Volume volume = _persistit.getVolume("persistit");
3232+ if (primordial) {
3233+ volume.getTree(treeName, true);
3234+ }
3235+ txn.begin();
3236+ try {
3237+ txn.setStep(1);
3238+ volume.getTree(treeName, true);
3239+
3240+ txn.setStep(2);
3241+ final Exchange ex1 = exchange(treeName);
3242+ ex1.getValue().put("step2");
3243+ ex1.to("a").store();
3244+
3245+ txn.setStep(3);
3246+ ex1.removeTree();
3247+
3248+ txn.setStep(4);
3249+ final Exchange ex2 = exchange(treeName);
3250+ ex2.getValue().put("step4");
3251+ ex2.to("b").store();
3252+
3253+ assertEquals("Expected contents at steps", expected1, computeCreateRemoveState(treeName, 5));
3254+
3255+ if (crash) {
3256+ _persistit.checkpoint();
3257+ _persistit.crash();
3258+ } else {
3259+ if (commit) {
3260+ txn.commit();
3261+ } else {
3262+ txn.rollback();
3263+ }
3264+ }
3265+ } finally {
3266+ if (!crash) {
3267+ txn.end();
3268+ }
3269+ }
3270+ if (restart) {
3271+ _persistit.close();
3272+ }
3273+ if (crash || restart) {
3274+ _persistit = new Persistit(_config);
3275+ _persistit.initialize();
3276+ }
3277+ assertEquals("Expected contents at steps", expected2, computeCreateRemoveState(treeName, 1));
3278+ }
3279+
3280+ private String computeCreateRemoveState(final String treeName, final int steps) throws PersistitException {
3281+ final StringBuilder sb = new StringBuilder();
3282+ for (int step = 0; step < steps; step++) {
3283+ _persistit.getTransaction().setStep(step);
3284+ if (sb.length() > 0) {
3285+ sb.append(",");
3286+ }
3287+ sb.append(step);
3288+ if (_persistit.getVolume("persistit").getTree(treeName, false) != null) {
3289+ sb.append(":");
3290+ final Exchange ex = exchange(treeName);
3291+ ex.append(Key.BEFORE);
3292+ while (ex.next()) {
3293+ sb.append(ex.getKey().decodeString()).append("=").append(ex.getValue().getString());
3294+ }
3295+ }
3296+ }
3297+ return sb.toString();
3298+ }
3299+
3300+ abstract class TExec extends ThrowingRunnable {
3301+
3302+ @Override
3303+ public void run() throws Exception {
3304+ final Transaction txn = _persistit.getTransaction();
3305+ txn.begin();
3306+ try {
3307+ exec(txn);
3308+ } finally {
3309+ txn.end();
3310+ }
3311+ }
3312+
3313+ abstract void exec(final Transaction txn) throws Exception;
3314+ }
3315+}
3316
3317=== modified file 'src/test/java/com/persistit/unit/ConcurrentUtil.java'
3318--- src/test/java/com/persistit/unit/ConcurrentUtil.java 2012-08-24 13:57:19 +0000
3319+++ src/test/java/com/persistit/unit/ConcurrentUtil.java 2013-04-29 21:38:25 +0000
3320@@ -21,11 +21,49 @@
3321 import java.util.HashMap;
3322 import java.util.Map;
3323
3324+/**
3325+ * Helper methods to create, start, join and check error status of test threads.
3326+ * The key element is a Map<Thread, Throwable> the can be checked after threads
3327+ * run for unhandled exceptions. The {@link #assertSuccess(Map)} method is
3328+ * called in the main thread to aggregate and report all exceptions that
3329+ * occurred in other threads.
3330+ */
3331 public class ConcurrentUtil {
3332+
3333+ /**
3334+ * An implementation of {@link Thread.UncaughtExceptionHandler} which
3335+ * records any uncaught errors or exceptions in a map. A test case can pass
3336+ * the map to the {@link ConcurrentUtil#assertSuccess(Map)} method verify
3337+ * that no exceptions or errors were caught on test threads.
3338+ *
3339+ */
3340+ public static class UncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
3341+ final Map<Thread, Throwable> throwableMap = Collections.synchronizedMap(new HashMap<Thread, Throwable>());
3342+
3343+ @Override
3344+ public void uncaughtException(final Thread t, final Throwable e) {
3345+ throwableMap.put(t, e);
3346+ }
3347+
3348+ public Map<Thread, Throwable> getThrowableMap() {
3349+ return throwableMap;
3350+ }
3351+ }
3352+
3353+ /**
3354+ * A version of Runnable in which the #run method throws Exception.
3355+ */
3356 public static abstract class ThrowingRunnable {
3357 public abstract void run() throws Throwable;
3358 }
3359
3360+ /**
3361+ * Create a named thread from a ThrowableRunnable.
3362+ *
3363+ * @param name
3364+ * @param runnable
3365+ * @return
3366+ */
3367 public static Thread createThread(final String name, final ThrowingRunnable runnable) {
3368 return new Thread(new Runnable() {
3369 @Override
3370@@ -40,34 +78,49 @@
3371 }
3372
3373 /**
3374- * Start and join on all given threads. Wait on each thread, individually,
3375- * for <code>timeout</code> milliseconds. The {@link Thread#join(long)}
3376- * method is used for this (<code>0</code> means indefinite).
3377+ * Start all given threads. Return a map on which unhandled exceptions will
3378+ * be reported.
3379 *
3380- * @param timeout
3381- * How long to join on each thread for.
3382 * @param threads
3383- * Threads to start and join.
3384+ * Threads to start.
3385 *
3386 * @return A map with an entry for each thread that had an unhandled
3387 * exception or did not complete in the allotted time. This map will
3388 * be empty if all threads completed successfully.
3389 */
3390- public static Map<Thread, Throwable> startAndJoin(final long timeout, final Thread... threads) {
3391- final Map<Thread, Throwable> throwableMap = Collections.synchronizedMap(new HashMap<Thread, Throwable>());
3392-
3393- final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
3394- @Override
3395- public void uncaughtException(final Thread t, final Throwable e) {
3396- throwableMap.put(t, e);
3397- }
3398- };
3399-
3400+ public static Map<Thread, Throwable> start(final Thread... threads) {
3401+ final UncaughtExceptionHandler handler = new UncaughtExceptionHandler();
3402+ start(handler, threads);
3403+ return handler.getThrowableMap();
3404+ }
3405+
3406+ /**
3407+ * Start all given threads with the supplied UncaughtExceptionHandler. The
3408+ * handler will record any uncaught exceptions or errors in a map associated
3409+ * with the handler.
3410+ *
3411+ * @param handler
3412+ * @param threads
3413+ */
3414+ public static void start(final UncaughtExceptionHandler handler, final Thread... threads) {
3415 for (final Thread t : threads) {
3416 t.setUncaughtExceptionHandler(handler);
3417 t.start();
3418 }
3419
3420+ }
3421+
3422+ /**
3423+ * Wait on each thread, individually, for <code>timeout</code> milliseconds.
3424+ * The {@link Thread#join(long)} method is used for this (<code>0</code>
3425+ * means indefinite). Add an Exception to the error map for any thread that
3426+ * did not end within its timeout.
3427+ *
3428+ * @param timeout
3429+ * @param throwableMap
3430+ * @param threads
3431+ */
3432+ public static void join(final long timeout, final Map<Thread, Throwable> throwableMap, final Thread... threads) {
3433 for (final Thread t : threads) {
3434 Throwable error = null;
3435 try {
3436@@ -78,12 +131,42 @@
3437 } catch (final InterruptedException e) {
3438 error = e;
3439 }
3440-
3441 if (error != null) {
3442 throwableMap.put(t, error);
3443 }
3444 }
3445-
3446+ }
3447+
3448+ /**
3449+ * Assert that no thread had any unhandled exceptions or timeouts.
3450+ *
3451+ * @param throwableMap
3452+ * map in which threads accumulated any unhandled Exceptions
3453+ */
3454+ public static void assertSuccess(final Map<Thread, Throwable> throwableMap) {
3455+ String description = "";
3456+ for (final Map.Entry<Thread, Throwable> entry : throwableMap.entrySet()) {
3457+ description += " " + entry.getKey().getName() + "=" + entry.getValue().toString();
3458+ }
3459+ assertEquals("All threads completed successfully", "{}", "{" + description + "}");
3460+ }
3461+
3462+ /**
3463+ * Call {@link #start(Thread...)} for all threads and then
3464+ * {@link #join(long, Map, Thread...)} for all threads.
3465+ *
3466+ * @param timeout
3467+ * How long to join on each thread for.
3468+ * @param threads
3469+ * Threads to start and join.
3470+ *
3471+ * @return A map with an entry for each thread that had an unhandled
3472+ * exception or did not complete in the allotted time. This map will
3473+ * be empty if all threads completed successfully.
3474+ */
3475+ public static Map<Thread, Throwable> startAndJoin(final long timeout, final Thread... threads) {
3476+ final Map<Thread, Throwable> throwableMap = start(threads);
3477+ join(timeout, throwableMap, threads);
3478 return throwableMap;
3479 }
3480
3481@@ -98,11 +181,7 @@
3482 * Threads to start and join.
3483 */
3484 public static void startAndJoinAssertSuccess(final long timeout, final Thread... threads) {
3485- final Map<Thread, Throwable> errors = startAndJoin(timeout, threads);
3486- String description = "";
3487- for (final Map.Entry<Thread, Throwable> entry : errors.entrySet()) {
3488- description += " " + entry.getKey().getName() + "=" + entry.getValue().toString();
3489- }
3490- assertEquals("All threads completed successfully", "{}", "{" + description + "}");
3491+ final Map<Thread, Throwable> throwableMap = startAndJoin(timeout, threads);
3492+ assertSuccess(throwableMap);
3493 }
3494 }
3495
3496=== modified file 'src/test/java/com/persistit/unit/TransactionTest1.java'
3497--- src/test/java/com/persistit/unit/TransactionTest1.java 2012-08-24 13:57:19 +0000
3498+++ src/test/java/com/persistit/unit/TransactionTest1.java 2013-04-29 21:38:25 +0000
3499@@ -88,7 +88,7 @@
3500 } finally {
3501 txn.end();
3502 }
3503- assertTrue(!ex.getTree().isValid());
3504+ assertTrue(ex.getTree().isDeleted());
3505 try {
3506 ex.clear().append("test1").hasChildren();
3507 fail("Should have thrown an exception");

Subscribers

People subscribed via source and target branches