Merge lp:~pbeaman/akiban-persistit/tree_builder2 into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 421
Merged at revision: 417
Proposed branch: lp:~pbeaman/akiban-persistit/tree_builder2
Merge into: lp:akiban-persistit
Prerequisite: lp:~pbeaman/akiban-persistit/add-lock
Diff against target: 1328 lines (+538/-324)
8 files modified
src/main/java/com/persistit/BufferPool.java (+1/-4)
src/main/java/com/persistit/Persistit.java (+7/-0)
src/main/java/com/persistit/StreamLoader.java (+150/-148)
src/main/java/com/persistit/StreamSaver.java (+1/-1)
src/main/java/com/persistit/TreeBuilder.java (+269/-153)
src/test/java/com/persistit/TreeBuilderTest.java (+98/-17)
src/test/java/com/persistit/stress/InsertBigLoad.java (+4/-0)
src/test/java/com/persistit/stress/unit/BigLoad.java (+8/-1)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/tree_builder2
Reviewer Review Type Date Requested Status
Akiban Build User Needs Fixing
Nathan Williams Approve
Review via email: mp+144404@code.launchpad.net

Description of the change

A revised version of TreeBuilder that merges data much faster than the original version. The original version persisted the sorted tree data by writing temp volumes to disk. The resulting randomly ordered pages must be read non-sequentially during the merge phase.

This new version uses StreamSaver and StreamLoader to persist and re-read the sorted records in sequential file order, significantly reducing random disk reads during the merge process.

In one large test the merge process was reduced from nearly 5 hours to about 1hr 40min.

The add-lock branch is a prerequisite due to the version bump.

Note that names and semantics of some of the protected methods in TreeBuilder changed. In particular, this version provides a deterministic way to know which value was loaded first when a duplicate key is detected.

To post a comment you must log in.
Revision history for this message
Peter Beaman (pbeaman) wrote :

Fixed bug 1100038 which appears when using TreeBuilder. The bug is an redundant decrement in BufferPool#invalidate

Revision history for this message
Nathan Williams (nwilliams) wrote :

Persistit trunk and server dep is now at 3.2.5 so we'll probably want to bump it again.

The unique helper on Persistit isn't wrong, but seems far away from it's only usage. Constructing a TreeBuilder and then calling getName() would be mildly surprising with this appended. Maybe TreeBuilder doesn't even need an external name? Or the name written to disk could be an internal ID instead of the name?

There's quite a bit of shuffling here, but otherwise looks all plausible.

review: Needs Fixing
Revision history for this message
Peter Beaman (pbeaman) wrote :

Thanks.

Yes, unique() seems far away, but Persistit is our closest singleton and I prefer not to use a static. I suppose we could have multiple counters in TimestampAllocator if you prefer.

Changed setName/getName to have expected behavior.

I kind of like the external name as file name to help someone figure out what the saves files might be if the process is running for a long time or crashes. I'm not wedded to it, but that's what I had in mind.

Found and fixed a couple of other issues - especially one in which closing a temporary volume leaves all its dirty pages in the buffer pool. (isDirty -> clearDirty).

Note that this branch now has 3.2.6-upgrade as a prerequisite, but I did not know how to add that.

Revision history for this message
Nathan Williams (nwilliams) wrote :

The external name for the file makes sense and I'm fine with the rest.

review: Approve
Revision history for this message
Akiban Build User (build-akiban) wrote :

There was one failure during build/test:

* during merge: merge conflicts while merging lp:~pbeaman/akiban-persistit/tree_builder2

review: Needs Fixing
Revision history for this message
Peter Beaman (pbeaman) wrote :

Resolved conflicts. And A-pproving again because nothing substantive changed.

Revision history for this message
Akiban Build User (build-akiban) wrote :

There were 2 failures during build/test:

* job system-tests-mtr failed at build number 3175: http://172.16.20.104:8080/job/system-tests-mtr/3175/

* view must-pass failed: system-tests-mtr is red

review: Needs Fixing
Revision history for this message
Nathan Williams (nwilliams) wrote :

