Merge lp:~pbeaman/akiban-persistit/tree_builder into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 413
Merged at revision: 407
Proposed branch: lp:~pbeaman/akiban-persistit/tree_builder
Merge into: lp:akiban-persistit
Diff against target: 1290 lines (+954/-114)
13 files modified
doc/Miscellaneous.rst (+5/-0)
pom.xml (+1/-1)
src/main/java/com/persistit/BufferPool.java (+10/-3)
src/main/java/com/persistit/Persistit.java (+3/-1)
src/main/java/com/persistit/TreeBuilder.java (+619/-0)
src/main/java/com/persistit/Volume.java (+6/-4)
src/main/java/com/persistit/VolumeStorageT2.java (+5/-5)
src/main/java/com/persistit/exception/DuplicateKeyException.java (+36/-0)
src/test/java/com/persistit/BufferPoolTest.java (+21/-0)
src/test/java/com/persistit/StressRunner.java (+2/-0)
src/test/java/com/persistit/TreeBuilderTest.java (+203/-0)
src/test/java/com/persistit/stress/InsertBigLoad.java (+1/-1)
src/test/java/com/persistit/stress/unit/BigLoad.java (+42/-99)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/tree_builder
Reviewer Review Type Date Requested Status
Akiban Build User Needs Fixing
Nathan Williams Approve
Review via email: mp+141536@code.launchpad.net

Description of the change

Adds new com.persistit.TreeBuilder class. TreeBuilder assists with the process of inserting a large number of records having uncorrelated keys. See the JavaDoc comments in the TreeBuilder class for details.

This branch also:

adds TreeBuilderTest
bumps version number
adds new BufferPool#evict(Volume) method (used by TreeBuilder)
modifies method for setting up temporary volumes so that a directory can be supplied (used by TreeBuilder)
adds reference to TreeBuilder to Miscellaneous.rst

ReleaseNotes.rst are not modified yet

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

After one pass, this looks pretty good.

My only immediate comment is that DuplicatKeyException could probably use some more information in addition to the detail message. Perhaps direct access to the key contents and tree. I'm thinking of the case where an application doesn't want/need to handle duplicates but would like to report a nice error to the user. I wouldn't block anything on this, it can always be added.

I'll make another pass shortly.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Nothing jumps out after a second quick pass. A tweak or two after server usage gets fleshed out will probably happen, so full steam ahead.

review: Approve
Revision history for this message
Akiban Build User (build-akiban) wrote :

There was one failure during build/test:

* unknown exception (check log)

