Merge lp:~pbeaman/akiban-persistit/bigload into lp:akiban-persistit
- bigload
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Peter Beaman |
Approved revision: | 387 |
Merged at revision: | 386 |
Proposed branch: | lp:~pbeaman/akiban-persistit/bigload |
Merge into: | lp:akiban-persistit |
Diff against target: |
481 lines (+405/-4) 5 files modified
src/main/java/com/persistit/BufferPool.java (+4/-1) src/main/java/com/persistit/Key.java (+60/-3) src/test/java/com/persistit/stress/InsertBigLoad.java (+55/-0) src/test/java/com/persistit/stress/unit/BigLoad.java (+243/-0) src/test/java/com/persistit/unit/KeyTest1.java (+43/-0) |
To merge this branch: | bzr merge lp:~pbeaman/akiban-persistit/bigload |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Akiban Build User | Needs Fixing | ||
Nathan Williams | Approve | ||
Review via email: mp+132617@code.launchpad.net |
This proposal supersedes a proposal from 2012-11-01.
Commit message
Description of the change
Add a new stress test that sorts random keys into trees in a temporary volume and then inserts them in key-sort order into the main database. This is primarily a demonstration developed to help an answer from a Persistit evaluator, but since it exercises temporary volumes and potential very large databases it seems appropriate to have the code available in the stress test suite.
This branch also adds a couple of new methods to the Key class. The original version of BigLoad used these. We also have another potential user who may benefit from these and they have some general usefulness so are therefore proposed. New unit tests cover them.
Peter Beaman (pbeaman) wrote : Posted in a previous version of this proposal | # |
Peter Beaman (pbeaman) wrote : | # |
Note that appendKeySegment does not check the size of the destination key - an AIOOBE can result. However, that's also true of the other append methods, so this new method behaves no worse.
Nathan Williams (nwilliams) wrote : | # |
New test sounds good, though I didn't inspect it too closely.
Mainline additions look good too.
Akiban Build User (build-akiban) wrote : | # |
There were 2 failures during build/test:
* job persistit-build failed at build number 470: http://
* view must-pass failed: persistit-build is yellow
Peter Beaman (pbeaman) wrote : | # |
Known sporadic failure in TransactionLife
Preview Diff
1 | === modified file 'src/main/java/com/persistit/BufferPool.java' |
2 | --- src/main/java/com/persistit/BufferPool.java 2012-10-05 14:21:47 +0000 |
3 | +++ src/main/java/com/persistit/BufferPool.java 2012-11-01 20:16:23 +0000 |
4 | @@ -653,7 +653,10 @@ |
5 | } |
6 | } |
7 | buffer.clearValid(); |
8 | - buffer.clearDirty(); |
9 | + if (buffer.isDirty()) { |
10 | + _dirtyPageCount.decrementAndGet(); |
11 | + buffer.clearDirty(); |
12 | + } |
13 | buffer.setPageAddressAndVolume(0, null); |
14 | } |
15 | |
16 | |
17 | === modified file 'src/main/java/com/persistit/Key.java' |
18 | --- src/main/java/com/persistit/Key.java 2012-08-24 13:57:19 +0000 |
19 | +++ src/main/java/com/persistit/Key.java 2012-11-01 20:16:23 +0000 |
20 | @@ -1146,7 +1146,7 @@ |
21 | } |
22 | |
23 | /** |
24 | - * Compares a bounded subarray of the backing byte array for this |
25 | + * Compare a bounded subarray of the backing byte array for this |
26 | * <code>Key</code> with another <code>Key</code>. This method is intended |
27 | * for use within the library and is generally not useful for applications. |
28 | * |
29 | @@ -1162,8 +1162,9 @@ |
30 | * to the corresponding fragment of the supplied <code>Key</code>. |
31 | */ |
32 | public int compareKeyFragment(final Key key, final int fragmentStart, final int fragmentSize) { |
33 | - if (key == this) |
34 | + if (key == this) { |
35 | return 0; |
36 | + } |
37 | final int size1 = this._size; |
38 | final int size2 = key.getEncodedSize(); |
39 | final byte[] bytes1 = this.getEncodedBytes(); |
40 | @@ -1178,8 +1179,9 @@ |
41 | for (int i = fragmentStart; i < size; i++) { |
42 | final int b1 = bytes1[i] & 0xFF; |
43 | final int b2 = bytes2[i] & 0xFF; |
44 | - if (b1 != b2) |
45 | + if (b1 != b2) { |
46 | return b1 - b2; |
47 | + } |
48 | } |
49 | if (size == fragmentSize + fragmentStart) |
50 | return 0; |
51 | @@ -1191,6 +1193,42 @@ |
52 | } |
53 | |
54 | /** |
55 | + * Compare the next key segment of this key to the next key segment of the |
56 | + * supplied Key. The next key segment is determined by the current index of |
57 | + * the key and can be set using the {@link #setIndex(int)} method. Returns a |
58 | + * positive integer if the next segment of this <code>Key</code> is larger |
59 | + * than the next segment of the supplied <code>Key</code>, a negative |
60 | + * integer if it is smaller, or zero if the segments are equal. |
61 | + * |
62 | + * @param key |
63 | + * @return the comparison result |
64 | + */ |
65 | + public int compareKeySegment(final Key key) { |
66 | + if (key == this) { |
67 | + return 0; |
68 | + } |
69 | + final byte[] bytes1 = this.getEncodedBytes(); |
70 | + final byte[] bytes2 = key.getEncodedBytes(); |
71 | + final int index1 = this.getIndex(); |
72 | + final int index2 = key.getIndex(); |
73 | + final int count1 = this.getEncodedSize() - this.getIndex(); |
74 | + final int count2 = key.getEncodedSize() - key.getIndex(); |
75 | + final int count = Math.min(count1, count2); |
76 | + |
77 | + for (int i = 0; i < count; i++) { |
78 | + final int b1 = bytes1[i + index1] & 0xFF; |
79 | + final int b2 = bytes2[i + index2] & 0xFF; |
80 | + if (b1 != b2) { |
81 | + return b1 - b2; |
82 | + } |
83 | + if (b1 == 0) { |
84 | + return 0; |
85 | + } |
86 | + } |
87 | + return count1 - count2; |
88 | + } |
89 | + |
90 | + /** |
91 | * Returns the index of the first encoded byte of this key that is different |
92 | * than the corresponding byte of the supplied <code>Key</code>. If all |
93 | * bytes match, then returns the encoded length of the shorter key. |
94 | @@ -2037,6 +2075,25 @@ |
95 | } |
96 | |
97 | /** |
98 | + * Append the next key segment of the supplied <code>Key</code> to this |
99 | + * <code>Key</code. The next key segment is determined by the current index |
100 | + * of the key and can be set using the {@link #setIndex(int)} method. |
101 | + * |
102 | + * @param key |
103 | + */ |
104 | + public void appendKeySegment(final Key key) { |
105 | + int length = 0; |
106 | + for (int index = key.getIndex(); index < key.getEncodedSize(); index++) { |
107 | + length++; |
108 | + if (key.getEncodedBytes()[index] == 0) { |
109 | + break; |
110 | + } |
111 | + } |
112 | + System.arraycopy(key.getEncodedBytes(), key.getIndex(), _bytes, _size, length); |
113 | + _size += length; |
114 | + } |
115 | + |
116 | + /** |
117 | * Replaces the final key segment with the supplied boolean value. If the |
118 | * key is currently empty, this method simply appends the value. This method |
119 | * is equivalent to invoking {@link #cut} and then {@link #append(boolean)}. |
120 | |
121 | === added file 'src/test/java/com/persistit/stress/InsertBigLoad.java' |
122 | --- src/test/java/com/persistit/stress/InsertBigLoad.java 1970-01-01 00:00:00 +0000 |
123 | +++ src/test/java/com/persistit/stress/InsertBigLoad.java 2012-11-01 20:16:23 +0000 |
124 | @@ -0,0 +1,55 @@ |
125 | +/** |
126 | + * Copyright © 2012 Akiban Technologies, Inc. All rights reserved. |
127 | + * |
128 | + * This program and the accompanying materials are made available |
129 | + * under the terms of the Eclipse Public License v1.0 which |
130 | + * accompanies this distribution, and is available at |
131 | + * http://www.eclipse.org/legal/epl-v10.html |
132 | + * |
133 | + * This program may also be available under different license terms. |
134 | + * For more information, see www.akiban.com or contact licensing@akiban.com. |
135 | + * |
136 | + * Contributors: |
137 | + * Akiban Technologies, Inc. |
138 | + */ |
139 | + |
140 | +package com.persistit.stress; |
141 | + |
142 | +import com.persistit.Configuration; |
143 | +import com.persistit.Persistit; |
144 | +import com.persistit.Transaction.CommitPolicy; |
145 | +import com.persistit.stress.unit.BigLoad; |
146 | + |
147 | +public class InsertBigLoad extends AbstractSuite { |
148 | + |
149 | + static String name() { |
150 | + return InsertBigLoad.class.getSimpleName(); |
151 | + } |
152 | + |
153 | + public static void main(final String[] args) throws Exception { |
154 | + new InsertBigLoad(args).runTest(); |
155 | + } |
156 | + |
157 | + public InsertBigLoad(final String[] args) { |
158 | + super(name(), args); |
159 | + } |
160 | + |
161 | + @Override |
162 | + public void runTest() throws Exception { |
163 | + |
164 | + deleteFiles(substitute("$datapath$/persistit*")); |
165 | + |
166 | + add(new BigLoad("records=10000000 buckets=10")); |
167 | + |
168 | + final Configuration config = makeConfiguration(16384, "50000", CommitPolicy.SOFT); |
169 | + config.setTmpVolMaxSize(100000000000l); |
170 | + final Persistit persistit = new Persistit(); |
171 | + persistit.initialize(config); |
172 | + |
173 | + try { |
174 | + execute(persistit); |
175 | + } finally { |
176 | + persistit.close(); |
177 | + } |
178 | + } |
179 | +} |
180 | |
181 | === added file 'src/test/java/com/persistit/stress/unit/BigLoad.java' |
182 | --- src/test/java/com/persistit/stress/unit/BigLoad.java 1970-01-01 00:00:00 +0000 |
183 | +++ src/test/java/com/persistit/stress/unit/BigLoad.java 2012-11-01 20:16:23 +0000 |
184 | @@ -0,0 +1,243 @@ |
185 | +/** |
186 | + * Copyright © 2012 Akiban Technologies, Inc. All rights reserved. |
187 | + * |
188 | + * This program and the accompanying materials are made available |
189 | + * under the terms of the Eclipse Public License v1.0 which |
190 | + * accompanies this distribution, and is available at |
191 | + * http://www.eclipse.org/legal/epl-v10.html |
192 | + * |
193 | + * This program may also be available under different license terms. |
194 | + * For more information, see www.akiban.com or contact licensing@akiban.com. |
195 | + * |
196 | + * Contributors: |
197 | + * Akiban Technologies, Inc. |
198 | + */ |
199 | + |
200 | +package com.persistit.stress.unit; |
201 | + |
202 | +import java.util.Random; |
203 | +import java.util.SortedMap; |
204 | +import java.util.TreeMap; |
205 | + |
206 | +import com.persistit.Exchange; |
207 | +import com.persistit.Key; |
208 | +import com.persistit.Persistit; |
209 | +import com.persistit.Volume; |
210 | +import com.persistit.exception.PersistitException; |
211 | +import com.persistit.stress.AbstractStressTest; |
212 | +import com.persistit.util.ArgParser; |
213 | +import com.persistit.util.Util; |
214 | + |
215 | +/** |
216 | + * <p> |
217 | + * Simulate loading a large set (e.g., 500M) of records with large random keys. |
218 | + * Because straightforward insertion results in highly randomized page access |
219 | + * after the database size has exceed the amount of buffer pool memory space, |
220 | + * this demo creates smaller sorted sets of keys and then merges them to create |
221 | + * the final Tree in sequential order. As a side-effect, the final tree is also |
222 | + * physically coherent in that the logical and physical order of keys disk are |
223 | + * closely aligned. |
224 | + * </p> |
225 | + * <p> |
226 | + * This class can be run stand-alone through its static main method, or within |
227 | + * the stress test suite. |
228 | + * </p> |
229 | + * |
230 | + * @author peter |
231 | + */ |
232 | +public class BigLoad extends AbstractStressTest { |
233 | + |
234 | + private static final Random RANDOM = new Random(); |
235 | + |
236 | + private int totalRecords; |
237 | + private int recordsPerBucket; |
238 | + |
239 | + /** |
240 | + * A Comparable wrapper for an Exchange. An instance of this class may be |
241 | + * held in a SortedMap only if the Key of the Exchange does not change. In |
242 | + * this example, the ComparableExchangeHolder is always removed from the |
243 | + * TreeMap before the Key changes and then reinserted into a new location |
244 | + * after the key has changed. |
245 | + */ |
246 | + static class ComparableExchangeHolder implements Comparable<ComparableExchangeHolder> { |
247 | + |
248 | + final Exchange exchange; |
249 | + |
250 | + ComparableExchangeHolder(final Exchange exchange) { |
251 | + this.exchange = exchange; |
252 | + } |
253 | + |
254 | + @Override |
255 | + public int compareTo(final ComparableExchangeHolder ceh) { |
256 | + final Key k1 = exchange.getKey(); |
257 | + final Key k2 = ceh.exchange.getKey(); |
258 | + return k1.compareTo(k2); |
259 | + } |
260 | + } |
261 | + |
262 | + public BigLoad(final int totalRecords, final int buckets) { |
263 | + super(""); |
264 | + this.totalRecords = totalRecords; |
265 | + this.recordsPerBucket = totalRecords / buckets; |
266 | + } |
267 | + |
268 | + public void load(final Persistit db) throws PersistitException { |
269 | + final long startLoadTime = System.nanoTime(); |
270 | + final Volume sortVolume = db.createTemporaryVolume(); |
271 | + final Exchange resultExchange = db.getExchange("persistit", "sorted", true); |
272 | + System.out.printf("Loading %,d records into %,d buckets\n", totalRecords, totalRecords / recordsPerBucket); |
273 | + final int bucketCount = loadBuckets(db, sortVolume); |
274 | + final long endLoadTime = System.nanoTime(); |
275 | + |
276 | + System.out.printf("Merging %,d records from %,d buckets into main database\n", totalRecords, bucketCount); |
277 | + mergeBuckets(db, bucketCount, sortVolume, resultExchange); |
278 | + final long endMergeTime = System.nanoTime(); |
279 | + System.out.printf("Merged %,d records in %,dms\n", totalRecords, (endMergeTime - endLoadTime) / Util.NS_PER_MS); |
280 | + sortVolume.close(); |
281 | + System.out.printf("Counting keys in main database (100M keys per dot) "); |
282 | + resultExchange.clear().append(Key.BEFORE); |
283 | + long count = 0; |
284 | + while (resultExchange.next()) { |
285 | + count++; |
286 | + if ((count % 100000000) == 0) { |
287 | + System.out.print("."); |
288 | + System.out.flush(); |
289 | + } |
290 | + } |
291 | + final long endCountTime = System.nanoTime(); |
292 | + System.out.printf("\nCounted %,d keys in the main database in %,dms\n", count, (endCountTime - endMergeTime) |
293 | + / Util.NS_PER_MS); |
294 | + System.out.printf("Total time to load, merge and count %,d records is %,dms", totalRecords, |
295 | + (endCountTime - startLoadTime) / Util.NS_PER_MS); |
296 | + } |
297 | + |
298 | + private int loadBuckets(final Persistit db, final Volume volume) throws PersistitException { |
299 | + long bucketStartTime = 0; |
300 | + int bucket = 0; |
301 | + Exchange ex = null; |
302 | + for (int i = 0; i < totalRecords; i++) { |
303 | + if ((i % recordsPerBucket) == 0) { |
304 | + final long now = System.nanoTime(); |
305 | + if (i > 0) { |
306 | + System.out.printf("Loaded bucket %,5d in %,12dms\n", bucket, (now - bucketStartTime) |
307 | + / Util.NS_PER_MS); |
308 | + } |
309 | + bucketStartTime = now; |
310 | + bucket++; |
311 | + ex = db.getExchange(volume, "sort" + bucket, true); |
312 | + } |
313 | + ex.clear().append(randomKey()); |
314 | + ex.store(); |
315 | + } |
316 | + return bucket; |
317 | + } |
318 | + |
319 | + private void mergeBuckets(final Persistit db, final int bucketCount, final Volume sortVolume, final Exchange to) |
320 | + throws PersistitException { |
321 | + final long startLoadTime = System.nanoTime(); |
322 | + int loaded = 0; |
323 | + |
324 | + final SortedMap<ComparableExchangeHolder, Integer> sortMap = new TreeMap<ComparableExchangeHolder, Integer>(); |
325 | + /* |
326 | + * Load the sortMap using as keys the first key of each bucket. |
327 | + */ |
328 | + for (int bucket = 1; bucket <= bucketCount; bucket++) { |
329 | + final Exchange ex = db.getExchange(sortVolume, "sort" + bucket, false); |
330 | + if (ex.append(Key.BEFORE).next()) { |
331 | + final Integer duplicate = sortMap.put(new ComparableExchangeHolder(ex), bucket); |
332 | + showDuplicate(duplicate, bucket, ex); |
333 | + } |
334 | + } |
335 | + |
336 | + while (!sortMap.isEmpty()) { |
337 | + final ComparableExchangeHolder ceh = sortMap.firstKey(); |
338 | + final int bucket = sortMap.remove(ceh); |
339 | + ceh.exchange.getKey().copyTo(to.getKey()); |
340 | + if (ceh.exchange.next()) { |
341 | + final Integer duplicate = sortMap.put(ceh, bucket); |
342 | + showDuplicate(duplicate, bucket, ceh.exchange); |
343 | + } |
344 | + to.store(); |
345 | + if ((++loaded % 10000000) == 0) { |
346 | + System.out.printf("Merged %,d records in %,dms\n", loaded, (System.nanoTime() - startLoadTime) |
347 | + / Util.NS_PER_MS); |
348 | + } |
349 | + } |
350 | + } |
351 | + |
352 | + private String randomKey() { |
353 | + return String.format("%020dxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", |
354 | + (RANDOM.nextLong() & Long.MAX_VALUE)); |
355 | + } |
356 | + |
357 | + private void showDuplicate(final Integer bucket1, final int bucket2, final Exchange ex) { |
358 | + if (bucket1 != null) { |
359 | + System.out.printf("Duplicate key %s in buckets %,d and %,d\n", ex.getKey(), bucket1, bucket2); |
360 | + } |
361 | + } |
362 | + |
363 | + /** |
364 | + * Arguments: |
365 | + * |
366 | + * records - total records to load |
367 | + * |
368 | + * buckets - number of subdivisions to create while loading |
369 | + * |
370 | + * propertiesPath - path to properties file for Persistit initialization |
371 | + * |
372 | + * @param args |
373 | + * @throws Exception |
374 | + */ |
375 | + |
376 | + public static void main(final String[] args) throws Exception { |
377 | + final int records = args.length > 0 ? Integer.parseInt(args[0]) : 1000000; |
378 | + final int buckets = args.length > 1 ? Integer.parseInt(args[1]) : 100; |
379 | + final BigLoad bl = new BigLoad(records, buckets); |
380 | + final Persistit db = new Persistit(); |
381 | + if (args.length > 2) { |
382 | + db.initialize(args[2]); |
383 | + } else { |
384 | + db.initialize(); |
385 | + } |
386 | + try { |
387 | + bl.load(db); |
388 | + } finally { |
389 | + db.close(); |
390 | + } |
391 | + } |
392 | + |
393 | + /* |
394 | + * ---------------------------------------------------------------------- |
395 | + * |
396 | + * Stuff below this line is required to run within the stress test suite |
397 | + * |
398 | + * ---------------------------------------------------------------------- |
399 | + */ |
400 | + |
401 | + public BigLoad(final String argsString) { |
402 | + super(argsString); |
403 | + } |
404 | + |
405 | + private final static String[] ARGS_TEMPLATE = { "records|int:1000000:1:1000000000|Total records to create", |
406 | + "buckets|int:100:1:1000000|Number of sort buckets", "tmpdir|string:|Temporary volume path" }; |
407 | + |
408 | + /** |
409 | + * Method to parse stress test arguments passed by the stress test suite. |
410 | + */ |
411 | + @Override |
412 | + public void setUp() throws Exception { |
413 | + super.setUp(); |
414 | + final ArgParser ap = new ArgParser("com.persistit.BigLoad", _args, ARGS_TEMPLATE).strict(); |
415 | + totalRecords = ap.getIntValue("records"); |
416 | + recordsPerBucket = totalRecords / ap.getIntValue("buckets"); |
417 | + final String path = ap.getStringValue("tmpdir"); |
418 | + if (path != null && !path.isEmpty()) { |
419 | + getPersistit().getConfiguration().setTmpVolDir(path); |
420 | + } |
421 | + } |
422 | + |
423 | + @Override |
424 | + protected void executeTest() throws Exception { |
425 | + load(getPersistit()); |
426 | + } |
427 | +} |
428 | |
429 | === modified file 'src/test/java/com/persistit/unit/KeyTest1.java' |
430 | --- src/test/java/com/persistit/unit/KeyTest1.java 2012-08-24 13:57:19 +0000 |
431 | +++ src/test/java/com/persistit/unit/KeyTest1.java 2012-11-01 20:16:23 +0000 |
432 | @@ -945,6 +945,49 @@ |
433 | } |
434 | } |
435 | |
436 | + @Test |
437 | + public void testCompareKeySegment() throws Exception { |
438 | + final Key key1 = newKey(); |
439 | + final Key key2 = newKey(); |
440 | + key1.append(1).append(2).append(3).append("abc"); |
441 | + key2.append(3).append(2).append(1).append("abcd"); |
442 | + key1.indexTo(1); |
443 | + key2.indexTo(1); |
444 | + assertTrue("Should be == 0", 0 == key1.compareKeySegment(key2)); |
445 | + assertTrue("Should be == 0", 0 == key2.compareKeySegment(key1)); |
446 | + key1.indexTo(0); |
447 | + assertTrue("Should be < 0", 0 > key1.compareKeySegment(key2)); |
448 | + assertTrue("Should be > 0", 0 < key2.compareKeySegment(key1)); |
449 | + key1.indexTo(2); |
450 | + assertTrue("Should be > 0", 0 < key1.compareKeySegment(key2)); |
451 | + assertTrue("Should be < 0", 0 > key2.compareKeySegment(key1)); |
452 | + key1.indexTo(3); |
453 | + assertTrue("Should be > 0", 0 < key1.compareKeySegment(key2)); |
454 | + assertTrue("Should be < 0", 0 > key2.compareKeySegment(key1)); |
455 | + key2.indexTo(3); |
456 | + assertTrue("Should be < 0", 0 > key1.compareKeySegment(key2)); |
457 | + assertTrue("Should be > 0", 0 < key2.compareKeySegment(key1)); |
458 | + } |
459 | + |
460 | + @Test |
461 | + public void testAppendKeySegment() throws Exception { |
462 | + final Key key1 = newKey(); |
463 | + final Key key2 = newKey(); |
464 | + key1.append(1).append(2).append(3).append("abc"); |
465 | + key1.indexTo(3); |
466 | + key2.appendKeySegment(key1); |
467 | + key1.indexTo(2); |
468 | + key2.appendKeySegment(key1); |
469 | + key1.indexTo(1); |
470 | + key2.appendKeySegment(key1); |
471 | + key1.indexTo(0); |
472 | + key2.appendKeySegment(key1); |
473 | + assertEquals("Key value incorrect", "{\"abc\",3,2,1}", key2.toString()); |
474 | + key1.indexTo(3); |
475 | + key1.appendKeySegment(key1); |
476 | + assertEquals("Key value incorrect", "{1,2,3,\"abc\",\"abc\"}", key1.toString()); |
477 | + } |
478 | + |
479 | private static boolean doubleEquals(final double f1, final double f2) { |
480 | if (Double.isNaN(f1)) { |
481 | return Double.isNaN(f2); |
Note: removed a change in the Exchange class subsequent to original merge proposal. Exchange is now unchanged.