Unrelated.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/BufferPool.java'
2--- src/main/java/com/persistit/BufferPool.java 2013-01-24 18:12:23 +0000
3+++ src/main/java/com/persistit/BufferPool.java 2013-01-28 10:44:23 +0000
4@@ -659,10 +659,7 @@
5 }
6 }
7 buffer.clearValid();
8- if (buffer.isDirty()) {
9- _dirtyPageCount.decrementAndGet();
10- buffer.clearDirty();
11- }
12+ buffer.clearDirty();
13 buffer.setPageAddressAndVolume(0, null);
14 }
15
16
17=== modified file 'src/main/java/com/persistit/Persistit.java'
18--- src/main/java/com/persistit/Persistit.java 2013-01-24 18:19:19 +0000
19+++ src/main/java/com/persistit/Persistit.java 2013-01-28 10:44:23 +0000
20@@ -43,6 +43,7 @@
21 import java.util.TreeMap;
22 import java.util.WeakHashMap;
23 import java.util.concurrent.atomic.AtomicBoolean;
24+import java.util.concurrent.atomic.AtomicLong;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import javax.management.InstanceNotFoundException;
28@@ -285,6 +286,8 @@
29
30 private final ThreadLocal<SoftReference<Value>> _valueThreadLocal = new ThreadLocal<SoftReference<Value>>();
31
32+ private final AtomicLong _uniqueCounter = new AtomicLong();
33+
34 private volatile Volume _lockVolume;
35
36 /**
37@@ -2513,6 +2516,10 @@
38 return value;
39 }
40
41+ long unique() {
42+ return _uniqueCounter.incrementAndGet();
43+ }
44+
45 private final static String[] ARG_TEMPLATE = { "_flag|g|Start AdminUI",
46 "_flag|i|Perform IntegrityCheck on all volumes", "_flag|w|Wait until AdminUI exists",
47 "_flag|c|Perform copy-back", "properties|string|Property file name",
48
49=== modified file 'src/main/java/com/persistit/StreamLoader.java'
50--- src/main/java/com/persistit/StreamLoader.java 2012-08-24 13:57:19 +0000
51+++ src/main/java/com/persistit/StreamLoader.java 2013-01-28 10:44:23 +0000
52@@ -110,126 +110,126 @@
53 }
54
55 public void load(final ImportHandler handler) throws IOException, PersistitException {
56- String volumeName = null;
57- String treeName = null;
58- for (;;) {
59- final int b1 = _dis.read();
60- if (b1 == -1) {
61- break;
62- }
63- final int b2 = _dis.read();
64- final int recordType = ((b1 & 0xFF) << 8) + (b2 & 0xFF);
65-
66- switch (recordType) {
67- case StreamSaver.RECORD_TYPE_FILL: {
68- handler.handleFillRecord();
69- _otherRecordCount++;
70- break;
71- }
72- case StreamSaver.RECORD_TYPE_DATA: {
73- final int keySize = _dis.readShort();
74- final int elisionCount = _dis.readShort();
75- final int valueSize = _dis.readInt();
76- _value.ensureFit(valueSize);
77- _dis.read(_key.getEncodedBytes(), elisionCount, keySize - elisionCount);
78- _key.setEncodedSize(keySize);
79- _dis.read(_value.getEncodedBytes(), 0, valueSize);
80- _value.setEncodedSize(valueSize);
81- handler.handleDataRecord(_key, _value);
82- _dataRecordCount++;
83- break;
84- }
85- case StreamSaver.RECORD_TYPE_KEY_FILTER: {
86- final String filterString = _dis.readUTF();
87- handler.handleKeyFilterRecord(filterString);
88- _otherRecordCount++;
89- break;
90- }
91- case StreamSaver.RECORD_TYPE_VOLUME_ID: {
92- final long id = _dis.readLong();
93- final long initialPages = _dis.readLong();
94- final long extensionPages = _dis.readLong();
95- final long maximumPages = _dis.readLong();
96- final int bufferSize = _dis.readInt();
97- final String path = _dis.readUTF();
98- volumeName = _dis.readUTF();
99- handler.handleVolumeIdRecord(id, initialPages, extensionPages, maximumPages, bufferSize, path,
100- volumeName);
101- _otherRecordCount++;
102- break;
103- }
104- case StreamSaver.RECORD_TYPE_TREE_ID: {
105- treeName = _dis.readUTF();
106- handler.handleTreeIdRecord(treeName);
107- _otherRecordCount++;
108- postMessage("Loading Tree " + treeName, Task.LOG_VERBOSE);
109- break;
110- }
111- case StreamSaver.RECORD_TYPE_HOSTNAME: {
112- final String hostName = _dis.readUTF();
113- handler.handleHostNameRecord(hostName);
114- _otherRecordCount++;
115- break;
116- }
117- case StreamSaver.RECORD_TYPE_USER: {
118- final String hostName = _dis.readUTF();
119- handler.handleUserRecord(hostName);
120- _otherRecordCount++;
121- break;
122- }
123- case StreamSaver.RECORD_TYPE_COMMENT: {
124- final String comment = _dis.readUTF();
125- handler.handleCommentRecord(comment);
126- _otherRecordCount++;
127- break;
128- }
129- case StreamSaver.RECORD_TYPE_COUNT: {
130- final long dataRecordCount = _dis.readLong();
131- final long otherRecordCount = _dis.readLong();
132- handler.handleCountRecord(dataRecordCount, otherRecordCount);
133- _otherRecordCount++;
134- break;
135- }
136- case StreamSaver.RECORD_TYPE_START: {
137- handler.handleStartRecord();
138- _otherRecordCount++;
139- break;
140- }
141- case StreamSaver.RECORD_TYPE_END: {
142- handler.handleEndRecord();
143- _otherRecordCount++;
144- break;
145- }
146- case StreamSaver.RECORD_TYPE_TIMESTAMP: {
147- final long timeStamp = _dis.readLong();
148- handler.handleTimeStampRecord(timeStamp);
149- _otherRecordCount++;
150- break;
151- }
152- case StreamSaver.RECORD_TYPE_EXCEPTION: {
153- final String exceptionString = _dis.readUTF();
154- handler.handleExceptionRecord(exceptionString);
155- _otherRecordCount++;
156- break;
157- }
158- case StreamSaver.RECORD_TYPE_COMPLETION: {
159- handler.handleCompletionRecord();
160- _otherRecordCount++;
161- break;
162- }
163- default: {
164- throw new CorruptImportStreamException("Invalid record type " + recordType + " ("
165- + Util.bytesToHex(new byte[] { (byte) (recordType >>> 8), (byte) recordType })
166- + " after reading " + _dataRecordCount + " data records" + " and " + _otherRecordCount
167- + " other records");
168- }
169- }
170-
171+ while (next(handler)) {
172 }
173 postMessage(String.format("DONE - processed %,d data records and %,d other records", _dataRecordCount,
174 _otherRecordCount), Task.LOG_NORMAL);
175 }
176
177+ public boolean next(final ImportHandler handler) throws IOException, PersistitException {
178+ final int b1 = _dis.read();
179+ if (b1 == -1) {
180+ return false;
181+ }
182+ final int b2 = _dis.read();
183+ final int recordType = ((b1 & 0xFF) << 8) + (b2 & 0xFF);
184+
185+ switch (recordType) {
186+ case StreamSaver.RECORD_TYPE_FILL: {
187+ handler.handleFillRecord();
188+ _otherRecordCount++;
189+ break;
190+ }
191+ case StreamSaver.RECORD_TYPE_DATA: {
192+ final int keySize = _dis.readShort();
193+ final int elisionCount = _dis.readShort();
194+ final int valueSize = _dis.readInt();
195+ _value.ensureFit(valueSize);
196+ _dis.read(_key.getEncodedBytes(), elisionCount, keySize - elisionCount);
197+ _key.setEncodedSize(keySize);
198+ _dis.read(_value.getEncodedBytes(), 0, valueSize);
199+ _value.setEncodedSize(valueSize);
200+ handler.handleDataRecord(_key, _value);
201+ _dataRecordCount++;
202+ break;
203+ }
204+ case StreamSaver.RECORD_TYPE_KEY_FILTER: {
205+ final String filterString = _dis.readUTF();
206+ handler.handleKeyFilterRecord(filterString);
207+ _otherRecordCount++;
208+ break;
209+ }
210+ case StreamSaver.RECORD_TYPE_VOLUME_ID: {
211+ final long id = _dis.readLong();
212+ final long initialPages = _dis.readLong();
213+ final long extensionPages = _dis.readLong();
214+ final long maximumPages = _dis.readLong();
215+ final int bufferSize = _dis.readInt();
216+ final String path = _dis.readUTF();
217+ handler._volumeName = _dis.readUTF();
218+ handler.handleVolumeIdRecord(id, initialPages, extensionPages, maximumPages, bufferSize, path,
219+ handler._volumeName);
220+ _otherRecordCount++;
221+ break;
222+ }
223+ case StreamSaver.RECORD_TYPE_TREE_ID: {
224+ handler._treeName = _dis.readUTF();
225+ handler.handleTreeIdRecord(handler._treeName);
226+ _otherRecordCount++;
227+ postMessage("Loading Tree " + handler._treeName, Task.LOG_VERBOSE);
228+ break;
229+ }
230+ case StreamSaver.RECORD_TYPE_HOSTNAME: {
231+ final String hostName = _dis.readUTF();
232+ handler.handleHostNameRecord(hostName);
233+ _otherRecordCount++;
234+ break;
235+ }
236+ case StreamSaver.RECORD_TYPE_USER: {
237+ final String hostName = _dis.readUTF();
238+ handler.handleUserRecord(hostName);
239+ _otherRecordCount++;
240+ break;
241+ }
242+ case StreamSaver.RECORD_TYPE_COMMENT: {
243+ final String comment = _dis.readUTF();
244+ handler.handleCommentRecord(comment);
245+ _otherRecordCount++;
246+ break;
247+ }
248+ case StreamSaver.RECORD_TYPE_COUNT: {
249+ final long dataRecordCount = _dis.readLong();
250+ final long otherRecordCount = _dis.readLong();
251+ handler.handleCountRecord(dataRecordCount, otherRecordCount);
252+ _otherRecordCount++;
253+ break;
254+ }
255+ case StreamSaver.RECORD_TYPE_START: {
256+ handler.handleStartRecord();
257+ _otherRecordCount++;
258+ break;
259+ }
260+ case StreamSaver.RECORD_TYPE_END: {
261+ handler.handleEndRecord();
262+ _otherRecordCount++;
263+ break;
264+ }
265+ case StreamSaver.RECORD_TYPE_TIMESTAMP: {
266+ final long timeStamp = _dis.readLong();
267+ handler.handleTimeStampRecord(timeStamp);
268+ _otherRecordCount++;
269+ break;
270+ }
271+ case StreamSaver.RECORD_TYPE_EXCEPTION: {
272+ final String exceptionString = _dis.readUTF();
273+ handler.handleExceptionRecord(exceptionString);
274+ _otherRecordCount++;
275+ break;
276+ }
277+ case StreamSaver.RECORD_TYPE_COMPLETION: {
278+ handler.handleCompletionRecord();
279+ _otherRecordCount++;
280+ break;
281+ }
282+ default: {
283+ throw new CorruptImportStreamException("Invalid record type " + recordType + " ("
284+ + Util.bytesToHex(new byte[] { (byte) (recordType >>> 8), (byte) recordType }) + " after reading "
285+ + _dataRecordCount + " data records" + " and " + _otherRecordCount + " other records");
286+ }
287+ }
288+ return true;
289+ }
290+
291 /**
292 * Handler for various record types in stream being loaded.
293 *
294@@ -237,7 +237,7 @@
295 *
296 */
297
298- public static class ImportHandler {
299+ protected static class ImportHandler {
300 protected Persistit _persistit;
301 protected TreeSelector _treeSelector;
302 protected Exchange _exchange;
303@@ -246,12 +246,14 @@
304 protected KeyFilter _keyFilter;
305 protected boolean _createMissingVolumes;
306 protected boolean _createMissingTrees;
307+ protected String _volumeName = null;
308+ protected String _treeName = null;
309
310- public ImportHandler(final Persistit persistit) {
311+ protected ImportHandler(final Persistit persistit) {
312 this(persistit, new TreeSelector(), true, true);
313 }
314
315- public ImportHandler(final Persistit persistit, final TreeSelector treeSelector,
316+ protected ImportHandler(final Persistit persistit, final TreeSelector treeSelector,
317 final boolean createMissingVolumes, final boolean createMissingTrees) {
318 _persistit = persistit;
319 _treeSelector = treeSelector == null ? new TreeSelector() : treeSelector;
320@@ -259,10 +261,10 @@
321 _createMissingVolumes = createMissingVolumes;
322 }
323
324- public void handleFillRecord() throws PersistitException {
325+ protected void handleFillRecord() throws PersistitException {
326 }
327
328- public void handleDataRecord(final Key key, final Value value) throws PersistitException {
329+ protected void handleDataRecord(final Key key, final Value value) throws PersistitException {
330 if (_keyFilter == null || _keyFilter.selected(key)) {
331 if (_volume == null || _tree == null)
332 return;
333@@ -277,10 +279,10 @@
334 }
335 }
336
337- public void handleKeyFilterRecord(final String keyFilterString) throws PersistitException {
338+ protected void handleKeyFilterRecord(final String keyFilterString) throws PersistitException {
339 }
340
341- public void handleVolumeIdRecord(final long volumeId, final long initialPages, final long extensionPages,
342+ protected void handleVolumeIdRecord(final long volumeId, final long initialPages, final long extensionPages,
343 final long maximumPages, final int bufferSize, final String path, final String name)
344 throws PersistitException {
345 final Exchange oldExchange = _exchange;
346@@ -306,7 +308,7 @@
347 }
348 }
349
350- public void handleTreeIdRecord(final String treeName) throws PersistitException {
351+ protected void handleTreeIdRecord(final String treeName) throws PersistitException {
352 final Exchange oldExchange = _exchange;
353 _exchange = null;
354 _tree = null;
355@@ -328,31 +330,31 @@
356
357 }
358
359- public void handleTimeStampRecord(final long timeStamp) throws PersistitException {
360- }
361-
362- public void handleHostNameRecord(final String hostName) throws PersistitException {
363- }
364-
365- public void handleUserRecord(final String userName) throws PersistitException {
366- }
367-
368- public void handleCommentRecord(final String comment) throws PersistitException {
369- }
370-
371- public void handleCountRecord(final long keyValueRecords, final long otherRecords) throws PersistitException {
372- }
373-
374- public void handleStartRecord() throws PersistitException {
375- }
376-
377- public void handleEndRecord() throws PersistitException {
378- }
379-
380- public void handleExceptionRecord(final String exceptionString) throws PersistitException {
381- }
382-
383- public void handleCompletionRecord() throws PersistitException {
384+ protected void handleTimeStampRecord(final long timeStamp) throws PersistitException {
385+ }
386+
387+ protected void handleHostNameRecord(final String hostName) throws PersistitException {
388+ }
389+
390+ protected void handleUserRecord(final String userName) throws PersistitException {
391+ }
392+
393+ protected void handleCommentRecord(final String comment) throws PersistitException {
394+ }
395+
396+ protected void handleCountRecord(final long keyValueRecords, final long otherRecords) throws PersistitException {
397+ }
398+
399+ protected void handleStartRecord() throws PersistitException {
400+ }
401+
402+ protected void handleEndRecord() throws PersistitException {
403+ }
404+
405+ protected void handleExceptionRecord(final String exceptionString) throws PersistitException {
406+ }
407+
408+ protected void handleCompletionRecord() throws PersistitException {
409 }
410
411 }
412
413=== modified file 'src/main/java/com/persistit/StreamSaver.java'
414--- src/main/java/com/persistit/StreamSaver.java 2012-08-24 13:57:19 +0000
415+++ src/main/java/com/persistit/StreamSaver.java 2013-01-28 10:44:23 +0000
416@@ -483,7 +483,7 @@
417 }
418
419 /**
420- * Saves on or more trees in the specified <code>Volume</code>.
421+ * Saves one or more trees in the specified <code>Volume</code>.
422 *
423 * @param volume
424 * The <code>Volume</code>
425
426=== modified file 'src/main/java/com/persistit/TreeBuilder.java'
427--- src/main/java/com/persistit/TreeBuilder.java 2013-01-19 04:11:48 +0000
428+++ src/main/java/com/persistit/TreeBuilder.java 2013-01-28 10:44:23 +0000
429@@ -15,16 +15,22 @@
430
431 package com.persistit;
432
433+import java.io.BufferedInputStream;
434+import java.io.BufferedOutputStream;
435+import java.io.DataInputStream;
436+import java.io.DataOutputStream;
437 import java.io.File;
438+import java.io.FileInputStream;
439+import java.io.FileOutputStream;
440 import java.io.IOException;
441+import java.text.SimpleDateFormat;
442 import java.util.ArrayList;
443 import java.util.Collections;
444 import java.util.Comparator;
445+import java.util.Date;
446 import java.util.HashMap;
447-import java.util.HashSet;
448 import java.util.List;
449 import java.util.Map;
450-import java.util.Set;
451 import java.util.SortedMap;
452 import java.util.TreeMap;
453 import java.util.concurrent.atomic.AtomicLong;
454@@ -122,11 +128,12 @@
455 * of records inserted with duplicate keys</li>
456 * <li>{@link #beforeMergeKey(Exchange)} - allowing filtering or custom handling
457 * per record while merging</li>
458- * <li>{@link #afterMergeKey(Exchange)} - behavior after merging one record</li>
459- * <li>{@link #beforeSortVolumeEvicted(Volume)} - behavior before evicting a
460- * sort volume when full</li>
461- * <li>{@link #afterSortVolumeEvicted(Volume)} - behavior after evicting a sort
462- * volume when full</li>
463+ * <li>{@link #afterMergeKey(Exchange)} - customizable behavior after merging
464+ * one record</li>
465+ * <li>{@link #beforeSortVolumeClosed(Volume)} - customizable behavior before
466+ * closing a sort volume when full</li>
467+ * <li>{@link #afterSortVolumeClose(Volume)} - customizable behavior after
468+ * closing a sort volume when full</li>
469 * <li>{@link #getTreeComparator()} - return a custom Comparator to determine
470 * sequence in which trees are populated within the {@link #merge()} method
471 * </ul>
472@@ -138,22 +145,26 @@
473 public class TreeBuilder {
474 private final static float DEFAULT_BUFFER_POOL_FRACTION = 0.5f;
475 private final static long REPORT_REPORT_MULTIPLE = 1000000;
476- private final static String DEFAULT_NAME = "TreeBuilder";
477+ private final static String SDF = "yyyyMMddHHmm";
478+ private final static int STREAM_SIZE = 1024 * 1024;
479
480 private final String _name;
481+ private final long _uniqueId;
482 private final Persistit _persistit;
483 private final List<File> _directories = new ArrayList<File>();
484- private final List<Volume> _sortVolumes = new ArrayList<Volume>();
485 private final int _pageSize;
486 private final int _pageLimit;
487 private final AtomicLong _sortedKeyCount = new AtomicLong();
488 private final AtomicLong _mergedKeyCount = new AtomicLong();
489 private volatile long _reportKeyCountMultiple = REPORT_REPORT_MULTIPLE;
490- private Volume _currentSortVolume;
491- private int _nextDirectoryIndex;
492-
493- private final Set<Tree> _allTrees = new HashSet<Tree>();
494- private final List<Tree> _sortedTrees = new ArrayList<Tree>();
495+ private Volume _sortVolume;
496+ private File _sortFile;
497+
498+ private final List<Tree> _allTrees = new ArrayList<Tree>();
499+ private final Map<String, Tree> _sortTreeMap = new HashMap<String, Tree>();
500+
501+ private int _sortFileIndex;
502+ private final List<Node> _sortNodes = new ArrayList<Node>();
503
504 private final ThreadLocal<Map<Tree, Exchange>> _sortExchangeMapThreadLocal = new ThreadLocal<Map<Tree, Exchange>>() {
505 @Override
506@@ -163,16 +174,21 @@
507 };
508
509 private final Comparator<Tree> _defaultTreeComparator = new Comparator<Tree>() {
510+ /**
511+ * Default implementation returns trees sorted in the order they were
512+ * added to the _allTrees list - in other words, sorting should leave
513+ * the list unchanged.
514+ *
515+ * @param a
516+ * @param b
517+ * @return
518+ */
519 @Override
520 public int compare(final Tree a, final Tree b) {
521 if (a == b) {
522 return 0;
523 }
524- if (a.getVolume() == b.getVolume()) {
525- return a.getName().compareTo(b.getName());
526- } else {
527- return a.getVolume().getName().compareTo(b.getVolume().getName());
528- }
529+ return _allTrees.indexOf(a) - _allTrees.indexOf(b);
530 }
531
532 @Override
533@@ -182,51 +198,54 @@
534 };
535
536 private class Node implements Comparable<Node> {
537- final Volume _volume;
538- int _treeListIndex = -1;
539- Exchange _exchange = null;
540- Tree _currentTree = null;
541- Node _duplicate;
542-
543- private Node(final Volume volume) {
544- _volume = volume;
545- }
546-
547- private boolean next() throws PersistitException {
548- for (;;) {
549- if (_exchange == null) {
550- _treeListIndex++;
551- if (_treeListIndex >= _sortedTrees.size()) {
552- _volume.close();
553- return false;
554- }
555- final String tempTreeName = "_" + _sortedTrees.get(_treeListIndex).getHandle();
556- final Tree sortTree = _volume.getTree(tempTreeName, false);
557- if (sortTree == null) {
558- continue;
559- }
560- _exchange = new Exchange(sortTree);
561- _currentTree = _sortedTrees.get(_treeListIndex);
562- }
563- if (_exchange.next(true)) {
564- return true;
565- }
566- _exchange = null;
567- }
568+
569+ private Tree _tree;
570+ private Key _key;
571+ private Value _value;
572+ private Node _duplicate;
573+ private final int _precedence;
574+
575+ private final File _file;
576+ private StreamLoader _loader;
577+ private Handler _handler;
578+ private boolean _next;
579+
580+ private class Handler extends StreamLoader.ImportHandler {
581+
582+ private Handler(final Persistit persistit) {
583+ super(persistit);
584+ }
585+
586+ @Override
587+ protected void handleDataRecord(final Key key, final Value value) throws PersistitException {
588+ Node.this._tree = super._tree;
589+ _key = key;
590+ _value = value;
591+ _next = true;
592+ }
593+ }
594+
595+ private File getFile() {
596+ return _file;
597+ }
598+
599+ private Node(final File file, final int index) {
600+ _file = file;
601+ _precedence = index;
602 }
603
604 @Override
605 public int compareTo(final Node node) {
606- if (_exchange == null) {
607- return node._exchange == null ? 0 : -1;
608- }
609- final int treeComparison = getTreeComparator().compare(_currentTree, node._currentTree);
610- if (treeComparison != 0) {
611- return treeComparison;
612- }
613- final Key k1 = _exchange.getKey();
614- final Key k2 = node._exchange.getKey();
615- return k1.compareTo(k2);
616+ if (_tree == null) {
617+ return node._tree == null ? 0 : 1;
618+ }
619+ if (node._tree == null) {
620+ return -1;
621+ }
622+ if (_tree != node._tree) {
623+ return _allTrees.indexOf(_tree) - _allTrees.indexOf(node._tree);
624+ } else
625+ return _key.compareTo(node._key);
626 }
627
628 @Override
629@@ -237,25 +256,67 @@
630 if (sb.length() > 0) {
631 sb.append(",");
632 }
633- if (n._exchange == null) {
634- sb.append("<null>");
635+ if (n._tree == null) {
636+ sb.append("<end>");
637 } else {
638- sb.append("<"
639- + (n._currentTree == null ? "?" : n._currentTree.getName() + n._exchange.getKey() + "="
640- + n._exchange.getValue()) + ">");
641+ sb.append("<" + (n._tree.getName() + n._key + "=" + n._value) + ">");
642 }
643 n = n._duplicate;
644 }
645 return sb.toString();
646 }
647+
648+ private void createStreamLoader() throws Exception {
649+ _loader = new StreamLoader(_persistit, new DataInputStream(new BufferedInputStream(new FileInputStream(
650+ _file), STREAM_SIZE)));
651+ _handler = new Handler(_persistit);
652+ }
653+
654+ private boolean next() throws Exception {
655+ _next = false;
656+ while (_loader.next(_handler) && !_next)
657+ ;
658+ if (!_next) {
659+ _loader.close();
660+ }
661+ return _next;
662+ }
663+
664+ }
665+
666+ private class SortStreamSaver extends StreamSaver {
667+
668+ Tree _sortTree = null;
669+
670+ SortStreamSaver(final Persistit persistit, final DataOutputStream stream) {
671+ super(persistit, stream);
672+ }
673+
674+ @Override
675+ protected void writeData(final Exchange exchange) throws IOException {
676+ if (exchange.getTree() != _sortTree) {
677+ final Tree source = _sortTreeMap.get(exchange.getTree().getName());
678+ if (_lastVolume != source.getVolume()) {
679+ writeVolumeInfo(source.getVolume());
680+ _lastVolume = source.getVolume();
681+ }
682+ if (_lastTree != source) {
683+ writeTreeInfo(source);
684+ _lastTree = source;
685+ }
686+ }
687+ writeData(exchange.getKey(), exchange.getValue());
688+ _recordCount++;
689+ }
690 }
691
692 public TreeBuilder(final Persistit persistit) {
693- this(persistit, DEFAULT_NAME, -1, DEFAULT_BUFFER_POOL_FRACTION);
694+ this(persistit, new SimpleDateFormat(SDF).format(new Date()), -1, DEFAULT_BUFFER_POOL_FRACTION);
695 }
696
697 public TreeBuilder(final Persistit persistit, final String name, final int pageSize, final float bufferPoolFraction) {
698 _name = name;
699+ _uniqueId = persistit.unique();
700 _persistit = persistit;
701 _pageSize = pageSize == -1 ? computePageSize(persistit) : pageSize;
702 final int bufferCount = _persistit.getBufferPool(_pageSize).getBufferCount();
703@@ -302,10 +363,10 @@
704 }
705
706 /**
707- * @return Count of sort volumes that have been created while sorting keys
708+ * @return Count of sort trees that have been created while sorting keys
709 */
710- public final synchronized int getSortVolumeCount() {
711- return _sortVolumes.size();
712+ public final synchronized int getSortFileCount() {
713+ return _sortFileIndex;
714 }
715
716 /**
717@@ -326,10 +387,8 @@
718 * @return List of destination
719 * <code>Tree<code> instances. This list is built as keys are stored.
720 */
721- public final List<Tree> getTrees() {
722- final List<Tree> list = new ArrayList<Tree>();
723- list.addAll(_allTrees);
724- Collections.sort(list, getTreeComparator());
725+ public synchronized final List<Tree> getTrees() {
726+ final List<Tree> list = new ArrayList<Tree>(_allTrees);
727 return list;
728 }
729
730@@ -394,7 +453,7 @@
731 synchronized (this) {
732 _directories.clear();
733 _directories.addAll(directories);
734- _nextDirectoryIndex = 0;
735+ _sortFileIndex = 0;
736 }
737 }
738 }
739@@ -402,9 +461,9 @@
740 /**
741 *
742 * @return List of directories set via the
743- * {@link #setSortTreeDirectories(List)} method.
744+ * {@link #setSortFileDirectories(List)} method.
745 */
746- public final List<File> getSortTreeDirectories() {
747+ public final List<File> getSortFileDirectories() {
748 return Collections.unmodifiableList(_directories);
749 }
750
751@@ -441,7 +500,10 @@
752 ex = _persistit.getExchange(newSortVolume, tempTreeName, true);
753 map.put(tree, ex);
754 synchronized (this) {
755- _allTrees.add(tree);
756+ if (!_allTrees.contains(tree)) {
757+ _allTrees.add(tree);
758+ _sortTreeMap.put(tempTreeName, tree);
759+ }
760 }
761 }
762 key.copyTo(ex.getKey());
763@@ -466,8 +528,13 @@
764 private void insertNode(final Map<Node, Node> sorted, final Node node) throws Exception {
765 final Node other = sorted.put(node, node);
766 if (other != null) {
767- if (!duplicateKeyDetected(node._currentTree, node._exchange.getKey(), other._exchange.getValue(),
768- node._exchange.getValue())) {
769+ final boolean reverse;
770+ if (node._precedence < other._precedence) {
771+ reverse = duplicateKeyDetected(node._tree, node._key, node._value, other._value);
772+ } else {
773+ reverse = !duplicateKeyDetected(node._tree, node._key, other._value, node._value);
774+ }
775+ if (reverse) {
776 sorted.put(node, other);
777 final Node p = other._duplicate;
778 other._duplicate = node;
779@@ -485,17 +552,16 @@
780 * @throws Exception
781 */
782 public synchronized void merge() throws Exception {
783+ finishSortVolume();
784 if ((_mergedKeyCount.get() % _reportKeyCountMultiple) != 0) {
785 reportSorted(_mergedKeyCount.get());
786 }
787- _sortedTrees.clear();
788- _sortedTrees.addAll(_allTrees);
789 Tree currentTree = null;
790 Exchange ex = null;
791- Collections.sort(_sortedTrees, getTreeComparator());
792 final SortedMap<Node, Node> sorted = new TreeMap<Node, Node>();
793- for (final Volume volume : _sortVolumes) {
794- final Node node = new Node(volume);
795+
796+ for (final Node node : _sortNodes) {
797+ node.createStreamLoader();
798 if (node.next()) {
799 insertNode(sorted, node);
800 }
801@@ -507,18 +573,18 @@
802 }
803 Node node = sorted.firstKey();
804 node = sorted.remove(node);
805- if (node._currentTree != currentTree) {
806- ex = new Exchange(node._currentTree);
807- currentTree = node._currentTree;
808+ if (node._tree != currentTree) {
809+ ex = new Exchange(node._tree);
810+ currentTree = node._tree;
811 }
812+ node._key.copyTo(ex.getKey());
813+ node._value.copyTo(ex.getValue());
814
815- node._exchange.getKey().copyTo(ex.getKey());
816- node._exchange.getValue().copyTo(ex.getValue());
817 if (beforeMergeKey(ex)) {
818 ex.fetchAndStore();
819 boolean stored = true;
820 if (ex.getValue().isDefined()) {
821- if (!duplicateKeyDetected(ex.getTree(), ex.getKey(), ex.getValue(), node._exchange.getValue())) {
822+ if (!duplicateKeyDetected(ex.getTree(), ex.getKey(), ex.getValue(), node._value)) {
823 ex.store();
824 stored = false;
825 }
826@@ -547,21 +613,32 @@
827
828 private synchronized void reset() throws Exception {
829 Exception exception = null;
830- for (final Volume volume : _sortVolumes) {
831+ try {
832+ if (_sortVolume != null) {
833+ _sortVolume.close();
834+ }
835+ } catch (final PersistitException e) {
836+ if (exception == null) {
837+ exception = e;
838+ }
839+ }
840+
841+ for (final Node node : _sortNodes) {
842 try {
843- volume.close();
844- } catch (final PersistitException e) {
845+ if (node.getFile() != null) {
846+ node.getFile().delete();
847+ }
848+ } catch (final Exception e) {
849 if (exception == null) {
850 exception = e;
851 }
852 }
853 }
854- _sortVolumes.clear();
855- _currentSortVolume = null;
856- _nextDirectoryIndex = 0;
857- _sortExchangeMapThreadLocal.get().clear();
858 _allTrees.clear();
859- _sortedTrees.clear();
860+ _sortNodes.clear();
861+ _sortVolume = null;
862+ _sortFileIndex = 0;
863+ _sortExchangeMapThreadLocal.get().clear();
864 if (exception != null) {
865 throw exception;
866 }
867@@ -574,91 +651,115 @@
868 }
869
870 private synchronized Volume getSortVolume() throws Exception {
871- final boolean full = _currentSortVolume != null && _currentSortVolume.getNextAvailablePage() > _pageLimit;
872- if (full) {
873- if (beforeSortVolumeEvicted(_currentSortVolume)) {
874- _persistit.getBufferPool(_pageSize).evict(_currentSortVolume);
875- }
876- afterSortVolumeEvicted(_currentSortVolume);
877- }
878- if (full || _currentSortVolume == null) {
879- _currentSortVolume = createSortVolume();
880- _sortVolumes.add(_currentSortVolume);
881- }
882- return _currentSortVolume;
883- }
884-
885- private Volume createSortVolume() throws Exception {
886- final File directory;
887- if (_directories.isEmpty()) {
888- final String directoryName = _persistit.getConfiguration().getTmpVolDir();
889- directory = directoryName == null ? null : new File(directoryName);
890- } else {
891- directory = _directories.get(_nextDirectoryIndex % _directories.size());
892- _nextDirectoryIndex++;
893- }
894- return Volume.createTemporaryVolume(_persistit, _pageSize, directory);
895- }
896-
897- /**
898- * This method may be extended to provide an application-specific ordering
899- * on <code>Tree</code>s. This ordering determines the sequence in which
900- * destination trees are built from the sort data. By default trees are
901- * build in alphabetical order by volume and tree name. However, an
902- * application may choose a different order to ensure invariants for
903- * concurrent use.
904- *
905- * @return a <code>java.util.Comparator</code> on <code>Tree</code>
906- */
907- protected Comparator<Tree> getTreeComparator() {
908- return _defaultTreeComparator;
909+ if (_sortVolume != null && _sortVolume.getNextAvailablePage() > _pageLimit) {
910+ finishSortVolume();
911+ }
912+ if (_sortVolume == null) {
913+ final File directory;
914+ if (_directories.isEmpty()) {
915+ String directoryName = _persistit.getConfiguration().getTmpVolDir();
916+ if (directoryName == null) {
917+ directoryName = System.getProperty("java.io.tmpdir");
918+ }
919+ directory = new File(directoryName);
920+ if (!directory.exists()) {
921+ directory.mkdirs();
922+ }
923+ _directories.add(directory);
924+ } else {
925+ directory = _directories.get(_sortFileIndex % _directories.size());
926+ }
927+ _sortVolume = Volume.createTemporaryVolume(_persistit, _pageSize, directory);
928+ _sortFile = new File(directory, String.format("%s_%d.%06d", _name, _uniqueId, _sortFileIndex));
929+ final Node node = new Node(_sortFile, _sortFileIndex);
930+ _sortNodes.add(node);
931+ _sortFileIndex++;
932+ }
933+ return _sortVolume;
934+ }
935+
936+ private void finishSortVolume() throws Exception {
937+ if (_sortVolume != null) {
938+ beforeSortVolumeClosed(_sortVolume, _sortFile);
939+ saveSortVolume(_sortVolume, _sortFile);
940+ afterSortVolumeClose(_sortVolume, _sortFile);
941+ _sortVolume.close();
942+ _sortVolume = null;
943+ }
944+ }
945+
946+ private void saveSortVolume(final Volume volume, final File file) throws Exception {
947+ final DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
948+ STREAM_SIZE));
949+ final List<Tree> sorted = new ArrayList<Tree>(_allTrees);
950+ Collections.sort(sorted, getTreeComparator());
951+ final StreamSaver saver = new SortStreamSaver(_persistit, dos);
952+ for (final Tree tree : sorted) {
953+ final String sortTreeName = "_" + tree.getHandle();
954+ final Tree sortTree = volume.getTree(sortTreeName, false);
955+ if (sortTree != null) {
956+ final Exchange exchange = new Exchange(sortTree);
957+ saver.save(exchange, null);
958+ }
959+ }
960+ file.deleteOnExit();
961+ dos.close();
962 }
963
964 /**
965 * This method may be extended to provide application-specific behavior when
966- * a sort volume has been filled to capacity. The default implementation
967- * return <code>true</code>. If this method returns <code>true</code>,
968- * <code>TreeBuilder</code> evicts the <code>Volume</code> to avoid
969- * over-running the <code>BufferPool</code> and then starts a new sort tree
970- * if a new record is subsequently stored.
971+ * a sort volume has been filled to capacity. Subsequent to this call, the
972+ * sort volume is streamed to a sort file and then its pages in the
973+ * <code>BufferPool</code> are invalidated to allow their immediate reuse.
974 *
975 * @param volume
976 * The temporary <code>Volume</code> that has been filled
977- * @return <code>true</code> to cause the current sort volume to be evicted
978- * from the <code>BufferPool</code>
979+ * @param file
980+ * the file to which the sorted key-value pairs will be written
981 * @throws Exception
982 */
983- protected boolean beforeSortVolumeEvicted(final Volume volume) throws Exception {
984- return true;
985+ protected void beforeSortVolumeClosed(final Volume volume, final File file) throws Exception {
986+
987 }
988
989 /**
990 * This method may be extended to provide application-specific reporting
991 * functionality after a sort volume has been filled to capacity and has
992 * been evicted. An application may also modify the temporary directory set
993- * via {@link #setSortTreeDirectories(List)} within this method if necessary
994+ * via {@link #setSortFileDirectories(List)} within this method if necessary
995 * to adjust disk space utilization, for example. The default behavior of
996 * this method is to do nothing.
997 *
998 * @param volume
999 * The temporary <code>Volume</code> that has been filled
1000+ * @param file
1001+ * the file to which the sorted key-value pairs have been written
1002 * @throws Exception
1003 */
1004- protected void afterSortVolumeEvicted(final Volume volume) throws Exception {
1005+ protected void afterSortVolumeClose(final Volume volume, final File file) throws Exception {
1006
1007 }
1008
1009 /**
1010+ * <p>
1011 * This method may be extended to provide application-specific behavior when
1012- * an attempt is made to merge records with duplicate keys. The default
1013- * behavior is to throw a {@link DuplicateKeyException}.
1014+ * an attempt is made to merge records with duplicate keys. The two
1015+ * <code>Value</code>s v1 and v2 are provided in the order they were
1016+ * inserted into the <code>TreeBuilder</code>. behavior is to write a
1017+ * warning to the log and retain the first value..
1018+ * </p>
1019 *
1020 * @param tree
1021+ * the <code>Tree</code> to which a key is being merged
1022 * @param key
1023+ * the <code>Key</code>
1024 * @param v1
1025+ * the <code>Value</code> previously inserted
1026 * @param v2
1027- * @return If <code>true</code>, the resulting value is v2. If
1028- * <code>false</code>, the resulting value is v1.
1029+ * the conflicting <code>Value</code>
1030+ * @return <code>true</code> to replace the value previously stored,
1031+ * <code>false</code> to leave the value first inserted and ignore
1032+ * the new value.
1033 * @throws DuplicateKeyException
1034 * if a key being inserted or merged matches a key already
1035 * exists
1036@@ -728,8 +829,23 @@
1037
1038 }
1039
1040- void unitTestNextSortVolume() {
1041+ /**
1042+ * This method may be extended to provide an application-specific ordering
1043+ * on <code>Tree</code>s. This ordering determines the sequence in which
1044+ * destination trees are built from the sort data. By default trees are
1045+ * build in alphabetical order by volume and tree name. However, an
1046+ * application may choose a different order to ensure invariants for
1047+ * concurrent use.
1048+ *
1049+ * @return a <code>java.util.Comparator</code> on <code>Tree</code>
1050+ */
1051+ protected Comparator<Tree> getTreeComparator() {
1052+ return _defaultTreeComparator;
1053+ }
1054+
1055+ void unitTestNextSortFile() throws Exception {
1056+ finishSortVolume();
1057 _sortExchangeMapThreadLocal.get().clear();
1058- _currentSortVolume = null;
1059+ _sortVolume = null;
1060 }
1061 }
1062
1063=== modified file 'src/test/java/com/persistit/TreeBuilderTest.java'
1064--- src/test/java/com/persistit/TreeBuilderTest.java 2012-12-31 21:03:59 +0000
1065+++ src/test/java/com/persistit/TreeBuilderTest.java 2013-01-28 10:44:23 +0000
1066@@ -1,5 +1,5 @@
1067 /**
1068- * Copyright © 2011-2012 Akiban Technologies, Inc. All rights reserved.
1069+ * Copyright © 2011-2013 Akiban Technologies, Inc. All rights reserved.
1070 *
1071 * This program and the accompanying materials are made available
1072 * under the terms of the Eclipse Public License v1.0 which
1073@@ -17,21 +17,25 @@
1074
1075 import static com.persistit.unit.UnitTestProperties.VOLUME_NAME;
1076 import static org.junit.Assert.assertEquals;
1077+import static org.junit.Assert.assertTrue;
1078
1079+import java.io.File;
1080 import java.util.ArrayList;
1081 import java.util.Collections;
1082 import java.util.List;
1083+import java.util.Random;
1084 import java.util.concurrent.atomic.AtomicBoolean;
1085 import java.util.concurrent.atomic.AtomicInteger;
1086
1087 import org.junit.Test;
1088
1089+import com.persistit.unit.UnitTestProperties;
1090+
1091 public class TreeBuilderTest extends PersistitUnitTestCase {
1092- private final static int COUNT = 10000;
1093-
1094- @Test
1095- public void basicTest() throws Exception {
1096-
1097+ private final static int COUNT = 100000;
1098+ private final AtomicInteger _duplicates = new AtomicInteger();
1099+
1100+ private TreeBuilder getBasicTreeBuilder() {
1101 final TreeBuilder tb = new TreeBuilder(_persistit) {
1102 @Override
1103 protected void reportSorted(final long count) {
1104@@ -43,8 +47,21 @@
1105 System.out.println("Merged " + count);
1106 }
1107
1108+ @Override
1109+ protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) {
1110+ System.out.println("Duplicate key " + key);
1111+ _duplicates.incrementAndGet();
1112+ return false;
1113+ }
1114 };
1115 tb.setReportKeyCountMultiple(COUNT / 2);
1116+ return tb;
1117+ }
1118+
1119+ @Test
1120+ public void basicTest() throws Exception {
1121+
1122+ final TreeBuilder tb = getBasicTreeBuilder();
1123
1124 final List<Integer> shuffled = new ArrayList<Integer>(COUNT);
1125 for (int i = 0; i < COUNT; i++) {
1126@@ -68,7 +85,6 @@
1127 tb.store(a);
1128 tb.store(b);
1129 tb.store(c);
1130-
1131 }
1132
1133 tb.merge();
1134@@ -84,7 +100,8 @@
1135 count++;
1136 }
1137 assertEquals("Expect every key value", COUNT, count);
1138-
1139+ _persistit.flush();
1140+ assertEquals(0, a.getBufferPool().getDirtyPageCount());
1141 }
1142
1143 @Test
1144@@ -130,7 +147,7 @@
1145 tb.store(ex);
1146 assertEquals("Should have registered a dup", 2, duplicateCount.get());
1147
1148- tb.unitTestNextSortVolume();
1149+ tb.unitTestNextSortFile();
1150
1151 ex.to(1).getValue().put("ghi");
1152 tb.store(ex);
1153@@ -157,9 +174,9 @@
1154 }
1155
1156 @Test
1157- public void duplicatePriority() throws Exception {
1158- final TreeBuilder tb = new TreeBuilder(_persistit) {
1159-
1160+ public void duplicatePriority1() throws Exception {
1161+ duplicatePriorityCheck(new TreeBuilder(_persistit) {
1162+ // Larger value wins
1163 @Override
1164 protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) {
1165 final String s1 = v1.getString();
1166@@ -167,16 +184,31 @@
1167 return s1.compareTo(s2) < 0;
1168 }
1169
1170- };
1171+ }, "xuorcxq");
1172+ }
1173+
1174+ @Test
1175+ public void duplicatePriority2() throws Exception {
1176+ duplicatePriorityCheck(new TreeBuilder(_persistit) {
1177+ // First value wins
1178+ @Override
1179+ protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) {
1180+ return false;
1181+ }
1182+
1183+ }, "xmnraxq");
1184+ }
1185+
1186+ private void duplicatePriorityCheck(final TreeBuilder tb, final String expected) throws Exception {
1187 final Exchange ex = _persistit.getExchange(VOLUME_NAME, "a", true);
1188 final String nul = null;
1189
1190 insertKeys(ex, tb, "x", "m", "n", nul, "a", nul, "q");
1191- tb.unitTestNextSortVolume();
1192+ tb.unitTestNextSortFile();
1193 insertKeys(ex, tb, nul, "t", "o", "r", nul, nul, nul);
1194- tb.unitTestNextSortVolume();
1195+ tb.unitTestNextSortFile();
1196 insertKeys(ex, tb, nul, "u", "m", "j", "c", "x", "l");
1197- tb.unitTestNextSortVolume();
1198+ tb.unitTestNextSortFile();
1199 insertKeys(ex, tb, nul, "m", nul, nul, "a", nul, "q");
1200
1201 tb.merge();
1202@@ -188,7 +220,8 @@
1203 result.append(ex.getValue().getString());
1204 }
1205 }
1206- assertEquals("xuorcxq", result.toString());
1207+ assertEquals(expected, result.toString());
1208+
1209 }
1210
1211 private void insertKeys(final Exchange ex, final TreeBuilder tb, final String... args) throws Exception {
1212@@ -200,4 +233,52 @@
1213 }
1214 }
1215
1216+ @Test
1217+ public void multipleDirectories() throws Exception {
1218+ final TreeBuilder tb = getBasicTreeBuilder();
1219+ final List<File> directories = new ArrayList<File>();
1220+ final Random random = new Random();
1221+ try {
1222+ for (int i = 0; i < 3; i++) {
1223+ final File file = File.createTempFile("TreeBuilderTest", "");
1224+ file.delete();
1225+ assertTrue("Expect to make directory", file.mkdir());
1226+ directories.add(file);
1227+ }
1228+ tb.setSortTreeDirectories(directories);
1229+ final Exchange ex = _persistit.getExchange(VOLUME_NAME, "TreeBuilderTest", true);
1230+ for (int i = 0; i < COUNT; i++) {
1231+ final int k = random.nextInt();
1232+ ex.to(k);
1233+ ex.getValue().put(RED_FOX + "," + k);
1234+ tb.store(ex);
1235+ if (((i + 1) % (COUNT / 10)) == 0) {
1236+ tb.unitTestNextSortFile();
1237+ }
1238+ }
1239+ for (final File file : directories) {
1240+ assertTrue("Expect some files in each directory", file.list().length > 0);
1241+ }
1242+ tb.merge();
1243+
1244+ for (final File file : directories) {
1245+ assertTrue("Expect no remaining files", file.list().length == 0);
1246+ }
1247+
1248+ ex.to(Key.BEFORE);
1249+ int count = 0;
1250+ while (ex.next()) {
1251+ count++;
1252+ final int k = ex.getKey().decodeInt();
1253+ assertEquals(RED_FOX + "," + k, ex.getValue().getString());
1254+ }
1255+ assert count + _duplicates.get() == COUNT;
1256+
1257+ } finally {
1258+ for (final File file : directories) {
1259+ UnitTestProperties.cleanUpDirectory(file);
1260+ }
1261+ }
1262+ }
1263+
1264 }
1265
1266=== modified file 'src/test/java/com/persistit/stress/InsertBigLoad.java'
1267--- src/test/java/com/persistit/stress/InsertBigLoad.java 2013-01-04 00:16:13 +0000
1268+++ src/test/java/com/persistit/stress/InsertBigLoad.java 2013-01-28 10:44:23 +0000
1269@@ -21,6 +21,7 @@
1270 import com.persistit.TreeBuilder;
1271 import com.persistit.stress.unit.BigLoad;
1272 import com.persistit.stress.unit.BigLoad.BigLoadTreeBuilder;
1273+import com.persistit.util.Util;
1274
1275 public class InsertBigLoad extends AbstractSuite {
1276
1277@@ -54,7 +55,10 @@
1278
1279 try {
1280 execute(persistit);
1281+ final long start = System.nanoTime();
1282 tb.merge();
1283+ final long elapsed = System.nanoTime() - start;
1284+ System.out.printf("Merge took %,dms", elapsed / Util.NS_PER_MS);
1285
1286 } finally {
1287 persistit.close();
1288
1289=== modified file 'src/test/java/com/persistit/stress/unit/BigLoad.java'
1290--- src/test/java/com/persistit/stress/unit/BigLoad.java 2013-01-04 00:16:13 +0000
1291+++ src/test/java/com/persistit/stress/unit/BigLoad.java 2013-01-28 10:44:23 +0000
1292@@ -15,6 +15,7 @@
1293
1294 package com.persistit.stress.unit;
1295
1296+import java.io.File;
1297 import java.util.Random;
1298
1299 import com.persistit.Exchange;
1300@@ -23,6 +24,7 @@
1301 import com.persistit.Tree;
1302 import com.persistit.TreeBuilder;
1303 import com.persistit.Value;
1304+import com.persistit.Volume;
1305 import com.persistit.stress.AbstractStressTest;
1306 import com.persistit.util.ArgParser;
1307 import com.persistit.util.Util;
1308@@ -68,7 +70,7 @@
1309 tb.store(resultExchange);
1310 }
1311 final long endLoadTime = System.nanoTime();
1312- System.out.printf("Loaded %,d records into %,d buckets in %,dms\n", totalRecords, tb.getSortVolumeCount(),
1313+ System.out.printf("Loaded %,d records into %,d buckets in %,dms\n", totalRecords, tb.getSortFileCount(),
1314 (endLoadTime - startLoadTime) / Util.NS_PER_MS);
1315 return endLoadTime - startLoadTime;
1316 }
1317@@ -157,6 +159,11 @@
1318 }
1319
1320 @Override
1321+ protected void beforeSortVolumeClosed(final Volume volume, final File file) {
1322+ System.out.printf("Saving sort volume %s to file %s\n", volume, file);
1323+ }
1324+
1325+ @Override
1326 protected void reportSorted(final long count) {
1327 System.out.printf("Sorted %,15d records\n", count);
1328 }

Subscribers

People subscribed via source and target branches