review: Needs Fixing

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'doc/Miscellaneous.rst'
2--- doc/Miscellaneous.rst 2012-06-11 21:25:37 +0000
3+++ doc/Miscellaneous.rst 2012-12-31 21:06:22 +0000
4@@ -5,6 +5,11 @@
5
6 Following are some short items you may find useful as you explore Akiban Persistit. Follow links to the API documentation for more details.
7
8+TreeBuilder
9+-----------
10+
11+The class ``com.persistit.TreeBuilder`` improves performance of inserting large sets of data when the keys being inserted are non-sequential. TreeBuilder is effective if and only if the size of the data being loaded is significantly larger than the amount of memory available in the buffer pool and keys are inserted in essentially random order.
12+
13 Histograms
14 ----------
15
16
17=== modified file 'pom.xml'
18--- pom.xml 2012-12-05 15:30:55 +0000
19+++ pom.xml 2012-12-31 21:06:22 +0000
20@@ -4,7 +4,7 @@
21
22 <groupId>com.akiban</groupId>
23 <artifactId>akiban-persistit</artifactId>
24- <version>3.2.3-SNAPSHOT</version>
25+ <version>3.2.4-SNAPSHOT</version>
26 <packaging>jar</packaging>
27
28 <parent>
29
30=== modified file 'src/main/java/com/persistit/BufferPool.java'
31--- src/main/java/com/persistit/BufferPool.java 2012-11-28 16:15:34 +0000
32+++ src/main/java/com/persistit/BufferPool.java 2012-12-31 21:06:22 +0000
33@@ -541,16 +541,20 @@
34 * The volume
35 * @throws PersistitInterruptedException
36 */
37- boolean invalidate(final Volume volume) throws PersistitInterruptedException {
38+ boolean invalidate(final Volume volume) throws PersistitException {
39 final float ratio = (float) volume.getStorage().getNextAvailablePage() / (float) _bufferCount;
40 if (ratio < SMALL_VOLUME_RATIO) {
41- return invalidateSmallVolume(volume);
42+ return invalidateSmallVolume(volume, false);
43 } else {
44 return invalidateLargeVolume(volume);
45 }
46 }
47
48- boolean invalidateSmallVolume(final Volume volume) throws PersistitInterruptedException {
49+ boolean evict(final Volume volume) throws PersistitException {
50+ return invalidateSmallVolume(volume, true);
51+ }
52+
53+ boolean invalidateSmallVolume(final Volume volume, final boolean mustWrite) throws PersistitException {
54 boolean result = true;
55 int markedAvailable = 0;
56 for (long page = 1; page < volume.getStorage().getNextAvailablePage(); page++) {
57@@ -565,6 +569,9 @@
58 try {
59 if ((buffer.getVolume() == volume || volume == null) && !buffer.isFixed()
60 && buffer.isValid()) {
61+ if (buffer.isDirty()) {
62+ buffer.writePage();
63+ }
64 invalidate(buffer);
65 invalidated = true;
66 }
67
68=== modified file 'src/main/java/com/persistit/Persistit.java'
69--- src/main/java/com/persistit/Persistit.java 2012-11-30 20:33:30 +0000
70+++ src/main/java/com/persistit/Persistit.java 2012-12-31 21:06:22 +0000
71@@ -1105,7 +1105,9 @@
72 if (!Volume.isValidPageSize(pageSize)) {
73 throw new IllegalArgumentException("Invalid page size " + pageSize);
74 }
75- return Volume.createTemporaryVolume(this, pageSize);
76+ final String directoryName = getConfiguration().getTmpVolDir();
77+ final File directory = directoryName == null ? null : new File(directoryName);
78+ return Volume.createTemporaryVolume(this, pageSize, directory);
79 }
80
81 /**
82
83=== added file 'src/main/java/com/persistit/TreeBuilder.java'
84--- src/main/java/com/persistit/TreeBuilder.java 1970-01-01 00:00:00 +0000
85+++ src/main/java/com/persistit/TreeBuilder.java 2012-12-31 21:06:22 +0000
86@@ -0,0 +1,619 @@
87+/**
88+ * Copyright © 2012 Akiban Technologies, Inc. All rights reserved.
89+ *
90+ * This program and the accompanying materials are made available
91+ * under the terms of the Eclipse Public License v1.0 which
92+ * accompanies this distribution, and is available at
93+ * http://www.eclipse.org/legal/epl-v10.html
94+ *
95+ * This program may also be available under different license terms.
96+ * For more information, see www.akiban.com or contact licensing@akiban.com.
97+ *
98+ * Contributors:
99+ * Akiban Technologies, Inc.
100+ */
101+
102+package com.persistit;
103+
104+import java.io.File;
105+import java.util.ArrayList;
106+import java.util.Collections;
107+import java.util.Comparator;
108+import java.util.HashMap;
109+import java.util.HashSet;
110+import java.util.List;
111+import java.util.Map;
112+import java.util.Set;
113+import java.util.SortedMap;
114+import java.util.TreeMap;
115+import java.util.concurrent.atomic.AtomicLong;
116+
117+import com.persistit.exception.DuplicateKeyException;
118+import com.persistit.exception.PersistitException;
119+import com.persistit.util.Util;
120+
121+/**
122+ * <p>
123+ * A mechanism for optimizing the process of loading large sets of records with
124+ * non-sequential keys. This class speeds up the process of inserting records
125+ * into a set of Persistit <code>Tree</code>s by sorting them before inserting
126+ * them. The sort process uses multiple "sort trees" in multiple temporary
127+ * <code>Volume</code>s to hold copies of the data. These are then merged into
128+ * the final "destination trees." Each sort tree is constrained to be small
129+ * enough to fit in the {@link BufferPool}.
130+ * </p>
131+ * <h3>Background</h3>
132+ * <p>
133+ * In general, Persistit can store records very quickly, even when the keys of
134+ * those records arrive in random order, as long as all the pages of the
135+ * destination tree or trees are resident in the buffer pool. However, the
136+ * situation changes dramatically as soon as the the destination tree or trees
137+ * exceed the size of the buffer pool. Once that happens, insert performance
138+ * degrades because the ratio of records inserted per disk I/O operation
139+ * performed decreases. In a worst-case scenario, inserting each new key may
140+ * require two or more disk I/O operations. These may occur because Persistit
141+ * performs the following steps:
142+ * <ul>
143+ * <li>Look up the key requires reading the page containing that key from disk
144+ * into the BufferPool.</li>
145+ * <li>Reading the page requires a Buffer containing some other page to be
146+ * evicted.</li>
147+ * <li>The page being evicted is likely to be dirty and therefore Persistit must
148+ * write its contents to disk before reusing the Buffer.</li>
149+ * </ul>
150+ * Further, these disk I/O operations are are usually at unrelated file
151+ * positions and therefore may each require random seeks. As a result, inserting
152+ * one key can take orders of magnitude longer once the tree no longer fits in
153+ * the buffer pool.
154+ * </p>
155+ * <p>
156+ * <code>TreeBuilder</code> mitigates that degradation by sorting the keys
157+ * before inserting them into their final destination trees. To do so it builds
158+ * a collection of bounded-size sort trees in temporary volumes. Then it
159+ * performs a merge sort from those trees into the final destination tree or
160+ * trees. This mechanism eliminates the problem that every key insertion
161+ * requires two (or more) random disk I/O operations. However, it is still the
162+ * case that every sort tree page must be written and read once, and every
163+ * destination tree page must be written at least once. Therefore the I/O
164+ * associated with TreeBuilder is reduced but not eliminated.
165+ * </p>
166+ * <p>
167+ * TreeBuilder is effective if and only if (a) the keys arrive in random order,
168+ * and (b) the data is significantly larger than available memory in the buffer
169+ * pool. In general it is faster to insert the keys directly into the
170+ * destination trees unless both of these conditions are true.
171+ * </p>
172+ * <h3>Using TreeBuilder</h3>
173+ * <p>
174+ * The following example demonstrates the fundamental operation of
175+ * <code>TreeBuilder</code>: <code><pre>
176+ * Exchange exchange = db.getExchange("myVolume", "myTree", true);
177+ * TreeBuilder tb = new TreeBuilder(db);
178+ * //
179+ * // Insert the data into sort trees
180+ * //
181+ * while (<i>source has more data</i>) {
182+ * exchange.to(<i>next key</i>).getValue().put(<i>next value</i>);
183+ * tb.store(exchange);
184+ * }
185+ * //
186+ * // Merge the data into myTree
187+ * //
188+ * tb.merge();
189+ * </pre></code> Note that a TreeBuilder can pre-sort data for multiple
190+ * destination trees. For example, it is possible to load and merge records for
191+ * a table and its corresponding indexes in one pass using TreeBuilder. During
192+ * the merge operation the final destination <code>Tree</code> are built in
193+ * sequence. By default that sequence is by alphabetical order of tree name, but
194+ * it is possible to customize TreeBuilder to change that order.
195+ * </p>
196+ * <p>
197+ * Loading a large data set may take a long time under the best of
198+ * circumstances. Therefore this class is designed to be extended by
199+ * applications to support progress reporting, to control disk space allocation,
200+ * to handle attempts to insert conflicting records with duplicate keys, etc.
201+ * See the following methods which may be overridden to provide custom behavior:
202+ * <ul>
203+ * <li>{@link #reportSorted(long)} - report completion of N records inserts into
204+ * sort trees</li>
205+ * <li>{@link #reportMerged(long)} - report completion of N records merged</li>
206+ * <li>{@link #duplicateKeyDetected(Tree, Key, Value, Value)} - handle detection
207+ * of records inserted with duplicate keys</li>
208+ * <li>{@link #beforeMergeKey(Exchange)} - allowing filtering or custom handling
209+ * per record while merging</li>
210+ * <li>{@link #afterMergeKey(Exchange)} - behavior after merging one record</li>
211+ * <li>{@link #beforeSortVolumeEvicted(Volume)} - behavior before evicting a
212+ * sort volume when full</li>
213+ * <li>{@link #afterSortVolumeEvicted(Volume)} - behavior after evicting a sort
214+ * volume when full</li>
215+ * <li>{@link #getTreeComparator()} - return a custom Comparator to determine
216+ * sequence in which trees are populated within the {@link #merge()} method
217+ * </ul>
218+ * </p>
219+ *
220+ * @author peter
221+ *
222+ */
223+public class TreeBuilder {
224+ private final static float DEFAULT_BUFFER_POOL_FRACTION = 0.5f;
225+ private final static long REPORT_REPORT_MULTIPLE = 1000000;
226+ private final static String DEFAULT_NAME = "TreeBuilder";
227+
228+ private final String _name;
229+ private final Persistit _persistit;
230+ private final List<File> _directories = new ArrayList<File>();
231+ private final List<Volume> _sortVolumes = new ArrayList<Volume>();
232+ private final int _pageSize;
233+ private final int _pageLimit;
234+ private final AtomicLong _keyCount = new AtomicLong();
235+ private volatile long _reportKeyCountMultiple = REPORT_REPORT_MULTIPLE;
236+
237+ private final Set<Tree> _allTrees = new HashSet<Tree>();
238+ private final List<Tree> _sortedTrees = new ArrayList<Tree>();
239+
240+ private final ThreadLocal<Map<Tree, Exchange>> _sortExchangeMapThreadLocal = new ThreadLocal<Map<Tree, Exchange>>() {
241+ @Override
242+ public Map<Tree, Exchange> initialValue() {
243+ return new HashMap<Tree, Exchange>();
244+ }
245+ };
246+
247+ private Volume _currentSortVolume;
248+ private int _nextDirectoryIndex;
249+
250+ private final Comparator<Tree> _defaultTreeComparator = new Comparator<Tree>() {
251+ @Override
252+ public int compare(final Tree a, final Tree b) {
253+ if (a == b) {
254+ return 0;
255+ }
256+ if (a.getVolume() == b.getVolume()) {
257+ return a.getName().compareTo(b.getName());
258+ } else {
259+ return a.getVolume().getName().compareTo(b.getVolume().getName());
260+ }
261+ }
262+
263+ @Override
264+ public boolean equals(final Object obj) {
265+ return this == obj;
266+ }
267+ };
268+
269+ private class Node implements Comparable<Node> {
270+ final Volume _volume;
271+ int _treeListIndex = -1;
272+ Exchange _exchange = null;
273+ Tree _currentTree = null;
274+ Node _duplicate;
275+
276+ private Node(final Volume volume) {
277+ _volume = volume;
278+ }
279+
280+ private boolean next() throws PersistitException {
281+ for (;;) {
282+ if (_exchange == null) {
283+ _treeListIndex++;
284+ if (_treeListIndex >= _sortedTrees.size()) {
285+ _volume.close();
286+ return false;
287+ }
288+ final String tempTreeName = "_" + _sortedTrees.get(_treeListIndex).getHandle();
289+ final Tree sortTree = _volume.getTree(tempTreeName, false);
290+ if (sortTree == null) {
291+ continue;
292+ }
293+ _exchange = new Exchange(sortTree);
294+ _currentTree = _sortedTrees.get(_treeListIndex);
295+ }
296+ if (_exchange.next(true)) {
297+ return true;
298+ }
299+ _exchange = null;
300+ }
301+ }
302+
303+ @Override
304+ public int compareTo(final Node node) {
305+ if (_exchange == null) {
306+ return node._exchange == null ? 0 : -1;
307+ }
308+ final int treeComparison = getTreeComparator().compare(_currentTree, node._currentTree);
309+ if (treeComparison != 0) {
310+ return treeComparison;
311+ }
312+ final Key k1 = _exchange.getKey();
313+ final Key k2 = node._exchange.getKey();
314+ return k1.compareTo(k2);
315+ }
316+
317+ @Override
318+ public String toString() {
319+ Node n = this;
320+ final StringBuilder sb = new StringBuilder();
321+ while (n != null) {
322+ if (sb.length() > 0) {
323+ sb.append(",");
324+ }
325+ if (n._exchange == null) {
326+ sb.append("<null>");
327+ } else {
328+ sb.append("<"
329+ + (n._currentTree == null ? "?" : n._currentTree.getName() + n._exchange.getKey() + "="
330+ + n._exchange.getValue()) + ">");
331+ }
332+ n = n._duplicate;
333+ }
334+ return sb.toString();
335+ }
336+ }
337+
338+ public TreeBuilder(final Persistit persistit) {
339+ this(persistit, DEFAULT_NAME, -1, DEFAULT_BUFFER_POOL_FRACTION);
340+ }
341+
342+ public TreeBuilder(final Persistit persistit, final String name, final int pageSize, final float bufferPoolFraction) {
343+ _name = name;
344+ _persistit = persistit;
345+ _pageSize = pageSize == -1 ? computePageSize(persistit) : pageSize;
346+ final int bufferCount = _persistit.getBufferPool(_pageSize).getBufferCount();
347+ _pageLimit = (int) (bufferCount * bufferPoolFraction);
348+ }
349+
350+ private int computePageSize(final Persistit persistit) {
351+ int pageSize = persistit.getConfiguration().getTmpVolPageSize();
352+ if (pageSize == 0) {
353+ for (final int size : persistit.getBufferPoolHashMap().keySet()) {
354+ if (size > pageSize) {
355+ pageSize = size;
356+ }
357+ }
358+ }
359+ return pageSize;
360+ }
361+
362+ public final String getName() {
363+ return _name;
364+ }
365+
366+ public final void setReportKeyCountMultiple(final long multiple) {
367+ _reportKeyCountMultiple = Util.rangeCheck(multiple, 1, Long.MAX_VALUE);
368+ }
369+
370+ public final long getReportKeyCountMultiple() {
371+ return _reportKeyCountMultiple;
372+ }
373+
374+ public final synchronized int getSortVolumeCount() {
375+ return _sortVolumes.size();
376+ }
377+
378+ public final List<Tree> getTrees() {
379+ final List<Tree> list = new ArrayList<Tree>();
380+ list.addAll(_allTrees);
381+ Collections.sort(list, getTreeComparator());
382+ return list;
383+ }
384+
385+ public final void setSortTreeDirectories(final List<File> directories) throws Exception {
386+ if (directories == null || directories.isEmpty()) {
387+ synchronized (this) {
388+ _directories.clear();
389+ }
390+ } else {
391+ /*
392+ * Make sure all supplied items are directories
393+ */
394+ for (final File file : directories) {
395+ if (file.exists() && !file.isDirectory()) {
396+ throw new IllegalArgumentException(file + " is not a directory");
397+ }
398+ }
399+ /*
400+ * Make sure all directories exist
401+ */
402+ for (final File file : directories) {
403+ if (!file.exists() && !file.mkdirs()) {
404+ throw new IllegalArgumentException(file + " could not be created as a new directory");
405+ }
406+ }
407+ /*
408+ * Make sure all directories permit creation of a new file
409+ */
410+ for (final File file : directories) {
411+ final File temp = File.createTempFile(VolumeStorageT2.TEMP_FILE_PREFIX, null, file);
412+ temp.delete();
413+ }
414+ synchronized (this) {
415+ _directories.clear();
416+ _directories.addAll(directories);
417+ _nextDirectoryIndex = 0;
418+ }
419+ }
420+ }
421+
422+ public final List<File> getSortTreeDirectories() {
423+ return Collections.unmodifiableList(_directories);
424+ }
425+
426+ public final void store(final Exchange exchange) throws Exception {
427+ store(exchange.getTree(), exchange.getKey(), exchange.getValue());
428+ }
429+
430+ public final void store(final Tree tree, final Key key, final Value value) throws Exception {
431+ final Map<Tree, Exchange> map = _sortExchangeMapThreadLocal.get();
432+ Exchange ex = map.get(tree);
433+ if (ex == null || ex.getTree().getVolume().getNextAvailablePage() > _pageLimit) {
434+ final Volume newSortVolume = getSortVolume();
435+ final String tempTreeName = "_" + _persistit.getJournalManager().handleForTree(tree);
436+ ex = _persistit.getExchange(newSortVolume, tempTreeName, true);
437+ map.put(tree, ex);
438+ synchronized (this) {
439+ _allTrees.add(tree);
440+ }
441+ }
442+ key.copyTo(ex.getKey());
443+ value.copyTo(ex.getValue());
444+
445+ ex.fetchAndStore();
446+ boolean stored = true;
447+ if (ex.getValue().isDefined()) {
448+ if (!duplicateKeyDetected(ex.getTree(), ex.getKey(), ex.getValue(), value)) {
449+ stored = false;
450+ ex.store();
451+ }
452+ }
453+ if (stored) {
454+ final long count = _keyCount.incrementAndGet();
455+ if ((count % _reportKeyCountMultiple) == 0) {
456+ reportSorted(count);
457+ }
458+ }
459+ }
460+
461+ private void insertNode(final Map<Node, Node> sorted, final Node node) throws Exception {
462+ final Node other = sorted.put(node, node);
463+ if (other != null) {
464+ if (!duplicateKeyDetected(node._currentTree, node._exchange.getKey(), other._exchange.getValue(),
465+ node._exchange.getValue())) {
466+ sorted.put(node, other);
467+ final Node p = other._duplicate;
468+ other._duplicate = node;
469+ node._duplicate = p;
470+ } else {
471+ node._duplicate = other;
472+ }
473+ }
474+ }
475+
476+ /**
477+ * Merge the record previously stored in sort volumes into their destination
478+ * <code>Tree</code>s.
479+ *
480+ * @throws Exception
481+ */
482+ public synchronized void merge() throws Exception {
483+ if ((_keyCount.get() % _reportKeyCountMultiple) != 0) {
484+ reportSorted(_keyCount.get());
485+ }
486+ _keyCount.set(0);
487+ _sortedTrees.clear();
488+ _sortedTrees.addAll(_allTrees);
489+ Tree currentTree = null;
490+ Exchange ex = null;
491+ Collections.sort(_sortedTrees, getTreeComparator());
492+ final SortedMap<Node, Node> sorted = new TreeMap<Node, Node>();
493+ for (final Volume volume : _sortVolumes) {
494+ final Node node = new Node(volume);
495+ if (node.next()) {
496+ insertNode(sorted, node);
497+ }
498+ }
499+
500+ for (;;) {
501+ if (sorted.isEmpty()) {
502+ break;
503+ }
504+ Node node = sorted.firstKey();
505+ node = sorted.remove(node);
506+ if (node._currentTree != currentTree) {
507+ ex = new Exchange(node._currentTree);
508+ currentTree = node._currentTree;
509+ }
510+
511+ node._exchange.getKey().copyTo(ex.getKey());
512+ node._exchange.getValue().copyTo(ex.getValue());
513+ if (beforeMergeKey(ex)) {
514+ ex.fetchAndStore();
515+ boolean stored = true;
516+ if (ex.getValue().isDefined()) {
517+ if (!duplicateKeyDetected(ex.getTree(), ex.getKey(), ex.getValue(), node._exchange.getValue())) {
518+ ex.store();
519+ stored = false;
520+ }
521+ }
522+ if (stored) {
523+ afterMergeKey(ex);
524+ if ((_keyCount.incrementAndGet() % _reportKeyCountMultiple) == 0) {
525+ reportMerged(_keyCount.get());
526+ }
527+ }
528+ }
529+ while (node != null) {
530+ final Node next = node._duplicate;
531+ node._duplicate = null;
532+ if (node.next()) {
533+ insertNode(sorted, node);
534+ }
535+ node = next;
536+ }
537+ }
538+ if ((_keyCount.get() % _reportKeyCountMultiple) != 0) {
539+ reportMerged(_keyCount.get());
540+ }
541+ _keyCount.set(0);
542+ _sortExchangeMapThreadLocal.get().clear();
543+ _allTrees.clear();
544+ _sortedTrees.clear();
545+ }
546+
547+ private synchronized Volume getSortVolume() throws Exception {
548+ final boolean full = _currentSortVolume != null && _currentSortVolume.getNextAvailablePage() > _pageLimit;
549+ if (full) {
550+ if (beforeSortVolumeEvicted(_currentSortVolume)) {
551+ _persistit.getBufferPool(_pageSize).evict(_currentSortVolume);
552+ }
553+ afterSortVolumeEvicted(_currentSortVolume);
554+ }
555+ if (full || _currentSortVolume == null) {
556+ _currentSortVolume = createSortVolume();
557+ _sortVolumes.add(_currentSortVolume);
558+ }
559+ return _currentSortVolume;
560+ }
561+
562+ private Volume createSortVolume() throws Exception {
563+ final File directory;
564+ if (_directories.isEmpty()) {
565+ final String directoryName = _persistit.getConfiguration().getTmpVolDir();
566+ directory = directoryName == null ? null : new File(directoryName);
567+ } else {
568+ directory = _directories.get(_nextDirectoryIndex % _directories.size());
569+ _nextDirectoryIndex++;
570+ }
571+ return Volume.createTemporaryVolume(_persistit, _pageSize, directory);
572+ }
573+
574+ /**
575+ * This method may be extended to provide an application-specific ordering
576+ * on <code>Tree</code>s. This ordering determines the sequence in which
577+ * destination trees are built from the sort data. By default trees are
578+ * build in alphabetical order by volume and tree name. However, an
579+ * application may choose a different order to ensure invariants for
580+ * concurrent use.
581+ *
582+ * @return a <code>java.util.Comparator</code> on <code>Tree</code>
583+ */
584+ protected Comparator<Tree> getTreeComparator() {
585+ return _defaultTreeComparator;
586+ }
587+
588+ /**
589+ * This method may be extended to provide application-specific behavior when
590+ * a sort volume has been filled to capacity. The default implementation
591+ * return <code>true</code>. If this method returns <code>true</code>,
592+ * <code>TreeBuilder</code> evicts the <code>Volume</code> to avoid
593+ * over-running the <code>BufferPool</code> and then starts a new sort tree
594+ * if a new record is subsequently stored.
595+ *
596+ * @param volume
597+ * The temporary <code>Volume</code> that has been filled
598+ * @return <code>true</code> to cause the current sort volume to be evicted
599+ * from the <code>BufferPool</code>
600+ * @throws Exception
601+ */
602+ protected boolean beforeSortVolumeEvicted(final Volume volume) throws Exception {
603+ return true;
604+ }
605+
606+ /**
607+ * This method may be extended to provide application-specific reporting
608+ * functionality after a sort volume has been filled to capacity and has
609+ * been evicted. An application may also modify the temporary directory set
610+ * via {@link #setSortTreeDirectories(List)} within this method if necessary
611+ * to adjust disk space utilization, for example. The default behavior of
612+ * this method is to do nothing.
613+ *
614+ * @param volume
615+ * The temporary <code>Volume</code> that has been filled
616+ * @throws Exception
617+ */
618+ protected void afterSortVolumeEvicted(final Volume volume) throws Exception {
619+
620+ }
621+
622+ /**
623+ * This method may be extended to provide application-specific behavior when
624+ * an attempt is made to merge records with duplicate keys. The default
625+ * behavior is to throw a {@link DuplicateKeyException}.
626+ *
627+ * @param tree
628+ * @param key
629+ * @param v1
630+ * @param v2
631+ * @return
632+ * @throws DuplicateKeyException
633+ * if a key being inserted or merged matches a key already
634+ * exists
635+ * @throws Exception
636+ */
637+ protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2)
638+ throws Exception {
639+ throw new DuplicateKeyException(String.format("Tree=%s Key=%s", tree, key));
640+ }
641+
642+ /**
643+ * This method may be extended to provide alternative functionality. The
644+ * default implementation merely returns <code>true</code> which signifies
645+ * that the key-value pair represented in the <code>Exchange</code> should
646+ * be merged into the destination <code>Tree</code>. A custom implementation
647+ * could be used to filter out unwanted records or to emit records to a
648+ * different destination.
649+ *
650+ * @param exchange
651+ * represents the key-value pair proposed for merging
652+ * @return <code>true</code> to allow the record to be merged
653+ * @throws Exception
654+ */
655+ protected boolean beforeMergeKey(final Exchange exchange) throws Exception {
656+ return true;
657+ }
658+
659+ /**
660+ * This method may be extended to provide custom behavior after merging one
661+ * record. The default implementation does nothing. This method is called
662+ * only if the corresponding call to {@link #beforeMergeKey(Exchange)}
663+ * returned <code>true</code>.
664+ *
665+ * @param exchange
666+ * represents the key-value pair that was merged.
667+ * @throws Exception
668+ */
669+ protected void afterMergeKey(final Exchange exchange) throws Exception {
670+
671+ }
672+
673+ /**
674+ * This method may be extended to provide application-specific progress
675+ * reports. By default it does nothing. This method is called after
676+ * inserting a number of records into sort trees. The method
677+ * {@link #setReportKeyCountMultiple(long)} determines the frequency at
678+ * which this method is called.
679+ *
680+ * @param count
681+ * The total number of recirds that has been merged so far.
682+ */
683+ protected void reportSorted(final long count) {
684+
685+ }
686+
687+ /**
688+ * This method may be extended to provide application-specific progress
689+ * reports. By default it does nothing. This method is called after merging
690+ * a number of records into destination trees. The method
691+ * {@link #setReportKeyCountMultiple(long)} determines the frequency at
692+ * which this method is called.
693+ *
694+ * @param count
695+ * The total number of recirds that has been merged so far.
696+ */
697+ protected void reportMerged(final long count) {
698+
699+ }
700+
701+ void unitTestNextSortVolume() {
702+ _sortExchangeMapThreadLocal.get().clear();
703+ _currentSortVolume = null;
704+ }
705+}
706
707=== modified file 'src/main/java/com/persistit/Volume.java'
708--- src/main/java/com/persistit/Volume.java 2012-10-06 02:44:20 +0000
709+++ src/main/java/com/persistit/Volume.java 2012-12-31 21:06:22 +0000
710@@ -78,11 +78,12 @@
711 return false;
712 }
713
714- static Volume createTemporaryVolume(final Persistit persistit, final int pageSize) throws PersistitException {
715+ static Volume createTemporaryVolume(final Persistit persistit, final int pageSize, final File tempDirectory)
716+ throws PersistitException {
717 final Volume volume = new Volume(
718 Thread.currentThread().getName() + TEMP_VOLUME_NAME_SUFFIX_FOR_FIXUP_DETECTION,
719 TEMP_VOLUME_ID_FOR_FIXUP_DETECTION);
720- volume.openTemporary(persistit, pageSize);
721+ volume.openTemporary(persistit, pageSize, tempDirectory);
722 return volume;
723 }
724
725@@ -473,7 +474,8 @@
726 persistit.addVolume(this);
727 }
728
729- void openTemporary(final Persistit persistit, final int pageSize) throws PersistitException {
730+ void openTemporary(final Persistit persistit, final int pageSize, final File tempDirectory)
731+ throws PersistitException {
732 checkClosing();
733 if (_storage != null) {
734 throw new IllegalStateException("This volume has already been opened");
735@@ -483,7 +485,7 @@
736 }
737
738 _structure = new VolumeStructure(persistit, this, pageSize);
739- _storage = new VolumeStorageT2(persistit, this);
740+ _storage = new VolumeStorageT2(persistit, this, tempDirectory);
741 _statistics = new VolumeStatistics();
742
743 _storage.create();
744
745=== modified file 'src/main/java/com/persistit/VolumeStorageT2.java'
746--- src/main/java/com/persistit/VolumeStorageT2.java 2012-08-24 13:57:19 +0000
747+++ src/main/java/com/persistit/VolumeStorageT2.java 2012-12-31 21:06:22 +0000
748@@ -43,8 +43,9 @@
749 */
750 class VolumeStorageT2 extends VolumeStorage {
751
752- private final static String TEMP_FILE_PREFIX = "persistit_tempvol_";
753+ final static String TEMP_FILE_PREFIX = "persistit_tempvol_";
754 private final static String TEMP_FILE_UNCREATED_NAME = "temp_volume_file_not_created_yet";
755+ private final File _tempDirectory;
756 private long _maxPages;
757 private volatile String _path;
758 private volatile FileChannel _channel;
759@@ -53,8 +54,9 @@
760 private volatile boolean _opened;
761 private volatile boolean _closed;
762
763- VolumeStorageT2(final Persistit persistit, final Volume volume) {
764+ VolumeStorageT2(final Persistit persistit, final Volume volume, final File tempDirectory) {
765 super(persistit, volume);
766+ _tempDirectory = tempDirectory;
767 }
768
769 /**
770@@ -94,9 +96,7 @@
771 synchronized FileChannel getChannel() throws PersistitIOException {
772 if (_channel == null) {
773 try {
774- final String directoryName = _persistit.getConfiguration().getTmpVolDir();
775- final File directory = directoryName == null ? null : new File(directoryName);
776- final File file = File.createTempFile(TEMP_FILE_PREFIX, null, directory);
777+ final File file = File.createTempFile(TEMP_FILE_PREFIX, null, _tempDirectory);
778 _path = file.getPath();
779 _channel = new MediatedFileChannel(_path, "rw");
780 } catch (final IOException ioe) {
781
782=== added file 'src/main/java/com/persistit/exception/DuplicateKeyException.java'
783--- src/main/java/com/persistit/exception/DuplicateKeyException.java 1970-01-01 00:00:00 +0000
784+++ src/main/java/com/persistit/exception/DuplicateKeyException.java 2012-12-31 21:06:22 +0000
785@@ -0,0 +1,36 @@
786+/**
787+ * Copyright © 2005-2012 Akiban Technologies, Inc. All rights reserved.
788+ *
789+ * This program and the accompanying materials are made available
790+ * under the terms of the Eclipse Public License v1.0 which
791+ * accompanies this distribution, and is available at
792+ * http://www.eclipse.org/legal/epl-v10.html
793+ *
794+ * This program may also be available under different license terms.
795+ * For more information, see www.akiban.com or contact licensing@akiban.com.
796+ *
797+ * Contributors:
798+ * Akiban Technologies, Inc.
799+ */
800+
801+package com.persistit.exception;
802+
803+/**
804+ * Thrown by {@link com.persistit.TreeBuilder} on an attempt to insert duplicate
805+ * keys.
806+ *
807+ * @version 1.0
808+ */
809+public class DuplicateKeyException extends PersistitException {
810+
811+ private static final long serialVersionUID = 3002304732225440553L;
812+
813+ public DuplicateKeyException() {
814+ super();
815+ }
816+
817+ public DuplicateKeyException(final String msg) {
818+ super(msg);
819+ }
820+
821+}
822
823=== modified file 'src/test/java/com/persistit/BufferPoolTest.java'
824--- src/test/java/com/persistit/BufferPoolTest.java 2012-08-24 13:57:19 +0000
825+++ src/test/java/com/persistit/BufferPoolTest.java 2012-12-31 21:06:22 +0000
826@@ -163,4 +163,25 @@
827 }
828 }
829
830+ @Test
831+ public void testEvictVoume() throws Exception {
832+ final Volume vol = _persistit.createTemporaryVolume();
833+ final Exchange ex = _persistit.getExchange(vol, "BufferPoolTest", true);
834+ _persistit.flush();
835+ ex.getValue().put(RED_FOX);
836+ int i;
837+ for (i = 1;; i++) {
838+ ex.to(i).store();
839+ if (vol.getNextAvailablePage() >= 10) {
840+ break;
841+ }
842+ }
843+ vol.getPool().evict(vol);
844+ assertTrue("Should be no remaining dirty buffers", vol.getPool().getDirtyPageCount() == 0);
845+ for (int j = 0; j < i + 100; j++) {
846+ ex.to(j).fetch();
847+ assertEquals(j >= 1 && j <= i, ex.getValue().isDefined());
848+ }
849+ }
850+
851 }
852
853=== modified file 'src/test/java/com/persistit/StressRunner.java'
854--- src/test/java/com/persistit/StressRunner.java 2012-11-18 16:28:15 +0000
855+++ src/test/java/com/persistit/StressRunner.java 2012-12-31 21:06:22 +0000
856@@ -25,6 +25,7 @@
857
858 import com.persistit.stress.AbstractSuite;
859 import com.persistit.stress.AccumulatorRestartSuite;
860+import com.persistit.stress.InsertBigLoad;
861 import com.persistit.stress.InsertUUIDs;
862 import com.persistit.stress.Mixture1;
863 import com.persistit.stress.Mixture2;
864@@ -71,6 +72,7 @@
865 static {
866 _classes.add(AccumulatorRestartSuite.class);
867 _classes.add(InsertUUIDs.class);
868+ _classes.add(InsertBigLoad.class);
869 _classes.add(Mixture1.class);
870 _classes.add(Mixture2.class);
871 _classes.add(Mixture3.class);
872
873=== added file 'src/test/java/com/persistit/TreeBuilderTest.java'
874--- src/test/java/com/persistit/TreeBuilderTest.java 1970-01-01 00:00:00 +0000
875+++ src/test/java/com/persistit/TreeBuilderTest.java 2012-12-31 21:06:22 +0000
876@@ -0,0 +1,203 @@
877+/**
878+ * Copyright © 2011-2012 Akiban Technologies, Inc. All rights reserved.
879+ *
880+ * This program and the accompanying materials are made available
881+ * under the terms of the Eclipse Public License v1.0 which
882+ * accompanies this distribution, and is available at
883+ * http://www.eclipse.org/legal/epl-v10.html
884+ *
885+ * This program may also be available under different license terms.
886+ * For more information, see www.akiban.com or contact licensing@akiban.com.
887+ *
888+ * Contributors:
889+ * Akiban Technologies, Inc.
890+ */
891+
892+package com.persistit;
893+
894+import static com.persistit.unit.UnitTestProperties.VOLUME_NAME;
895+import static org.junit.Assert.assertEquals;
896+
897+import java.util.ArrayList;
898+import java.util.Collections;
899+import java.util.List;
900+import java.util.concurrent.atomic.AtomicBoolean;
901+import java.util.concurrent.atomic.AtomicInteger;
902+
903+import org.junit.Test;
904+
905+public class TreeBuilderTest extends PersistitUnitTestCase {
906+ private final static int COUNT = 10000;
907+
908+ @Test
909+ public void basicTest() throws Exception {
910+
911+ final TreeBuilder tb = new TreeBuilder(_persistit) {
912+ @Override
913+ protected void reportSorted(final long count) {
914+ System.out.println("Sorted " + count);
915+ }
916+
917+ @Override
918+ protected void reportMerged(final long count) {
919+ System.out.println("Merged " + count);
920+ }
921+
922+ };
923+ tb.setReportKeyCountMultiple(COUNT / 2);
924+
925+ final List<Integer> shuffled = new ArrayList<Integer>(COUNT);
926+ for (int i = 0; i < COUNT; i++) {
927+ shuffled.add(i);
928+ }
929+ Collections.shuffle(shuffled);
930+ final Exchange a = _persistit.getExchange(VOLUME_NAME, "a", true);
931+ final Exchange b = _persistit.getExchange(VOLUME_NAME, "b", true);
932+ final Exchange c = _persistit.getExchange(VOLUME_NAME, "c", true);
933+
934+ for (int i = 0; i < COUNT; i++) {
935+ final int ka = shuffled.get(i);
936+ final int kb = shuffled.get((i + COUNT / 3) % COUNT);
937+ final int kc = shuffled.get((i + (2 * COUNT) / 3) % COUNT);
938+ a.clear().append(ka).append("a");
939+ a.getValue().put(i);
940+ b.clear().append(kb).append("b");
941+ b.getValue().put(i);
942+ c.clear().append(kc).append("c");
943+ c.getValue().put(i);
944+ tb.store(a);
945+ tb.store(b);
946+ tb.store(c);
947+
948+ }
949+
950+ tb.merge();
951+
952+ a.clear();
953+ b.clear();
954+ c.clear();
955+ int count = 0;
956+ while (a.next(true) && b.next(true) && c.next(true)) {
957+ assertEquals("Expect correct key value", count, a.getKey().decodeInt());
958+ assertEquals("Expect correct key value", count, b.getKey().decodeInt());
959+ assertEquals("Expect correct key value", count, c.getKey().decodeInt());
960+ count++;
961+ }
962+ assertEquals("Expect every key value", COUNT, count);
963+
964+ }
965+
966+ @Test
967+ public void customizationMethods() throws Exception {
968+ final AtomicBoolean doReplace = new AtomicBoolean();
969+ final AtomicInteger duplicateCount = new AtomicInteger();
970+ final AtomicInteger beforeMergeCount = new AtomicInteger();
971+ final AtomicInteger afterMergeCount = new AtomicInteger();
972+
973+ final TreeBuilder tb = new TreeBuilder(_persistit) {
974+
975+ @Override
976+ protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) {
977+ duplicateCount.incrementAndGet();
978+ return doReplace.get();
979+ }
980+
981+ @Override
982+ protected boolean beforeMergeKey(final Exchange ex) throws Exception {
983+ beforeMergeCount.incrementAndGet();
984+ return ex.getKey().decodeInt() != 3;
985+ }
986+
987+ @Override
988+ protected void afterMergeKey(final Exchange ex) throws Exception {
989+ afterMergeCount.incrementAndGet();
990+ }
991+
992+ };
993+ final Exchange ex = _persistit.getExchange(VOLUME_NAME, "a", true);
994+
995+ doReplace.set(true);
996+ ex.to(1).getValue().put("abc");
997+ tb.store(ex);
998+ ex.to(1).getValue().put("def");
999+ tb.store(ex);
1000+ assertEquals("Should have registered a dup", 1, duplicateCount.get());
1001+
1002+ doReplace.set(false);
1003+ ex.to(2).getValue().put("abc");
1004+ tb.store(ex);
1005+ ex.to(2).getValue().put("def");
1006+ tb.store(ex);
1007+ assertEquals("Should have registered a dup", 2, duplicateCount.get());
1008+
1009+ tb.unitTestNextSortVolume();
1010+
1011+ ex.to(1).getValue().put("ghi");
1012+ tb.store(ex);
1013+ ex.to(2).getValue().put("ghi");
1014+ tb.store(ex);
1015+ assertEquals("Should not have registered a dup yet", 2, duplicateCount.get());
1016+
1017+ ex.to(3).getValue().put("abc");
1018+ tb.store(ex);
1019+
1020+ doReplace.set(false);
1021+ tb.merge();
1022+
1023+ assertEquals("Should have registered two dups", 4, duplicateCount.get());
1024+ assertEquals("beforeMergeKey should be thrice", 3, beforeMergeCount.get());
1025+ assertEquals("afterMergeKey should be called twice", 2, afterMergeCount.get());
1026+
1027+ final StringBuilder result = new StringBuilder();
1028+ ex.clear().append(Key.BEFORE);
1029+ while (ex.next(true)) {
1030+ result.append(String.format("%s=%s,", ex.getKey(), ex.getValue()));
1031+ }
1032+ assertEquals("Expected result", "{1}=\"def\",{2}=\"abc\",", result.toString());
1033+ }
1034+
1035+ @Test
1036+ public void duplicatePriority() throws Exception {
1037+ final TreeBuilder tb = new TreeBuilder(_persistit) {
1038+
1039+ @Override
1040+ protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) {
1041+ final String s1 = v1.getString();
1042+ final String s2 = v2.getString();
1043+ return s1.compareTo(s2) < 0;
1044+ }
1045+
1046+ };
1047+ final Exchange ex = _persistit.getExchange(VOLUME_NAME, "a", true);
1048+ final String nul = null;
1049+
1050+ insertKeys(ex, tb, "x", "m", "n", nul, "a", nul, "q");
1051+ tb.unitTestNextSortVolume();
1052+ insertKeys(ex, tb, nul, "t", "o", "r", nul, nul, nul);
1053+ tb.unitTestNextSortVolume();
1054+ insertKeys(ex, tb, nul, "u", "m", "j", "c", "x", "l");
1055+ tb.unitTestNextSortVolume();
1056+ insertKeys(ex, tb, nul, "m", nul, nul, "a", nul, "q");
1057+
1058+ tb.merge();
1059+
1060+ final StringBuilder result = new StringBuilder();
1061+ for (int i = 0; i < 10; i++) {
1062+ ex.to(i).fetch();
1063+ if (ex.isValueDefined()) {
1064+ result.append(ex.getValue().getString());
1065+ }
1066+ }
1067+ assertEquals("xuorcxq", result.toString());
1068+ }
1069+
1070+ private void insertKeys(final Exchange ex, final TreeBuilder tb, final String... args) throws Exception {
1071+ for (int i = 0; i < args.length; i++) {
1072+ if (args[i] != null) {
1073+ ex.to(i).getValue().put(args[i]);
1074+ tb.store(ex);
1075+ }
1076+ }
1077+ }
1078+
1079+}
1080
1081=== modified file 'src/test/java/com/persistit/stress/InsertBigLoad.java'
1082--- src/test/java/com/persistit/stress/InsertBigLoad.java 2012-11-20 17:45:51 +0000
1083+++ src/test/java/com/persistit/stress/InsertBigLoad.java 2012-12-31 21:06:22 +0000
1084@@ -39,7 +39,7 @@
1085
1086 deleteFiles(substitute("$datapath$/persistit*"));
1087
1088- add(new BigLoad("records=10000000 buckets=10"));
1089+ add(new BigLoad("records=10000000"));
1090
1091 final Configuration config = makeConfiguration(16384, "50000", CommitPolicy.SOFT);
1092 config.setTmpVolMaxSize(100000000000l);
1093
1094=== modified file 'src/test/java/com/persistit/stress/unit/BigLoad.java'
1095--- src/test/java/com/persistit/stress/unit/BigLoad.java 2012-11-20 17:45:51 +0000
1096+++ src/test/java/com/persistit/stress/unit/BigLoad.java 2012-12-31 21:06:22 +0000
1097@@ -16,14 +16,13 @@
1098 package com.persistit.stress.unit;
1099
1100 import java.util.Random;
1101-import java.util.SortedMap;
1102-import java.util.TreeMap;
1103
1104 import com.persistit.Exchange;
1105 import com.persistit.Key;
1106 import com.persistit.Persistit;
1107-import com.persistit.Volume;
1108-import com.persistit.exception.PersistitException;
1109+import com.persistit.Tree;
1110+import com.persistit.TreeBuilder;
1111+import com.persistit.Value;
1112 import com.persistit.stress.AbstractStressTest;
1113 import com.persistit.util.ArgParser;
1114 import com.persistit.util.Util;
1115@@ -50,50 +49,48 @@
1116 private static final Random RANDOM = new Random();
1117
1118 private int totalRecords;
1119- private int recordsPerBucket;
1120-
1121- /**
1122- * A Comparable wrapper for an Exchange. An instance of this class may be
1123- * held in a SortedMap only if the Key of the Exchange does not change. In
1124- * this example, the ComparableExchangeHolder is always removed from the
1125- * TreeMap before the Key changes and then reinserted into a new location
1126- * after the key has changed.
1127- */
1128- static class ComparableExchangeHolder implements Comparable<ComparableExchangeHolder> {
1129-
1130- final Exchange exchange;
1131-
1132- ComparableExchangeHolder(final Exchange exchange) {
1133- this.exchange = exchange;
1134- }
1135-
1136- @Override
1137- public int compareTo(final ComparableExchangeHolder ceh) {
1138- final Key k1 = exchange.getKey();
1139- final Key k2 = ceh.exchange.getKey();
1140- return k1.compareTo(k2);
1141- }
1142- }
1143
1144 public BigLoad(final int totalRecords, final int buckets) {
1145 super("");
1146 this.totalRecords = totalRecords;
1147- this.recordsPerBucket = totalRecords / buckets;
1148 }
1149
1150- public void load(final Persistit db) throws PersistitException {
1151+ public void load(final Persistit db) throws Exception {
1152 final long startLoadTime = System.nanoTime();
1153- final Volume sortVolume = db.createTemporaryVolume();
1154+ final TreeBuilder tb = new TreeBuilder(db) {
1155+ @Override
1156+ protected void reportSorted(final long count) {
1157+ System.out.printf("Sorted %,15d records\n", count);
1158+ }
1159+
1160+ @Override
1161+ protected void reportMerged(final long count) {
1162+ System.out.printf("Merged %,15d records\n", count);
1163+ }
1164+
1165+ @Override
1166+ protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) {
1167+ System.out.println("Duplicate key detected: " + key);
1168+ return true;
1169+ }
1170+ };
1171 final Exchange resultExchange = db.getExchange("persistit", "sorted", true);
1172- System.out.printf("Loading %,d records into %,d buckets\n", totalRecords, totalRecords / recordsPerBucket);
1173- final int bucketCount = loadBuckets(db, sortVolume);
1174+ System.out.printf("Loading %,d records\n", totalRecords);
1175+
1176+ for (int i = 0; i < totalRecords; i++) {
1177+ resultExchange.clear().append(randomKey());
1178+ tb.store(resultExchange);
1179+ }
1180 final long endLoadTime = System.nanoTime();
1181-
1182- System.out.printf("Merging %,d records from %,d buckets into main database\n", totalRecords, bucketCount);
1183- mergeBuckets(db, bucketCount, sortVolume, resultExchange);
1184+ System.out.printf("Loaded %,d records into %,d buckets in %,dms\n", totalRecords, tb.getSortVolumeCount(),
1185+ (endLoadTime - startLoadTime) / Util.NS_PER_MS);
1186+
1187+ System.out.printf("Merging %,d records into main database\n", totalRecords);
1188+
1189+ tb.merge();
1190 final long endMergeTime = System.nanoTime();
1191 System.out.printf("Merged %,d records in %,dms\n", totalRecords, (endMergeTime - endLoadTime) / Util.NS_PER_MS);
1192- sortVolume.close();
1193+
1194 System.out.printf("Counting keys in main database (100M keys per dot) ");
1195 resultExchange.clear().append(Key.BEFORE);
1196 long count = 0;
1197@@ -111,69 +108,16 @@
1198 (endCountTime - startLoadTime) / Util.NS_PER_MS);
1199 }
1200
1201- private int loadBuckets(final Persistit db, final Volume volume) throws PersistitException {
1202- long bucketStartTime = 0;
1203- int bucket = 0;
1204- Exchange ex = null;
1205- for (int i = 0; i < totalRecords; i++) {
1206- if ((i % recordsPerBucket) == 0) {
1207- final long now = System.nanoTime();
1208- if (i > 0) {
1209- System.out.printf("Loaded bucket %,5d in %,12dms\n", bucket, (now - bucketStartTime)
1210- / Util.NS_PER_MS);
1211- }
1212- bucketStartTime = now;
1213- bucket++;
1214- ex = db.getExchange(volume, "sort" + bucket, true);
1215- }
1216- ex.clear().append(randomKey());
1217- ex.store();
1218- }
1219- return bucket;
1220- }
1221-
1222- private void mergeBuckets(final Persistit db, final int bucketCount, final Volume sortVolume, final Exchange to)
1223- throws PersistitException {
1224- final long startLoadTime = System.nanoTime();
1225- int loaded = 0;
1226-
1227- final SortedMap<ComparableExchangeHolder, Integer> sortMap = new TreeMap<ComparableExchangeHolder, Integer>();
1228- /*
1229- * Load the sortMap using as keys the first key of each bucket.
1230- */
1231- for (int bucket = 1; bucket <= bucketCount; bucket++) {
1232- final Exchange ex = db.getExchange(sortVolume, "sort" + bucket, false);
1233- if (ex.append(Key.BEFORE).next()) {
1234- final Integer duplicate = sortMap.put(new ComparableExchangeHolder(ex), bucket);
1235- showDuplicate(duplicate, bucket, ex);
1236- }
1237- }
1238-
1239- while (!sortMap.isEmpty()) {
1240- final ComparableExchangeHolder ceh = sortMap.firstKey();
1241- final int bucket = sortMap.remove(ceh);
1242- ceh.exchange.getKey().copyTo(to.getKey());
1243- if (ceh.exchange.next()) {
1244- final Integer duplicate = sortMap.put(ceh, bucket);
1245- showDuplicate(duplicate, bucket, ceh.exchange);
1246- }
1247- to.store();
1248- if ((++loaded % 10000000) == 0) {
1249- System.out.printf("Merged %,d records in %,dms\n", loaded, (System.nanoTime() - startLoadTime)
1250- / Util.NS_PER_MS);
1251- }
1252- }
1253- }
1254+ final StringBuilder sb = new StringBuilder(
1255+ "00000000000000000000xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
1256
1257 private String randomKey() {
1258- return String.format("%020dxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
1259- (RANDOM.nextLong() & Long.MAX_VALUE));
1260- }
1261-
1262- private void showDuplicate(final Integer bucket1, final int bucket2, final Exchange ex) {
1263- if (bucket1 != null) {
1264- System.out.printf("Duplicate key %s in buckets %,d and %,d\n", ex.getKey(), bucket1, bucket2);
1265+ long n = RANDOM.nextLong() & Long.MAX_VALUE;
1266+ for (int i = 20; --i >= 0;) {
1267+ sb.setCharAt(i, (char) ((n % 10) + '0'));
1268+ n /= 10;
1269 }
1270+ return sb.toString();
1271 }
1272
1273 /**
1274@@ -220,7 +164,7 @@
1275 }
1276
1277 private final static String[] ARGS_TEMPLATE = { "records|int:1000000:1:1000000000|Total records to create",
1278- "buckets|int:100:1:1000000|Number of sort buckets", "tmpdir|string:|Temporary volume path" };
1279+ "tmpdir|string:|Temporary volume path" };
1280
1281 /**
1282 * Method to parse stress test arguments passed by the stress test suite.
1283@@ -230,7 +174,6 @@
1284 super.setUp();
1285 final ArgParser ap = new ArgParser("com.persistit.BigLoad", _args, ARGS_TEMPLATE).strict();
1286 totalRecords = ap.getIntValue("records");
1287- recordsPerBucket = totalRecords / ap.getIntValue("buckets");
1288 final String path = ap.getStringValue("tmpdir");
1289 if (path != null && !path.isEmpty()) {
1290 getPersistit().getConfiguration().setTmpVolDir(path);

Subscribers

People subscribed via source and target branches