Merge lp:~pbeaman/akiban-persistit/tree_builder into lp:akiban-persistit
- tree_builder
- Merge into trunk
Proposed by
Peter Beaman
Status: | Merged |
---|---|
Approved by: | Nathan Williams |
Approved revision: | 413 |
Merged at revision: | 407 |
Proposed branch: | lp:~pbeaman/akiban-persistit/tree_builder |
Merge into: | lp:akiban-persistit |
Diff against target: |
1290 lines (+954/-114) 13 files modified
doc/Miscellaneous.rst (+5/-0) pom.xml (+1/-1) src/main/java/com/persistit/BufferPool.java (+10/-3) src/main/java/com/persistit/Persistit.java (+3/-1) src/main/java/com/persistit/TreeBuilder.java (+619/-0) src/main/java/com/persistit/Volume.java (+6/-4) src/main/java/com/persistit/VolumeStorageT2.java (+5/-5) src/main/java/com/persistit/exception/DuplicateKeyException.java (+36/-0) src/test/java/com/persistit/BufferPoolTest.java (+21/-0) src/test/java/com/persistit/StressRunner.java (+2/-0) src/test/java/com/persistit/TreeBuilderTest.java (+203/-0) src/test/java/com/persistit/stress/InsertBigLoad.java (+1/-1) src/test/java/com/persistit/stress/unit/BigLoad.java (+42/-99) |
To merge this branch: | bzr merge lp:~pbeaman/akiban-persistit/tree_builder |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Akiban Build User | Needs Fixing | ||
Nathan Williams | Approve | ||
Review via email: mp+141536@code.launchpad.net |
Commit message
Description of the change
Adds new com.persistit.
This branch also:
adds TreeBuilderTest
bumps version number
adds new BufferPool#
modifies method for setting up temporary volumes so that a directory can be supplied (used by TreeBuilder)
adds reference to TreeBuilder to Miscellaneous.rst
ReleaseNotes.rst are not modified yet
To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote : | # |
Revision history for this message
Nathan Williams (nwilliams) wrote : | # |
Nothing jumps out after a second quick pass. A tweak or two after server usage gets fleshed out will probably happen, so full steam ahead.
review:
Approve
Revision history for this message
Akiban Build User (build-akiban) wrote : | # |
There was one failure during build/test:
* unknown exception (check log)
review:
Needs Fixing
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'doc/Miscellaneous.rst' |
2 | --- doc/Miscellaneous.rst 2012-06-11 21:25:37 +0000 |
3 | +++ doc/Miscellaneous.rst 2012-12-31 21:06:22 +0000 |
4 | @@ -5,6 +5,11 @@ |
5 | |
6 | Following are some short items you may find useful as you explore Akiban Persistit. Follow links to the API documentation for more details. |
7 | |
8 | +TreeBuilder |
9 | +----------- |
10 | + |
11 | +The class ``com.persistit.TreeBuilder`` improves performance of inserting large sets of data when the keys being inserted are non-sequential. TreeBuilder is effective if and only if the size of the data being loaded is significantly larger than the amount of memory available in the buffer pool and keys are inserted in essentially random order. |
12 | + |
13 | Histograms |
14 | ---------- |
15 | |
16 | |
17 | === modified file 'pom.xml' |
18 | --- pom.xml 2012-12-05 15:30:55 +0000 |
19 | +++ pom.xml 2012-12-31 21:06:22 +0000 |
20 | @@ -4,7 +4,7 @@ |
21 | |
22 | <groupId>com.akiban</groupId> |
23 | <artifactId>akiban-persistit</artifactId> |
24 | - <version>3.2.3-SNAPSHOT</version> |
25 | + <version>3.2.4-SNAPSHOT</version> |
26 | <packaging>jar</packaging> |
27 | |
28 | <parent> |
29 | |
30 | === modified file 'src/main/java/com/persistit/BufferPool.java' |
31 | --- src/main/java/com/persistit/BufferPool.java 2012-11-28 16:15:34 +0000 |
32 | +++ src/main/java/com/persistit/BufferPool.java 2012-12-31 21:06:22 +0000 |
33 | @@ -541,16 +541,20 @@ |
34 | * The volume |
35 | * @throws PersistitInterruptedException |
36 | */ |
37 | - boolean invalidate(final Volume volume) throws PersistitInterruptedException { |
38 | + boolean invalidate(final Volume volume) throws PersistitException { |
39 | final float ratio = (float) volume.getStorage().getNextAvailablePage() / (float) _bufferCount; |
40 | if (ratio < SMALL_VOLUME_RATIO) { |
41 | - return invalidateSmallVolume(volume); |
42 | + return invalidateSmallVolume(volume, false); |
43 | } else { |
44 | return invalidateLargeVolume(volume); |
45 | } |
46 | } |
47 | |
48 | - boolean invalidateSmallVolume(final Volume volume) throws PersistitInterruptedException { |
49 | + boolean evict(final Volume volume) throws PersistitException { |
50 | + return invalidateSmallVolume(volume, true); |
51 | + } |
52 | + |
53 | + boolean invalidateSmallVolume(final Volume volume, final boolean mustWrite) throws PersistitException { |
54 | boolean result = true; |
55 | int markedAvailable = 0; |
56 | for (long page = 1; page < volume.getStorage().getNextAvailablePage(); page++) { |
57 | @@ -565,6 +569,9 @@ |
58 | try { |
59 | if ((buffer.getVolume() == volume || volume == null) && !buffer.isFixed() |
60 | && buffer.isValid()) { |
61 | + if (buffer.isDirty()) { |
62 | + buffer.writePage(); |
63 | + } |
64 | invalidate(buffer); |
65 | invalidated = true; |
66 | } |
67 | |
68 | === modified file 'src/main/java/com/persistit/Persistit.java' |
69 | --- src/main/java/com/persistit/Persistit.java 2012-11-30 20:33:30 +0000 |
70 | +++ src/main/java/com/persistit/Persistit.java 2012-12-31 21:06:22 +0000 |
71 | @@ -1105,7 +1105,9 @@ |
72 | if (!Volume.isValidPageSize(pageSize)) { |
73 | throw new IllegalArgumentException("Invalid page size " + pageSize); |
74 | } |
75 | - return Volume.createTemporaryVolume(this, pageSize); |
76 | + final String directoryName = getConfiguration().getTmpVolDir(); |
77 | + final File directory = directoryName == null ? null : new File(directoryName); |
78 | + return Volume.createTemporaryVolume(this, pageSize, directory); |
79 | } |
80 | |
81 | /** |
82 | |
83 | === added file 'src/main/java/com/persistit/TreeBuilder.java' |
84 | --- src/main/java/com/persistit/TreeBuilder.java 1970-01-01 00:00:00 +0000 |
85 | +++ src/main/java/com/persistit/TreeBuilder.java 2012-12-31 21:06:22 +0000 |
86 | @@ -0,0 +1,619 @@ |
87 | +/** |
88 | + * Copyright © 2012 Akiban Technologies, Inc. All rights reserved. |
89 | + * |
90 | + * This program and the accompanying materials are made available |
91 | + * under the terms of the Eclipse Public License v1.0 which |
92 | + * accompanies this distribution, and is available at |
93 | + * http://www.eclipse.org/legal/epl-v10.html |
94 | + * |
95 | + * This program may also be available under different license terms. |
96 | + * For more information, see www.akiban.com or contact licensing@akiban.com. |
97 | + * |
98 | + * Contributors: |
99 | + * Akiban Technologies, Inc. |
100 | + */ |
101 | + |
102 | +package com.persistit; |
103 | + |
104 | +import java.io.File; |
105 | +import java.util.ArrayList; |
106 | +import java.util.Collections; |
107 | +import java.util.Comparator; |
108 | +import java.util.HashMap; |
109 | +import java.util.HashSet; |
110 | +import java.util.List; |
111 | +import java.util.Map; |
112 | +import java.util.Set; |
113 | +import java.util.SortedMap; |
114 | +import java.util.TreeMap; |
115 | +import java.util.concurrent.atomic.AtomicLong; |
116 | + |
117 | +import com.persistit.exception.DuplicateKeyException; |
118 | +import com.persistit.exception.PersistitException; |
119 | +import com.persistit.util.Util; |
120 | + |
121 | +/** |
122 | + * <p> |
123 | + * A mechanism for optimizing the process of loading large sets of records with |
124 | + * non-sequential keys. This class speeds up the process of inserting records |
125 | + * into a set of Persistit <code>Tree</code>s by sorting them before inserting |
126 | + * them. The sort process uses multiple "sort trees" in multiple temporary |
127 | + * <code>Volume</code>s to hold copies of the data. These are then merged into |
128 | + * the final "destination trees." Each sort tree is constrained to be small |
129 | + * enough to fit in the {@link BufferPool}. |
130 | + * </p> |
131 | + * <h3>Background</h3> |
132 | + * <p> |
133 | + * In general, Persistit can store records very quickly, even when the keys of |
134 | + * those records arrive in random order, as long as all the pages of the |
135 | + * destination tree or trees are resident in the buffer pool. However, the |
136 | + * situation changes dramatically as soon as the the destination tree or trees |
137 | + * exceed the size of the buffer pool. Once that happens, insert performance |
138 | + * degrades because the ratio of records inserted per disk I/O operation |
139 | + * performed decreases. In a worst-case scenario, inserting each new key may |
140 | + * require two or more disk I/O operations. These may occur because Persistit |
141 | + * performs the following steps: |
142 | + * <ul> |
143 | + * <li>Look up the key requires reading the page containing that key from disk |
144 | + * into the BufferPool.</li> |
145 | + * <li>Reading the page requires a Buffer containing some other page to be |
146 | + * evicted.</li> |
147 | + * <li>The page being evicted is likely to be dirty and therefore Persistit must |
148 | + * write its contents to disk before reusing the Buffer.</li> |
149 | + * </ul> |
150 | + * Further, these disk I/O operations are are usually at unrelated file |
151 | + * positions and therefore may each require random seeks. As a result, inserting |
152 | + * one key can take orders of magnitude longer once the tree no longer fits in |
153 | + * the buffer pool. |
154 | + * </p> |
155 | + * <p> |
156 | + * <code>TreeBuilder</code> mitigates that degradation by sorting the keys |
157 | + * before inserting them into their final destination trees. To do so it builds |
158 | + * a collection of bounded-size sort trees in temporary volumes. Then it |
159 | + * performs a merge sort from those trees into the final destination tree or |
160 | + * trees. This mechanism eliminates the problem that every key insertion |
161 | + * requires two (or more) random disk I/O operations. However, it is still the |
162 | + * case that every sort tree page must be written and read once, and every |
163 | + * destination tree page must be written at least once. Therefore the I/O |
164 | + * associated with TreeBuilder is reduced but not eliminated. |
165 | + * </p> |
166 | + * <p> |
167 | + * TreeBuilder is effective if and only if (a) the keys arrive in random order, |
168 | + * and (b) the data is significantly larger than available memory in the buffer |
169 | + * pool. In general it is faster to insert the keys directly into the |
170 | + * destination trees unless both of these conditions are true. |
171 | + * </p> |
172 | + * <h3>Using TreeBuilder</h3> |
173 | + * <p> |
174 | + * The following example demonstrates the fundamental operation of |
175 | + * <code>TreeBuilder</code>: <code><pre> |
176 | + * Exchange exchange = db.getExchange("myVolume", "myTree", true); |
177 | + * TreeBuilder tb = new TreeBuilder(db); |
178 | + * // |
179 | + * // Insert the data into sort trees |
180 | + * // |
181 | + * while (<i>source has more data</i>) { |
182 | + * exchange.to(<i>next key</i>).getValue().put(<i>next value</i>); |
183 | + * tb.store(exchange); |
184 | + * } |
185 | + * // |
186 | + * // Merge the data into myTree |
187 | + * // |
188 | + * tb.merge(); |
189 | + * </pre></code> Note that a TreeBuilder can pre-sort data for multiple |
190 | + * destination trees. For example, it is possible to load and merge records for |
191 | + * a table and its corresponding indexes in one pass using TreeBuilder. During |
192 | + * the merge operation the final destination <code>Tree</code> are built in |
193 | + * sequence. By default that sequence is by alphabetical order of tree name, but |
194 | + * it is possible to customize TreeBuilder to change that order. |
195 | + * </p> |
196 | + * <p> |
197 | + * Loading a large data set may take a long time under the best of |
198 | + * circumstances. Therefore this class is designed to be extended by |
199 | + * applications to support progress reporting, to control disk space allocation, |
200 | + * to handle attempts to insert conflicting records with duplicate keys, etc. |
201 | + * See the following methods which may be overridden to provide custom behavior: |
202 | + * <ul> |
203 | + * <li>{@link #reportSorted(long)} - report completion of N records inserts into |
204 | + * sort trees</li> |
205 | + * <li>{@link #reportMerged(long)} - report completion of N records merged</li> |
206 | + * <li>{@link #duplicateKeyDetected(Tree, Key, Value, Value)} - handle detection |
207 | + * of records inserted with duplicate keys</li> |
208 | + * <li>{@link #beforeMergeKey(Exchange)} - allowing filtering or custom handling |
209 | + * per record while merging</li> |
210 | + * <li>{@link #afterMergeKey(Exchange)} - behavior after merging one record</li> |
211 | + * <li>{@link #beforeSortVolumeEvicted(Volume)} - behavior before evicting a |
212 | + * sort volume when full</li> |
213 | + * <li>{@link #afterSortVolumeEvicted(Volume)} - behavior after evicting a sort |
214 | + * volume when full</li> |
215 | + * <li>{@link #getTreeComparator()} - return a custom Comparator to determine |
216 | + * sequence in which trees are populated within the {@link #merge()} method |
217 | + * </ul> |
218 | + * </p> |
219 | + * |
220 | + * @author peter |
221 | + * |
222 | + */ |
223 | +public class TreeBuilder { |
224 | + private final static float DEFAULT_BUFFER_POOL_FRACTION = 0.5f; |
225 | + private final static long REPORT_REPORT_MULTIPLE = 1000000; |
226 | + private final static String DEFAULT_NAME = "TreeBuilder"; |
227 | + |
228 | + private final String _name; |
229 | + private final Persistit _persistit; |
230 | + private final List<File> _directories = new ArrayList<File>(); |
231 | + private final List<Volume> _sortVolumes = new ArrayList<Volume>(); |
232 | + private final int _pageSize; |
233 | + private final int _pageLimit; |
234 | + private final AtomicLong _keyCount = new AtomicLong(); |
235 | + private volatile long _reportKeyCountMultiple = REPORT_REPORT_MULTIPLE; |
236 | + |
237 | + private final Set<Tree> _allTrees = new HashSet<Tree>(); |
238 | + private final List<Tree> _sortedTrees = new ArrayList<Tree>(); |
239 | + |
240 | + private final ThreadLocal<Map<Tree, Exchange>> _sortExchangeMapThreadLocal = new ThreadLocal<Map<Tree, Exchange>>() { |
241 | + @Override |
242 | + public Map<Tree, Exchange> initialValue() { |
243 | + return new HashMap<Tree, Exchange>(); |
244 | + } |
245 | + }; |
246 | + |
247 | + private Volume _currentSortVolume; |
248 | + private int _nextDirectoryIndex; |
249 | + |
250 | + private final Comparator<Tree> _defaultTreeComparator = new Comparator<Tree>() { |
251 | + @Override |
252 | + public int compare(final Tree a, final Tree b) { |
253 | + if (a == b) { |
254 | + return 0; |
255 | + } |
256 | + if (a.getVolume() == b.getVolume()) { |
257 | + return a.getName().compareTo(b.getName()); |
258 | + } else { |
259 | + return a.getVolume().getName().compareTo(b.getVolume().getName()); |
260 | + } |
261 | + } |
262 | + |
263 | + @Override |
264 | + public boolean equals(final Object obj) { |
265 | + return this == obj; |
266 | + } |
267 | + }; |
268 | + |
269 | + private class Node implements Comparable<Node> { |
270 | + final Volume _volume; |
271 | + int _treeListIndex = -1; |
272 | + Exchange _exchange = null; |
273 | + Tree _currentTree = null; |
274 | + Node _duplicate; |
275 | + |
276 | + private Node(final Volume volume) { |
277 | + _volume = volume; |
278 | + } |
279 | + |
280 | + private boolean next() throws PersistitException { |
281 | + for (;;) { |
282 | + if (_exchange == null) { |
283 | + _treeListIndex++; |
284 | + if (_treeListIndex >= _sortedTrees.size()) { |
285 | + _volume.close(); |
286 | + return false; |
287 | + } |
288 | + final String tempTreeName = "_" + _sortedTrees.get(_treeListIndex).getHandle(); |
289 | + final Tree sortTree = _volume.getTree(tempTreeName, false); |
290 | + if (sortTree == null) { |
291 | + continue; |
292 | + } |
293 | + _exchange = new Exchange(sortTree); |
294 | + _currentTree = _sortedTrees.get(_treeListIndex); |
295 | + } |
296 | + if (_exchange.next(true)) { |
297 | + return true; |
298 | + } |
299 | + _exchange = null; |
300 | + } |
301 | + } |
302 | + |
303 | + @Override |
304 | + public int compareTo(final Node node) { |
305 | + if (_exchange == null) { |
306 | + return node._exchange == null ? 0 : -1; |
307 | + } |
308 | + final int treeComparison = getTreeComparator().compare(_currentTree, node._currentTree); |
309 | + if (treeComparison != 0) { |
310 | + return treeComparison; |
311 | + } |
312 | + final Key k1 = _exchange.getKey(); |
313 | + final Key k2 = node._exchange.getKey(); |
314 | + return k1.compareTo(k2); |
315 | + } |
316 | + |
317 | + @Override |
318 | + public String toString() { |
319 | + Node n = this; |
320 | + final StringBuilder sb = new StringBuilder(); |
321 | + while (n != null) { |
322 | + if (sb.length() > 0) { |
323 | + sb.append(","); |
324 | + } |
325 | + if (n._exchange == null) { |
326 | + sb.append("<null>"); |
327 | + } else { |
328 | + sb.append("<" |
329 | + + (n._currentTree == null ? "?" : n._currentTree.getName() + n._exchange.getKey() + "=" |
330 | + + n._exchange.getValue()) + ">"); |
331 | + } |
332 | + n = n._duplicate; |
333 | + } |
334 | + return sb.toString(); |
335 | + } |
336 | + } |
337 | + |
338 | + public TreeBuilder(final Persistit persistit) { |
339 | + this(persistit, DEFAULT_NAME, -1, DEFAULT_BUFFER_POOL_FRACTION); |
340 | + } |
341 | + |
342 | + public TreeBuilder(final Persistit persistit, final String name, final int pageSize, final float bufferPoolFraction) { |
343 | + _name = name; |
344 | + _persistit = persistit; |
345 | + _pageSize = pageSize == -1 ? computePageSize(persistit) : pageSize; |
346 | + final int bufferCount = _persistit.getBufferPool(_pageSize).getBufferCount(); |
347 | + _pageLimit = (int) (bufferCount * bufferPoolFraction); |
348 | + } |
349 | + |
350 | + private int computePageSize(final Persistit persistit) { |
351 | + int pageSize = persistit.getConfiguration().getTmpVolPageSize(); |
352 | + if (pageSize == 0) { |
353 | + for (final int size : persistit.getBufferPoolHashMap().keySet()) { |
354 | + if (size > pageSize) { |
355 | + pageSize = size; |
356 | + } |
357 | + } |
358 | + } |
359 | + return pageSize; |
360 | + } |
361 | + |
362 | + public final String getName() { |
363 | + return _name; |
364 | + } |
365 | + |
366 | + public final void setReportKeyCountMultiple(final long multiple) { |
367 | + _reportKeyCountMultiple = Util.rangeCheck(multiple, 1, Long.MAX_VALUE); |
368 | + } |
369 | + |
370 | + public final long getReportKeyCountMultiple() { |
371 | + return _reportKeyCountMultiple; |
372 | + } |
373 | + |
374 | + public final synchronized int getSortVolumeCount() { |
375 | + return _sortVolumes.size(); |
376 | + } |
377 | + |
378 | + public final List<Tree> getTrees() { |
379 | + final List<Tree> list = new ArrayList<Tree>(); |
380 | + list.addAll(_allTrees); |
381 | + Collections.sort(list, getTreeComparator()); |
382 | + return list; |
383 | + } |
384 | + |
385 | + public final void setSortTreeDirectories(final List<File> directories) throws Exception { |
386 | + if (directories == null || directories.isEmpty()) { |
387 | + synchronized (this) { |
388 | + _directories.clear(); |
389 | + } |
390 | + } else { |
391 | + /* |
392 | + * Make sure all supplied items are directories |
393 | + */ |
394 | + for (final File file : directories) { |
395 | + if (file.exists() && !file.isDirectory()) { |
396 | + throw new IllegalArgumentException(file + " is not a directory"); |
397 | + } |
398 | + } |
399 | + /* |
400 | + * Make sure all directories exist |
401 | + */ |
402 | + for (final File file : directories) { |
403 | + if (!file.exists() && !file.mkdirs()) { |
404 | + throw new IllegalArgumentException(file + " could not be created as a new directory"); |
405 | + } |
406 | + } |
407 | + /* |
408 | + * Make sure all directories permit creation of a new file |
409 | + */ |
410 | + for (final File file : directories) { |
411 | + final File temp = File.createTempFile(VolumeStorageT2.TEMP_FILE_PREFIX, null, file); |
412 | + temp.delete(); |
413 | + } |
414 | + synchronized (this) { |
415 | + _directories.clear(); |
416 | + _directories.addAll(directories); |
417 | + _nextDirectoryIndex = 0; |
418 | + } |
419 | + } |
420 | + } |
421 | + |
422 | + public final List<File> getSortTreeDirectories() { |
423 | + return Collections.unmodifiableList(_directories); |
424 | + } |
425 | + |
426 | + public final void store(final Exchange exchange) throws Exception { |
427 | + store(exchange.getTree(), exchange.getKey(), exchange.getValue()); |
428 | + } |
429 | + |
430 | + public final void store(final Tree tree, final Key key, final Value value) throws Exception { |
431 | + final Map<Tree, Exchange> map = _sortExchangeMapThreadLocal.get(); |
432 | + Exchange ex = map.get(tree); |
433 | + if (ex == null || ex.getTree().getVolume().getNextAvailablePage() > _pageLimit) { |
434 | + final Volume newSortVolume = getSortVolume(); |
435 | + final String tempTreeName = "_" + _persistit.getJournalManager().handleForTree(tree); |
436 | + ex = _persistit.getExchange(newSortVolume, tempTreeName, true); |
437 | + map.put(tree, ex); |
438 | + synchronized (this) { |
439 | + _allTrees.add(tree); |
440 | + } |
441 | + } |
442 | + key.copyTo(ex.getKey()); |
443 | + value.copyTo(ex.getValue()); |
444 | + |
445 | + ex.fetchAndStore(); |
446 | + boolean stored = true; |
447 | + if (ex.getValue().isDefined()) { |
448 | + if (!duplicateKeyDetected(ex.getTree(), ex.getKey(), ex.getValue(), value)) { |
449 | + stored = false; |
450 | + ex.store(); |
451 | + } |
452 | + } |
453 | + if (stored) { |
454 | + final long count = _keyCount.incrementAndGet(); |
455 | + if ((count % _reportKeyCountMultiple) == 0) { |
456 | + reportSorted(count); |
457 | + } |
458 | + } |
459 | + } |
460 | + |
461 | + private void insertNode(final Map<Node, Node> sorted, final Node node) throws Exception { |
462 | + final Node other = sorted.put(node, node); |
463 | + if (other != null) { |
464 | + if (!duplicateKeyDetected(node._currentTree, node._exchange.getKey(), other._exchange.getValue(), |
465 | + node._exchange.getValue())) { |
466 | + sorted.put(node, other); |
467 | + final Node p = other._duplicate; |
468 | + other._duplicate = node; |
469 | + node._duplicate = p; |
470 | + } else { |
471 | + node._duplicate = other; |
472 | + } |
473 | + } |
474 | + } |
475 | + |
476 | + /** |
477 | + * Merge the record previously stored in sort volumes into their destination |
478 | + * <code>Tree</code>s. |
479 | + * |
480 | + * @throws Exception |
481 | + */ |
482 | + public synchronized void merge() throws Exception { |
483 | + if ((_keyCount.get() % _reportKeyCountMultiple) != 0) { |
484 | + reportSorted(_keyCount.get()); |
485 | + } |
486 | + _keyCount.set(0); |
487 | + _sortedTrees.clear(); |
488 | + _sortedTrees.addAll(_allTrees); |
489 | + Tree currentTree = null; |
490 | + Exchange ex = null; |
491 | + Collections.sort(_sortedTrees, getTreeComparator()); |
492 | + final SortedMap<Node, Node> sorted = new TreeMap<Node, Node>(); |
493 | + for (final Volume volume : _sortVolumes) { |
494 | + final Node node = new Node(volume); |
495 | + if (node.next()) { |
496 | + insertNode(sorted, node); |
497 | + } |
498 | + } |
499 | + |
500 | + for (;;) { |
501 | + if (sorted.isEmpty()) { |
502 | + break; |
503 | + } |
504 | + Node node = sorted.firstKey(); |
505 | + node = sorted.remove(node); |
506 | + if (node._currentTree != currentTree) { |
507 | + ex = new Exchange(node._currentTree); |
508 | + currentTree = node._currentTree; |
509 | + } |
510 | + |
511 | + node._exchange.getKey().copyTo(ex.getKey()); |
512 | + node._exchange.getValue().copyTo(ex.getValue()); |
513 | + if (beforeMergeKey(ex)) { |
514 | + ex.fetchAndStore(); |
515 | + boolean stored = true; |
516 | + if (ex.getValue().isDefined()) { |
517 | + if (!duplicateKeyDetected(ex.getTree(), ex.getKey(), ex.getValue(), node._exchange.getValue())) { |
518 | + ex.store(); |
519 | + stored = false; |
520 | + } |
521 | + } |
522 | + if (stored) { |
523 | + afterMergeKey(ex); |
524 | + if ((_keyCount.incrementAndGet() % _reportKeyCountMultiple) == 0) { |
525 | + reportMerged(_keyCount.get()); |
526 | + } |
527 | + } |
528 | + } |
529 | + while (node != null) { |
530 | + final Node next = node._duplicate; |
531 | + node._duplicate = null; |
532 | + if (node.next()) { |
533 | + insertNode(sorted, node); |
534 | + } |
535 | + node = next; |
536 | + } |
537 | + } |
538 | + if ((_keyCount.get() % _reportKeyCountMultiple) != 0) { |
539 | + reportMerged(_keyCount.get()); |
540 | + } |
541 | + _keyCount.set(0); |
542 | + _sortExchangeMapThreadLocal.get().clear(); |
543 | + _allTrees.clear(); |
544 | + _sortedTrees.clear(); |
545 | + } |
546 | + |
547 | + private synchronized Volume getSortVolume() throws Exception { |
548 | + final boolean full = _currentSortVolume != null && _currentSortVolume.getNextAvailablePage() > _pageLimit; |
549 | + if (full) { |
550 | + if (beforeSortVolumeEvicted(_currentSortVolume)) { |
551 | + _persistit.getBufferPool(_pageSize).evict(_currentSortVolume); |
552 | + } |
553 | + afterSortVolumeEvicted(_currentSortVolume); |
554 | + } |
555 | + if (full || _currentSortVolume == null) { |
556 | + _currentSortVolume = createSortVolume(); |
557 | + _sortVolumes.add(_currentSortVolume); |
558 | + } |
559 | + return _currentSortVolume; |
560 | + } |
561 | + |
562 | + private Volume createSortVolume() throws Exception { |
563 | + final File directory; |
564 | + if (_directories.isEmpty()) { |
565 | + final String directoryName = _persistit.getConfiguration().getTmpVolDir(); |
566 | + directory = directoryName == null ? null : new File(directoryName); |
567 | + } else { |
568 | + directory = _directories.get(_nextDirectoryIndex % _directories.size()); |
569 | + _nextDirectoryIndex++; |
570 | + } |
571 | + return Volume.createTemporaryVolume(_persistit, _pageSize, directory); |
572 | + } |
573 | + |
574 | + /** |
575 | + * This method may be extended to provide an application-specific ordering |
576 | + * on <code>Tree</code>s. This ordering determines the sequence in which |
577 | + * destination trees are built from the sort data. By default trees are |
578 | + * build in alphabetical order by volume and tree name. However, an |
579 | + * application may choose a different order to ensure invariants for |
580 | + * concurrent use. |
581 | + * |
582 | + * @return a <code>java.util.Comparator</code> on <code>Tree</code> |
583 | + */ |
584 | + protected Comparator<Tree> getTreeComparator() { |
585 | + return _defaultTreeComparator; |
586 | + } |
587 | + |
588 | + /** |
589 | + * This method may be extended to provide application-specific behavior when |
590 | + * a sort volume has been filled to capacity. The default implementation |
591 | + * return <code>true</code>. If this method returns <code>true</code>, |
592 | + * <code>TreeBuilder</code> evicts the <code>Volume</code> to avoid |
593 | + * over-running the <code>BufferPool</code> and then starts a new sort tree |
594 | + * if a new record is subsequently stored. |
595 | + * |
596 | + * @param volume |
597 | + * The temporary <code>Volume</code> that has been filled |
598 | + * @return <code>true</code> to cause the current sort volume to be evicted |
599 | + * from the <code>BufferPool</code> |
600 | + * @throws Exception |
601 | + */ |
602 | + protected boolean beforeSortVolumeEvicted(final Volume volume) throws Exception { |
603 | + return true; |
604 | + } |
605 | + |
606 | + /** |
607 | + * This method may be extended to provide application-specific reporting |
608 | + * functionality after a sort volume has been filled to capacity and has |
609 | + * been evicted. An application may also modify the temporary directory set |
610 | + * via {@link #setSortTreeDirectories(List)} within this method if necessary |
611 | + * to adjust disk space utilization, for example. The default behavior of |
612 | + * this method is to do nothing. |
613 | + * |
614 | + * @param volume |
615 | + * The temporary <code>Volume</code> that has been filled |
616 | + * @throws Exception |
617 | + */ |
618 | + protected void afterSortVolumeEvicted(final Volume volume) throws Exception { |
619 | + |
620 | + } |
621 | + |
622 | + /** |
623 | + * This method may be extended to provide application-specific behavior when |
624 | + * an attempt is made to merge records with duplicate keys. The default |
625 | + * behavior is to throw a {@link DuplicateKeyException}. |
626 | + * |
627 | + * @param tree |
628 | + * @param key |
629 | + * @param v1 |
630 | + * @param v2 |
631 | + * @return |
632 | + * @throws DuplicateKeyException |
633 | + * if a key being inserted or merged matches a key already |
634 | + * exists |
635 | + * @throws Exception |
636 | + */ |
637 | + protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) |
638 | + throws Exception { |
639 | + throw new DuplicateKeyException(String.format("Tree=%s Key=%s", tree, key)); |
640 | + } |
641 | + |
642 | + /** |
643 | + * This method may be extended to provide alternative functionality. The |
644 | + * default implementation merely returns <code>true</code> which signifies |
645 | + * that the key-value pair represented in the <code>Exchange</code> should |
646 | + * be merged into the destination <code>Tree</code>. A custom implementation |
647 | + * could be used to filter out unwanted records or to emit records to a |
648 | + * different destination. |
649 | + * |
650 | + * @param exchange |
651 | + * represents the key-value pair proposed for merging |
652 | + * @return <code>true</code> to allow the record to be merged |
653 | + * @throws Exception |
654 | + */ |
655 | + protected boolean beforeMergeKey(final Exchange exchange) throws Exception { |
656 | + return true; |
657 | + } |
658 | + |
659 | + /** |
660 | + * This method may be extended to provide custom behavior after merging one |
661 | + * record. The default implementation does nothing. This method is called |
662 | + * only if the corresponding call to {@link #beforeMergeKey(Exchange)} |
663 | + * returned <code>true</code>. |
664 | + * |
665 | + * @param exchange |
666 | + * represents the key-value pair that was merged. |
667 | + * @throws Exception |
668 | + */ |
669 | + protected void afterMergeKey(final Exchange exchange) throws Exception { |
670 | + |
671 | + } |
672 | + |
673 | + /** |
674 | + * This method may be extended to provide application-specific progress |
675 | + * reports. By default it does nothing. This method is called after |
676 | + * inserting a number of records into sort trees. The method |
677 | + * {@link #setReportKeyCountMultiple(long)} determines the frequency at |
678 | + * which this method is called. |
679 | + * |
680 | + * @param count |
681 | + * The total number of recirds that has been merged so far. |
682 | + */ |
683 | + protected void reportSorted(final long count) { |
684 | + |
685 | + } |
686 | + |
687 | + /** |
688 | + * This method may be extended to provide application-specific progress |
689 | + * reports. By default it does nothing. This method is called after merging |
690 | + * a number of records into destination trees. The method |
691 | + * {@link #setReportKeyCountMultiple(long)} determines the frequency at |
692 | + * which this method is called. |
693 | + * |
694 | + * @param count |
695 | + * The total number of recirds that has been merged so far. |
696 | + */ |
697 | + protected void reportMerged(final long count) { |
698 | + |
699 | + } |
700 | + |
701 | + void unitTestNextSortVolume() { |
702 | + _sortExchangeMapThreadLocal.get().clear(); |
703 | + _currentSortVolume = null; |
704 | + } |
705 | +} |
706 | |
707 | === modified file 'src/main/java/com/persistit/Volume.java' |
708 | --- src/main/java/com/persistit/Volume.java 2012-10-06 02:44:20 +0000 |
709 | +++ src/main/java/com/persistit/Volume.java 2012-12-31 21:06:22 +0000 |
710 | @@ -78,11 +78,12 @@ |
711 | return false; |
712 | } |
713 | |
714 | - static Volume createTemporaryVolume(final Persistit persistit, final int pageSize) throws PersistitException { |
715 | + static Volume createTemporaryVolume(final Persistit persistit, final int pageSize, final File tempDirectory) |
716 | + throws PersistitException { |
717 | final Volume volume = new Volume( |
718 | Thread.currentThread().getName() + TEMP_VOLUME_NAME_SUFFIX_FOR_FIXUP_DETECTION, |
719 | TEMP_VOLUME_ID_FOR_FIXUP_DETECTION); |
720 | - volume.openTemporary(persistit, pageSize); |
721 | + volume.openTemporary(persistit, pageSize, tempDirectory); |
722 | return volume; |
723 | } |
724 | |
725 | @@ -473,7 +474,8 @@ |
726 | persistit.addVolume(this); |
727 | } |
728 | |
729 | - void openTemporary(final Persistit persistit, final int pageSize) throws PersistitException { |
730 | + void openTemporary(final Persistit persistit, final int pageSize, final File tempDirectory) |
731 | + throws PersistitException { |
732 | checkClosing(); |
733 | if (_storage != null) { |
734 | throw new IllegalStateException("This volume has already been opened"); |
735 | @@ -483,7 +485,7 @@ |
736 | } |
737 | |
738 | _structure = new VolumeStructure(persistit, this, pageSize); |
739 | - _storage = new VolumeStorageT2(persistit, this); |
740 | + _storage = new VolumeStorageT2(persistit, this, tempDirectory); |
741 | _statistics = new VolumeStatistics(); |
742 | |
743 | _storage.create(); |
744 | |
745 | === modified file 'src/main/java/com/persistit/VolumeStorageT2.java' |
746 | --- src/main/java/com/persistit/VolumeStorageT2.java 2012-08-24 13:57:19 +0000 |
747 | +++ src/main/java/com/persistit/VolumeStorageT2.java 2012-12-31 21:06:22 +0000 |
748 | @@ -43,8 +43,9 @@ |
749 | */ |
750 | class VolumeStorageT2 extends VolumeStorage { |
751 | |
752 | - private final static String TEMP_FILE_PREFIX = "persistit_tempvol_"; |
753 | + final static String TEMP_FILE_PREFIX = "persistit_tempvol_"; |
754 | private final static String TEMP_FILE_UNCREATED_NAME = "temp_volume_file_not_created_yet"; |
755 | + private final File _tempDirectory; |
756 | private long _maxPages; |
757 | private volatile String _path; |
758 | private volatile FileChannel _channel; |
759 | @@ -53,8 +54,9 @@ |
760 | private volatile boolean _opened; |
761 | private volatile boolean _closed; |
762 | |
763 | - VolumeStorageT2(final Persistit persistit, final Volume volume) { |
764 | + VolumeStorageT2(final Persistit persistit, final Volume volume, final File tempDirectory) { |
765 | super(persistit, volume); |
766 | + _tempDirectory = tempDirectory; |
767 | } |
768 | |
769 | /** |
770 | @@ -94,9 +96,7 @@ |
771 | synchronized FileChannel getChannel() throws PersistitIOException { |
772 | if (_channel == null) { |
773 | try { |
774 | - final String directoryName = _persistit.getConfiguration().getTmpVolDir(); |
775 | - final File directory = directoryName == null ? null : new File(directoryName); |
776 | - final File file = File.createTempFile(TEMP_FILE_PREFIX, null, directory); |
777 | + final File file = File.createTempFile(TEMP_FILE_PREFIX, null, _tempDirectory); |
778 | _path = file.getPath(); |
779 | _channel = new MediatedFileChannel(_path, "rw"); |
780 | } catch (final IOException ioe) { |
781 | |
782 | === added file 'src/main/java/com/persistit/exception/DuplicateKeyException.java' |
783 | --- src/main/java/com/persistit/exception/DuplicateKeyException.java 1970-01-01 00:00:00 +0000 |
784 | +++ src/main/java/com/persistit/exception/DuplicateKeyException.java 2012-12-31 21:06:22 +0000 |
785 | @@ -0,0 +1,36 @@ |
786 | +/** |
787 | + * Copyright © 2005-2012 Akiban Technologies, Inc. All rights reserved. |
788 | + * |
789 | + * This program and the accompanying materials are made available |
790 | + * under the terms of the Eclipse Public License v1.0 which |
791 | + * accompanies this distribution, and is available at |
792 | + * http://www.eclipse.org/legal/epl-v10.html |
793 | + * |
794 | + * This program may also be available under different license terms. |
795 | + * For more information, see www.akiban.com or contact licensing@akiban.com. |
796 | + * |
797 | + * Contributors: |
798 | + * Akiban Technologies, Inc. |
799 | + */ |
800 | + |
801 | +package com.persistit.exception; |
802 | + |
803 | +/** |
804 | + * Thrown by {@link com.persistit.TreeBuilder} on an attempt to insert duplicate |
805 | + * keys. |
806 | + * |
807 | + * @version 1.0 |
808 | + */ |
809 | +public class DuplicateKeyException extends PersistitException { |
810 | + |
811 | + private static final long serialVersionUID = 3002304732225440553L; |
812 | + |
813 | + public DuplicateKeyException() { |
814 | + super(); |
815 | + } |
816 | + |
817 | + public DuplicateKeyException(final String msg) { |
818 | + super(msg); |
819 | + } |
820 | + |
821 | +} |
822 | |
823 | === modified file 'src/test/java/com/persistit/BufferPoolTest.java' |
824 | --- src/test/java/com/persistit/BufferPoolTest.java 2012-08-24 13:57:19 +0000 |
825 | +++ src/test/java/com/persistit/BufferPoolTest.java 2012-12-31 21:06:22 +0000 |
826 | @@ -163,4 +163,25 @@ |
827 | } |
828 | } |
829 | |
830 | + @Test |
831 | + public void testEvictVoume() throws Exception { |
832 | + final Volume vol = _persistit.createTemporaryVolume(); |
833 | + final Exchange ex = _persistit.getExchange(vol, "BufferPoolTest", true); |
834 | + _persistit.flush(); |
835 | + ex.getValue().put(RED_FOX); |
836 | + int i; |
837 | + for (i = 1;; i++) { |
838 | + ex.to(i).store(); |
839 | + if (vol.getNextAvailablePage() >= 10) { |
840 | + break; |
841 | + } |
842 | + } |
843 | + vol.getPool().evict(vol); |
844 | + assertTrue("Should be no remaining dirty buffers", vol.getPool().getDirtyPageCount() == 0); |
845 | + for (int j = 0; j < i + 100; j++) { |
846 | + ex.to(j).fetch(); |
847 | + assertEquals(j >= 1 && j <= i, ex.getValue().isDefined()); |
848 | + } |
849 | + } |
850 | + |
851 | } |
852 | |
853 | === modified file 'src/test/java/com/persistit/StressRunner.java' |
854 | --- src/test/java/com/persistit/StressRunner.java 2012-11-18 16:28:15 +0000 |
855 | +++ src/test/java/com/persistit/StressRunner.java 2012-12-31 21:06:22 +0000 |
856 | @@ -25,6 +25,7 @@ |
857 | |
858 | import com.persistit.stress.AbstractSuite; |
859 | import com.persistit.stress.AccumulatorRestartSuite; |
860 | +import com.persistit.stress.InsertBigLoad; |
861 | import com.persistit.stress.InsertUUIDs; |
862 | import com.persistit.stress.Mixture1; |
863 | import com.persistit.stress.Mixture2; |
864 | @@ -71,6 +72,7 @@ |
865 | static { |
866 | _classes.add(AccumulatorRestartSuite.class); |
867 | _classes.add(InsertUUIDs.class); |
868 | + _classes.add(InsertBigLoad.class); |
869 | _classes.add(Mixture1.class); |
870 | _classes.add(Mixture2.class); |
871 | _classes.add(Mixture3.class); |
872 | |
873 | === added file 'src/test/java/com/persistit/TreeBuilderTest.java' |
874 | --- src/test/java/com/persistit/TreeBuilderTest.java 1970-01-01 00:00:00 +0000 |
875 | +++ src/test/java/com/persistit/TreeBuilderTest.java 2012-12-31 21:06:22 +0000 |
876 | @@ -0,0 +1,203 @@ |
877 | +/** |
878 | + * Copyright © 2011-2012 Akiban Technologies, Inc. All rights reserved. |
879 | + * |
880 | + * This program and the accompanying materials are made available |
881 | + * under the terms of the Eclipse Public License v1.0 which |
882 | + * accompanies this distribution, and is available at |
883 | + * http://www.eclipse.org/legal/epl-v10.html |
884 | + * |
885 | + * This program may also be available under different license terms. |
886 | + * For more information, see www.akiban.com or contact licensing@akiban.com. |
887 | + * |
888 | + * Contributors: |
889 | + * Akiban Technologies, Inc. |
890 | + */ |
891 | + |
892 | +package com.persistit; |
893 | + |
894 | +import static com.persistit.unit.UnitTestProperties.VOLUME_NAME; |
895 | +import static org.junit.Assert.assertEquals; |
896 | + |
897 | +import java.util.ArrayList; |
898 | +import java.util.Collections; |
899 | +import java.util.List; |
900 | +import java.util.concurrent.atomic.AtomicBoolean; |
901 | +import java.util.concurrent.atomic.AtomicInteger; |
902 | + |
903 | +import org.junit.Test; |
904 | + |
905 | +public class TreeBuilderTest extends PersistitUnitTestCase { |
906 | + private final static int COUNT = 10000; |
907 | + |
908 | + @Test |
909 | + public void basicTest() throws Exception { |
910 | + |
911 | + final TreeBuilder tb = new TreeBuilder(_persistit) { |
912 | + @Override |
913 | + protected void reportSorted(final long count) { |
914 | + System.out.println("Sorted " + count); |
915 | + } |
916 | + |
917 | + @Override |
918 | + protected void reportMerged(final long count) { |
919 | + System.out.println("Merged " + count); |
920 | + } |
921 | + |
922 | + }; |
923 | + tb.setReportKeyCountMultiple(COUNT / 2); |
924 | + |
925 | + final List<Integer> shuffled = new ArrayList<Integer>(COUNT); |
926 | + for (int i = 0; i < COUNT; i++) { |
927 | + shuffled.add(i); |
928 | + } |
929 | + Collections.shuffle(shuffled); |
930 | + final Exchange a = _persistit.getExchange(VOLUME_NAME, "a", true); |
931 | + final Exchange b = _persistit.getExchange(VOLUME_NAME, "b", true); |
932 | + final Exchange c = _persistit.getExchange(VOLUME_NAME, "c", true); |
933 | + |
934 | + for (int i = 0; i < COUNT; i++) { |
935 | + final int ka = shuffled.get(i); |
936 | + final int kb = shuffled.get((i + COUNT / 3) % COUNT); |
937 | + final int kc = shuffled.get((i + (2 * COUNT) / 3) % COUNT); |
938 | + a.clear().append(ka).append("a"); |
939 | + a.getValue().put(i); |
940 | + b.clear().append(kb).append("b"); |
941 | + b.getValue().put(i); |
942 | + c.clear().append(kc).append("c"); |
943 | + c.getValue().put(i); |
944 | + tb.store(a); |
945 | + tb.store(b); |
946 | + tb.store(c); |
947 | + |
948 | + } |
949 | + |
950 | + tb.merge(); |
951 | + |
952 | + a.clear(); |
953 | + b.clear(); |
954 | + c.clear(); |
955 | + int count = 0; |
956 | + while (a.next(true) && b.next(true) && c.next(true)) { |
957 | + assertEquals("Expect correct key value", count, a.getKey().decodeInt()); |
958 | + assertEquals("Expect correct key value", count, b.getKey().decodeInt()); |
959 | + assertEquals("Expect correct key value", count, c.getKey().decodeInt()); |
960 | + count++; |
961 | + } |
962 | + assertEquals("Expect every key value", COUNT, count); |
963 | + |
964 | + } |
965 | + |
966 | + @Test |
967 | + public void customizationMethods() throws Exception { |
968 | + final AtomicBoolean doReplace = new AtomicBoolean(); |
969 | + final AtomicInteger duplicateCount = new AtomicInteger(); |
970 | + final AtomicInteger beforeMergeCount = new AtomicInteger(); |
971 | + final AtomicInteger afterMergeCount = new AtomicInteger(); |
972 | + |
973 | + final TreeBuilder tb = new TreeBuilder(_persistit) { |
974 | + |
975 | + @Override |
976 | + protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) { |
977 | + duplicateCount.incrementAndGet(); |
978 | + return doReplace.get(); |
979 | + } |
980 | + |
981 | + @Override |
982 | + protected boolean beforeMergeKey(final Exchange ex) throws Exception { |
983 | + beforeMergeCount.incrementAndGet(); |
984 | + return ex.getKey().decodeInt() != 3; |
985 | + } |
986 | + |
987 | + @Override |
988 | + protected void afterMergeKey(final Exchange ex) throws Exception { |
989 | + afterMergeCount.incrementAndGet(); |
990 | + } |
991 | + |
992 | + }; |
993 | + final Exchange ex = _persistit.getExchange(VOLUME_NAME, "a", true); |
994 | + |
995 | + doReplace.set(true); |
996 | + ex.to(1).getValue().put("abc"); |
997 | + tb.store(ex); |
998 | + ex.to(1).getValue().put("def"); |
999 | + tb.store(ex); |
1000 | + assertEquals("Should have registered a dup", 1, duplicateCount.get()); |
1001 | + |
1002 | + doReplace.set(false); |
1003 | + ex.to(2).getValue().put("abc"); |
1004 | + tb.store(ex); |
1005 | + ex.to(2).getValue().put("def"); |
1006 | + tb.store(ex); |
1007 | + assertEquals("Should have registered a dup", 2, duplicateCount.get()); |
1008 | + |
1009 | + tb.unitTestNextSortVolume(); |
1010 | + |
1011 | + ex.to(1).getValue().put("ghi"); |
1012 | + tb.store(ex); |
1013 | + ex.to(2).getValue().put("ghi"); |
1014 | + tb.store(ex); |
1015 | + assertEquals("Should not have registered a dup yet", 2, duplicateCount.get()); |
1016 | + |
1017 | + ex.to(3).getValue().put("abc"); |
1018 | + tb.store(ex); |
1019 | + |
1020 | + doReplace.set(false); |
1021 | + tb.merge(); |
1022 | + |
1023 | + assertEquals("Should have registered two dups", 4, duplicateCount.get()); |
1024 | + assertEquals("beforeMergeKey should be thrice", 3, beforeMergeCount.get()); |
1025 | + assertEquals("afterMergeKey should be called twice", 2, afterMergeCount.get()); |
1026 | + |
1027 | + final StringBuilder result = new StringBuilder(); |
1028 | + ex.clear().append(Key.BEFORE); |
1029 | + while (ex.next(true)) { |
1030 | + result.append(String.format("%s=%s,", ex.getKey(), ex.getValue())); |
1031 | + } |
1032 | + assertEquals("Expected result", "{1}=\"def\",{2}=\"abc\",", result.toString()); |
1033 | + } |
1034 | + |
1035 | + @Test |
1036 | + public void duplicatePriority() throws Exception { |
1037 | + final TreeBuilder tb = new TreeBuilder(_persistit) { |
1038 | + |
1039 | + @Override |
1040 | + protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) { |
1041 | + final String s1 = v1.getString(); |
1042 | + final String s2 = v2.getString(); |
1043 | + return s1.compareTo(s2) < 0; |
1044 | + } |
1045 | + |
1046 | + }; |
1047 | + final Exchange ex = _persistit.getExchange(VOLUME_NAME, "a", true); |
1048 | + final String nul = null; |
1049 | + |
1050 | + insertKeys(ex, tb, "x", "m", "n", nul, "a", nul, "q"); |
1051 | + tb.unitTestNextSortVolume(); |
1052 | + insertKeys(ex, tb, nul, "t", "o", "r", nul, nul, nul); |
1053 | + tb.unitTestNextSortVolume(); |
1054 | + insertKeys(ex, tb, nul, "u", "m", "j", "c", "x", "l"); |
1055 | + tb.unitTestNextSortVolume(); |
1056 | + insertKeys(ex, tb, nul, "m", nul, nul, "a", nul, "q"); |
1057 | + |
1058 | + tb.merge(); |
1059 | + |
1060 | + final StringBuilder result = new StringBuilder(); |
1061 | + for (int i = 0; i < 10; i++) { |
1062 | + ex.to(i).fetch(); |
1063 | + if (ex.isValueDefined()) { |
1064 | + result.append(ex.getValue().getString()); |
1065 | + } |
1066 | + } |
1067 | + assertEquals("xuorcxq", result.toString()); |
1068 | + } |
1069 | + |
1070 | + private void insertKeys(final Exchange ex, final TreeBuilder tb, final String... args) throws Exception { |
1071 | + for (int i = 0; i < args.length; i++) { |
1072 | + if (args[i] != null) { |
1073 | + ex.to(i).getValue().put(args[i]); |
1074 | + tb.store(ex); |
1075 | + } |
1076 | + } |
1077 | + } |
1078 | + |
1079 | +} |
1080 | |
1081 | === modified file 'src/test/java/com/persistit/stress/InsertBigLoad.java' |
1082 | --- src/test/java/com/persistit/stress/InsertBigLoad.java 2012-11-20 17:45:51 +0000 |
1083 | +++ src/test/java/com/persistit/stress/InsertBigLoad.java 2012-12-31 21:06:22 +0000 |
1084 | @@ -39,7 +39,7 @@ |
1085 | |
1086 | deleteFiles(substitute("$datapath$/persistit*")); |
1087 | |
1088 | - add(new BigLoad("records=10000000 buckets=10")); |
1089 | + add(new BigLoad("records=10000000")); |
1090 | |
1091 | final Configuration config = makeConfiguration(16384, "50000", CommitPolicy.SOFT); |
1092 | config.setTmpVolMaxSize(100000000000l); |
1093 | |
1094 | === modified file 'src/test/java/com/persistit/stress/unit/BigLoad.java' |
1095 | --- src/test/java/com/persistit/stress/unit/BigLoad.java 2012-11-20 17:45:51 +0000 |
1096 | +++ src/test/java/com/persistit/stress/unit/BigLoad.java 2012-12-31 21:06:22 +0000 |
1097 | @@ -16,14 +16,13 @@ |
1098 | package com.persistit.stress.unit; |
1099 | |
1100 | import java.util.Random; |
1101 | -import java.util.SortedMap; |
1102 | -import java.util.TreeMap; |
1103 | |
1104 | import com.persistit.Exchange; |
1105 | import com.persistit.Key; |
1106 | import com.persistit.Persistit; |
1107 | -import com.persistit.Volume; |
1108 | -import com.persistit.exception.PersistitException; |
1109 | +import com.persistit.Tree; |
1110 | +import com.persistit.TreeBuilder; |
1111 | +import com.persistit.Value; |
1112 | import com.persistit.stress.AbstractStressTest; |
1113 | import com.persistit.util.ArgParser; |
1114 | import com.persistit.util.Util; |
1115 | @@ -50,50 +49,48 @@ |
1116 | private static final Random RANDOM = new Random(); |
1117 | |
1118 | private int totalRecords; |
1119 | - private int recordsPerBucket; |
1120 | - |
1121 | - /** |
1122 | - * A Comparable wrapper for an Exchange. An instance of this class may be |
1123 | - * held in a SortedMap only if the Key of the Exchange does not change. In |
1124 | - * this example, the ComparableExchangeHolder is always removed from the |
1125 | - * TreeMap before the Key changes and then reinserted into a new location |
1126 | - * after the key has changed. |
1127 | - */ |
1128 | - static class ComparableExchangeHolder implements Comparable<ComparableExchangeHolder> { |
1129 | - |
1130 | - final Exchange exchange; |
1131 | - |
1132 | - ComparableExchangeHolder(final Exchange exchange) { |
1133 | - this.exchange = exchange; |
1134 | - } |
1135 | - |
1136 | - @Override |
1137 | - public int compareTo(final ComparableExchangeHolder ceh) { |
1138 | - final Key k1 = exchange.getKey(); |
1139 | - final Key k2 = ceh.exchange.getKey(); |
1140 | - return k1.compareTo(k2); |
1141 | - } |
1142 | - } |
1143 | |
1144 | public BigLoad(final int totalRecords, final int buckets) { |
1145 | super(""); |
1146 | this.totalRecords = totalRecords; |
1147 | - this.recordsPerBucket = totalRecords / buckets; |
1148 | } |
1149 | |
1150 | - public void load(final Persistit db) throws PersistitException { |
1151 | + public void load(final Persistit db) throws Exception { |
1152 | final long startLoadTime = System.nanoTime(); |
1153 | - final Volume sortVolume = db.createTemporaryVolume(); |
1154 | + final TreeBuilder tb = new TreeBuilder(db) { |
1155 | + @Override |
1156 | + protected void reportSorted(final long count) { |
1157 | + System.out.printf("Sorted %,15d records\n", count); |
1158 | + } |
1159 | + |
1160 | + @Override |
1161 | + protected void reportMerged(final long count) { |
1162 | + System.out.printf("Merged %,15d records\n", count); |
1163 | + } |
1164 | + |
1165 | + @Override |
1166 | + protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) { |
1167 | + System.out.println("Duplicate key detected: " + key); |
1168 | + return true; |
1169 | + } |
1170 | + }; |
1171 | final Exchange resultExchange = db.getExchange("persistit", "sorted", true); |
1172 | - System.out.printf("Loading %,d records into %,d buckets\n", totalRecords, totalRecords / recordsPerBucket); |
1173 | - final int bucketCount = loadBuckets(db, sortVolume); |
1174 | + System.out.printf("Loading %,d records\n", totalRecords); |
1175 | + |
1176 | + for (int i = 0; i < totalRecords; i++) { |
1177 | + resultExchange.clear().append(randomKey()); |
1178 | + tb.store(resultExchange); |
1179 | + } |
1180 | final long endLoadTime = System.nanoTime(); |
1181 | - |
1182 | - System.out.printf("Merging %,d records from %,d buckets into main database\n", totalRecords, bucketCount); |
1183 | - mergeBuckets(db, bucketCount, sortVolume, resultExchange); |
1184 | + System.out.printf("Loaded %,d records into %,d buckets in %,dms\n", totalRecords, tb.getSortVolumeCount(), |
1185 | + (endLoadTime - startLoadTime) / Util.NS_PER_MS); |
1186 | + |
1187 | + System.out.printf("Merging %,d records into main database\n", totalRecords); |
1188 | + |
1189 | + tb.merge(); |
1190 | final long endMergeTime = System.nanoTime(); |
1191 | System.out.printf("Merged %,d records in %,dms\n", totalRecords, (endMergeTime - endLoadTime) / Util.NS_PER_MS); |
1192 | - sortVolume.close(); |
1193 | + |
1194 | System.out.printf("Counting keys in main database (100M keys per dot) "); |
1195 | resultExchange.clear().append(Key.BEFORE); |
1196 | long count = 0; |
1197 | @@ -111,69 +108,16 @@ |
1198 | (endCountTime - startLoadTime) / Util.NS_PER_MS); |
1199 | } |
1200 | |
1201 | - private int loadBuckets(final Persistit db, final Volume volume) throws PersistitException { |
1202 | - long bucketStartTime = 0; |
1203 | - int bucket = 0; |
1204 | - Exchange ex = null; |
1205 | - for (int i = 0; i < totalRecords; i++) { |
1206 | - if ((i % recordsPerBucket) == 0) { |
1207 | - final long now = System.nanoTime(); |
1208 | - if (i > 0) { |
1209 | - System.out.printf("Loaded bucket %,5d in %,12dms\n", bucket, (now - bucketStartTime) |
1210 | - / Util.NS_PER_MS); |
1211 | - } |
1212 | - bucketStartTime = now; |
1213 | - bucket++; |
1214 | - ex = db.getExchange(volume, "sort" + bucket, true); |
1215 | - } |
1216 | - ex.clear().append(randomKey()); |
1217 | - ex.store(); |
1218 | - } |
1219 | - return bucket; |
1220 | - } |
1221 | - |
1222 | - private void mergeBuckets(final Persistit db, final int bucketCount, final Volume sortVolume, final Exchange to) |
1223 | - throws PersistitException { |
1224 | - final long startLoadTime = System.nanoTime(); |
1225 | - int loaded = 0; |
1226 | - |
1227 | - final SortedMap<ComparableExchangeHolder, Integer> sortMap = new TreeMap<ComparableExchangeHolder, Integer>(); |
1228 | - /* |
1229 | - * Load the sortMap using as keys the first key of each bucket. |
1230 | - */ |
1231 | - for (int bucket = 1; bucket <= bucketCount; bucket++) { |
1232 | - final Exchange ex = db.getExchange(sortVolume, "sort" + bucket, false); |
1233 | - if (ex.append(Key.BEFORE).next()) { |
1234 | - final Integer duplicate = sortMap.put(new ComparableExchangeHolder(ex), bucket); |
1235 | - showDuplicate(duplicate, bucket, ex); |
1236 | - } |
1237 | - } |
1238 | - |
1239 | - while (!sortMap.isEmpty()) { |
1240 | - final ComparableExchangeHolder ceh = sortMap.firstKey(); |
1241 | - final int bucket = sortMap.remove(ceh); |
1242 | - ceh.exchange.getKey().copyTo(to.getKey()); |
1243 | - if (ceh.exchange.next()) { |
1244 | - final Integer duplicate = sortMap.put(ceh, bucket); |
1245 | - showDuplicate(duplicate, bucket, ceh.exchange); |
1246 | - } |
1247 | - to.store(); |
1248 | - if ((++loaded % 10000000) == 0) { |
1249 | - System.out.printf("Merged %,d records in %,dms\n", loaded, (System.nanoTime() - startLoadTime) |
1250 | - / Util.NS_PER_MS); |
1251 | - } |
1252 | - } |
1253 | - } |
1254 | + final StringBuilder sb = new StringBuilder( |
1255 | + "00000000000000000000xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); |
1256 | |
1257 | private String randomKey() { |
1258 | - return String.format("%020dxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", |
1259 | - (RANDOM.nextLong() & Long.MAX_VALUE)); |
1260 | - } |
1261 | - |
1262 | - private void showDuplicate(final Integer bucket1, final int bucket2, final Exchange ex) { |
1263 | - if (bucket1 != null) { |
1264 | - System.out.printf("Duplicate key %s in buckets %,d and %,d\n", ex.getKey(), bucket1, bucket2); |
1265 | + long n = RANDOM.nextLong() & Long.MAX_VALUE; |
1266 | + for (int i = 20; --i >= 0;) { |
1267 | + sb.setCharAt(i, (char) ((n % 10) + '0')); |
1268 | + n /= 10; |
1269 | } |
1270 | + return sb.toString(); |
1271 | } |
1272 | |
1273 | /** |
1274 | @@ -220,7 +164,7 @@ |
1275 | } |
1276 | |
1277 | private final static String[] ARGS_TEMPLATE = { "records|int:1000000:1:1000000000|Total records to create", |
1278 | - "buckets|int:100:1:1000000|Number of sort buckets", "tmpdir|string:|Temporary volume path" }; |
1279 | + "tmpdir|string:|Temporary volume path" }; |
1280 | |
1281 | /** |
1282 | * Method to parse stress test arguments passed by the stress test suite. |
1283 | @@ -230,7 +174,6 @@ |
1284 | super.setUp(); |
1285 | final ArgParser ap = new ArgParser("com.persistit.BigLoad", _args, ARGS_TEMPLATE).strict(); |
1286 | totalRecords = ap.getIntValue("records"); |
1287 | - recordsPerBucket = totalRecords / ap.getIntValue("buckets"); |
1288 | final String path = ap.getStringValue("tmpdir"); |
1289 | if (path != null && !path.isEmpty()) { |
1290 | getPersistit().getConfiguration().setTmpVolDir(path); |
After one pass, this looks pretty good.
My only immediate comment is that DuplicatKeyExce ption could probably use some more information in addition to the detail message. Perhaps direct access to the key contents and tree. I'm thinking of the case where an application doesn't want/need to handle duplicates but would like to report a nice error to the user. I wouldn't block anything on this, it can always be added.
I'll make another pass shortly.