Merge lp:~pbeaman/akiban-persistit/fix-wait-for-durability into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Peter Beaman
Approved revision: 381
Merged at revision: 368
Proposed branch: lp:~pbeaman/akiban-persistit/fix-wait-for-durability
Merge into: lp:akiban-persistit
Diff against target: 1050 lines (+237/-196)
12 files modified
src/main/java/com/persistit/BufferPool.java (+13/-5)
src/main/java/com/persistit/CleanupManager.java (+32/-11)
src/main/java/com/persistit/Exchange.java (+11/-4)
src/main/java/com/persistit/IOMeter.java (+14/-4)
src/main/java/com/persistit/IntegrityCheck.java (+20/-0)
src/main/java/com/persistit/JournalManager.java (+102/-118)
src/main/java/com/persistit/Persistit.java (+12/-10)
src/main/java/com/persistit/TransactionPlayer.java (+3/-1)
src/main/java/com/persistit/logging/LogBase.java (+2/-2)
src/main/java/com/persistit/mxbeans/IOMeterMXBean.java (+2/-2)
src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java (+20/-15)
src/test/java/com/persistit/JournalManagerTest.java (+6/-24)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-wait-for-durability
Reviewer Review Type Date Requested Status
Akiban Build User Needs Fixing
Nathan Williams Approve
Review via email: mp+123174@code.launchpad.net

Description of the change

Modifies the JournalManager#waitForDurability loop to improve performance on 4-hour TPCC test and adds several smaller changes to fix problems seen while testing 4-hour TPCC extensively on perf02.

* Modify BufferPool preload warmup logging to show elapsed time

* CleanupManager no longer calls pruneObsoleteTransactions and other periodic maintenance actions on every polling cycle (wastes CPU)

* JournalManager: added UrgentFileCountThreshold value settable from MXBean to control journal file count. Revised the urgency() method and friends to work more smoothly with the settable threshold.

* JournalManager: Move the calls to force Volume updates back into the copier thread because waiting until Checkpoints causes massive I/O

* JournalManager: Remove ReentrantReadWriteLock from waitForDurability. Turned out that wasted a great deal of CPU with little benefit. Modified to a purely polling protocol and ensured every path through the loop sleeps for at least one ms. Removed detection of loops without waits and the corresponding tests since by inspection there is a sleep on every conditional branch.

* JournalManager: Corrected time interval calculations in waitForDurability

* JournalManager: Changed calculation of expected IO time to use average rather than max. Actual behavior is very erratic, and our best effort to delay commit() until completion of a force cycle is very inaccurate; we may want to remove the behavior and certainly should modify or remove the documentation.

* JournalManager: Relaxed the leadTime and flusher poll times from 100ms to 500ms. (This change may no longer be necessary; will test and revise as appropriate.)

* Persistit: Corrected a problem in copyBackPages() in which the relaxed pruneObsoleteTransactions() cycle emerged as a race that broke unit tests.

To post a comment you must log in.
Revision history for this message
Peter Beaman (pbeaman) wrote :

Pushed a couple of additional changes inspired by observing tests:

- cleanupPageList is now O(N) rather O(N*N)
- non-transactions store/remove operations now throttle
- the throttle() method itself is now faster - the throttle interval is computed every few seconds by the JOURNAL_COPIER thread.
- a couple of minor code cleanups

Revision history for this message
Nathan Williams (nwilliams) wrote :

This look pretty good.

Just a few questions:
IOMeter#DEFAULT_QUIESCENT_IO_THRESHOLD was increased by 1000. The usage of it in JournalManager was also scaled by 1024. I'm not entirely what that is used for, can you elaborate?

JournalManager#selectForCopy() went from asserting the pageNodes were valid to checking it. Can invalid nodes be in the list now?

Minor:
Probably just my preference, but constants with units in the name make one less thing to check (new intervals in CleanupManager).

review: Needs Information
Revision history for this message
Peter Beaman (pbeaman) wrote :

Thanks, good questions.

DEFAULT_QUIESCENT_IO_THRESHOLD is used to try to detect that little
I/O is currently being done and when that happens the copier interval
goes down. I'm not wedded to that heuristic, but it has been there
since the beginning. The change was to normalize the units in IOMeter.
 The IORate field attempts to show KBytes per second, whereas the
threshold used to be in Bytes per second. Now the threshold is
expressed in KBytes per second too. Clearly there's a neglected
documentation task - I will fix that.

selectNodesForCopy used to call cleanupPageList before looping, and
the assert made sense. However, because the cleanup operation can be
expensive (even with the new algorithm) I reduced the number of number
of times it is called, and therefore the selectNodesForCopy method can
now see and skip invalid PageNodes.

I agree about units in the constant names and will change.

On Mon, Sep 10, 2012 at 10:29 AM, Nathan Williams <email address hidden> wrote:
> Review: Needs Information
>
> This look pretty good.
>
> Just a few questions:
> IOMeter#DEFAULT_QUIESCENT_IO_THRESHOLD was increased by 1000. The usage of it in JournalManager was also scaled by 1024. I'm not entirely what that is used for, can you elaborate?
>
> JournalManager#selectForCopy() went from asserting the pageNodes were valid to checking it. Can invalid nodes be in the list now?
>
> Minor:
> Probably just my preference, but constants with units in the name make one less thing to check (new intervals in CleanupManager).
> --
> https://code.launchpad.net/~pbeaman/akiban-persistit/fix-wait-for-durability/+merge/123174
> You are the owner of lp:~pbeaman/akiban-persistit/fix-wait-for-durability.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Revised as requested. The CleanupManager time constants are name named XXX_MS as are a number of other constants.

From original proposal reverted the changes in JOURNAL_FLUSHER poll time and SOFT commit lead times to their former values. I believe these changes didn't help and are not needed for good performance and want to benchmark again with the original values.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Thanks for the tweaks. Looks good to me.

review: Approve
Revision history for this message
Akiban Build User (build-akiban) wrote :

There was one failure during build/test:

* unknown exception (check log)

review: Needs Fixing
Revision history for this message
Peter Beaman (pbeaman) wrote :

