Merge lp:~pbeaman/akiban-persistit/tree_builder2 into lp:akiban-persistit
- tree_builder2
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Nathan Williams | ||||
Approved revision: | 421 | ||||
Merged at revision: | 417 | ||||
Proposed branch: | lp:~pbeaman/akiban-persistit/tree_builder2 | ||||
Merge into: | lp:akiban-persistit | ||||
Prerequisite: | lp:~pbeaman/akiban-persistit/add-lock | ||||
Diff against target: |
1328 lines (+538/-324) 8 files modified
src/main/java/com/persistit/BufferPool.java (+1/-4) src/main/java/com/persistit/Persistit.java (+7/-0) src/main/java/com/persistit/StreamLoader.java (+150/-148) src/main/java/com/persistit/StreamSaver.java (+1/-1) src/main/java/com/persistit/TreeBuilder.java (+269/-153) src/test/java/com/persistit/TreeBuilderTest.java (+98/-17) src/test/java/com/persistit/stress/InsertBigLoad.java (+4/-0) src/test/java/com/persistit/stress/unit/BigLoad.java (+8/-1) |
||||
To merge this branch: | bzr merge lp:~pbeaman/akiban-persistit/tree_builder2 | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Akiban Build User | Needs Fixing | ||
Nathan Williams | Approve | ||
Review via email: mp+144404@code.launchpad.net |
Commit message
Description of the change
A revised version of TreeBuilder that merges data much faster than the original version. The original version persisted the sorted tree data by writing temp volumes to disk. The resulting randomly ordered pages must be read non-sequentially during the merge phase.
This new version uses StreamSaver and StreamLoader to persist and re-read the sorted records in sequential file order, significantly reducing random disk reads during the merge process.
In one large test the merge process was reduced from nearly 5 hours to about 1hr 40min.
The add-lock branch is a prerequisite due to the version bump.
Note that names and semantics of some of the protected methods in TreeBuilder changed. In particular, this version provides a deterministic way to know which value was loaded first when a duplicate key is detected.
Peter Beaman (pbeaman) wrote : | # |
Nathan Williams (nwilliams) wrote : | # |
Persistit trunk and server dep is now at 3.2.5 so we'll probably want to bump it again.
The unique helper on Persistit isn't wrong, but seems far away from it's only usage. Constructing a TreeBuilder and then calling getName() would be mildly surprising with this appended. Maybe TreeBuilder doesn't even need an external name? Or the name written to disk could be an internal ID instead of the name?
There's quite a bit of shuffling here, but otherwise looks all plausible.
Peter Beaman (pbeaman) wrote : | # |
Thanks.
Yes, unique() seems far away, but Persistit is our closest singleton and I prefer not to use a static. I suppose we could have multiple counters in TimestampAllocator if you prefer.
Changed setName/getName to have expected behavior.
I kind of like the external name as file name to help someone figure out what the saves files might be if the process is running for a long time or crashes. I'm not wedded to it, but that's what I had in mind.
Found and fixed a couple of other issues - especially one in which closing a temporary volume leaves all its dirty pages in the buffer pool. (isDirty -> clearDirty).
Note that this branch now has 3.2.6-upgrade as a prerequisite, but I did not know how to add that.
Nathan Williams (nwilliams) wrote : | # |
The external name for the file makes sense and I'm fine with the rest.
Akiban Build User (build-akiban) wrote : | # |
There was one failure during build/test:
* during merge: merge conflicts while merging lp:~pbeaman/akiban-persistit/tree_builder2
Peter Beaman (pbeaman) wrote : | # |
Resolved conflicts. And A-pproving again because nothing substantive changed.
Akiban Build User (build-akiban) wrote : | # |
There were 2 failures during build/test:
* job system-tests-mtr failed at build number 3175: http://
* view must-pass failed: system-tests-mtr is red
Nathan Williams (nwilliams) wrote : | # |
Unrelated.
Preview Diff
1 | === modified file 'src/main/java/com/persistit/BufferPool.java' |
2 | --- src/main/java/com/persistit/BufferPool.java 2013-01-24 18:12:23 +0000 |
3 | +++ src/main/java/com/persistit/BufferPool.java 2013-01-28 10:44:23 +0000 |
4 | @@ -659,10 +659,7 @@ |
5 | } |
6 | } |
7 | buffer.clearValid(); |
8 | - if (buffer.isDirty()) { |
9 | - _dirtyPageCount.decrementAndGet(); |
10 | - buffer.clearDirty(); |
11 | - } |
12 | + buffer.clearDirty(); |
13 | buffer.setPageAddressAndVolume(0, null); |
14 | } |
15 | |
16 | |
17 | === modified file 'src/main/java/com/persistit/Persistit.java' |
18 | --- src/main/java/com/persistit/Persistit.java 2013-01-24 18:19:19 +0000 |
19 | +++ src/main/java/com/persistit/Persistit.java 2013-01-28 10:44:23 +0000 |
20 | @@ -43,6 +43,7 @@ |
21 | import java.util.TreeMap; |
22 | import java.util.WeakHashMap; |
23 | import java.util.concurrent.atomic.AtomicBoolean; |
24 | +import java.util.concurrent.atomic.AtomicLong; |
25 | import java.util.concurrent.atomic.AtomicReference; |
26 | |
27 | import javax.management.InstanceNotFoundException; |
28 | @@ -285,6 +286,8 @@ |
29 | |
30 | private final ThreadLocal<SoftReference<Value>> _valueThreadLocal = new ThreadLocal<SoftReference<Value>>(); |
31 | |
32 | + private final AtomicLong _uniqueCounter = new AtomicLong(); |
33 | + |
34 | private volatile Volume _lockVolume; |
35 | |
36 | /** |
37 | @@ -2513,6 +2516,10 @@ |
38 | return value; |
39 | } |
40 | |
41 | + long unique() { |
42 | + return _uniqueCounter.incrementAndGet(); |
43 | + } |
44 | + |
45 | private final static String[] ARG_TEMPLATE = { "_flag|g|Start AdminUI", |
46 | "_flag|i|Perform IntegrityCheck on all volumes", "_flag|w|Wait until AdminUI exists", |
47 | "_flag|c|Perform copy-back", "properties|string|Property file name", |
48 | |
49 | === modified file 'src/main/java/com/persistit/StreamLoader.java' |
50 | --- src/main/java/com/persistit/StreamLoader.java 2012-08-24 13:57:19 +0000 |
51 | +++ src/main/java/com/persistit/StreamLoader.java 2013-01-28 10:44:23 +0000 |
52 | @@ -110,126 +110,126 @@ |
53 | } |
54 | |
55 | public void load(final ImportHandler handler) throws IOException, PersistitException { |
56 | - String volumeName = null; |
57 | - String treeName = null; |
58 | - for (;;) { |
59 | - final int b1 = _dis.read(); |
60 | - if (b1 == -1) { |
61 | - break; |
62 | - } |
63 | - final int b2 = _dis.read(); |
64 | - final int recordType = ((b1 & 0xFF) << 8) + (b2 & 0xFF); |
65 | - |
66 | - switch (recordType) { |
67 | - case StreamSaver.RECORD_TYPE_FILL: { |
68 | - handler.handleFillRecord(); |
69 | - _otherRecordCount++; |
70 | - break; |
71 | - } |
72 | - case StreamSaver.RECORD_TYPE_DATA: { |
73 | - final int keySize = _dis.readShort(); |
74 | - final int elisionCount = _dis.readShort(); |
75 | - final int valueSize = _dis.readInt(); |
76 | - _value.ensureFit(valueSize); |
77 | - _dis.read(_key.getEncodedBytes(), elisionCount, keySize - elisionCount); |
78 | - _key.setEncodedSize(keySize); |
79 | - _dis.read(_value.getEncodedBytes(), 0, valueSize); |
80 | - _value.setEncodedSize(valueSize); |
81 | - handler.handleDataRecord(_key, _value); |
82 | - _dataRecordCount++; |
83 | - break; |
84 | - } |
85 | - case StreamSaver.RECORD_TYPE_KEY_FILTER: { |
86 | - final String filterString = _dis.readUTF(); |
87 | - handler.handleKeyFilterRecord(filterString); |
88 | - _otherRecordCount++; |
89 | - break; |
90 | - } |
91 | - case StreamSaver.RECORD_TYPE_VOLUME_ID: { |
92 | - final long id = _dis.readLong(); |
93 | - final long initialPages = _dis.readLong(); |
94 | - final long extensionPages = _dis.readLong(); |
95 | - final long maximumPages = _dis.readLong(); |
96 | - final int bufferSize = _dis.readInt(); |
97 | - final String path = _dis.readUTF(); |
98 | - volumeName = _dis.readUTF(); |
99 | - handler.handleVolumeIdRecord(id, initialPages, extensionPages, maximumPages, bufferSize, path, |
100 | - volumeName); |
101 | - _otherRecordCount++; |
102 | - break; |
103 | - } |
104 | - case StreamSaver.RECORD_TYPE_TREE_ID: { |
105 | - treeName = _dis.readUTF(); |
106 | - handler.handleTreeIdRecord(treeName); |
107 | - _otherRecordCount++; |
108 | - postMessage("Loading Tree " + treeName, Task.LOG_VERBOSE); |
109 | - break; |
110 | - } |
111 | - case StreamSaver.RECORD_TYPE_HOSTNAME: { |
112 | - final String hostName = _dis.readUTF(); |
113 | - handler.handleHostNameRecord(hostName); |
114 | - _otherRecordCount++; |
115 | - break; |
116 | - } |
117 | - case StreamSaver.RECORD_TYPE_USER: { |
118 | - final String hostName = _dis.readUTF(); |
119 | - handler.handleUserRecord(hostName); |
120 | - _otherRecordCount++; |
121 | - break; |
122 | - } |
123 | - case StreamSaver.RECORD_TYPE_COMMENT: { |
124 | - final String comment = _dis.readUTF(); |
125 | - handler.handleCommentRecord(comment); |
126 | - _otherRecordCount++; |
127 | - break; |
128 | - } |
129 | - case StreamSaver.RECORD_TYPE_COUNT: { |
130 | - final long dataRecordCount = _dis.readLong(); |
131 | - final long otherRecordCount = _dis.readLong(); |
132 | - handler.handleCountRecord(dataRecordCount, otherRecordCount); |
133 | - _otherRecordCount++; |
134 | - break; |
135 | - } |
136 | - case StreamSaver.RECORD_TYPE_START: { |
137 | - handler.handleStartRecord(); |
138 | - _otherRecordCount++; |
139 | - break; |
140 | - } |
141 | - case StreamSaver.RECORD_TYPE_END: { |
142 | - handler.handleEndRecord(); |
143 | - _otherRecordCount++; |
144 | - break; |
145 | - } |
146 | - case StreamSaver.RECORD_TYPE_TIMESTAMP: { |
147 | - final long timeStamp = _dis.readLong(); |
148 | - handler.handleTimeStampRecord(timeStamp); |
149 | - _otherRecordCount++; |
150 | - break; |
151 | - } |
152 | - case StreamSaver.RECORD_TYPE_EXCEPTION: { |
153 | - final String exceptionString = _dis.readUTF(); |
154 | - handler.handleExceptionRecord(exceptionString); |
155 | - _otherRecordCount++; |
156 | - break; |
157 | - } |
158 | - case StreamSaver.RECORD_TYPE_COMPLETION: { |
159 | - handler.handleCompletionRecord(); |
160 | - _otherRecordCount++; |
161 | - break; |
162 | - } |
163 | - default: { |
164 | - throw new CorruptImportStreamException("Invalid record type " + recordType + " (" |
165 | - + Util.bytesToHex(new byte[] { (byte) (recordType >>> 8), (byte) recordType }) |
166 | - + " after reading " + _dataRecordCount + " data records" + " and " + _otherRecordCount |
167 | - + " other records"); |
168 | - } |
169 | - } |
170 | - |
171 | + while (next(handler)) { |
172 | } |
173 | postMessage(String.format("DONE - processed %,d data records and %,d other records", _dataRecordCount, |
174 | _otherRecordCount), Task.LOG_NORMAL); |
175 | } |
176 | |
177 | + public boolean next(final ImportHandler handler) throws IOException, PersistitException { |
178 | + final int b1 = _dis.read(); |
179 | + if (b1 == -1) { |
180 | + return false; |
181 | + } |
182 | + final int b2 = _dis.read(); |
183 | + final int recordType = ((b1 & 0xFF) << 8) + (b2 & 0xFF); |
184 | + |
185 | + switch (recordType) { |
186 | + case StreamSaver.RECORD_TYPE_FILL: { |
187 | + handler.handleFillRecord(); |
188 | + _otherRecordCount++; |
189 | + break; |
190 | + } |
191 | + case StreamSaver.RECORD_TYPE_DATA: { |
192 | + final int keySize = _dis.readShort(); |
193 | + final int elisionCount = _dis.readShort(); |
194 | + final int valueSize = _dis.readInt(); |
195 | + _value.ensureFit(valueSize); |
196 | + _dis.read(_key.getEncodedBytes(), elisionCount, keySize - elisionCount); |
197 | + _key.setEncodedSize(keySize); |
198 | + _dis.read(_value.getEncodedBytes(), 0, valueSize); |
199 | + _value.setEncodedSize(valueSize); |
200 | + handler.handleDataRecord(_key, _value); |
201 | + _dataRecordCount++; |
202 | + break; |
203 | + } |
204 | + case StreamSaver.RECORD_TYPE_KEY_FILTER: { |
205 | + final String filterString = _dis.readUTF(); |
206 | + handler.handleKeyFilterRecord(filterString); |
207 | + _otherRecordCount++; |
208 | + break; |
209 | + } |
210 | + case StreamSaver.RECORD_TYPE_VOLUME_ID: { |
211 | + final long id = _dis.readLong(); |
212 | + final long initialPages = _dis.readLong(); |
213 | + final long extensionPages = _dis.readLong(); |
214 | + final long maximumPages = _dis.readLong(); |
215 | + final int bufferSize = _dis.readInt(); |
216 | + final String path = _dis.readUTF(); |
217 | + handler._volumeName = _dis.readUTF(); |
218 | + handler.handleVolumeIdRecord(id, initialPages, extensionPages, maximumPages, bufferSize, path, |
219 | + handler._volumeName); |
220 | + _otherRecordCount++; |
221 | + break; |
222 | + } |
223 | + case StreamSaver.RECORD_TYPE_TREE_ID: { |
224 | + handler._treeName = _dis.readUTF(); |
225 | + handler.handleTreeIdRecord(handler._treeName); |
226 | + _otherRecordCount++; |
227 | + postMessage("Loading Tree " + handler._treeName, Task.LOG_VERBOSE); |
228 | + break; |
229 | + } |
230 | + case StreamSaver.RECORD_TYPE_HOSTNAME: { |
231 | + final String hostName = _dis.readUTF(); |
232 | + handler.handleHostNameRecord(hostName); |
233 | + _otherRecordCount++; |
234 | + break; |
235 | + } |
236 | + case StreamSaver.RECORD_TYPE_USER: { |
237 | + final String hostName = _dis.readUTF(); |
238 | + handler.handleUserRecord(hostName); |
239 | + _otherRecordCount++; |
240 | + break; |
241 | + } |
242 | + case StreamSaver.RECORD_TYPE_COMMENT: { |
243 | + final String comment = _dis.readUTF(); |
244 | + handler.handleCommentRecord(comment); |
245 | + _otherRecordCount++; |
246 | + break; |
247 | + } |
248 | + case StreamSaver.RECORD_TYPE_COUNT: { |
249 | + final long dataRecordCount = _dis.readLong(); |
250 | + final long otherRecordCount = _dis.readLong(); |
251 | + handler.handleCountRecord(dataRecordCount, otherRecordCount); |
252 | + _otherRecordCount++; |
253 | + break; |
254 | + } |
255 | + case StreamSaver.RECORD_TYPE_START: { |
256 | + handler.handleStartRecord(); |
257 | + _otherRecordCount++; |
258 | + break; |
259 | + } |
260 | + case StreamSaver.RECORD_TYPE_END: { |
261 | + handler.handleEndRecord(); |
262 | + _otherRecordCount++; |
263 | + break; |
264 | + } |
265 | + case StreamSaver.RECORD_TYPE_TIMESTAMP: { |
266 | + final long timeStamp = _dis.readLong(); |
267 | + handler.handleTimeStampRecord(timeStamp); |
268 | + _otherRecordCount++; |
269 | + break; |
270 | + } |
271 | + case StreamSaver.RECORD_TYPE_EXCEPTION: { |
272 | + final String exceptionString = _dis.readUTF(); |
273 | + handler.handleExceptionRecord(exceptionString); |
274 | + _otherRecordCount++; |
275 | + break; |
276 | + } |
277 | + case StreamSaver.RECORD_TYPE_COMPLETION: { |
278 | + handler.handleCompletionRecord(); |
279 | + _otherRecordCount++; |
280 | + break; |
281 | + } |
282 | + default: { |
283 | + throw new CorruptImportStreamException("Invalid record type " + recordType + " (" |
284 | + + Util.bytesToHex(new byte[] { (byte) (recordType >>> 8), (byte) recordType }) + " after reading " |
285 | + + _dataRecordCount + " data records" + " and " + _otherRecordCount + " other records"); |
286 | + } |
287 | + } |
288 | + return true; |
289 | + } |
290 | + |
291 | /** |
292 | * Handler for various record types in stream being loaded. |
293 | * |
294 | @@ -237,7 +237,7 @@ |
295 | * |
296 | */ |
297 | |
298 | - public static class ImportHandler { |
299 | + protected static class ImportHandler { |
300 | protected Persistit _persistit; |
301 | protected TreeSelector _treeSelector; |
302 | protected Exchange _exchange; |
303 | @@ -246,12 +246,14 @@ |
304 | protected KeyFilter _keyFilter; |
305 | protected boolean _createMissingVolumes; |
306 | protected boolean _createMissingTrees; |
307 | + protected String _volumeName = null; |
308 | + protected String _treeName = null; |
309 | |
310 | - public ImportHandler(final Persistit persistit) { |
311 | + protected ImportHandler(final Persistit persistit) { |
312 | this(persistit, new TreeSelector(), true, true); |
313 | } |
314 | |
315 | - public ImportHandler(final Persistit persistit, final TreeSelector treeSelector, |
316 | + protected ImportHandler(final Persistit persistit, final TreeSelector treeSelector, |
317 | final boolean createMissingVolumes, final boolean createMissingTrees) { |
318 | _persistit = persistit; |
319 | _treeSelector = treeSelector == null ? new TreeSelector() : treeSelector; |
320 | @@ -259,10 +261,10 @@ |
321 | _createMissingVolumes = createMissingVolumes; |
322 | } |
323 | |
324 | - public void handleFillRecord() throws PersistitException { |
325 | + protected void handleFillRecord() throws PersistitException { |
326 | } |
327 | |
328 | - public void handleDataRecord(final Key key, final Value value) throws PersistitException { |
329 | + protected void handleDataRecord(final Key key, final Value value) throws PersistitException { |
330 | if (_keyFilter == null || _keyFilter.selected(key)) { |
331 | if (_volume == null || _tree == null) |
332 | return; |
333 | @@ -277,10 +279,10 @@ |
334 | } |
335 | } |
336 | |
337 | - public void handleKeyFilterRecord(final String keyFilterString) throws PersistitException { |
338 | + protected void handleKeyFilterRecord(final String keyFilterString) throws PersistitException { |
339 | } |
340 | |
341 | - public void handleVolumeIdRecord(final long volumeId, final long initialPages, final long extensionPages, |
342 | + protected void handleVolumeIdRecord(final long volumeId, final long initialPages, final long extensionPages, |
343 | final long maximumPages, final int bufferSize, final String path, final String name) |
344 | throws PersistitException { |
345 | final Exchange oldExchange = _exchange; |
346 | @@ -306,7 +308,7 @@ |
347 | } |
348 | } |
349 | |
350 | - public void handleTreeIdRecord(final String treeName) throws PersistitException { |
351 | + protected void handleTreeIdRecord(final String treeName) throws PersistitException { |
352 | final Exchange oldExchange = _exchange; |
353 | _exchange = null; |
354 | _tree = null; |
355 | @@ -328,31 +330,31 @@ |
356 | |
357 | } |
358 | |
359 | - public void handleTimeStampRecord(final long timeStamp) throws PersistitException { |
360 | - } |
361 | - |
362 | - public void handleHostNameRecord(final String hostName) throws PersistitException { |
363 | - } |
364 | - |
365 | - public void handleUserRecord(final String userName) throws PersistitException { |
366 | - } |
367 | - |
368 | - public void handleCommentRecord(final String comment) throws PersistitException { |
369 | - } |
370 | - |
371 | - public void handleCountRecord(final long keyValueRecords, final long otherRecords) throws PersistitException { |
372 | - } |
373 | - |
374 | - public void handleStartRecord() throws PersistitException { |
375 | - } |
376 | - |
377 | - public void handleEndRecord() throws PersistitException { |
378 | - } |
379 | - |
380 | - public void handleExceptionRecord(final String exceptionString) throws PersistitException { |
381 | - } |
382 | - |
383 | - public void handleCompletionRecord() throws PersistitException { |
384 | + protected void handleTimeStampRecord(final long timeStamp) throws PersistitException { |
385 | + } |
386 | + |
387 | + protected void handleHostNameRecord(final String hostName) throws PersistitException { |
388 | + } |
389 | + |
390 | + protected void handleUserRecord(final String userName) throws PersistitException { |
391 | + } |
392 | + |
393 | + protected void handleCommentRecord(final String comment) throws PersistitException { |
394 | + } |
395 | + |
396 | + protected void handleCountRecord(final long keyValueRecords, final long otherRecords) throws PersistitException { |
397 | + } |
398 | + |
399 | + protected void handleStartRecord() throws PersistitException { |
400 | + } |
401 | + |
402 | + protected void handleEndRecord() throws PersistitException { |
403 | + } |
404 | + |
405 | + protected void handleExceptionRecord(final String exceptionString) throws PersistitException { |
406 | + } |
407 | + |
408 | + protected void handleCompletionRecord() throws PersistitException { |
409 | } |
410 | |
411 | } |
412 | |
413 | === modified file 'src/main/java/com/persistit/StreamSaver.java' |
414 | --- src/main/java/com/persistit/StreamSaver.java 2012-08-24 13:57:19 +0000 |
415 | +++ src/main/java/com/persistit/StreamSaver.java 2013-01-28 10:44:23 +0000 |
416 | @@ -483,7 +483,7 @@ |
417 | } |
418 | |
419 | /** |
420 | - * Saves on or more trees in the specified <code>Volume</code>. |
421 | + * Saves one or more trees in the specified <code>Volume</code>. |
422 | * |
423 | * @param volume |
424 | * The <code>Volume</code> |
425 | |
426 | === modified file 'src/main/java/com/persistit/TreeBuilder.java' |
427 | --- src/main/java/com/persistit/TreeBuilder.java 2013-01-19 04:11:48 +0000 |
428 | +++ src/main/java/com/persistit/TreeBuilder.java 2013-01-28 10:44:23 +0000 |
429 | @@ -15,16 +15,22 @@ |
430 | |
431 | package com.persistit; |
432 | |
433 | +import java.io.BufferedInputStream; |
434 | +import java.io.BufferedOutputStream; |
435 | +import java.io.DataInputStream; |
436 | +import java.io.DataOutputStream; |
437 | import java.io.File; |
438 | +import java.io.FileInputStream; |
439 | +import java.io.FileOutputStream; |
440 | import java.io.IOException; |
441 | +import java.text.SimpleDateFormat; |
442 | import java.util.ArrayList; |
443 | import java.util.Collections; |
444 | import java.util.Comparator; |
445 | +import java.util.Date; |
446 | import java.util.HashMap; |
447 | -import java.util.HashSet; |
448 | import java.util.List; |
449 | import java.util.Map; |
450 | -import java.util.Set; |
451 | import java.util.SortedMap; |
452 | import java.util.TreeMap; |
453 | import java.util.concurrent.atomic.AtomicLong; |
454 | @@ -122,11 +128,12 @@ |
455 | * of records inserted with duplicate keys</li> |
456 | * <li>{@link #beforeMergeKey(Exchange)} - allowing filtering or custom handling |
457 | * per record while merging</li> |
458 | - * <li>{@link #afterMergeKey(Exchange)} - behavior after merging one record</li> |
459 | - * <li>{@link #beforeSortVolumeEvicted(Volume)} - behavior before evicting a |
460 | - * sort volume when full</li> |
461 | - * <li>{@link #afterSortVolumeEvicted(Volume)} - behavior after evicting a sort |
462 | - * volume when full</li> |
463 | + * <li>{@link #afterMergeKey(Exchange)} - customizable behavior after merging |
464 | + * one record</li> |
465 | + * <li>{@link #beforeSortVolumeClosed(Volume)} - customizable behavior before |
466 | + * closing a sort volume when full</li> |
467 | + * <li>{@link #afterSortVolumeClose(Volume)} - customizable behavior after |
468 | + * closing a sort volume when full</li> |
469 | * <li>{@link #getTreeComparator()} - return a custom Comparator to determine |
470 | * sequence in which trees are populated within the {@link #merge()} method |
471 | * </ul> |
472 | @@ -138,22 +145,26 @@ |
473 | public class TreeBuilder { |
474 | private final static float DEFAULT_BUFFER_POOL_FRACTION = 0.5f; |
475 | private final static long REPORT_REPORT_MULTIPLE = 1000000; |
476 | - private final static String DEFAULT_NAME = "TreeBuilder"; |
477 | + private final static String SDF = "yyyyMMddHHmm"; |
478 | + private final static int STREAM_SIZE = 1024 * 1024; |
479 | |
480 | private final String _name; |
481 | + private final long _uniqueId; |
482 | private final Persistit _persistit; |
483 | private final List<File> _directories = new ArrayList<File>(); |
484 | - private final List<Volume> _sortVolumes = new ArrayList<Volume>(); |
485 | private final int _pageSize; |
486 | private final int _pageLimit; |
487 | private final AtomicLong _sortedKeyCount = new AtomicLong(); |
488 | private final AtomicLong _mergedKeyCount = new AtomicLong(); |
489 | private volatile long _reportKeyCountMultiple = REPORT_REPORT_MULTIPLE; |
490 | - private Volume _currentSortVolume; |
491 | - private int _nextDirectoryIndex; |
492 | - |
493 | - private final Set<Tree> _allTrees = new HashSet<Tree>(); |
494 | - private final List<Tree> _sortedTrees = new ArrayList<Tree>(); |
495 | + private Volume _sortVolume; |
496 | + private File _sortFile; |
497 | + |
498 | + private final List<Tree> _allTrees = new ArrayList<Tree>(); |
499 | + private final Map<String, Tree> _sortTreeMap = new HashMap<String, Tree>(); |
500 | + |
501 | + private int _sortFileIndex; |
502 | + private final List<Node> _sortNodes = new ArrayList<Node>(); |
503 | |
504 | private final ThreadLocal<Map<Tree, Exchange>> _sortExchangeMapThreadLocal = new ThreadLocal<Map<Tree, Exchange>>() { |
505 | @Override |
506 | @@ -163,16 +174,21 @@ |
507 | }; |
508 | |
509 | private final Comparator<Tree> _defaultTreeComparator = new Comparator<Tree>() { |
510 | + /** |
511 | + * Default implementation returns trees sorted in the order they were |
512 | + * added to the _allTrees list - in other words, sorting should leave |
513 | + * the list unchanged. |
514 | + * |
515 | + * @param a |
516 | + * @param b |
517 | + * @return |
518 | + */ |
519 | @Override |
520 | public int compare(final Tree a, final Tree b) { |
521 | if (a == b) { |
522 | return 0; |
523 | } |
524 | - if (a.getVolume() == b.getVolume()) { |
525 | - return a.getName().compareTo(b.getName()); |
526 | - } else { |
527 | - return a.getVolume().getName().compareTo(b.getVolume().getName()); |
528 | - } |
529 | + return _allTrees.indexOf(a) - _allTrees.indexOf(b); |
530 | } |
531 | |
532 | @Override |
533 | @@ -182,51 +198,54 @@ |
534 | }; |
535 | |
536 | private class Node implements Comparable<Node> { |
537 | - final Volume _volume; |
538 | - int _treeListIndex = -1; |
539 | - Exchange _exchange = null; |
540 | - Tree _currentTree = null; |
541 | - Node _duplicate; |
542 | - |
543 | - private Node(final Volume volume) { |
544 | - _volume = volume; |
545 | - } |
546 | - |
547 | - private boolean next() throws PersistitException { |
548 | - for (;;) { |
549 | - if (_exchange == null) { |
550 | - _treeListIndex++; |
551 | - if (_treeListIndex >= _sortedTrees.size()) { |
552 | - _volume.close(); |
553 | - return false; |
554 | - } |
555 | - final String tempTreeName = "_" + _sortedTrees.get(_treeListIndex).getHandle(); |
556 | - final Tree sortTree = _volume.getTree(tempTreeName, false); |
557 | - if (sortTree == null) { |
558 | - continue; |
559 | - } |
560 | - _exchange = new Exchange(sortTree); |
561 | - _currentTree = _sortedTrees.get(_treeListIndex); |
562 | - } |
563 | - if (_exchange.next(true)) { |
564 | - return true; |
565 | - } |
566 | - _exchange = null; |
567 | - } |
568 | + |
569 | + private Tree _tree; |
570 | + private Key _key; |
571 | + private Value _value; |
572 | + private Node _duplicate; |
573 | + private final int _precedence; |
574 | + |
575 | + private final File _file; |
576 | + private StreamLoader _loader; |
577 | + private Handler _handler; |
578 | + private boolean _next; |
579 | + |
580 | + private class Handler extends StreamLoader.ImportHandler { |
581 | + |
582 | + private Handler(final Persistit persistit) { |
583 | + super(persistit); |
584 | + } |
585 | + |
586 | + @Override |
587 | + protected void handleDataRecord(final Key key, final Value value) throws PersistitException { |
588 | + Node.this._tree = super._tree; |
589 | + _key = key; |
590 | + _value = value; |
591 | + _next = true; |
592 | + } |
593 | + } |
594 | + |
595 | + private File getFile() { |
596 | + return _file; |
597 | + } |
598 | + |
599 | + private Node(final File file, final int index) { |
600 | + _file = file; |
601 | + _precedence = index; |
602 | } |
603 | |
604 | @Override |
605 | public int compareTo(final Node node) { |
606 | - if (_exchange == null) { |
607 | - return node._exchange == null ? 0 : -1; |
608 | - } |
609 | - final int treeComparison = getTreeComparator().compare(_currentTree, node._currentTree); |
610 | - if (treeComparison != 0) { |
611 | - return treeComparison; |
612 | - } |
613 | - final Key k1 = _exchange.getKey(); |
614 | - final Key k2 = node._exchange.getKey(); |
615 | - return k1.compareTo(k2); |
616 | + if (_tree == null) { |
617 | + return node._tree == null ? 0 : 1; |
618 | + } |
619 | + if (node._tree == null) { |
620 | + return -1; |
621 | + } |
622 | + if (_tree != node._tree) { |
623 | + return _allTrees.indexOf(_tree) - _allTrees.indexOf(node._tree); |
624 | + } else |
625 | + return _key.compareTo(node._key); |
626 | } |
627 | |
628 | @Override |
629 | @@ -237,25 +256,67 @@ |
630 | if (sb.length() > 0) { |
631 | sb.append(","); |
632 | } |
633 | - if (n._exchange == null) { |
634 | - sb.append("<null>"); |
635 | + if (n._tree == null) { |
636 | + sb.append("<end>"); |
637 | } else { |
638 | - sb.append("<" |
639 | - + (n._currentTree == null ? "?" : n._currentTree.getName() + n._exchange.getKey() + "=" |
640 | - + n._exchange.getValue()) + ">"); |
641 | + sb.append("<" + (n._tree.getName() + n._key + "=" + n._value) + ">"); |
642 | } |
643 | n = n._duplicate; |
644 | } |
645 | return sb.toString(); |
646 | } |
647 | + |
648 | + private void createStreamLoader() throws Exception { |
649 | + _loader = new StreamLoader(_persistit, new DataInputStream(new BufferedInputStream(new FileInputStream( |
650 | + _file), STREAM_SIZE))); |
651 | + _handler = new Handler(_persistit); |
652 | + } |
653 | + |
654 | + private boolean next() throws Exception { |
655 | + _next = false; |
656 | + while (_loader.next(_handler) && !_next) |
657 | + ; |
658 | + if (!_next) { |
659 | + _loader.close(); |
660 | + } |
661 | + return _next; |
662 | + } |
663 | + |
664 | + } |
665 | + |
666 | + private class SortStreamSaver extends StreamSaver { |
667 | + |
668 | + Tree _sortTree = null; |
669 | + |
670 | + SortStreamSaver(final Persistit persistit, final DataOutputStream stream) { |
671 | + super(persistit, stream); |
672 | + } |
673 | + |
674 | + @Override |
675 | + protected void writeData(final Exchange exchange) throws IOException { |
676 | + if (exchange.getTree() != _sortTree) { |
677 | + final Tree source = _sortTreeMap.get(exchange.getTree().getName()); |
678 | + if (_lastVolume != source.getVolume()) { |
679 | + writeVolumeInfo(source.getVolume()); |
680 | + _lastVolume = source.getVolume(); |
681 | + } |
682 | + if (_lastTree != source) { |
683 | + writeTreeInfo(source); |
684 | + _lastTree = source; |
685 | + } |
686 | + } |
687 | + writeData(exchange.getKey(), exchange.getValue()); |
688 | + _recordCount++; |
689 | + } |
690 | } |
691 | |
692 | public TreeBuilder(final Persistit persistit) { |
693 | - this(persistit, DEFAULT_NAME, -1, DEFAULT_BUFFER_POOL_FRACTION); |
694 | + this(persistit, new SimpleDateFormat(SDF).format(new Date()), -1, DEFAULT_BUFFER_POOL_FRACTION); |
695 | } |
696 | |
697 | public TreeBuilder(final Persistit persistit, final String name, final int pageSize, final float bufferPoolFraction) { |
698 | _name = name; |
699 | + _uniqueId = persistit.unique(); |
700 | _persistit = persistit; |
701 | _pageSize = pageSize == -1 ? computePageSize(persistit) : pageSize; |
702 | final int bufferCount = _persistit.getBufferPool(_pageSize).getBufferCount(); |
703 | @@ -302,10 +363,10 @@ |
704 | } |
705 | |
706 | /** |
707 | - * @return Count of sort volumes that have been created while sorting keys |
708 | + * @return Count of sort trees that have been created while sorting keys |
709 | */ |
710 | - public final synchronized int getSortVolumeCount() { |
711 | - return _sortVolumes.size(); |
712 | + public final synchronized int getSortFileCount() { |
713 | + return _sortFileIndex; |
714 | } |
715 | |
716 | /** |
717 | @@ -326,10 +387,8 @@ |
718 | * @return List of destination |
719 | * <code>Tree<code> instances. This list is built as keys are stored. |
720 | */ |
721 | - public final List<Tree> getTrees() { |
722 | - final List<Tree> list = new ArrayList<Tree>(); |
723 | - list.addAll(_allTrees); |
724 | - Collections.sort(list, getTreeComparator()); |
725 | + public synchronized final List<Tree> getTrees() { |
726 | + final List<Tree> list = new ArrayList<Tree>(_allTrees); |
727 | return list; |
728 | } |
729 | |
730 | @@ -394,7 +453,7 @@ |
731 | synchronized (this) { |
732 | _directories.clear(); |
733 | _directories.addAll(directories); |
734 | - _nextDirectoryIndex = 0; |
735 | + _sortFileIndex = 0; |
736 | } |
737 | } |
738 | } |
739 | @@ -402,9 +461,9 @@ |
740 | /** |
741 | * |
742 | * @return List of directories set via the |
743 | - * {@link #setSortTreeDirectories(List)} method. |
744 | + * {@link #setSortFileDirectories(List)} method. |
745 | */ |
746 | - public final List<File> getSortTreeDirectories() { |
747 | + public final List<File> getSortFileDirectories() { |
748 | return Collections.unmodifiableList(_directories); |
749 | } |
750 | |
751 | @@ -441,7 +500,10 @@ |
752 | ex = _persistit.getExchange(newSortVolume, tempTreeName, true); |
753 | map.put(tree, ex); |
754 | synchronized (this) { |
755 | - _allTrees.add(tree); |
756 | + if (!_allTrees.contains(tree)) { |
757 | + _allTrees.add(tree); |
758 | + _sortTreeMap.put(tempTreeName, tree); |
759 | + } |
760 | } |
761 | } |
762 | key.copyTo(ex.getKey()); |
763 | @@ -466,8 +528,13 @@ |
764 | private void insertNode(final Map<Node, Node> sorted, final Node node) throws Exception { |
765 | final Node other = sorted.put(node, node); |
766 | if (other != null) { |
767 | - if (!duplicateKeyDetected(node._currentTree, node._exchange.getKey(), other._exchange.getValue(), |
768 | - node._exchange.getValue())) { |
769 | + final boolean reverse; |
770 | + if (node._precedence < other._precedence) { |
771 | + reverse = duplicateKeyDetected(node._tree, node._key, node._value, other._value); |
772 | + } else { |
773 | + reverse = !duplicateKeyDetected(node._tree, node._key, other._value, node._value); |
774 | + } |
775 | + if (reverse) { |
776 | sorted.put(node, other); |
777 | final Node p = other._duplicate; |
778 | other._duplicate = node; |
779 | @@ -485,17 +552,16 @@ |
780 | * @throws Exception |
781 | */ |
782 | public synchronized void merge() throws Exception { |
783 | + finishSortVolume(); |
784 | if ((_mergedKeyCount.get() % _reportKeyCountMultiple) != 0) { |
785 | reportSorted(_mergedKeyCount.get()); |
786 | } |
787 | - _sortedTrees.clear(); |
788 | - _sortedTrees.addAll(_allTrees); |
789 | Tree currentTree = null; |
790 | Exchange ex = null; |
791 | - Collections.sort(_sortedTrees, getTreeComparator()); |
792 | final SortedMap<Node, Node> sorted = new TreeMap<Node, Node>(); |
793 | - for (final Volume volume : _sortVolumes) { |
794 | - final Node node = new Node(volume); |
795 | + |
796 | + for (final Node node : _sortNodes) { |
797 | + node.createStreamLoader(); |
798 | if (node.next()) { |
799 | insertNode(sorted, node); |
800 | } |
801 | @@ -507,18 +573,18 @@ |
802 | } |
803 | Node node = sorted.firstKey(); |
804 | node = sorted.remove(node); |
805 | - if (node._currentTree != currentTree) { |
806 | - ex = new Exchange(node._currentTree); |
807 | - currentTree = node._currentTree; |
808 | + if (node._tree != currentTree) { |
809 | + ex = new Exchange(node._tree); |
810 | + currentTree = node._tree; |
811 | } |
812 | + node._key.copyTo(ex.getKey()); |
813 | + node._value.copyTo(ex.getValue()); |
814 | |
815 | - node._exchange.getKey().copyTo(ex.getKey()); |
816 | - node._exchange.getValue().copyTo(ex.getValue()); |
817 | if (beforeMergeKey(ex)) { |
818 | ex.fetchAndStore(); |
819 | boolean stored = true; |
820 | if (ex.getValue().isDefined()) { |
821 | - if (!duplicateKeyDetected(ex.getTree(), ex.getKey(), ex.getValue(), node._exchange.getValue())) { |
822 | + if (!duplicateKeyDetected(ex.getTree(), ex.getKey(), ex.getValue(), node._value)) { |
823 | ex.store(); |
824 | stored = false; |
825 | } |
826 | @@ -547,21 +613,32 @@ |
827 | |
828 | private synchronized void reset() throws Exception { |
829 | Exception exception = null; |
830 | - for (final Volume volume : _sortVolumes) { |
831 | + try { |
832 | + if (_sortVolume != null) { |
833 | + _sortVolume.close(); |
834 | + } |
835 | + } catch (final PersistitException e) { |
836 | + if (exception == null) { |
837 | + exception = e; |
838 | + } |
839 | + } |
840 | + |
841 | + for (final Node node : _sortNodes) { |
842 | try { |
843 | - volume.close(); |
844 | - } catch (final PersistitException e) { |
845 | + if (node.getFile() != null) { |
846 | + node.getFile().delete(); |
847 | + } |
848 | + } catch (final Exception e) { |
849 | if (exception == null) { |
850 | exception = e; |
851 | } |
852 | } |
853 | } |
854 | - _sortVolumes.clear(); |
855 | - _currentSortVolume = null; |
856 | - _nextDirectoryIndex = 0; |
857 | - _sortExchangeMapThreadLocal.get().clear(); |
858 | _allTrees.clear(); |
859 | - _sortedTrees.clear(); |
860 | + _sortNodes.clear(); |
861 | + _sortVolume = null; |
862 | + _sortFileIndex = 0; |
863 | + _sortExchangeMapThreadLocal.get().clear(); |
864 | if (exception != null) { |
865 | throw exception; |
866 | } |
867 | @@ -574,91 +651,115 @@ |
868 | } |
869 | |
870 | private synchronized Volume getSortVolume() throws Exception { |
871 | - final boolean full = _currentSortVolume != null && _currentSortVolume.getNextAvailablePage() > _pageLimit; |
872 | - if (full) { |
873 | - if (beforeSortVolumeEvicted(_currentSortVolume)) { |
874 | - _persistit.getBufferPool(_pageSize).evict(_currentSortVolume); |
875 | - } |
876 | - afterSortVolumeEvicted(_currentSortVolume); |
877 | - } |
878 | - if (full || _currentSortVolume == null) { |
879 | - _currentSortVolume = createSortVolume(); |
880 | - _sortVolumes.add(_currentSortVolume); |
881 | - } |
882 | - return _currentSortVolume; |
883 | - } |
884 | - |
885 | - private Volume createSortVolume() throws Exception { |
886 | - final File directory; |
887 | - if (_directories.isEmpty()) { |
888 | - final String directoryName = _persistit.getConfiguration().getTmpVolDir(); |
889 | - directory = directoryName == null ? null : new File(directoryName); |
890 | - } else { |
891 | - directory = _directories.get(_nextDirectoryIndex % _directories.size()); |
892 | - _nextDirectoryIndex++; |
893 | - } |
894 | - return Volume.createTemporaryVolume(_persistit, _pageSize, directory); |
895 | - } |
896 | - |
897 | - /** |
898 | - * This method may be extended to provide an application-specific ordering |
899 | - * on <code>Tree</code>s. This ordering determines the sequence in which |
900 | - * destination trees are built from the sort data. By default trees are |
901 | - * build in alphabetical order by volume and tree name. However, an |
902 | - * application may choose a different order to ensure invariants for |
903 | - * concurrent use. |
904 | - * |
905 | - * @return a <code>java.util.Comparator</code> on <code>Tree</code> |
906 | - */ |
907 | - protected Comparator<Tree> getTreeComparator() { |
908 | - return _defaultTreeComparator; |
909 | + if (_sortVolume != null && _sortVolume.getNextAvailablePage() > _pageLimit) { |
910 | + finishSortVolume(); |
911 | + } |
912 | + if (_sortVolume == null) { |
913 | + final File directory; |
914 | + if (_directories.isEmpty()) { |
915 | + String directoryName = _persistit.getConfiguration().getTmpVolDir(); |
916 | + if (directoryName == null) { |
917 | + directoryName = System.getProperty("java.io.tmpdir"); |
918 | + } |
919 | + directory = new File(directoryName); |
920 | + if (!directory.exists()) { |
921 | + directory.mkdirs(); |
922 | + } |
923 | + _directories.add(directory); |
924 | + } else { |
925 | + directory = _directories.get(_sortFileIndex % _directories.size()); |
926 | + } |
927 | + _sortVolume = Volume.createTemporaryVolume(_persistit, _pageSize, directory); |
928 | + _sortFile = new File(directory, String.format("%s_%d.%06d", _name, _uniqueId, _sortFileIndex)); |
929 | + final Node node = new Node(_sortFile, _sortFileIndex); |
930 | + _sortNodes.add(node); |
931 | + _sortFileIndex++; |
932 | + } |
933 | + return _sortVolume; |
934 | + } |
935 | + |
936 | + private void finishSortVolume() throws Exception { |
937 | + if (_sortVolume != null) { |
938 | + beforeSortVolumeClosed(_sortVolume, _sortFile); |
939 | + saveSortVolume(_sortVolume, _sortFile); |
940 | + afterSortVolumeClose(_sortVolume, _sortFile); |
941 | + _sortVolume.close(); |
942 | + _sortVolume = null; |
943 | + } |
944 | + } |
945 | + |
946 | + private void saveSortVolume(final Volume volume, final File file) throws Exception { |
947 | + final DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), |
948 | + STREAM_SIZE)); |
949 | + final List<Tree> sorted = new ArrayList<Tree>(_allTrees); |
950 | + Collections.sort(sorted, getTreeComparator()); |
951 | + final StreamSaver saver = new SortStreamSaver(_persistit, dos); |
952 | + for (final Tree tree : sorted) { |
953 | + final String sortTreeName = "_" + tree.getHandle(); |
954 | + final Tree sortTree = volume.getTree(sortTreeName, false); |
955 | + if (sortTree != null) { |
956 | + final Exchange exchange = new Exchange(sortTree); |
957 | + saver.save(exchange, null); |
958 | + } |
959 | + } |
960 | + file.deleteOnExit(); |
961 | + dos.close(); |
962 | } |
963 | |
964 | /** |
965 | * This method may be extended to provide application-specific behavior when |
966 | - * a sort volume has been filled to capacity. The default implementation |
967 | - * return <code>true</code>. If this method returns <code>true</code>, |
968 | - * <code>TreeBuilder</code> evicts the <code>Volume</code> to avoid |
969 | - * over-running the <code>BufferPool</code> and then starts a new sort tree |
970 | - * if a new record is subsequently stored. |
971 | + * a sort volume has been filled to capacity. Subsequent to this call, the |
972 | + * sort volume is streamed to a sort file and then its pages in the |
973 | + * <code>BufferPool</code> are invalidated to allow their immediate reuse. |
974 | * |
975 | * @param volume |
976 | * The temporary <code>Volume</code> that has been filled |
977 | - * @return <code>true</code> to cause the current sort volume to be evicted |
978 | - * from the <code>BufferPool</code> |
979 | + * @param file |
980 | + * the file to which the sorted key-value pairs will be written |
981 | * @throws Exception |
982 | */ |
983 | - protected boolean beforeSortVolumeEvicted(final Volume volume) throws Exception { |
984 | - return true; |
985 | + protected void beforeSortVolumeClosed(final Volume volume, final File file) throws Exception { |
986 | + |
987 | } |
988 | |
989 | /** |
990 | * This method may be extended to provide application-specific reporting |
991 | * functionality after a sort volume has been filled to capacity and has |
992 | * been evicted. An application may also modify the temporary directory set |
993 | - * via {@link #setSortTreeDirectories(List)} within this method if necessary |
994 | + * via {@link #setSortFileDirectories(List)} within this method if necessary |
995 | * to adjust disk space utilization, for example. The default behavior of |
996 | * this method is to do nothing. |
997 | * |
998 | * @param volume |
999 | * The temporary <code>Volume</code> that has been filled |
1000 | + * @param file |
1001 | + * the file to which the sorted key-value pairs have been written |
1002 | * @throws Exception |
1003 | */ |
1004 | - protected void afterSortVolumeEvicted(final Volume volume) throws Exception { |
1005 | + protected void afterSortVolumeClose(final Volume volume, final File file) throws Exception { |
1006 | |
1007 | } |
1008 | |
1009 | /** |
1010 | + * <p> |
1011 | * This method may be extended to provide application-specific behavior when |
1012 | - * an attempt is made to merge records with duplicate keys. The default |
1013 | - * behavior is to throw a {@link DuplicateKeyException}. |
1014 | + * an attempt is made to merge records with duplicate keys. The two |
1015 | + * <code>Value</code>s v1 and v2 are provided in the order they were |
1016 | + * inserted into the <code>TreeBuilder</code>. behavior is to write a |
1017 | + * warning to the log and retain the first value.. |
1018 | + * </p> |
1019 | * |
1020 | * @param tree |
1021 | + * the <code>Tree</code> to which a key is being merged |
1022 | * @param key |
1023 | + * the <code>Key</code> |
1024 | * @param v1 |
1025 | + * the <code>Value</code> previously inserted |
1026 | * @param v2 |
1027 | - * @return If <code>true</code>, the resulting value is v2. If |
1028 | - * <code>false</code>, the resulting value is v1. |
1029 | + * the conflicting <code>Value</code> |
1030 | + * @return <code>true</code> to replace the value previously stored, |
1031 | + * <code>false</code> to leave the value first inserted and ignore |
1032 | + * the new value. |
1033 | * @throws DuplicateKeyException |
1034 | * if a key being inserted or merged matches a key already |
1035 | * exists |
1036 | @@ -728,8 +829,23 @@ |
1037 | |
1038 | } |
1039 | |
1040 | - void unitTestNextSortVolume() { |
1041 | + /** |
1042 | + * This method may be extended to provide an application-specific ordering |
1043 | + * on <code>Tree</code>s. This ordering determines the sequence in which |
1044 | + * destination trees are built from the sort data. By default trees are |
1045 | + * build in alphabetical order by volume and tree name. However, an |
1046 | + * application may choose a different order to ensure invariants for |
1047 | + * concurrent use. |
1048 | + * |
1049 | + * @return a <code>java.util.Comparator</code> on <code>Tree</code> |
1050 | + */ |
1051 | + protected Comparator<Tree> getTreeComparator() { |
1052 | + return _defaultTreeComparator; |
1053 | + } |
1054 | + |
1055 | + void unitTestNextSortFile() throws Exception { |
1056 | + finishSortVolume(); |
1057 | _sortExchangeMapThreadLocal.get().clear(); |
1058 | - _currentSortVolume = null; |
1059 | + _sortVolume = null; |
1060 | } |
1061 | } |
1062 | |
1063 | === modified file 'src/test/java/com/persistit/TreeBuilderTest.java' |
1064 | --- src/test/java/com/persistit/TreeBuilderTest.java 2012-12-31 21:03:59 +0000 |
1065 | +++ src/test/java/com/persistit/TreeBuilderTest.java 2013-01-28 10:44:23 +0000 |
1066 | @@ -1,5 +1,5 @@ |
1067 | /** |
1068 | - * Copyright © 2011-2012 Akiban Technologies, Inc. All rights reserved. |
1069 | + * Copyright © 2011-2013 Akiban Technologies, Inc. All rights reserved. |
1070 | * |
1071 | * This program and the accompanying materials are made available |
1072 | * under the terms of the Eclipse Public License v1.0 which |
1073 | @@ -17,21 +17,25 @@ |
1074 | |
1075 | import static com.persistit.unit.UnitTestProperties.VOLUME_NAME; |
1076 | import static org.junit.Assert.assertEquals; |
1077 | +import static org.junit.Assert.assertTrue; |
1078 | |
1079 | +import java.io.File; |
1080 | import java.util.ArrayList; |
1081 | import java.util.Collections; |
1082 | import java.util.List; |
1083 | +import java.util.Random; |
1084 | import java.util.concurrent.atomic.AtomicBoolean; |
1085 | import java.util.concurrent.atomic.AtomicInteger; |
1086 | |
1087 | import org.junit.Test; |
1088 | |
1089 | +import com.persistit.unit.UnitTestProperties; |
1090 | + |
1091 | public class TreeBuilderTest extends PersistitUnitTestCase { |
1092 | - private final static int COUNT = 10000; |
1093 | - |
1094 | - @Test |
1095 | - public void basicTest() throws Exception { |
1096 | - |
1097 | + private final static int COUNT = 100000; |
1098 | + private final AtomicInteger _duplicates = new AtomicInteger(); |
1099 | + |
1100 | + private TreeBuilder getBasicTreeBuilder() { |
1101 | final TreeBuilder tb = new TreeBuilder(_persistit) { |
1102 | @Override |
1103 | protected void reportSorted(final long count) { |
1104 | @@ -43,8 +47,21 @@ |
1105 | System.out.println("Merged " + count); |
1106 | } |
1107 | |
1108 | + @Override |
1109 | + protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) { |
1110 | + System.out.println("Duplicate key " + key); |
1111 | + _duplicates.incrementAndGet(); |
1112 | + return false; |
1113 | + } |
1114 | }; |
1115 | tb.setReportKeyCountMultiple(COUNT / 2); |
1116 | + return tb; |
1117 | + } |
1118 | + |
1119 | + @Test |
1120 | + public void basicTest() throws Exception { |
1121 | + |
1122 | + final TreeBuilder tb = getBasicTreeBuilder(); |
1123 | |
1124 | final List<Integer> shuffled = new ArrayList<Integer>(COUNT); |
1125 | for (int i = 0; i < COUNT; i++) { |
1126 | @@ -68,7 +85,6 @@ |
1127 | tb.store(a); |
1128 | tb.store(b); |
1129 | tb.store(c); |
1130 | - |
1131 | } |
1132 | |
1133 | tb.merge(); |
1134 | @@ -84,7 +100,8 @@ |
1135 | count++; |
1136 | } |
1137 | assertEquals("Expect every key value", COUNT, count); |
1138 | - |
1139 | + _persistit.flush(); |
1140 | + assertEquals(0, a.getBufferPool().getDirtyPageCount()); |
1141 | } |
1142 | |
1143 | @Test |
1144 | @@ -130,7 +147,7 @@ |
1145 | tb.store(ex); |
1146 | assertEquals("Should have registered a dup", 2, duplicateCount.get()); |
1147 | |
1148 | - tb.unitTestNextSortVolume(); |
1149 | + tb.unitTestNextSortFile(); |
1150 | |
1151 | ex.to(1).getValue().put("ghi"); |
1152 | tb.store(ex); |
1153 | @@ -157,9 +174,9 @@ |
1154 | } |
1155 | |
1156 | @Test |
1157 | - public void duplicatePriority() throws Exception { |
1158 | - final TreeBuilder tb = new TreeBuilder(_persistit) { |
1159 | - |
1160 | + public void duplicatePriority1() throws Exception { |
1161 | + duplicatePriorityCheck(new TreeBuilder(_persistit) { |
1162 | + // Larger value wins |
1163 | @Override |
1164 | protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) { |
1165 | final String s1 = v1.getString(); |
1166 | @@ -167,16 +184,31 @@ |
1167 | return s1.compareTo(s2) < 0; |
1168 | } |
1169 | |
1170 | - }; |
1171 | + }, "xuorcxq"); |
1172 | + } |
1173 | + |
1174 | + @Test |
1175 | + public void duplicatePriority2() throws Exception { |
1176 | + duplicatePriorityCheck(new TreeBuilder(_persistit) { |
1177 | + // First value wins |
1178 | + @Override |
1179 | + protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) { |
1180 | + return false; |
1181 | + } |
1182 | + |
1183 | + }, "xmnraxq"); |
1184 | + } |
1185 | + |
1186 | + private void duplicatePriorityCheck(final TreeBuilder tb, final String expected) throws Exception { |
1187 | final Exchange ex = _persistit.getExchange(VOLUME_NAME, "a", true); |
1188 | final String nul = null; |
1189 | |
1190 | insertKeys(ex, tb, "x", "m", "n", nul, "a", nul, "q"); |
1191 | - tb.unitTestNextSortVolume(); |
1192 | + tb.unitTestNextSortFile(); |
1193 | insertKeys(ex, tb, nul, "t", "o", "r", nul, nul, nul); |
1194 | - tb.unitTestNextSortVolume(); |
1195 | + tb.unitTestNextSortFile(); |
1196 | insertKeys(ex, tb, nul, "u", "m", "j", "c", "x", "l"); |
1197 | - tb.unitTestNextSortVolume(); |
1198 | + tb.unitTestNextSortFile(); |
1199 | insertKeys(ex, tb, nul, "m", nul, nul, "a", nul, "q"); |
1200 | |
1201 | tb.merge(); |
1202 | @@ -188,7 +220,8 @@ |
1203 | result.append(ex.getValue().getString()); |
1204 | } |
1205 | } |
1206 | - assertEquals("xuorcxq", result.toString()); |
1207 | + assertEquals(expected, result.toString()); |
1208 | + |
1209 | } |
1210 | |
1211 | private void insertKeys(final Exchange ex, final TreeBuilder tb, final String... args) throws Exception { |
1212 | @@ -200,4 +233,52 @@ |
1213 | } |
1214 | } |
1215 | |
1216 | + @Test |
1217 | + public void multipleDirectories() throws Exception { |
1218 | + final TreeBuilder tb = getBasicTreeBuilder(); |
1219 | + final List<File> directories = new ArrayList<File>(); |
1220 | + final Random random = new Random(); |
1221 | + try { |
1222 | + for (int i = 0; i < 3; i++) { |
1223 | + final File file = File.createTempFile("TreeBuilderTest", ""); |
1224 | + file.delete(); |
1225 | + assertTrue("Expect to make directory", file.mkdir()); |
1226 | + directories.add(file); |
1227 | + } |
1228 | + tb.setSortTreeDirectories(directories); |
1229 | + final Exchange ex = _persistit.getExchange(VOLUME_NAME, "TreeBuilderTest", true); |
1230 | + for (int i = 0; i < COUNT; i++) { |
1231 | + final int k = random.nextInt(); |
1232 | + ex.to(k); |
1233 | + ex.getValue().put(RED_FOX + "," + k); |
1234 | + tb.store(ex); |
1235 | + if (((i + 1) % (COUNT / 10)) == 0) { |
1236 | + tb.unitTestNextSortFile(); |
1237 | + } |
1238 | + } |
1239 | + for (final File file : directories) { |
1240 | + assertTrue("Expect some files in each directory", file.list().length > 0); |
1241 | + } |
1242 | + tb.merge(); |
1243 | + |
1244 | + for (final File file : directories) { |
1245 | + assertTrue("Expect no remaining files", file.list().length == 0); |
1246 | + } |
1247 | + |
1248 | + ex.to(Key.BEFORE); |
1249 | + int count = 0; |
1250 | + while (ex.next()) { |
1251 | + count++; |
1252 | + final int k = ex.getKey().decodeInt(); |
1253 | + assertEquals(RED_FOX + "," + k, ex.getValue().getString()); |
1254 | + } |
1255 | + assert count + _duplicates.get() == COUNT; |
1256 | + |
1257 | + } finally { |
1258 | + for (final File file : directories) { |
1259 | + UnitTestProperties.cleanUpDirectory(file); |
1260 | + } |
1261 | + } |
1262 | + } |
1263 | + |
1264 | } |
1265 | |
1266 | === modified file 'src/test/java/com/persistit/stress/InsertBigLoad.java' |
1267 | --- src/test/java/com/persistit/stress/InsertBigLoad.java 2013-01-04 00:16:13 +0000 |
1268 | +++ src/test/java/com/persistit/stress/InsertBigLoad.java 2013-01-28 10:44:23 +0000 |
1269 | @@ -21,6 +21,7 @@ |
1270 | import com.persistit.TreeBuilder; |
1271 | import com.persistit.stress.unit.BigLoad; |
1272 | import com.persistit.stress.unit.BigLoad.BigLoadTreeBuilder; |
1273 | +import com.persistit.util.Util; |
1274 | |
1275 | public class InsertBigLoad extends AbstractSuite { |
1276 | |
1277 | @@ -54,7 +55,10 @@ |
1278 | |
1279 | try { |
1280 | execute(persistit); |
1281 | + final long start = System.nanoTime(); |
1282 | tb.merge(); |
1283 | + final long elapsed = System.nanoTime() - start; |
1284 | + System.out.printf("Merge took %,dms", elapsed / Util.NS_PER_MS); |
1285 | |
1286 | } finally { |
1287 | persistit.close(); |
1288 | |
1289 | === modified file 'src/test/java/com/persistit/stress/unit/BigLoad.java' |
1290 | --- src/test/java/com/persistit/stress/unit/BigLoad.java 2013-01-04 00:16:13 +0000 |
1291 | +++ src/test/java/com/persistit/stress/unit/BigLoad.java 2013-01-28 10:44:23 +0000 |
1292 | @@ -15,6 +15,7 @@ |
1293 | |
1294 | package com.persistit.stress.unit; |
1295 | |
1296 | +import java.io.File; |
1297 | import java.util.Random; |
1298 | |
1299 | import com.persistit.Exchange; |
1300 | @@ -23,6 +24,7 @@ |
1301 | import com.persistit.Tree; |
1302 | import com.persistit.TreeBuilder; |
1303 | import com.persistit.Value; |
1304 | +import com.persistit.Volume; |
1305 | import com.persistit.stress.AbstractStressTest; |
1306 | import com.persistit.util.ArgParser; |
1307 | import com.persistit.util.Util; |
1308 | @@ -68,7 +70,7 @@ |
1309 | tb.store(resultExchange); |
1310 | } |
1311 | final long endLoadTime = System.nanoTime(); |
1312 | - System.out.printf("Loaded %,d records into %,d buckets in %,dms\n", totalRecords, tb.getSortVolumeCount(), |
1313 | + System.out.printf("Loaded %,d records into %,d buckets in %,dms\n", totalRecords, tb.getSortFileCount(), |
1314 | (endLoadTime - startLoadTime) / Util.NS_PER_MS); |
1315 | return endLoadTime - startLoadTime; |
1316 | } |
1317 | @@ -157,6 +159,11 @@ |
1318 | } |
1319 | |
1320 | @Override |
1321 | + protected void beforeSortVolumeClosed(final Volume volume, final File file) { |
1322 | + System.out.printf("Saving sort volume %s to file %s\n", volume, file); |
1323 | + } |
1324 | + |
1325 | + @Override |
1326 | protected void reportSorted(final long count) { |
1327 | System.out.printf("Sorted %,15d records\n", count); |
1328 | } |
Fixed bug 1100038 which appears when using TreeBuilder. The bug is an redundant decrement in BufferPool# invalidate