Jenkins glitch. Reapproving.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/BufferPool.java'
2--- src/main/java/com/persistit/BufferPool.java 2012-08-24 14:00:17 +0000
3+++ src/main/java/com/persistit/BufferPool.java 2012-09-12 22:04:20 +0000
4@@ -97,10 +97,11 @@
5 */
6 private final static int INVENTORY_VERSIONS = 3;
7
8+ private final static long NS_PER_SEC = 1000000000;
9 /**
10- * Preload log multiple
11+ * Preload log message interval, in seconds
12 */
13- private final static int INVENTORY_PRELOAD_LOG_MESSAGE_MULTIPLE = 10000;
14+ private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_SEC;
15
16 /**
17 * The Persistit instance that references this BufferPool.
18@@ -1451,6 +1452,9 @@
19 void preloadBufferInventory() {
20 int count = 0;
21 int total = 0;
22+ final long startTime = System.nanoTime();
23+ long reportTime = startTime;
24+
25 try {
26 final JournalManager jman = _persistit.getJournalManager();
27 final Exchange exchange = getBufferInventoryExchange();
28@@ -1496,8 +1500,11 @@
29 final Buffer buff = get(vol, pn.getPageAddress(), false, true);
30 buff.release();
31 count++;
32- if ((count % INVENTORY_PRELOAD_LOG_MESSAGE_MULTIPLE) == 0) {
33- _persistit.getLogBase().bufferInventoryProgress.log(count, total);
34+ final long now = System.nanoTime();
35+ if (now - reportTime >= INVENTORY_PRELOAD_LOG_MESSAGE_NS) {
36+ _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime)
37+ / NS_PER_SEC);
38+ reportTime = now;
39 }
40 if (count >= _bufferCount) {
41 //
42@@ -1513,7 +1520,8 @@
43 } catch (final PersistitException e) {
44 _persistit.getLogBase().bufferInventoryException.log(e);
45 } finally {
46- _persistit.getLogBase().bufferInventoryProgress.log(count, total);
47+ final long now = System.nanoTime();
48+ _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_SEC);
49 }
50 }
51
52
53=== modified file 'src/main/java/com/persistit/CleanupManager.java'
54--- src/main/java/com/persistit/CleanupManager.java 2012-08-24 13:57:19 +0000
55+++ src/main/java/com/persistit/CleanupManager.java 2012-09-12 22:04:20 +0000
56@@ -36,15 +36,19 @@
57 void performAction(Persistit persistit) throws PersistitException;
58 }
59
60- final static long DEFAULT_CLEANUP_INTERVAL = 1000;
61+ final static long DEFAULT_CLEANUP_INTERVAL_MS = 1000;
62
63 final static int DEFAULT_QUEUE_SIZE = 50000;
64
65- final static int WORKLIST_LENGTH = 500;
66-
67- private final static long DEFAULT_MINIMUM_PRUNING_DELAY = 1000;
68-
69- final Queue<CleanupAction> _cleanupActionQueue = new ArrayBlockingQueue<CleanupAction>(DEFAULT_QUEUE_SIZE);
70+ private final static int WORKLIST_LENGTH = 500;
71+
72+ private final static long MINIMUM_MAINTENANCE_INTERVAL_NS = 1000000000L;
73+
74+ private final static long MINIMUM_PRUNE_OBSOLETE_TRANSACTIONS_INTERVAL_NS = 50000000000L;
75+
76+ private final static long DEFAULT_MINIMUM_PRUNING_DELAY_NS = 1000;
77+
78+ private final Queue<CleanupAction> _cleanupActionQueue = new ArrayBlockingQueue<CleanupAction>(DEFAULT_QUEUE_SIZE);
79
80 private final AtomicBoolean _closed = new AtomicBoolean();
81
82@@ -56,7 +60,11 @@
83
84 private final AtomicLong _errors = new AtomicLong();
85
86- private final AtomicLong _minimumPruningDelay = new AtomicLong(DEFAULT_MINIMUM_PRUNING_DELAY);
87+ private final AtomicLong _minimumPruningDelay = new AtomicLong(DEFAULT_MINIMUM_PRUNING_DELAY_NS);
88+
89+ private long _lastMaintenance;
90+
91+ private long _lastPruneObsoleteTransactions;
92
93 CleanupManager(final Persistit persistit) {
94 super(persistit);
95@@ -64,7 +72,10 @@
96
97 public void start() {
98 _closed.set(false);
99- start("CLEANUP_MANAGER", DEFAULT_CLEANUP_INTERVAL);
100+ final long now = System.nanoTime();
101+ _lastMaintenance = now;
102+ _lastPruneObsoleteTransactions = now;
103+ start("CLEANUP_MANAGER", DEFAULT_CLEANUP_INTERVAL_MS);
104 }
105
106 public void close(final boolean flush) throws PersistitException {
107@@ -138,9 +149,19 @@
108
109 @Override
110 public void poll() throws Exception {
111- _persistit.getIOMeter().poll();
112- _persistit.cleanup();
113- _persistit.getJournalManager().pruneObsoleteTransactions();
114+
115+ final long now = System.nanoTime();
116+ if (now - _lastMaintenance > MINIMUM_MAINTENANCE_INTERVAL_NS) {
117+ _persistit.getIOMeter().poll();
118+ _persistit.cleanup();
119+ _lastMaintenance = now;
120+ }
121+
122+ if (now - _lastPruneObsoleteTransactions > MINIMUM_PRUNE_OBSOLETE_TRANSACTIONS_INTERVAL_NS) {
123+ _persistit.getJournalManager().pruneObsoleteTransactions();
124+ _lastPruneObsoleteTransactions = now;
125+ }
126+
127 final List<CleanupAction> workList = new ArrayList<CleanupAction>(WORKLIST_LENGTH);
128 synchronized (this) {
129 while (workList.size() < WORKLIST_LENGTH) {
130
131=== modified file 'src/main/java/com/persistit/Exchange.java'
132--- src/main/java/com/persistit/Exchange.java 2012-08-28 14:29:50 +0000
133+++ src/main/java/com/persistit/Exchange.java 2012-09-12 22:04:20 +0000
134@@ -1290,6 +1290,9 @@
135 if (!isDirectoryExchange()) {
136 _persistit.checkSuspended();
137 }
138+ if (!_ignoreTransactions && !_transaction.isActive()) {
139+ _persistit.getJournalManager().throttle();
140+ }
141 // TODO: directoryExchange, and lots of tests, don't use transactions.
142 // Skip MVCC for now.
143 int options = StoreOptions.WAIT;
144@@ -2990,13 +2993,9 @@
145 }
146
147 private boolean removeInternal(final Direction selection, final boolean fetchFirst) throws PersistitException {
148- assertCorrectThread(true);
149- _persistit.checkClosed();
150-
151 if (selection != EQ && selection != GTEQ && selection != GT) {
152 throw new IllegalArgumentException("Invalid mode " + selection);
153 }
154-
155 final int keySize = _key.getEncodedSize();
156
157 _key.copyTo(_spareKey3);
158@@ -3005,6 +3004,7 @@
159 // Special case for empty key
160 if (keySize == 0) {
161 if (selection == EQ) {
162+ assertCorrectThread(true);
163 return false;
164 }
165 _spareKey3.append(BEFORE);
166@@ -3094,6 +3094,13 @@
167 assertCorrectThread(true);
168 _persistit.checkClosed();
169
170+ if (!isDirectoryExchange()) {
171+ _persistit.checkSuspended();
172+ }
173+ if (!_ignoreTransactions && !_transaction.isActive()) {
174+ _persistit.getJournalManager().throttle();
175+ }
176+
177 if (_ignoreTransactions || !_transaction.isActive()) {
178 return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false);
179 }
180
181=== modified file 'src/main/java/com/persistit/IOMeter.java'
182--- src/main/java/com/persistit/IOMeter.java 2012-08-24 13:57:19 +0000
183+++ src/main/java/com/persistit/IOMeter.java 2012-09-12 22:04:20 +0000
184@@ -32,6 +32,7 @@
185
186 import com.persistit.mxbeans.IOMeterMXBean;
187 import com.persistit.util.ArgParser;
188+import com.persistit.util.Util;
189
190 /**
191 *
192@@ -57,7 +58,9 @@
193 private final static String DUMP_FORMAT = "time=%,12d op=%2s vol=%4s page=%,16d addr=%,16d size=%,8d index=%,7d";
194 private final static int DUMP_RECORD_LENGTH = 37;
195
196- private final static int DEFAULT_QUIESCENT_IO_THRESHOLD = 100000;
197+ private final static int DEFAULT_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 100;
198+ private final static int MINIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 0;
199+ private final static int MAXIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 1000000;
200
201 private final static int READ_PAGE_FROM_VOLUME = 1;
202 private final static int READ_PAGE_FROM_JOURNAL = 2;
203@@ -72,7 +75,7 @@
204
205 private final static int ITEM_COUNT = 11;
206
207- private long _quiescentIOthreshold = DEFAULT_QUIESCENT_IO_THRESHOLD;
208+ private long _quiescentIOthreshold = DEFAULT_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC;
209
210 private final AtomicReference<DataOutputStream> _logStream = new AtomicReference<DataOutputStream>();
211
212@@ -149,6 +152,7 @@
213
214 /**
215 * @return the quiescentIOthreshold
216+ * @see #setQuiescentIOthreshold(long)
217 */
218 @Override
219 public synchronized long getQuiescentIOthreshold() {
220@@ -156,12 +160,18 @@
221 }
222
223 /**
224+ * Persistit monitors the rate at which new I/O operations are created. When
225+ * the IORate falls below the quiescentIOthreshold, expressed in KBytes per
226+ * second, the JOURNAL_COPIER accelerates its work to try to clean up older
227+ * journal files.
228+ *
229 * @param quiescentIOthreshold
230 * the quiescentIOthreshold to set
231 */
232 @Override
233- public synchronized void setQuiescentIOthreshold(final long quiescentIO) {
234- _quiescentIOthreshold = quiescentIO;
235+ public synchronized void setQuiescentIOthreshold(final long quiescentIOthreshold) {
236+ _quiescentIOthreshold = Util.rangeCheck(quiescentIOthreshold, MINIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC,
237+ MAXIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC);
238 }
239
240 /**
241
242=== modified file 'src/main/java/com/persistit/IntegrityCheck.java'
243--- src/main/java/com/persistit/IntegrityCheck.java 2012-09-11 14:14:55 +0000
244+++ src/main/java/com/persistit/IntegrityCheck.java 2012-09-12 22:04:20 +0000
245@@ -386,6 +386,26 @@
246 }
247
248 /**
249+ * Control output format. When CSV mode is enabled, the output is organized
250+ * as comma-separated-variable text that can be imported into a spreadsheet.
251+ *
252+ * @param csvMode
253+ */
254+ public void setCsvMode(final boolean csvMode) {
255+ _csv = csvMode;
256+ }
257+
258+ /**
259+ * Indicate whether CSV mode is enabled. If so the output is organized as
260+ * comma-separated-variable text that can be imported into a spreadsheet.
261+ *
262+ * @return <code>true<c/code> if CSV mode is enabled.
263+ */
264+ public boolean isCsvMode() {
265+ return _csv;
266+ }
267+
268+ /**
269 * Indicate whether missing index pages should be added when an index "hole"
270 * is discovered.
271 *
272
273=== modified file 'src/main/java/com/persistit/JournalManager.java'
274--- src/main/java/com/persistit/JournalManager.java 2012-09-11 14:14:55 +0000
275+++ src/main/java/com/persistit/JournalManager.java 2012-09-12 22:04:20 +0000
276@@ -29,15 +29,15 @@
277 import java.util.Collections;
278 import java.util.Comparator;
279 import java.util.HashMap;
280+import java.util.HashSet;
281 import java.util.Iterator;
282 import java.util.List;
283 import java.util.Map;
284+import java.util.Set;
285 import java.util.SortedMap;
286 import java.util.TreeMap;
287-import java.util.concurrent.TimeUnit;
288 import java.util.concurrent.atomic.AtomicBoolean;
289 import java.util.concurrent.atomic.AtomicLong;
290-import java.util.concurrent.locks.ReentrantReadWriteLock;
291 import java.util.regex.Matcher;
292 import java.util.regex.Pattern;
293
294@@ -80,9 +80,9 @@
295 final static int GENTLE_COMMIT_DELAY_MILLIS = 12;
296 private final static long NS_PER_MS = 1000000L;
297 private final static int IO_MEASUREMENT_CYCLES = 8;
298-
299- private final static int TOO_MANY_WARN_THRESHOLD = 15;
300- private final static int TOO_MANY_ERROR_THRESHOLD = 20;
301+ private final static int TOO_MANY_WARN_THRESHOLD = 5;
302+ private final static int TOO_MANY_ERROR_THRESHOLD = 10;
303+ private final static long KILO = 1024;
304
305 /**
306 * REGEX expression that recognizes the name of a journal file.
307@@ -104,7 +104,6 @@
308 private final Map<TreeDescriptor, Integer> _treeToHandleMap = new HashMap<TreeDescriptor, Integer>();
309
310 private final Map<Integer, TreeDescriptor> _handleToTreeMap = new HashMap<Integer, TreeDescriptor>();
311-
312 private final Map<Long, TransactionMapItem> _liveTransactionMap = new HashMap<Long, TransactionMapItem>();
313
314 private final Persistit _persistit;
315@@ -184,9 +183,9 @@
316
317 private final AtomicLong _totalFlushIoTime = new AtomicLong();
318
319- private volatile long _flushInterval = DEFAULT_FLUSH_INTERVAL;
320+ private volatile long _flushInterval = DEFAULT_FLUSH_INTERVAL_MS;
321
322- private volatile long _slowIoAlertThreshold = DEFAULT_SLOW_IO_ALERT_THRESHOLD;
323+ private volatile long _slowIoAlertThreshold = DEFAULT_SLOW_IO_ALERT_THRESHOLD_MS;
324
325 private final TransactionPlayer _player = new TransactionPlayer(new JournalTransactionPlayerSupport());
326
327@@ -201,7 +200,7 @@
328 * performs I/O. Hopefully we can set good defaults and not expose these as
329 * knobs.
330 */
331- private volatile long _copierInterval = DEFAULT_COPIER_INTERVAL;
332+ private volatile long _copierInterval = DEFAULT_COPIER_INTERVAL_MS;
333
334 private volatile int _copiesPerCycle = DEFAULT_COPIES_PER_CYCLE;
335
336@@ -213,7 +212,9 @@
337
338 private boolean _allowHandlesForTempVolumesAndTrees;
339
340- private final AtomicLong _waitLoopsWithNoDelay = new AtomicLong();
341+ private volatile int _urgentFileCountThreshold = DEFAULT_URGENT_FILE_COUNT_THRESHOLD;
342+
343+ private volatile long _throttleSleepInterval;
344
345 /**
346 * <p>
347@@ -537,10 +538,22 @@
348
349 @Override
350 public void setSlowIoAlertThreshold(final long slowIoAlertThreshold) {
351- Util.rangeCheck(slowIoAlertThreshold, MINIMUM_SLOW_ALERT_THRESHOLD, MAXIMUM_SLOW_ALERT_THRESHOLD);
352+ Util.rangeCheck(slowIoAlertThreshold, MINIMUM_SLOW_ALERT_THRESHOLD_MS, MAXIMUM_SLOW_ALERT_THRESHOLD_MS);
353 _slowIoAlertThreshold = slowIoAlertThreshold;
354 }
355
356+ @Override
357+ public int getUrgentFileCountThreshold() {
358+ return _urgentFileCountThreshold;
359+ }
360+
361+ @Override
362+ public void setUrgentFileCountThreshold(final int threshold) {
363+ Util.rangeCheck(threshold, MINIMUM_URGENT_FILE_COUNT_THRESHOLD, MAXIMUM_URGENT_FILE_COUNT_THRESHOLD);
364+ _urgentFileCountThreshold = threshold;
365+
366+ }
367+
368 /**
369 * Compute an "urgency" factor that determines how vigorously the
370 * JOURNAL_COPIER thread should perform I/O. This number is computed on a
371@@ -554,8 +567,8 @@
372 if (_copyFast.get()) {
373 return URGENT;
374 }
375- final int journalFileCount = getJournalFileCount();
376- return Math.min(URGENT, journalFileCount);
377+ final int remainingFiles = _urgentFileCountThreshold - getJournalFileCount();
378+ return Math.max(0, Math.min(URGENT - remainingFiles, URGENT));
379 }
380
381 /**
382@@ -567,13 +580,9 @@
383 * @throws PersistitInterruptedException
384 */
385 public void throttle() throws PersistitInterruptedException {
386- final int urgency = urgency();
387- if (!_appendOnly.get()) {
388- if (urgency == URGENT) {
389- Util.sleep(URGENT_COMMIT_DELAY_MILLIS);
390- } else if (urgency >= ALMOST_URGENT) {
391- Util.sleep(GENTLE_COMMIT_DELAY_MILLIS);
392- }
393+ final long interval = _throttleSleepInterval;
394+ if (interval > 0) {
395+ Util.sleep(interval);
396 }
397 }
398
399@@ -1007,14 +1016,6 @@
400 //
401 force();
402 //
403- // Make sure all copied pages have been flushed to disk.
404- //
405- for (final Volume vol : _volumeToHandleMap.keySet()) {
406- if (vol.isOpened()) {
407- vol.getStorage().force();
408- }
409- }
410- //
411 // Prepare room for CP.OVERHEAD bytes in the journal. If doing so
412 // started a new journal file then there's no need to write another
413 // CP record.
414@@ -1227,7 +1228,6 @@
415 static long fileToGeneration(final File file) {
416 final Matcher matcher = PATH_PATTERN.matcher(file.getName());
417 if (matcher.matches()) {
418- // TODO - validate range
419 return Long.parseLong(matcher.group(2));
420 } else {
421 return -1;
422@@ -1237,7 +1237,6 @@
423 static String fileToPath(final File file) {
424 final Matcher matcher = PATH_PATTERN.matcher(file.getPath());
425 if (matcher.matches()) {
426- // TODO - validate range
427 return matcher.group(1);
428 } else {
429 return null;
430@@ -1677,11 +1676,6 @@
431 }
432 }
433 //
434- // Remove the page list entries too.
435- //
436- _droppedPageCount += cleanupPageList();
437-
438- //
439 // Remove any PageNode from the branchMap having a timestamp less
440 // than the checkpoint. Generally all such entries are removed after
441 // the first checkpoint that has been established after recovery.
442@@ -2168,6 +2162,20 @@
443 } finally {
444 _copying.set(false);
445 }
446+
447+ long throttleInterval = 0;
448+ if (!_appendOnly.get()) {
449+ final int urgency = urgency();
450+ if (urgency == URGENT) {
451+ throttleInterval = URGENT_COMMIT_DELAY_MILLIS;
452+ } else if (urgency > ALMOST_URGENT) {
453+ throttleInterval = GENTLE_COMMIT_DELAY_MILLIS;
454+ }
455+ }
456+ if (throttleInterval != _throttleSleepInterval) {
457+ _throttleSleepInterval = throttleInterval;
458+ }
459+
460 }
461
462 @Override
463@@ -2175,14 +2183,14 @@
464 return _closed.get() || _shouldStop;
465 }
466
467- @Override
468 /**
469- * Return a nice interval, in milliseconds, to wait between
470- * copierCycle invocations. The interval decreases as interval
471- * goes up, and becomes zero when the urgency is 10. The interval
472- * is also zero if there has be no recent I/O activity invoked
473- * by other activities.
474+ * Return a nice interval, in milliseconds, to wait between copierCycle
475+ * invocations. The interval decreases as interval goes up, and becomes
476+ * zero when the urgency is greater than or equal to 8. The interval is
477+ * also zero if there has be no recent I/O activity invoked by other
478+ * activities.
479 */
480+ @Override
481 public long getPollInterval() {
482 final IOMeter iom = _persistit.getIOMeter();
483 final long pollInterval = super.getPollInterval();
484@@ -2198,7 +2206,7 @@
485
486 int divisor = 1;
487
488- if (iom.recentCharge() < iom.getQuiescentIOthreshold()) {
489+ if (iom.recentCharge() < iom.getQuiescentIOthreshold() * KILO) {
490 divisor = HALF_URGENT;
491 } else if (urgency > HALF_URGENT) {
492 divisor = urgency - HALF_URGENT;
493@@ -2214,8 +2222,6 @@
494
495 private class JournalFlusher extends IOTaskRunnable {
496
497- final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
498-
499 volatile long _lastExceptionTimestamp = 0;
500 volatile Exception _lastException = null;
501
502@@ -2248,6 +2254,7 @@
503 * posted an _endTimestamp larger than flushedTimestamp.
504 */
505 final long now = System.nanoTime();
506+ long remainingStallTime = stallTime;
507
508 while (true) {
509 /*
510@@ -2269,10 +2276,10 @@
511 endTimestamp = _endTimestamp;
512 startTime = _startTime;
513 endTime = _endTime;
514- if (flushedTimestamp > startTimestamp && startTimestamp > endTimestamp) {
515- estimatedRemainingIoNanos = Math.max(startTime + _expectedIoTime - now, 0);
516- }
517 if (startTimestamp == _startTimestamp && endTimestamp == _endTimestamp) {
518+ if (flushedTimestamp > startTimestamp && startTimestamp > endTimestamp) {
519+ estimatedRemainingIoNanos = Math.max(startTime + _expectedIoTime - now, 0);
520+ }
521 break;
522 }
523 Util.spinSleep();
524@@ -2280,22 +2287,23 @@
525
526 if (endTimestamp > flushedTimestamp && startTimestamp > flushedTimestamp) {
527 /*
528- * Done - commit is fully durable
529+ * Done - commit is durable
530 */
531 break;
532 }
533
534 long remainingSleepNanos;
535- boolean didWait = false;
536 if (estimatedRemainingIoNanos == -1) {
537 remainingSleepNanos = Math.max(0, _flushInterval - (now - endTime));
538 } else {
539 remainingSleepNanos = _flushInterval;
540 }
541
542- long estimatedNanosToFinish = Math.max(estimatedRemainingIoNanos, 0);
543+ long estimatedNanosToFinish;
544 if (startTimestamp < flushedTimestamp) {
545- estimatedNanosToFinish += remainingSleepNanos + _expectedIoTime;
546+ estimatedNanosToFinish = remainingSleepNanos + _expectedIoTime;
547+ } else {
548+ estimatedNanosToFinish = estimatedRemainingIoNanos;
549 }
550
551 if (leadTime > 0 && leadTime * NS_PER_MS >= estimatedNanosToFinish) {
552@@ -2311,41 +2319,20 @@
553 * possible (determined by stallTime) before kicking the
554 * JOURNAL_FLUSHER to write the caller's transaction.
555 */
556- final long delay = stallTime * NS_PER_MS - estimatedNanosToFinish;
557- if (delay > 0) {
558- Util.sleep(delay / NS_PER_MS);
559- didWait = true;
560- }
561- kick();
562- if (delay <= 0) {
563- didWait = true;
564- try {
565- if (_lock.readLock().tryLock(NS_PER_MS, TimeUnit.NANOSECONDS)) {
566- _lock.readLock().unlock();
567- }
568- } catch (final InterruptedException e) {
569- throw new PersistitInterruptedException(e);
570- }
571+ if (remainingStallTime > 0) {
572+ Util.sleep(remainingStallTime);
573+ remainingStallTime = 0;
574+ } else {
575+ kick();
576+ Util.spinSleep();
577 }
578 } else {
579 /*
580- * Otherwise, wait until the I/O is about half done and then
581- * retry.
582+ * Otherwise wait for concurrent I/O operation to finish. Do
583+ * this by polling because our experiments with using locks
584+ * here showed significant excess CPU consumption.
585 */
586- final long delay = ((estimatedNanosToFinish - leadTime * NS_PER_MS) / 2) + NS_PER_MS;
587- try {
588- if (delay > 0) {
589- didWait = true;
590- if (_lock.readLock().tryLock(delay, TimeUnit.NANOSECONDS)) {
591- _lock.readLock().unlock();
592- }
593- }
594- } catch (final InterruptedException e) {
595- throw new PersistitInterruptedException(e);
596- }
597- }
598- if (!didWait) {
599- _waitLoopsWithNoDelay.incrementAndGet();
600+ Util.spinSleep();
601 }
602 }
603 if (_lastExceptionTimestamp > flushedTimestamp) {
604@@ -2370,7 +2357,6 @@
605 * waitForDurability to know when the I/O operation has
606 * finished.
607 */
608- _lock.writeLock().lock();
609 try {
610 _startTimestamp = _persistit.getTimestampAllocator().updateTimestamp();
611 _startTime = System.nanoTime();
612@@ -2382,22 +2368,24 @@
613 } finally {
614 _endTime = System.nanoTime();
615 _endTimestamp = _persistit.getTimestampAllocator().updateTimestamp();
616- _lock.writeLock().unlock();
617 }
618
619 final long elapsed = _endTime - _startTime;
620 _totalFlushCycles.incrementAndGet();
621 _totalFlushIoTime.addAndGet(elapsed);
622 _ioTimes[_ioCycle] = elapsed;
623- _ioCycle = (_ioCycle + 1) % _ioTimes.length;
624+ _ioCycle = (_ioCycle + 1) % IO_MEASUREMENT_CYCLES;
625
626- long max = 0;
627- for (int index = 0; index < _ioTimes.length; index++) {
628- max = Math.max(max, _ioTimes[index]);
629+ long avg = 0;
630+ for (int index = 0; index < IO_MEASUREMENT_CYCLES; index++) {
631+ avg += _ioTimes[index];
632 }
633- _expectedIoTime = max;
634+ avg /= IO_MEASUREMENT_CYCLES;
635+
636+ _expectedIoTime = avg;
637 if (elapsed > _slowIoAlertThreshold * NS_PER_MS) {
638- _persistit.getLogBase().longJournalIO.log(elapsed / NS_PER_MS);
639+ _persistit.getLogBase().longJournalIO.log(elapsed / NS_PER_MS, IO_MEASUREMENT_CYCLES, avg
640+ / NS_PER_MS);
641 }
642
643 } catch (final Exception e) {
644@@ -2416,6 +2404,7 @@
645 } finally {
646 _flushing.set(false);
647 }
648+
649 }
650
651 @Override
652@@ -2426,14 +2415,12 @@
653
654 synchronized void selectForCopy(final List<PageNode> list) {
655 list.clear();
656- _droppedPageCount += cleanupPageList();
657 if (!_appendOnly.get()) {
658 final long timeStampUpperBound = Math.min(getLastValidCheckpointTimestamp(), _copierTimestampLimit);
659 for (final Iterator<PageNode> iterator = _pageList.iterator(); iterator.hasNext();) {
660 final PageNode pageNode = iterator.next();
661- for (PageNode pn = pageNode; pn != null; pn = pn.getPrevious()) {
662+ for (PageNode pn = pageNode; pn != null && !pn.isInvalid(); pn = pn.getPrevious()) {
663 if (pn.getTimestamp() < timeStampUpperBound) {
664- assert !pn.isInvalid();
665 list.add(pn);
666 break;
667 }
668@@ -2517,6 +2504,7 @@
669 Volume volumeRef = null;
670 Volume volume = null;
671 int handle = -1;
672+ final Set<Volume> volumes = new HashSet<Volume>();
673
674 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
675 if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {
676@@ -2567,6 +2555,7 @@
677
678 try {
679 volume.getStorage().writePage(bb, pageAddress);
680+ volumes.add(volume);
681 } catch (final PersistitException ioe) {
682 _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(),
683 pageNode.getJournalAddress());
684@@ -2578,6 +2567,10 @@
685 pageNode.getJournalAddress(), urgency());
686 }
687
688+ for (final Volume vol : volumes) {
689+ vol.getStorage().force();
690+ }
691+
692 }
693
694 private void cleanupForCopy(final List<PageNode> list) throws PersistitException {
695@@ -2715,26 +2708,21 @@
696 * @return Count of removed PageNode instances.
697 */
698 int cleanupPageList() {
699- int to = -1;
700- int count = 0;
701- for (int index = _pageList.size(); --index >= 0;) {
702- if (_pageList.get(index).isInvalid()) {
703- if (to == -1) {
704- to = index;
705- }
706- } else {
707- if (to != -1) {
708- _pageList.removeRange(index + 1, to + 1);
709- count += to - index;
710- to = -1;
711- }
712+ final int size = _pageList.size();
713+ int from;
714+ for (from = 0; from < size && !_pageList.get(from).isInvalid(); from++)
715+ ;
716+ int to = from;
717+ for (from = from + 1; from < size; from++) {
718+ final PageNode pn = _pageList.get(from);
719+ if (!pn.isInvalid()) {
720+ _pageList.set(to++, pn);
721 }
722 }
723- if (to != -1) {
724- _pageList.removeRange(0, to + 1);
725- count += to + 1;
726+ if (size > to) {
727+ _pageList.removeRange(to, size);
728 }
729- return count;
730+ return size - to;
731 }
732
733 private void reportJournalFileCount() {
734@@ -2744,11 +2732,11 @@
735 */
736 final int journalFileCount = getJournalFileCount();
737 if (journalFileCount != _lastReportedJournalFileCount) {
738- if (journalFileCount > TOO_MANY_ERROR_THRESHOLD) {
739+ if (journalFileCount > TOO_MANY_ERROR_THRESHOLD + _urgentFileCountThreshold) {
740 _persistit.getAlertMonitor()
741 .post(new Event(AlertLevel.ERROR, _persistit.getLogBase().tooManyJournalFilesError,
742 journalFileCount), AlertMonitor.MANY_JOURNAL_FILES);
743- } else if (journalFileCount > TOO_MANY_WARN_THRESHOLD) {
744+ } else if (journalFileCount > TOO_MANY_WARN_THRESHOLD + _urgentFileCountThreshold) {
745 _persistit.getAlertMonitor()
746 .post(new Event(AlertLevel.WARN, _persistit.getLogBase().tooManyJournalFilesWarning,
747 journalFileCount), AlertMonitor.MANY_JOURNAL_FILES);
748@@ -2923,11 +2911,11 @@
749 _liveTransactionMap.clear();
750 }
751
752- synchronized long getCurrentJournalSize() {
753+ long getCurrentJournalSize() {
754 return _currentAddress % _blockSize;
755 }
756
757- synchronized int getJournalFileCount() {
758+ int getJournalFileCount() {
759 return (int) (_currentAddress / _blockSize - _baseAddress / _blockSize) + 1;
760 }
761
762@@ -2989,8 +2977,4 @@
763 public SortedMap<Integer, TreeDescriptor> queryTreeMap() {
764 return new TreeMap<Integer, TreeDescriptor>(_handleToTreeMap);
765 }
766-
767- long getWaitLoopsWithNoDelay() {
768- return _waitLoopsWithNoDelay.get();
769- }
770 }
771
772=== modified file 'src/main/java/com/persistit/Persistit.java'
773--- src/main/java/com/persistit/Persistit.java 2012-09-11 14:14:55 +0000
774+++ src/main/java/com/persistit/Persistit.java 2012-09-12 22:04:20 +0000
775@@ -327,11 +327,11 @@
776 private final static SplitPolicy DEFAULT_SPLIT_POLICY = SplitPolicy.PACK_BIAS;
777 private final static JoinPolicy DEFAULT_JOIN_POLICY = JoinPolicy.EVEN_BIAS;
778 private final static CommitPolicy DEFAULT_TRANSACTION_COMMIT_POLICY = CommitPolicy.SOFT;
779- private final static long DEFAULT_COMMIT_LEAD_TIME = 100;
780- private final static long DEFAULT_COMMIT_STALL_TIME = 1;
781- private final static long MAX_COMMIT_LEAD_TIME = 5000;
782- private final static long MAX_COMMIT_STALL_TIME = 5000;
783- private final static long FLUSH_DELAY_INTERVAL = 5000;
784+ private final static long DEFAULT_COMMIT_LEAD_TIME_MS = 100;
785+ private final static long DEFAULT_COMMIT_STALL_TIME_MS = 1;
786+ private final static long MAX_COMMIT_LEAD_TIME_MS = 5000;
787+ private final static long MAX_COMMIT_STALL_TIME_MS = 5000;
788+ private final static long LOG_FLUSH_DELAY_INTERVAL_MS = 5000;
789
790 private final static int MAX_FATAL_ERROR_MESSAGES = 10;
791
792@@ -365,7 +365,7 @@
793 public void run() {
794 while (!_stop) {
795 try {
796- Util.sleep(FLUSH_DELAY_INTERVAL);
797+ Util.sleep(LOG_FLUSH_DELAY_INTERVAL_MS);
798 } catch (final PersistitInterruptedException ie) {
799 break;
800 }
801@@ -458,9 +458,9 @@
802
803 private volatile CommitPolicy _defaultCommitPolicy = DEFAULT_TRANSACTION_COMMIT_POLICY;
804
805- private volatile long _commitLeadTime = DEFAULT_COMMIT_LEAD_TIME;
806+ private volatile long _commitLeadTime = DEFAULT_COMMIT_LEAD_TIME_MS;
807
808- private volatile long _commitStallTime = DEFAULT_COMMIT_STALL_TIME;
809+ private volatile long _commitStallTime = DEFAULT_COMMIT_STALL_TIME_MS;
810
811 private final ThreadLocal<SoftReference<int[]>> _intArrayThreadLocal = new ThreadLocal<SoftReference<int[]>>();
812
813@@ -1447,6 +1447,8 @@
814 */
815 for (int i = 0; i < 5; ++i) {
816 if (!_closed.get() && _initialized.get()) {
817+ _transactionIndex.updateActiveTransactionCache();
818+ _journalManager.pruneObsoleteTransactions();
819 _checkpointManager.checkpoint();
820 _journalManager.copyBack();
821 final int fileCount = _journalManager.getJournalFileCount();
822@@ -2051,7 +2053,7 @@
823 }
824
825 void setTransactionCommitleadTime(final long time) {
826- _commitLeadTime = Util.rangeCheck(time, 0, MAX_COMMIT_LEAD_TIME);
827+ _commitLeadTime = Util.rangeCheck(time, 0, MAX_COMMIT_LEAD_TIME_MS);
828 }
829
830 long getTransactionCommitStallTime() {
831@@ -2059,7 +2061,7 @@
832 }
833
834 void setTransactionCommitStallTime(final long time) {
835- _commitStallTime = Util.rangeCheck(time, 0, MAX_COMMIT_STALL_TIME);
836+ _commitStallTime = Util.rangeCheck(time, 0, MAX_COMMIT_STALL_TIME_MS);
837 }
838
839 /**
840
841=== modified file 'src/main/java/com/persistit/TransactionPlayer.java'
842--- src/main/java/com/persistit/TransactionPlayer.java 2012-08-24 13:57:19 +0000
843+++ src/main/java/com/persistit/TransactionPlayer.java 2012-09-12 22:04:20 +0000
844@@ -300,7 +300,9 @@
845 if (VolumeStructure.DIRECTORY_TREE_NAME.equals(td.getTreeName())) {
846 return volume.getStructure().directoryExchange();
847 } else {
848- return _support.getPersistit().getExchange(volume, td.getTreeName(), true);
849+ final Exchange exchange = _support.getPersistit().getExchange(volume, td.getTreeName(), true);
850+ exchange.ignoreTransactions();
851+ return exchange;
852 }
853 }
854
855
856=== modified file 'src/main/java/com/persistit/logging/LogBase.java'
857--- src/main/java/com/persistit/logging/LogBase.java 2012-08-24 14:00:17 +0000
858+++ src/main/java/com/persistit/logging/LogBase.java 2012-09-12 22:04:20 +0000
859@@ -232,7 +232,7 @@
860 @Message("WARNING|Crash retried %,d times on %s")
861 public final LogItem crashRetry = PersistitLogMessage.empty();
862
863- @Message("WARNING|Journal flush operation took %,dms")
864+ @Message("WARNING|Journal flush operation took %,dms last %,d cycles average is %,dms")
865 public final LogItem longJournalIO = PersistitLogMessage.empty();
866
867 @Message("INFO|Normal journal file count %,d")
868@@ -247,7 +247,7 @@
869 @Message("INFO|Preloading buffer pool inventory recorded at %tc")
870 public final LogItem bufferInventoryLoad = PersistitLogMessage.empty();
871
872- @Message("INFO|Preloaded %,d of %,d buffers")
873+ @Message("INFO|Preloaded %,d of %,d buffers in %,d seconds")
874 public final LogItem bufferInventoryProgress = PersistitLogMessage.empty();
875
876 @Message("WARNING|Exception while writing buffer pool inventory %s")
877
878=== modified file 'src/main/java/com/persistit/mxbeans/IOMeterMXBean.java'
879--- src/main/java/com/persistit/mxbeans/IOMeterMXBean.java 2012-08-24 13:57:19 +0000
880+++ src/main/java/com/persistit/mxbeans/IOMeterMXBean.java 2012-09-12 22:04:20 +0000
881@@ -69,7 +69,7 @@
882 /**
883 * @return the quiescentIOthreshold
884 */
885- @Description("Disk I/O scheduling parameter in bytes per second specifying threshold "
886+ @Description("Disk I/O scheduling parameter in KBytes per second specifying threshold "
887 + "between \"quiescent\" and \"busy\" states")
888 public long getQuiescentIOthreshold();
889
890@@ -77,7 +77,7 @@
891 * @param quiescentIO
892 * the quiescentIOthreshold to set
893 */
894- @Description("Disk I/O scheduling parameter in bytes per second specifying threshold "
895+ @Description("Disk I/O scheduling parameter in KBytes per second specifying threshold "
896 + "between \"quiescent\" and \"busy\" states")
897 public void setQuiescentIOthreshold(long quiescentIO);
898
899
900=== modified file 'src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java'
901--- src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-08-24 13:57:19 +0000
902+++ src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-09-12 22:04:20 +0000
903@@ -76,14 +76,20 @@
904 * Default time interval (in milliseconds) between calls to the
905 * FileChannel.force() method.
906 */
907- final static long DEFAULT_FLUSH_INTERVAL = 100;
908+ final static long DEFAULT_FLUSH_INTERVAL_MS = 100;
909
910 /**
911 * Default time interval (in milliseconds) between calls to the journal
912 * copier method.
913 */
914- final static long DEFAULT_COPIER_INTERVAL = 10000;
915-
916+ final static long DEFAULT_COPIER_INTERVAL_MS = 10000;
917+ /**
918+ * Default journal file count at which transactions are throttled to allow
919+ * copier to catch up.
920+ */
921+ final static int DEFAULT_URGENT_FILE_COUNT_THRESHOLD = 15;
922+ final static int MINIMUM_URGENT_FILE_COUNT_THRESHOLD = 5;
923+ final static int MAXIMUM_URGENT_FILE_COUNT_THRESHOLD = 100;
924 /**
925 * Default value for maximum pages to be copied per cycle.
926 */
927@@ -94,17 +100,17 @@
928 * exceptions on attempts to write to the journal. Prevents excessively
929 * verbose log on repeated failures.
930 */
931- final static long DEFAULT_LOG_REPEAT_INTERVAL = 60000L;
932- final static long MINIMUM_LOG_REPEAT_INTERVAL = 1000L;
933- final static long MAXIMUM_LOG_REPEAT_INTERVAL = Long.MAX_VALUE;
934+ final static long DEFAULT_LOG_REPEAT_INTERVAL_MS = 60000L;
935+ final static long MINIMUM_LOG_REPEAT_INTERVAL_MS = 1000L;
936+ final static long MAXIMUM_LOG_REPEAT_INTERVAL_MS = Long.MAX_VALUE;
937 /**
938 * Default threshold time in milliseconds for JournalManager flush
939 * operations. If a flush operation takes longer than this time, a WARNING
940 * message is written to the log.
941 */
942- final static long DEFAULT_SLOW_IO_ALERT_THRESHOLD = 2000L;
943- final static long MINIMUM_SLOW_ALERT_THRESHOLD = 100L;
944- final static long MAXIMUM_SLOW_ALERT_THRESHOLD = Long.MAX_VALUE;
945+ final static long DEFAULT_SLOW_IO_ALERT_THRESHOLD_MS = 2000L;
946+ final static long MINIMUM_SLOW_ALERT_THRESHOLD_MS = 100L;
947+ final static long MAXIMUM_SLOW_ALERT_THRESHOLD_MS = Long.MAX_VALUE;
948
949 /**
950 * File name appended when journal path specifies only a directory
951@@ -115,12 +121,6 @@
952 */
953 final static String PATH_FORMAT = "%s.%012d";
954
955- /**
956- * Default setting for number of pages in the page map before the urgency of
957- * copying starts to increase.
958- */
959- final static int DEFAULT_PAGE_MAP_SIZE_BASE = 250000;
960-
961 final static int MAXIMUM_CONCURRENT_TRANSACTIONS = 10000;
962
963 @Description("Number of transaction map items in the live map")
964@@ -243,4 +243,9 @@
965 @Description("Threshold in milliseconds for warnings of long duration flush cycles")
966 void setSlowIoAlertThreshold(long slowIoAlertThreshold);
967
968+ @Description("Journal file count threshold for throttling transactions")
969+ int getUrgentFileCountThreshold();
970+
971+ @Description("Journal file count threshold for throttling transactions")
972+ void setUrgentFileCountThreshold(int threshold);
973 }
974
975=== modified file 'src/test/java/com/persistit/JournalManagerTest.java'
976--- src/test/java/com/persistit/JournalManagerTest.java 2012-09-04 21:01:23 +0000
977+++ src/test/java/com/persistit/JournalManagerTest.java 2012-09-12 22:04:20 +0000
978@@ -33,6 +33,7 @@
979 import java.util.HashMap;
980 import java.util.HashSet;
981 import java.util.Iterator;
982+import java.util.LinkedList;
983 import java.util.List;
984 import java.util.Map;
985 import java.util.Properties;
986@@ -43,7 +44,6 @@
987
988 import com.persistit.CheckpointManager.Checkpoint;
989 import com.persistit.JournalManager.PageNode;
990-import com.persistit.Transaction.CommitPolicy;
991 import com.persistit.TransactionPlayer.TransactionPlayerListener;
992 import com.persistit.exception.PersistitException;
993 import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
994@@ -440,7 +440,7 @@
995 * Randomly invalidated PageNodes
996 */
997 {
998- final int SIZE = 5000;
999+ final int SIZE = 1000000;
1000 final Random random = new Random(1);
1001 final List<PageNode> source = testCleanupPageListSource(SIZE);
1002 int next = -1;
1003@@ -448,8 +448,8 @@
1004 if (index < next) {
1005 source.get(index).invalidate();
1006 } else {
1007- index += random.nextInt(50);
1008- next = random.nextInt(50) + index;
1009+ index += random.nextInt(5);
1010+ next = random.nextInt(5) + index;
1011 }
1012 }
1013 testCleanupPageListHelper(source);
1014@@ -538,34 +538,16 @@
1015 .getIgnoredUpdates() > 0);
1016 }
1017
1018- @Test
1019- public void waitForDurabilitySoaksCPU() throws Exception {
1020- _persistit.setDefaultTransactionCommitPolicy(CommitPolicy.HARD);
1021- final JournalManager jman = _persistit.getJournalManager();
1022- long waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay();
1023- final Exchange exchange = _persistit.getExchange("persistit", "JournalManagerTest", true);
1024- final Transaction txn = exchange.getTransaction();
1025- for (int count = 0; count < 1000; count++) {
1026- txn.begin();
1027- exchange.getValue().put(RED_FOX + count);
1028- exchange.to(count).store();
1029- txn.commit();
1030- txn.end();
1031- }
1032- waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay() - waitLoopsWithoutDelay;
1033- assertEquals("Wait loops without delay", 0, waitLoopsWithoutDelay);
1034- }
1035-
1036 private List<PageNode> testCleanupPageListSource(final int size) {
1037 final List<PageNode> source = new ArrayList<PageNode>(size);
1038- for (int index = 0; index < 1000000; index++) {
1039+ for (int index = 0; index < size; index++) {
1040 source.add(new PageNode(0, index, index * 10, index));
1041 }
1042 return source;
1043 }
1044
1045 private void testCleanupPageListHelper(final List<PageNode> source) throws Exception {
1046- final List<PageNode> cleaned = new ArrayList<PageNode>(source);
1047+ final List<PageNode> cleaned = new LinkedList<PageNode>(source);
1048 for (final Iterator<PageNode> iterator = cleaned.iterator(); iterator.hasNext();) {
1049 if (iterator.next().isInvalid()) {
1050 iterator.remove();

Subscribers

People subscribed via source and target branches