Merge lp:~pbeaman/akiban-persistit/fix-wait-for-durability into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Peter Beaman
Approved revision: 381
Merged at revision: 368
Proposed branch: lp:~pbeaman/akiban-persistit/fix-wait-for-durability
Merge into: lp:akiban-persistit
Diff against target: 1050 lines (+237/-196)
12 files modified
src/main/java/com/persistit/BufferPool.java (+13/-5)
src/main/java/com/persistit/CleanupManager.java (+32/-11)
src/main/java/com/persistit/Exchange.java (+11/-4)
src/main/java/com/persistit/IOMeter.java (+14/-4)
src/main/java/com/persistit/IntegrityCheck.java (+20/-0)
src/main/java/com/persistit/JournalManager.java (+102/-118)
src/main/java/com/persistit/Persistit.java (+12/-10)
src/main/java/com/persistit/TransactionPlayer.java (+3/-1)
src/main/java/com/persistit/logging/LogBase.java (+2/-2)
src/main/java/com/persistit/mxbeans/IOMeterMXBean.java (+2/-2)
src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java (+20/-15)
src/test/java/com/persistit/JournalManagerTest.java (+6/-24)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-wait-for-durability
Reviewer Review Type Date Requested Status
Akiban Build User Needs Fixing
Nathan Williams Approve
Review via email: mp+123174@code.launchpad.net

Description of the change

Modifies the JournalManager#waitForDurability loop to improve performance on 4-hour TPCC test and adds several smaller changes to fix problems seen while testing 4-hour TPCC extensively on perf02.

* Modify BufferPool preload warmup logging to show elapsed time

* CleanupManager no longer calls pruneObsoleteTransactions and other periodic maintenance actions on every polling cycle (wastes CPU)

* JournalManager: added UrgentFileCountThreshold value settable from MXBean to control journal file count. Revised the urgency() method and friends to work more smoothly with the settable threshold.

* JournalManager: Move the calls to force Volume updates back into the copier thread because waiting until Checkpoints causes massive I/O

* JournalManager: Remove ReentrantReadWriteLock from waitForDurability. Turned out that wasted a great deal of CPU with little benefit. Modified to a purely polling protocol and ensured every path through the loop sleeps for at least one ms. Removed detection of loops without waits and the corresponding tests since by inspection there is a sleep on every conditional branch.

* JournalManager: Corrected time interval calculations in waitForDurability

* JournalManager: Changed calculation of expected IO time to use average rather than max. Actual behavior is very erratic, and our best effort to delay commit() until completion of a force cycle is very inaccurate; we may want to remove the behavior and certainly should modify or remove the documentation.

* JournalManager: Relaxed the leadTime and flusher poll times from 100ms to 500ms. (This change may no longer be necessary; will test and revise as appropriate.)

* Persistit: Corrected a problem in copyBackPages() in which the relaxed pruneObsoleteTransactions() cycle emerged as a race that broke unit tests.

To post a comment you must log in.
Revision history for this message
Peter Beaman (pbeaman) wrote :

Pushed a couple of additional changes inspired by observing tests:

- cleanupPageList is now O(N) rather O(N*N)
- non-transactions store/remove operations now throttle
- the throttle() method itself is now faster - the throttle interval is computed every few seconds by the JOURNAL_COPIER thread.
- a couple of minor code cleanups

Revision history for this message
Nathan Williams (nwilliams) wrote :

This look pretty good.

Just a few questions:
IOMeter#DEFAULT_QUIESCENT_IO_THRESHOLD was increased by 1000. The usage of it in JournalManager was also scaled by 1024. I'm not entirely what that is used for, can you elaborate?

JournalManager#selectForCopy() went from asserting the pageNodes were valid to checking it. Can invalid nodes be in the list now?

Minor:
Probably just my preference, but constants with units in the name make one less thing to check (new intervals in CleanupManager).

review: Needs Information
Revision history for this message
Peter Beaman (pbeaman) wrote :

Thanks, good questions.

DEFAULT_QUIESCENT_IO_THRESHOLD is used to try to detect that little
I/O is currently being done and when that happens the copier interval
goes down. I'm not wedded to that heuristic, but it has been there
since the beginning. The change was to normalize the units in IOMeter.
 The IORate field attempts to show KBytes per second, whereas the
threshold used to be in Bytes per second. Now the threshold is
expressed in KBytes per second too. Clearly there's a neglected
documentation task - I will fix that.

selectNodesForCopy used to call cleanupPageList before looping, and
the assert made sense. However, because the cleanup operation can be
expensive (even with the new algorithm) I reduced the number of number
of times it is called, and therefore the selectNodesForCopy method can
now see and skip invalid PageNodes.

I agree about units in the constant names and will change.

On Mon, Sep 10, 2012 at 10:29 AM, Nathan Williams <email address hidden> wrote:
> Review: Needs Information
>
> This look pretty good.
>
> Just a few questions:
> IOMeter#DEFAULT_QUIESCENT_IO_THRESHOLD was increased by 1000. The usage of it in JournalManager was also scaled by 1024. I'm not entirely what that is used for, can you elaborate?
>
> JournalManager#selectForCopy() went from asserting the pageNodes were valid to checking it. Can invalid nodes be in the list now?
>
> Minor:
> Probably just my preference, but constants with units in the name make one less thing to check (new intervals in CleanupManager).
> --
> https://code.launchpad.net/~pbeaman/akiban-persistit/fix-wait-for-durability/+merge/123174
> You are the owner of lp:~pbeaman/akiban-persistit/fix-wait-for-durability.

Revision history for this message
Peter Beaman (pbeaman) wrote :

Revised as requested. The CleanupManager time constants are name named XXX_MS as are a number of other constants.

From original proposal reverted the changes in JOURNAL_FLUSHER poll time and SOFT commit lead times to their former values. I believe these changes didn't help and are not needed for good performance and want to benchmark again with the original values.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Thanks for the tweaks. Looks good to me.

review: Approve
Revision history for this message
Akiban Build User (build-akiban) wrote :

There was one failure during build/test:

* unknown exception (check log)

review: Needs Fixing
Revision history for this message
Peter Beaman (pbeaman) wrote :

Jenkins glitch. Reapproving.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/main/java/com/persistit/BufferPool.java'
--- src/main/java/com/persistit/BufferPool.java 2012-08-24 14:00:17 +0000
+++ src/main/java/com/persistit/BufferPool.java 2012-09-12 22:04:20 +0000
@@ -97,10 +97,11 @@
97 */97 */
98 private final static int INVENTORY_VERSIONS = 3;98 private final static int INVENTORY_VERSIONS = 3;
9999
100 private final static long NS_PER_SEC = 1000000000;
100 /**101 /**
101 * Preload log multiple102 * Preload log message interval, in seconds
102 */103 */
103 private final static int INVENTORY_PRELOAD_LOG_MESSAGE_MULTIPLE = 10000;104 private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_SEC;
104105
105 /**106 /**
106 * The Persistit instance that references this BufferPool.107 * The Persistit instance that references this BufferPool.
@@ -1451,6 +1452,9 @@
1451 void preloadBufferInventory() {1452 void preloadBufferInventory() {
1452 int count = 0;1453 int count = 0;
1453 int total = 0;1454 int total = 0;
1455 final long startTime = System.nanoTime();
1456 long reportTime = startTime;
1457
1454 try {1458 try {
1455 final JournalManager jman = _persistit.getJournalManager();1459 final JournalManager jman = _persistit.getJournalManager();
1456 final Exchange exchange = getBufferInventoryExchange();1460 final Exchange exchange = getBufferInventoryExchange();
@@ -1496,8 +1500,11 @@
1496 final Buffer buff = get(vol, pn.getPageAddress(), false, true);1500 final Buffer buff = get(vol, pn.getPageAddress(), false, true);
1497 buff.release();1501 buff.release();
1498 count++;1502 count++;
1499 if ((count % INVENTORY_PRELOAD_LOG_MESSAGE_MULTIPLE) == 0) {1503 final long now = System.nanoTime();
1500 _persistit.getLogBase().bufferInventoryProgress.log(count, total);1504 if (now - reportTime >= INVENTORY_PRELOAD_LOG_MESSAGE_NS) {
1505 _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime)
1506 / NS_PER_SEC);
1507 reportTime = now;
1501 }1508 }
1502 if (count >= _bufferCount) {1509 if (count >= _bufferCount) {
1503 //1510 //
@@ -1513,7 +1520,8 @@
1513 } catch (final PersistitException e) {1520 } catch (final PersistitException e) {
1514 _persistit.getLogBase().bufferInventoryException.log(e);1521 _persistit.getLogBase().bufferInventoryException.log(e);
1515 } finally {1522 } finally {
1516 _persistit.getLogBase().bufferInventoryProgress.log(count, total);1523 final long now = System.nanoTime();
1524 _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_SEC);
1517 }1525 }
1518 }1526 }
15191527
15201528
=== modified file 'src/main/java/com/persistit/CleanupManager.java'
--- src/main/java/com/persistit/CleanupManager.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/CleanupManager.java 2012-09-12 22:04:20 +0000
@@ -36,15 +36,19 @@
36 void performAction(Persistit persistit) throws PersistitException;36 void performAction(Persistit persistit) throws PersistitException;
37 }37 }
3838
39 final static long DEFAULT_CLEANUP_INTERVAL = 1000;39 final static long DEFAULT_CLEANUP_INTERVAL_MS = 1000;
4040
41 final static int DEFAULT_QUEUE_SIZE = 50000;41 final static int DEFAULT_QUEUE_SIZE = 50000;
4242
43 final static int WORKLIST_LENGTH = 500;43 private final static int WORKLIST_LENGTH = 500;
4444
45 private final static long DEFAULT_MINIMUM_PRUNING_DELAY = 1000;45 private final static long MINIMUM_MAINTENANCE_INTERVAL_NS = 1000000000L;
4646
47 final Queue<CleanupAction> _cleanupActionQueue = new ArrayBlockingQueue<CleanupAction>(DEFAULT_QUEUE_SIZE);47 private final static long MINIMUM_PRUNE_OBSOLETE_TRANSACTIONS_INTERVAL_NS = 50000000000L;
48
49 private final static long DEFAULT_MINIMUM_PRUNING_DELAY_NS = 1000;
50
51 private final Queue<CleanupAction> _cleanupActionQueue = new ArrayBlockingQueue<CleanupAction>(DEFAULT_QUEUE_SIZE);
4852
49 private final AtomicBoolean _closed = new AtomicBoolean();53 private final AtomicBoolean _closed = new AtomicBoolean();
5054
@@ -56,7 +60,11 @@
5660
57 private final AtomicLong _errors = new AtomicLong();61 private final AtomicLong _errors = new AtomicLong();
5862
59 private final AtomicLong _minimumPruningDelay = new AtomicLong(DEFAULT_MINIMUM_PRUNING_DELAY);63 private final AtomicLong _minimumPruningDelay = new AtomicLong(DEFAULT_MINIMUM_PRUNING_DELAY_NS);
64
65 private long _lastMaintenance;
66
67 private long _lastPruneObsoleteTransactions;
6068
61 CleanupManager(final Persistit persistit) {69 CleanupManager(final Persistit persistit) {
62 super(persistit);70 super(persistit);
@@ -64,7 +72,10 @@
6472
65 public void start() {73 public void start() {
66 _closed.set(false);74 _closed.set(false);
67 start("CLEANUP_MANAGER", DEFAULT_CLEANUP_INTERVAL);75 final long now = System.nanoTime();
76 _lastMaintenance = now;
77 _lastPruneObsoleteTransactions = now;
78 start("CLEANUP_MANAGER", DEFAULT_CLEANUP_INTERVAL_MS);
68 }79 }
6980
70 public void close(final boolean flush) throws PersistitException {81 public void close(final boolean flush) throws PersistitException {
@@ -138,9 +149,19 @@
138149
139 @Override150 @Override
140 public void poll() throws Exception {151 public void poll() throws Exception {
141 _persistit.getIOMeter().poll();152
142 _persistit.cleanup();153 final long now = System.nanoTime();
143 _persistit.getJournalManager().pruneObsoleteTransactions();154 if (now - _lastMaintenance > MINIMUM_MAINTENANCE_INTERVAL_NS) {
155 _persistit.getIOMeter().poll();
156 _persistit.cleanup();
157 _lastMaintenance = now;
158 }
159
160 if (now - _lastPruneObsoleteTransactions > MINIMUM_PRUNE_OBSOLETE_TRANSACTIONS_INTERVAL_NS) {
161 _persistit.getJournalManager().pruneObsoleteTransactions();
162 _lastPruneObsoleteTransactions = now;
163 }
164
144 final List<CleanupAction> workList = new ArrayList<CleanupAction>(WORKLIST_LENGTH);165 final List<CleanupAction> workList = new ArrayList<CleanupAction>(WORKLIST_LENGTH);
145 synchronized (this) {166 synchronized (this) {
146 while (workList.size() < WORKLIST_LENGTH) {167 while (workList.size() < WORKLIST_LENGTH) {
147168
=== modified file 'src/main/java/com/persistit/Exchange.java'
--- src/main/java/com/persistit/Exchange.java 2012-08-28 14:29:50 +0000
+++ src/main/java/com/persistit/Exchange.java 2012-09-12 22:04:20 +0000
@@ -1290,6 +1290,9 @@
1290 if (!isDirectoryExchange()) {1290 if (!isDirectoryExchange()) {
1291 _persistit.checkSuspended();1291 _persistit.checkSuspended();
1292 }1292 }
1293 if (!_ignoreTransactions && !_transaction.isActive()) {
1294 _persistit.getJournalManager().throttle();
1295 }
1293 // TODO: directoryExchange, and lots of tests, don't use transactions.1296 // TODO: directoryExchange, and lots of tests, don't use transactions.
1294 // Skip MVCC for now.1297 // Skip MVCC for now.
1295 int options = StoreOptions.WAIT;1298 int options = StoreOptions.WAIT;
@@ -2990,13 +2993,9 @@
2990 }2993 }
29912994
2992 private boolean removeInternal(final Direction selection, final boolean fetchFirst) throws PersistitException {2995 private boolean removeInternal(final Direction selection, final boolean fetchFirst) throws PersistitException {
2993 assertCorrectThread(true);
2994 _persistit.checkClosed();
2995
2996 if (selection != EQ && selection != GTEQ && selection != GT) {2996 if (selection != EQ && selection != GTEQ && selection != GT) {
2997 throw new IllegalArgumentException("Invalid mode " + selection);2997 throw new IllegalArgumentException("Invalid mode " + selection);
2998 }2998 }
2999
3000 final int keySize = _key.getEncodedSize();2999 final int keySize = _key.getEncodedSize();
30013000
3002 _key.copyTo(_spareKey3);3001 _key.copyTo(_spareKey3);
@@ -3005,6 +3004,7 @@
3005 // Special case for empty key3004 // Special case for empty key
3006 if (keySize == 0) {3005 if (keySize == 0) {
3007 if (selection == EQ) {3006 if (selection == EQ) {
3007 assertCorrectThread(true);
3008 return false;3008 return false;
3009 }3009 }
3010 _spareKey3.append(BEFORE);3010 _spareKey3.append(BEFORE);
@@ -3094,6 +3094,13 @@
3094 assertCorrectThread(true);3094 assertCorrectThread(true);
3095 _persistit.checkClosed();3095 _persistit.checkClosed();
30963096
3097 if (!isDirectoryExchange()) {
3098 _persistit.checkSuspended();
3099 }
3100 if (!_ignoreTransactions && !_transaction.isActive()) {
3101 _persistit.getJournalManager().throttle();
3102 }
3103
3097 if (_ignoreTransactions || !_transaction.isActive()) {3104 if (_ignoreTransactions || !_transaction.isActive()) {
3098 return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false);3105 return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false);
3099 }3106 }
31003107
=== modified file 'src/main/java/com/persistit/IOMeter.java'
--- src/main/java/com/persistit/IOMeter.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/IOMeter.java 2012-09-12 22:04:20 +0000
@@ -32,6 +32,7 @@
3232
33import com.persistit.mxbeans.IOMeterMXBean;33import com.persistit.mxbeans.IOMeterMXBean;
34import com.persistit.util.ArgParser;34import com.persistit.util.ArgParser;
35import com.persistit.util.Util;
3536
36/**37/**
37 * 38 *
@@ -57,7 +58,9 @@
57 private final static String DUMP_FORMAT = "time=%,12d op=%2s vol=%4s page=%,16d addr=%,16d size=%,8d index=%,7d";58 private final static String DUMP_FORMAT = "time=%,12d op=%2s vol=%4s page=%,16d addr=%,16d size=%,8d index=%,7d";
58 private final static int DUMP_RECORD_LENGTH = 37;59 private final static int DUMP_RECORD_LENGTH = 37;
5960
60 private final static int DEFAULT_QUIESCENT_IO_THRESHOLD = 100000;61 private final static int DEFAULT_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 100;
62 private final static int MINIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 0;
63 private final static int MAXIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 1000000;
6164
62 private final static int READ_PAGE_FROM_VOLUME = 1;65 private final static int READ_PAGE_FROM_VOLUME = 1;
63 private final static int READ_PAGE_FROM_JOURNAL = 2;66 private final static int READ_PAGE_FROM_JOURNAL = 2;
@@ -72,7 +75,7 @@
7275
73 private final static int ITEM_COUNT = 11;76 private final static int ITEM_COUNT = 11;
7477
75 private long _quiescentIOthreshold = DEFAULT_QUIESCENT_IO_THRESHOLD;78 private long _quiescentIOthreshold = DEFAULT_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC;
7679
77 private final AtomicReference<DataOutputStream> _logStream = new AtomicReference<DataOutputStream>();80 private final AtomicReference<DataOutputStream> _logStream = new AtomicReference<DataOutputStream>();
7881
@@ -149,6 +152,7 @@
149152
150 /**153 /**
151 * @return the quiescentIOthreshold154 * @return the quiescentIOthreshold
155 * @see #setQuiescentIOthreshold(long)
152 */156 */
153 @Override157 @Override
154 public synchronized long getQuiescentIOthreshold() {158 public synchronized long getQuiescentIOthreshold() {
@@ -156,12 +160,18 @@
156 }160 }
157161
158 /**162 /**
163 * Persistit monitors the rate at which new I/O operations are created. When
164 * the IORate falls below the quiescentIOthreshold, expressed in KBytes per
165 * second, the JOURNAL_COPIER accelerates its work to try to clean up older
166 * journal files.
167 *
159 * @param quiescentIOthreshold168 * @param quiescentIOthreshold
160 * the quiescentIOthreshold to set169 * the quiescentIOthreshold to set
161 */170 */
162 @Override171 @Override
163 public synchronized void setQuiescentIOthreshold(final long quiescentIO) {172 public synchronized void setQuiescentIOthreshold(final long quiescentIOthreshold) {
164 _quiescentIOthreshold = quiescentIO;173 _quiescentIOthreshold = Util.rangeCheck(quiescentIOthreshold, MINIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC,
174 MAXIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC);
165 }175 }
166176
167 /**177 /**
168178
=== modified file 'src/main/java/com/persistit/IntegrityCheck.java'
--- src/main/java/com/persistit/IntegrityCheck.java 2012-09-11 14:14:55 +0000
+++ src/main/java/com/persistit/IntegrityCheck.java 2012-09-12 22:04:20 +0000
@@ -386,6 +386,26 @@
386 }386 }
387387
388 /**388 /**
389 * Control output format. When CSV mode is enabled, the output is organized
390 * as comma-separated-variable text that can be imported into a spreadsheet.
391 *
392 * @param csvMode
393 */
394 public void setCsvMode(final boolean csvMode) {
395 _csv = csvMode;
396 }
397
398 /**
399 * Indicate whether CSV mode is enabled. If so the output is organized as
400 * comma-separated-variable text that can be imported into a spreadsheet.
401 *
402 * @return <code>true<c/code> if CSV mode is enabled.
403 */
404 public boolean isCsvMode() {
405 return _csv;
406 }
407
408 /**
389 * Indicate whether missing index pages should be added when an index "hole"409 * Indicate whether missing index pages should be added when an index "hole"
390 * is discovered.410 * is discovered.
391 * 411 *
392412
=== modified file 'src/main/java/com/persistit/JournalManager.java'
--- src/main/java/com/persistit/JournalManager.java 2012-09-11 14:14:55 +0000
+++ src/main/java/com/persistit/JournalManager.java 2012-09-12 22:04:20 +0000
@@ -29,15 +29,15 @@
29import java.util.Collections;29import java.util.Collections;
30import java.util.Comparator;30import java.util.Comparator;
31import java.util.HashMap;31import java.util.HashMap;
32import java.util.HashSet;
32import java.util.Iterator;33import java.util.Iterator;
33import java.util.List;34import java.util.List;
34import java.util.Map;35import java.util.Map;
36import java.util.Set;
35import java.util.SortedMap;37import java.util.SortedMap;
36import java.util.TreeMap;38import java.util.TreeMap;
37import java.util.concurrent.TimeUnit;
38import java.util.concurrent.atomic.AtomicBoolean;39import java.util.concurrent.atomic.AtomicBoolean;
39import java.util.concurrent.atomic.AtomicLong;40import java.util.concurrent.atomic.AtomicLong;
40import java.util.concurrent.locks.ReentrantReadWriteLock;
41import java.util.regex.Matcher;41import java.util.regex.Matcher;
42import java.util.regex.Pattern;42import java.util.regex.Pattern;
4343
@@ -80,9 +80,9 @@
80 final static int GENTLE_COMMIT_DELAY_MILLIS = 12;80 final static int GENTLE_COMMIT_DELAY_MILLIS = 12;
81 private final static long NS_PER_MS = 1000000L;81 private final static long NS_PER_MS = 1000000L;
82 private final static int IO_MEASUREMENT_CYCLES = 8;82 private final static int IO_MEASUREMENT_CYCLES = 8;
8383 private final static int TOO_MANY_WARN_THRESHOLD = 5;
84 private final static int TOO_MANY_WARN_THRESHOLD = 15;84 private final static int TOO_MANY_ERROR_THRESHOLD = 10;
85 private final static int TOO_MANY_ERROR_THRESHOLD = 20;85 private final static long KILO = 1024;
8686
87 /**87 /**
88 * REGEX expression that recognizes the name of a journal file.88 * REGEX expression that recognizes the name of a journal file.
@@ -104,7 +104,6 @@
104 private final Map<TreeDescriptor, Integer> _treeToHandleMap = new HashMap<TreeDescriptor, Integer>();104 private final Map<TreeDescriptor, Integer> _treeToHandleMap = new HashMap<TreeDescriptor, Integer>();
105105
106 private final Map<Integer, TreeDescriptor> _handleToTreeMap = new HashMap<Integer, TreeDescriptor>();106 private final Map<Integer, TreeDescriptor> _handleToTreeMap = new HashMap<Integer, TreeDescriptor>();
107
108 private final Map<Long, TransactionMapItem> _liveTransactionMap = new HashMap<Long, TransactionMapItem>();107 private final Map<Long, TransactionMapItem> _liveTransactionMap = new HashMap<Long, TransactionMapItem>();
109108
110 private final Persistit _persistit;109 private final Persistit _persistit;
@@ -184,9 +183,9 @@
184183
185 private final AtomicLong _totalFlushIoTime = new AtomicLong();184 private final AtomicLong _totalFlushIoTime = new AtomicLong();
186185
187 private volatile long _flushInterval = DEFAULT_FLUSH_INTERVAL;186 private volatile long _flushInterval = DEFAULT_FLUSH_INTERVAL_MS;
188187
189 private volatile long _slowIoAlertThreshold = DEFAULT_SLOW_IO_ALERT_THRESHOLD;188 private volatile long _slowIoAlertThreshold = DEFAULT_SLOW_IO_ALERT_THRESHOLD_MS;
190189
191 private final TransactionPlayer _player = new TransactionPlayer(new JournalTransactionPlayerSupport());190 private final TransactionPlayer _player = new TransactionPlayer(new JournalTransactionPlayerSupport());
192191
@@ -201,7 +200,7 @@
201 * performs I/O. Hopefully we can set good defaults and not expose these as200 * performs I/O. Hopefully we can set good defaults and not expose these as
202 * knobs.201 * knobs.
203 */202 */
204 private volatile long _copierInterval = DEFAULT_COPIER_INTERVAL;203 private volatile long _copierInterval = DEFAULT_COPIER_INTERVAL_MS;
205204
206 private volatile int _copiesPerCycle = DEFAULT_COPIES_PER_CYCLE;205 private volatile int _copiesPerCycle = DEFAULT_COPIES_PER_CYCLE;
207206
@@ -213,7 +212,9 @@
213212
214 private boolean _allowHandlesForTempVolumesAndTrees;213 private boolean _allowHandlesForTempVolumesAndTrees;
215214
216 private final AtomicLong _waitLoopsWithNoDelay = new AtomicLong();215 private volatile int _urgentFileCountThreshold = DEFAULT_URGENT_FILE_COUNT_THRESHOLD;
216
217 private volatile long _throttleSleepInterval;
217218
218 /**219 /**
219 * <p>220 * <p>
@@ -537,10 +538,22 @@
537538
538 @Override539 @Override
539 public void setSlowIoAlertThreshold(final long slowIoAlertThreshold) {540 public void setSlowIoAlertThreshold(final long slowIoAlertThreshold) {
540 Util.rangeCheck(slowIoAlertThreshold, MINIMUM_SLOW_ALERT_THRESHOLD, MAXIMUM_SLOW_ALERT_THRESHOLD);541 Util.rangeCheck(slowIoAlertThreshold, MINIMUM_SLOW_ALERT_THRESHOLD_MS, MAXIMUM_SLOW_ALERT_THRESHOLD_MS);
541 _slowIoAlertThreshold = slowIoAlertThreshold;542 _slowIoAlertThreshold = slowIoAlertThreshold;
542 }543 }
543544
545 @Override
546 public int getUrgentFileCountThreshold() {
547 return _urgentFileCountThreshold;
548 }
549
550 @Override
551 public void setUrgentFileCountThreshold(final int threshold) {
552 Util.rangeCheck(threshold, MINIMUM_URGENT_FILE_COUNT_THRESHOLD, MAXIMUM_URGENT_FILE_COUNT_THRESHOLD);
553 _urgentFileCountThreshold = threshold;
554
555 }
556
544 /**557 /**
545 * Compute an "urgency" factor that determines how vigorously the558 * Compute an "urgency" factor that determines how vigorously the
546 * JOURNAL_COPIER thread should perform I/O. This number is computed on a559 * JOURNAL_COPIER thread should perform I/O. This number is computed on a
@@ -554,8 +567,8 @@
554 if (_copyFast.get()) {567 if (_copyFast.get()) {
555 return URGENT;568 return URGENT;
556 }569 }
557 final int journalFileCount = getJournalFileCount();570 final int remainingFiles = _urgentFileCountThreshold - getJournalFileCount();
558 return Math.min(URGENT, journalFileCount);571 return Math.max(0, Math.min(URGENT - remainingFiles, URGENT));
559 }572 }
560573
561 /**574 /**
@@ -567,13 +580,9 @@
567 * @throws PersistitInterruptedException580 * @throws PersistitInterruptedException
568 */581 */
569 public void throttle() throws PersistitInterruptedException {582 public void throttle() throws PersistitInterruptedException {
570 final int urgency = urgency();583 final long interval = _throttleSleepInterval;
571 if (!_appendOnly.get()) {584 if (interval > 0) {
572 if (urgency == URGENT) {585 Util.sleep(interval);
573 Util.sleep(URGENT_COMMIT_DELAY_MILLIS);
574 } else if (urgency >= ALMOST_URGENT) {
575 Util.sleep(GENTLE_COMMIT_DELAY_MILLIS);
576 }
577 }586 }
578 }587 }
579588
@@ -1007,14 +1016,6 @@
1007 //1016 //
1008 force();1017 force();
1009 //1018 //
1010 // Make sure all copied pages have been flushed to disk.
1011 //
1012 for (final Volume vol : _volumeToHandleMap.keySet()) {
1013 if (vol.isOpened()) {
1014 vol.getStorage().force();
1015 }
1016 }
1017 //
1018 // Prepare room for CP.OVERHEAD bytes in the journal. If doing so1019 // Prepare room for CP.OVERHEAD bytes in the journal. If doing so
1019 // started a new journal file then there's no need to write another1020 // started a new journal file then there's no need to write another
1020 // CP record.1021 // CP record.
@@ -1227,7 +1228,6 @@
1227 static long fileToGeneration(final File file) {1228 static long fileToGeneration(final File file) {
1228 final Matcher matcher = PATH_PATTERN.matcher(file.getName());1229 final Matcher matcher = PATH_PATTERN.matcher(file.getName());
1229 if (matcher.matches()) {1230 if (matcher.matches()) {
1230 // TODO - validate range
1231 return Long.parseLong(matcher.group(2));1231 return Long.parseLong(matcher.group(2));
1232 } else {1232 } else {
1233 return -1;1233 return -1;
@@ -1237,7 +1237,6 @@
1237 static String fileToPath(final File file) {1237 static String fileToPath(final File file) {
1238 final Matcher matcher = PATH_PATTERN.matcher(file.getPath());1238 final Matcher matcher = PATH_PATTERN.matcher(file.getPath());
1239 if (matcher.matches()) {1239 if (matcher.matches()) {
1240 // TODO - validate range
1241 return matcher.group(1);1240 return matcher.group(1);
1242 } else {1241 } else {
1243 return null;1242 return null;
@@ -1677,11 +1676,6 @@
1677 }1676 }
1678 }1677 }
1679 //1678 //
1680 // Remove the page list entries too.
1681 //
1682 _droppedPageCount += cleanupPageList();
1683
1684 //
1685 // Remove any PageNode from the branchMap having a timestamp less1679 // Remove any PageNode from the branchMap having a timestamp less
1686 // than the checkpoint. Generally all such entries are removed after1680 // than the checkpoint. Generally all such entries are removed after
1687 // the first checkpoint that has been established after recovery.1681 // the first checkpoint that has been established after recovery.
@@ -2168,6 +2162,20 @@
2168 } finally {2162 } finally {
2169 _copying.set(false);2163 _copying.set(false);
2170 }2164 }
2165
2166 long throttleInterval = 0;
2167 if (!_appendOnly.get()) {
2168 final int urgency = urgency();
2169 if (urgency == URGENT) {
2170 throttleInterval = URGENT_COMMIT_DELAY_MILLIS;
2171 } else if (urgency > ALMOST_URGENT) {
2172 throttleInterval = GENTLE_COMMIT_DELAY_MILLIS;
2173 }
2174 }
2175 if (throttleInterval != _throttleSleepInterval) {
2176 _throttleSleepInterval = throttleInterval;
2177 }
2178
2171 }2179 }
21722180
2173 @Override2181 @Override
@@ -2175,14 +2183,14 @@
2175 return _closed.get() || _shouldStop;2183 return _closed.get() || _shouldStop;
2176 }2184 }
21772185
2178 @Override
2179 /**2186 /**
2180 * Return a nice interval, in milliseconds, to wait between2187 * Return a nice interval, in milliseconds, to wait between copierCycle
2181 * copierCycle invocations. The interval decreases as interval 2188 * invocations. The interval decreases as interval goes up, and becomes
2182 * goes up, and becomes zero when the urgency is 10. The interval2189 * zero when the urgency is greater than or equal to 8. The interval is
2183 * is also zero if there has be no recent I/O activity invoked2190 * also zero if there has be no recent I/O activity invoked by other
2184 * by other activities.2191 * activities.
2185 */2192 */
2193 @Override
2186 public long getPollInterval() {2194 public long getPollInterval() {
2187 final IOMeter iom = _persistit.getIOMeter();2195 final IOMeter iom = _persistit.getIOMeter();
2188 final long pollInterval = super.getPollInterval();2196 final long pollInterval = super.getPollInterval();
@@ -2198,7 +2206,7 @@
21982206
2199 int divisor = 1;2207 int divisor = 1;
22002208
2201 if (iom.recentCharge() < iom.getQuiescentIOthreshold()) {2209 if (iom.recentCharge() < iom.getQuiescentIOthreshold() * KILO) {
2202 divisor = HALF_URGENT;2210 divisor = HALF_URGENT;
2203 } else if (urgency > HALF_URGENT) {2211 } else if (urgency > HALF_URGENT) {
2204 divisor = urgency - HALF_URGENT;2212 divisor = urgency - HALF_URGENT;
@@ -2214,8 +2222,6 @@
22142222
2215 private class JournalFlusher extends IOTaskRunnable {2223 private class JournalFlusher extends IOTaskRunnable {
22162224
2217 final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
2218
2219 volatile long _lastExceptionTimestamp = 0;2225 volatile long _lastExceptionTimestamp = 0;
2220 volatile Exception _lastException = null;2226 volatile Exception _lastException = null;
22212227
@@ -2248,6 +2254,7 @@
2248 * posted an _endTimestamp larger than flushedTimestamp.2254 * posted an _endTimestamp larger than flushedTimestamp.
2249 */2255 */
2250 final long now = System.nanoTime();2256 final long now = System.nanoTime();
2257 long remainingStallTime = stallTime;
22512258
2252 while (true) {2259 while (true) {
2253 /*2260 /*
@@ -2269,10 +2276,10 @@
2269 endTimestamp = _endTimestamp;2276 endTimestamp = _endTimestamp;
2270 startTime = _startTime;2277 startTime = _startTime;
2271 endTime = _endTime;2278 endTime = _endTime;
2272 if (flushedTimestamp > startTimestamp && startTimestamp > endTimestamp) {
2273 estimatedRemainingIoNanos = Math.max(startTime + _expectedIoTime - now, 0);
2274 }
2275 if (startTimestamp == _startTimestamp && endTimestamp == _endTimestamp) {2279 if (startTimestamp == _startTimestamp && endTimestamp == _endTimestamp) {
2280 if (flushedTimestamp > startTimestamp && startTimestamp > endTimestamp) {
2281 estimatedRemainingIoNanos = Math.max(startTime + _expectedIoTime - now, 0);
2282 }
2276 break;2283 break;
2277 }2284 }
2278 Util.spinSleep();2285 Util.spinSleep();
@@ -2280,22 +2287,23 @@
22802287
2281 if (endTimestamp > flushedTimestamp && startTimestamp > flushedTimestamp) {2288 if (endTimestamp > flushedTimestamp && startTimestamp > flushedTimestamp) {
2282 /*2289 /*
2283 * Done - commit is fully durable2290 * Done - commit is durable
2284 */2291 */
2285 break;2292 break;
2286 }2293 }
22872294
2288 long remainingSleepNanos;2295 long remainingSleepNanos;
2289 boolean didWait = false;
2290 if (estimatedRemainingIoNanos == -1) {2296 if (estimatedRemainingIoNanos == -1) {
2291 remainingSleepNanos = Math.max(0, _flushInterval - (now - endTime));2297 remainingSleepNanos = Math.max(0, _flushInterval - (now - endTime));
2292 } else {2298 } else {
2293 remainingSleepNanos = _flushInterval;2299 remainingSleepNanos = _flushInterval;
2294 }2300 }
22952301
2296 long estimatedNanosToFinish = Math.max(estimatedRemainingIoNanos, 0);2302 long estimatedNanosToFinish;
2297 if (startTimestamp < flushedTimestamp) {2303 if (startTimestamp < flushedTimestamp) {
2298 estimatedNanosToFinish += remainingSleepNanos + _expectedIoTime;2304 estimatedNanosToFinish = remainingSleepNanos + _expectedIoTime;
2305 } else {
2306 estimatedNanosToFinish = estimatedRemainingIoNanos;
2299 }2307 }
23002308
2301 if (leadTime > 0 && leadTime * NS_PER_MS >= estimatedNanosToFinish) {2309 if (leadTime > 0 && leadTime * NS_PER_MS >= estimatedNanosToFinish) {
@@ -2311,41 +2319,20 @@
2311 * possible (determined by stallTime) before kicking the2319 * possible (determined by stallTime) before kicking the
2312 * JOURNAL_FLUSHER to write the caller's transaction.2320 * JOURNAL_FLUSHER to write the caller's transaction.
2313 */2321 */
2314 final long delay = stallTime * NS_PER_MS - estimatedNanosToFinish;2322 if (remainingStallTime > 0) {
2315 if (delay > 0) {2323 Util.sleep(remainingStallTime);
2316 Util.sleep(delay / NS_PER_MS);2324 remainingStallTime = 0;
2317 didWait = true;2325 } else {
2318 }2326 kick();
2319 kick();2327 Util.spinSleep();
2320 if (delay <= 0) {
2321 didWait = true;
2322 try {
2323 if (_lock.readLock().tryLock(NS_PER_MS, TimeUnit.NANOSECONDS)) {
2324 _lock.readLock().unlock();
2325 }
2326 } catch (final InterruptedException e) {
2327 throw new PersistitInterruptedException(e);
2328 }
2329 }2328 }
2330 } else {2329 } else {
2331 /*2330 /*
2332 * Otherwise, wait until the I/O is about half done and then2331 * Otherwise wait for concurrent I/O operation to finish. Do
2333 * retry.2332 * this by polling because our experiments with using locks
2333 * here showed significant excess CPU consumption.
2334 */2334 */
2335 final long delay = ((estimatedNanosToFinish - leadTime * NS_PER_MS) / 2) + NS_PER_MS;2335 Util.spinSleep();
2336 try {
2337 if (delay > 0) {
2338 didWait = true;
2339 if (_lock.readLock().tryLock(delay, TimeUnit.NANOSECONDS)) {
2340 _lock.readLock().unlock();
2341 }
2342 }
2343 } catch (final InterruptedException e) {
2344 throw new PersistitInterruptedException(e);
2345 }
2346 }
2347 if (!didWait) {
2348 _waitLoopsWithNoDelay.incrementAndGet();
2349 }2336 }
2350 }2337 }
2351 if (_lastExceptionTimestamp > flushedTimestamp) {2338 if (_lastExceptionTimestamp > flushedTimestamp) {
@@ -2370,7 +2357,6 @@
2370 * waitForDurability to know when the I/O operation has2357 * waitForDurability to know when the I/O operation has
2371 * finished.2358 * finished.
2372 */2359 */
2373 _lock.writeLock().lock();
2374 try {2360 try {
2375 _startTimestamp = _persistit.getTimestampAllocator().updateTimestamp();2361 _startTimestamp = _persistit.getTimestampAllocator().updateTimestamp();
2376 _startTime = System.nanoTime();2362 _startTime = System.nanoTime();
@@ -2382,22 +2368,24 @@
2382 } finally {2368 } finally {
2383 _endTime = System.nanoTime();2369 _endTime = System.nanoTime();
2384 _endTimestamp = _persistit.getTimestampAllocator().updateTimestamp();2370 _endTimestamp = _persistit.getTimestampAllocator().updateTimestamp();
2385 _lock.writeLock().unlock();
2386 }2371 }
23872372
2388 final long elapsed = _endTime - _startTime;2373 final long elapsed = _endTime - _startTime;
2389 _totalFlushCycles.incrementAndGet();2374 _totalFlushCycles.incrementAndGet();
2390 _totalFlushIoTime.addAndGet(elapsed);2375 _totalFlushIoTime.addAndGet(elapsed);
2391 _ioTimes[_ioCycle] = elapsed;2376 _ioTimes[_ioCycle] = elapsed;
2392 _ioCycle = (_ioCycle + 1) % _ioTimes.length;2377 _ioCycle = (_ioCycle + 1) % IO_MEASUREMENT_CYCLES;
23932378
2394 long max = 0;2379 long avg = 0;
2395 for (int index = 0; index < _ioTimes.length; index++) {2380 for (int index = 0; index < IO_MEASUREMENT_CYCLES; index++) {
2396 max = Math.max(max, _ioTimes[index]);2381 avg += _ioTimes[index];
2397 }2382 }
2398 _expectedIoTime = max;2383 avg /= IO_MEASUREMENT_CYCLES;
2384
2385 _expectedIoTime = avg;
2399 if (elapsed > _slowIoAlertThreshold * NS_PER_MS) {2386 if (elapsed > _slowIoAlertThreshold * NS_PER_MS) {
2400 _persistit.getLogBase().longJournalIO.log(elapsed / NS_PER_MS);2387 _persistit.getLogBase().longJournalIO.log(elapsed / NS_PER_MS, IO_MEASUREMENT_CYCLES, avg
2388 / NS_PER_MS);
2401 }2389 }
24022390
2403 } catch (final Exception e) {2391 } catch (final Exception e) {
@@ -2416,6 +2404,7 @@
2416 } finally {2404 } finally {
2417 _flushing.set(false);2405 _flushing.set(false);
2418 }2406 }
2407
2419 }2408 }
24202409
2421 @Override2410 @Override
@@ -2426,14 +2415,12 @@
24262415
2427 synchronized void selectForCopy(final List<PageNode> list) {2416 synchronized void selectForCopy(final List<PageNode> list) {
2428 list.clear();2417 list.clear();
2429 _droppedPageCount += cleanupPageList();
2430 if (!_appendOnly.get()) {2418 if (!_appendOnly.get()) {
2431 final long timeStampUpperBound = Math.min(getLastValidCheckpointTimestamp(), _copierTimestampLimit);2419 final long timeStampUpperBound = Math.min(getLastValidCheckpointTimestamp(), _copierTimestampLimit);
2432 for (final Iterator<PageNode> iterator = _pageList.iterator(); iterator.hasNext();) {2420 for (final Iterator<PageNode> iterator = _pageList.iterator(); iterator.hasNext();) {
2433 final PageNode pageNode = iterator.next();2421 final PageNode pageNode = iterator.next();
2434 for (PageNode pn = pageNode; pn != null; pn = pn.getPrevious()) {2422 for (PageNode pn = pageNode; pn != null && !pn.isInvalid(); pn = pn.getPrevious()) {
2435 if (pn.getTimestamp() < timeStampUpperBound) {2423 if (pn.getTimestamp() < timeStampUpperBound) {
2436 assert !pn.isInvalid();
2437 list.add(pn);2424 list.add(pn);
2438 break;2425 break;
2439 }2426 }
@@ -2517,6 +2504,7 @@
2517 Volume volumeRef = null;2504 Volume volumeRef = null;
2518 Volume volume = null;2505 Volume volume = null;
2519 int handle = -1;2506 int handle = -1;
2507 final Set<Volume> volumes = new HashSet<Volume>();
25202508
2521 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {2509 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
2522 if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {2510 if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {
@@ -2567,6 +2555,7 @@
25672555
2568 try {2556 try {
2569 volume.getStorage().writePage(bb, pageAddress);2557 volume.getStorage().writePage(bb, pageAddress);
2558 volumes.add(volume);
2570 } catch (final PersistitException ioe) {2559 } catch (final PersistitException ioe) {
2571 _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(),2560 _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(),
2572 pageNode.getJournalAddress());2561 pageNode.getJournalAddress());
@@ -2578,6 +2567,10 @@
2578 pageNode.getJournalAddress(), urgency());2567 pageNode.getJournalAddress(), urgency());
2579 }2568 }
25802569
2570 for (final Volume vol : volumes) {
2571 vol.getStorage().force();
2572 }
2573
2581 }2574 }
25822575
2583 private void cleanupForCopy(final List<PageNode> list) throws PersistitException {2576 private void cleanupForCopy(final List<PageNode> list) throws PersistitException {
@@ -2715,26 +2708,21 @@
2715 * @return Count of removed PageNode instances.2708 * @return Count of removed PageNode instances.
2716 */2709 */
2717 int cleanupPageList() {2710 int cleanupPageList() {
2718 int to = -1;2711 final int size = _pageList.size();
2719 int count = 0;2712 int from;
2720 for (int index = _pageList.size(); --index >= 0;) {2713 for (from = 0; from < size && !_pageList.get(from).isInvalid(); from++)
2721 if (_pageList.get(index).isInvalid()) {2714 ;
2722 if (to == -1) {2715 int to = from;
2723 to = index;2716 for (from = from + 1; from < size; from++) {
2724 }2717 final PageNode pn = _pageList.get(from);
2725 } else {2718 if (!pn.isInvalid()) {
2726 if (to != -1) {2719 _pageList.set(to++, pn);
2727 _pageList.removeRange(index + 1, to + 1);
2728 count += to - index;
2729 to = -1;
2730 }
2731 }2720 }
2732 }2721 }
2733 if (to != -1) {2722 if (size > to) {
2734 _pageList.removeRange(0, to + 1);2723 _pageList.removeRange(to, size);
2735 count += to + 1;
2736 }2724 }
2737 return count;2725 return size - to;
2738 }2726 }
27392727
2740 private void reportJournalFileCount() {2728 private void reportJournalFileCount() {
@@ -2744,11 +2732,11 @@
2744 */2732 */
2745 final int journalFileCount = getJournalFileCount();2733 final int journalFileCount = getJournalFileCount();
2746 if (journalFileCount != _lastReportedJournalFileCount) {2734 if (journalFileCount != _lastReportedJournalFileCount) {
2747 if (journalFileCount > TOO_MANY_ERROR_THRESHOLD) {2735 if (journalFileCount > TOO_MANY_ERROR_THRESHOLD + _urgentFileCountThreshold) {
2748 _persistit.getAlertMonitor()2736 _persistit.getAlertMonitor()
2749 .post(new Event(AlertLevel.ERROR, _persistit.getLogBase().tooManyJournalFilesError,2737 .post(new Event(AlertLevel.ERROR, _persistit.getLogBase().tooManyJournalFilesError,
2750 journalFileCount), AlertMonitor.MANY_JOURNAL_FILES);2738 journalFileCount), AlertMonitor.MANY_JOURNAL_FILES);
2751 } else if (journalFileCount > TOO_MANY_WARN_THRESHOLD) {2739 } else if (journalFileCount > TOO_MANY_WARN_THRESHOLD + _urgentFileCountThreshold) {
2752 _persistit.getAlertMonitor()2740 _persistit.getAlertMonitor()
2753 .post(new Event(AlertLevel.WARN, _persistit.getLogBase().tooManyJournalFilesWarning,2741 .post(new Event(AlertLevel.WARN, _persistit.getLogBase().tooManyJournalFilesWarning,
2754 journalFileCount), AlertMonitor.MANY_JOURNAL_FILES);2742 journalFileCount), AlertMonitor.MANY_JOURNAL_FILES);
@@ -2923,11 +2911,11 @@
2923 _liveTransactionMap.clear();2911 _liveTransactionMap.clear();
2924 }2912 }
29252913
2926 synchronized long getCurrentJournalSize() {2914 long getCurrentJournalSize() {
2927 return _currentAddress % _blockSize;2915 return _currentAddress % _blockSize;
2928 }2916 }
29292917
2930 synchronized int getJournalFileCount() {2918 int getJournalFileCount() {
2931 return (int) (_currentAddress / _blockSize - _baseAddress / _blockSize) + 1;2919 return (int) (_currentAddress / _blockSize - _baseAddress / _blockSize) + 1;
2932 }2920 }
29332921
@@ -2989,8 +2977,4 @@
2989 public SortedMap<Integer, TreeDescriptor> queryTreeMap() {2977 public SortedMap<Integer, TreeDescriptor> queryTreeMap() {
2990 return new TreeMap<Integer, TreeDescriptor>(_handleToTreeMap);2978 return new TreeMap<Integer, TreeDescriptor>(_handleToTreeMap);
2991 }2979 }
2992
2993 long getWaitLoopsWithNoDelay() {
2994 return _waitLoopsWithNoDelay.get();
2995 }
2996}2980}
29972981
=== modified file 'src/main/java/com/persistit/Persistit.java'
--- src/main/java/com/persistit/Persistit.java 2012-09-11 14:14:55 +0000
+++ src/main/java/com/persistit/Persistit.java 2012-09-12 22:04:20 +0000
@@ -327,11 +327,11 @@
327 private final static SplitPolicy DEFAULT_SPLIT_POLICY = SplitPolicy.PACK_BIAS;327 private final static SplitPolicy DEFAULT_SPLIT_POLICY = SplitPolicy.PACK_BIAS;
328 private final static JoinPolicy DEFAULT_JOIN_POLICY = JoinPolicy.EVEN_BIAS;328 private final static JoinPolicy DEFAULT_JOIN_POLICY = JoinPolicy.EVEN_BIAS;
329 private final static CommitPolicy DEFAULT_TRANSACTION_COMMIT_POLICY = CommitPolicy.SOFT;329 private final static CommitPolicy DEFAULT_TRANSACTION_COMMIT_POLICY = CommitPolicy.SOFT;
330 private final static long DEFAULT_COMMIT_LEAD_TIME = 100;330 private final static long DEFAULT_COMMIT_LEAD_TIME_MS = 100;
331 private final static long DEFAULT_COMMIT_STALL_TIME = 1;331 private final static long DEFAULT_COMMIT_STALL_TIME_MS = 1;
332 private final static long MAX_COMMIT_LEAD_TIME = 5000;332 private final static long MAX_COMMIT_LEAD_TIME_MS = 5000;
333 private final static long MAX_COMMIT_STALL_TIME = 5000;333 private final static long MAX_COMMIT_STALL_TIME_MS = 5000;
334 private final static long FLUSH_DELAY_INTERVAL = 5000;334 private final static long LOG_FLUSH_DELAY_INTERVAL_MS = 5000;
335335
336 private final static int MAX_FATAL_ERROR_MESSAGES = 10;336 private final static int MAX_FATAL_ERROR_MESSAGES = 10;
337337
@@ -365,7 +365,7 @@
365 public void run() {365 public void run() {
366 while (!_stop) {366 while (!_stop) {
367 try {367 try {
368 Util.sleep(FLUSH_DELAY_INTERVAL);368 Util.sleep(LOG_FLUSH_DELAY_INTERVAL_MS);
369 } catch (final PersistitInterruptedException ie) {369 } catch (final PersistitInterruptedException ie) {
370 break;370 break;
371 }371 }
@@ -458,9 +458,9 @@
458458
459 private volatile CommitPolicy _defaultCommitPolicy = DEFAULT_TRANSACTION_COMMIT_POLICY;459 private volatile CommitPolicy _defaultCommitPolicy = DEFAULT_TRANSACTION_COMMIT_POLICY;
460460
461 private volatile long _commitLeadTime = DEFAULT_COMMIT_LEAD_TIME;461 private volatile long _commitLeadTime = DEFAULT_COMMIT_LEAD_TIME_MS;
462462
463 private volatile long _commitStallTime = DEFAULT_COMMIT_STALL_TIME;463 private volatile long _commitStallTime = DEFAULT_COMMIT_STALL_TIME_MS;
464464
465 private final ThreadLocal<SoftReference<int[]>> _intArrayThreadLocal = new ThreadLocal<SoftReference<int[]>>();465 private final ThreadLocal<SoftReference<int[]>> _intArrayThreadLocal = new ThreadLocal<SoftReference<int[]>>();
466466
@@ -1447,6 +1447,8 @@
1447 */1447 */
1448 for (int i = 0; i < 5; ++i) {1448 for (int i = 0; i < 5; ++i) {
1449 if (!_closed.get() && _initialized.get()) {1449 if (!_closed.get() && _initialized.get()) {
1450 _transactionIndex.updateActiveTransactionCache();
1451 _journalManager.pruneObsoleteTransactions();
1450 _checkpointManager.checkpoint();1452 _checkpointManager.checkpoint();
1451 _journalManager.copyBack();1453 _journalManager.copyBack();
1452 final int fileCount = _journalManager.getJournalFileCount();1454 final int fileCount = _journalManager.getJournalFileCount();
@@ -2051,7 +2053,7 @@
2051 }2053 }
20522054
2053 void setTransactionCommitleadTime(final long time) {2055 void setTransactionCommitleadTime(final long time) {
2054 _commitLeadTime = Util.rangeCheck(time, 0, MAX_COMMIT_LEAD_TIME);2056 _commitLeadTime = Util.rangeCheck(time, 0, MAX_COMMIT_LEAD_TIME_MS);
2055 }2057 }
20562058
2057 long getTransactionCommitStallTime() {2059 long getTransactionCommitStallTime() {
@@ -2059,7 +2061,7 @@
2059 }2061 }
20602062
2061 void setTransactionCommitStallTime(final long time) {2063 void setTransactionCommitStallTime(final long time) {
2062 _commitStallTime = Util.rangeCheck(time, 0, MAX_COMMIT_STALL_TIME);2064 _commitStallTime = Util.rangeCheck(time, 0, MAX_COMMIT_STALL_TIME_MS);
2063 }2065 }
20642066
2065/**2067/**
20662068
=== modified file 'src/main/java/com/persistit/TransactionPlayer.java'
--- src/main/java/com/persistit/TransactionPlayer.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/TransactionPlayer.java 2012-09-12 22:04:20 +0000
@@ -300,7 +300,9 @@
300 if (VolumeStructure.DIRECTORY_TREE_NAME.equals(td.getTreeName())) {300 if (VolumeStructure.DIRECTORY_TREE_NAME.equals(td.getTreeName())) {
301 return volume.getStructure().directoryExchange();301 return volume.getStructure().directoryExchange();
302 } else {302 } else {
303 return _support.getPersistit().getExchange(volume, td.getTreeName(), true);303 final Exchange exchange = _support.getPersistit().getExchange(volume, td.getTreeName(), true);
304 exchange.ignoreTransactions();
305 return exchange;
304 }306 }
305 }307 }
306308
307309
=== modified file 'src/main/java/com/persistit/logging/LogBase.java'
--- src/main/java/com/persistit/logging/LogBase.java 2012-08-24 14:00:17 +0000
+++ src/main/java/com/persistit/logging/LogBase.java 2012-09-12 22:04:20 +0000
@@ -232,7 +232,7 @@
232 @Message("WARNING|Crash retried %,d times on %s")232 @Message("WARNING|Crash retried %,d times on %s")
233 public final LogItem crashRetry = PersistitLogMessage.empty();233 public final LogItem crashRetry = PersistitLogMessage.empty();
234234
235 @Message("WARNING|Journal flush operation took %,dms")235 @Message("WARNING|Journal flush operation took %,dms last %,d cycles average is %,dms")
236 public final LogItem longJournalIO = PersistitLogMessage.empty();236 public final LogItem longJournalIO = PersistitLogMessage.empty();
237237
238 @Message("INFO|Normal journal file count %,d")238 @Message("INFO|Normal journal file count %,d")
@@ -247,7 +247,7 @@
247 @Message("INFO|Preloading buffer pool inventory recorded at %tc")247 @Message("INFO|Preloading buffer pool inventory recorded at %tc")
248 public final LogItem bufferInventoryLoad = PersistitLogMessage.empty();248 public final LogItem bufferInventoryLoad = PersistitLogMessage.empty();
249249
250 @Message("INFO|Preloaded %,d of %,d buffers")250 @Message("INFO|Preloaded %,d of %,d buffers in %,d seconds")
251 public final LogItem bufferInventoryProgress = PersistitLogMessage.empty();251 public final LogItem bufferInventoryProgress = PersistitLogMessage.empty();
252252
253 @Message("WARNING|Exception while writing buffer pool inventory %s")253 @Message("WARNING|Exception while writing buffer pool inventory %s")
254254
=== modified file 'src/main/java/com/persistit/mxbeans/IOMeterMXBean.java'
--- src/main/java/com/persistit/mxbeans/IOMeterMXBean.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/mxbeans/IOMeterMXBean.java 2012-09-12 22:04:20 +0000
@@ -69,7 +69,7 @@
69 /**69 /**
70 * @return the quiescentIOthreshold70 * @return the quiescentIOthreshold
71 */71 */
72 @Description("Disk I/O scheduling parameter in bytes per second specifying threshold "72 @Description("Disk I/O scheduling parameter in KBytes per second specifying threshold "
73 + "between \"quiescent\" and \"busy\" states")73 + "between \"quiescent\" and \"busy\" states")
74 public long getQuiescentIOthreshold();74 public long getQuiescentIOthreshold();
7575
@@ -77,7 +77,7 @@
77 * @param quiescentIO77 * @param quiescentIO
78 * the quiescentIOthreshold to set78 * the quiescentIOthreshold to set
79 */79 */
80 @Description("Disk I/O scheduling parameter in bytes per second specifying threshold "80 @Description("Disk I/O scheduling parameter in KBytes per second specifying threshold "
81 + "between \"quiescent\" and \"busy\" states")81 + "between \"quiescent\" and \"busy\" states")
82 public void setQuiescentIOthreshold(long quiescentIO);82 public void setQuiescentIOthreshold(long quiescentIO);
8383
8484
=== modified file 'src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java'
--- src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-09-12 22:04:20 +0000
@@ -76,14 +76,20 @@
76 * Default time interval (in milliseconds) between calls to the76 * Default time interval (in milliseconds) between calls to the
77 * FileChannel.force() method.77 * FileChannel.force() method.
78 */78 */
79 final static long DEFAULT_FLUSH_INTERVAL = 100;79 final static long DEFAULT_FLUSH_INTERVAL_MS = 100;
8080
81 /**81 /**
82 * Default time interval (in milliseconds) between calls to the journal82 * Default time interval (in milliseconds) between calls to the journal
83 * copier method.83 * copier method.
84 */84 */
85 final static long DEFAULT_COPIER_INTERVAL = 10000;85 final static long DEFAULT_COPIER_INTERVAL_MS = 10000;
8686 /**
87 * Default journal file count at which transactions are throttled to allow
88 * copier to catch up.
89 */
90 final static int DEFAULT_URGENT_FILE_COUNT_THRESHOLD = 15;
91 final static int MINIMUM_URGENT_FILE_COUNT_THRESHOLD = 5;
92 final static int MAXIMUM_URGENT_FILE_COUNT_THRESHOLD = 100;
87 /**93 /**
88 * Default value for maximum pages to be copied per cycle.94 * Default value for maximum pages to be copied per cycle.
89 */95 */
@@ -94,17 +100,17 @@
94 * exceptions on attempts to write to the journal. Prevents excessively100 * exceptions on attempts to write to the journal. Prevents excessively
95 * verbose log on repeated failures.101 * verbose log on repeated failures.
96 */102 */
97 final static long DEFAULT_LOG_REPEAT_INTERVAL = 60000L;103 final static long DEFAULT_LOG_REPEAT_INTERVAL_MS = 60000L;
98 final static long MINIMUM_LOG_REPEAT_INTERVAL = 1000L;104 final static long MINIMUM_LOG_REPEAT_INTERVAL_MS = 1000L;
99 final static long MAXIMUM_LOG_REPEAT_INTERVAL = Long.MAX_VALUE;105 final static long MAXIMUM_LOG_REPEAT_INTERVAL_MS = Long.MAX_VALUE;
100 /**106 /**
101 * Default threshold time in milliseconds for JournalManager flush107 * Default threshold time in milliseconds for JournalManager flush
102 * operations. If a flush operation takes longer than this time, a WARNING108 * operations. If a flush operation takes longer than this time, a WARNING
103 * message is written to the log.109 * message is written to the log.
104 */110 */
105 final static long DEFAULT_SLOW_IO_ALERT_THRESHOLD = 2000L;111 final static long DEFAULT_SLOW_IO_ALERT_THRESHOLD_MS = 2000L;
106 final static long MINIMUM_SLOW_ALERT_THRESHOLD = 100L;112 final static long MINIMUM_SLOW_ALERT_THRESHOLD_MS = 100L;
107 final static long MAXIMUM_SLOW_ALERT_THRESHOLD = Long.MAX_VALUE;113 final static long MAXIMUM_SLOW_ALERT_THRESHOLD_MS = Long.MAX_VALUE;
108114
109 /**115 /**
110 * File name appended when journal path specifies only a directory116 * File name appended when journal path specifies only a directory
@@ -115,12 +121,6 @@
115 */121 */
116 final static String PATH_FORMAT = "%s.%012d";122 final static String PATH_FORMAT = "%s.%012d";
117123
118 /**
119 * Default setting for number of pages in the page map before the urgency of
120 * copying starts to increase.
121 */
122 final static int DEFAULT_PAGE_MAP_SIZE_BASE = 250000;
123
124 final static int MAXIMUM_CONCURRENT_TRANSACTIONS = 10000;124 final static int MAXIMUM_CONCURRENT_TRANSACTIONS = 10000;
125125
126 @Description("Number of transaction map items in the live map")126 @Description("Number of transaction map items in the live map")
@@ -243,4 +243,9 @@
243 @Description("Threshold in milliseconds for warnings of long duration flush cycles")243 @Description("Threshold in milliseconds for warnings of long duration flush cycles")
244 void setSlowIoAlertThreshold(long slowIoAlertThreshold);244 void setSlowIoAlertThreshold(long slowIoAlertThreshold);
245245
246 @Description("Journal file count threshold for throttling transactions")
247 int getUrgentFileCountThreshold();
248
249 @Description("Journal file count threshold for throttling transactions")
250 void setUrgentFileCountThreshold(int threshold);
246}251}
247252
=== modified file 'src/test/java/com/persistit/JournalManagerTest.java'
--- src/test/java/com/persistit/JournalManagerTest.java 2012-09-04 21:01:23 +0000
+++ src/test/java/com/persistit/JournalManagerTest.java 2012-09-12 22:04:20 +0000
@@ -33,6 +33,7 @@
33import java.util.HashMap;33import java.util.HashMap;
34import java.util.HashSet;34import java.util.HashSet;
35import java.util.Iterator;35import java.util.Iterator;
36import java.util.LinkedList;
36import java.util.List;37import java.util.List;
37import java.util.Map;38import java.util.Map;
38import java.util.Properties;39import java.util.Properties;
@@ -43,7 +44,6 @@
4344
44import com.persistit.CheckpointManager.Checkpoint;45import com.persistit.CheckpointManager.Checkpoint;
45import com.persistit.JournalManager.PageNode;46import com.persistit.JournalManager.PageNode;
46import com.persistit.Transaction.CommitPolicy;
47import com.persistit.TransactionPlayer.TransactionPlayerListener;47import com.persistit.TransactionPlayer.TransactionPlayerListener;
48import com.persistit.exception.PersistitException;48import com.persistit.exception.PersistitException;
49import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;49import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
@@ -440,7 +440,7 @@
440 * Randomly invalidated PageNodes440 * Randomly invalidated PageNodes
441 */441 */
442 {442 {
443 final int SIZE = 5000;443 final int SIZE = 1000000;
444 final Random random = new Random(1);444 final Random random = new Random(1);
445 final List<PageNode> source = testCleanupPageListSource(SIZE);445 final List<PageNode> source = testCleanupPageListSource(SIZE);
446 int next = -1;446 int next = -1;
@@ -448,8 +448,8 @@
448 if (index < next) {448 if (index < next) {
449 source.get(index).invalidate();449 source.get(index).invalidate();
450 } else {450 } else {
451 index += random.nextInt(50);451 index += random.nextInt(5);
452 next = random.nextInt(50) + index;452 next = random.nextInt(5) + index;
453 }453 }
454 }454 }
455 testCleanupPageListHelper(source);455 testCleanupPageListHelper(source);
@@ -538,34 +538,16 @@
538 .getIgnoredUpdates() > 0);538 .getIgnoredUpdates() > 0);
539 }539 }
540540
541 @Test
542 public void waitForDurabilitySoaksCPU() throws Exception {
543 _persistit.setDefaultTransactionCommitPolicy(CommitPolicy.HARD);
544 final JournalManager jman = _persistit.getJournalManager();
545 long waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay();
546 final Exchange exchange = _persistit.getExchange("persistit", "JournalManagerTest", true);
547 final Transaction txn = exchange.getTransaction();
548 for (int count = 0; count < 1000; count++) {
549 txn.begin();
550 exchange.getValue().put(RED_FOX + count);
551 exchange.to(count).store();
552 txn.commit();
553 txn.end();
554 }
555 waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay() - waitLoopsWithoutDelay;
556 assertEquals("Wait loops without delay", 0, waitLoopsWithoutDelay);
557 }
558
559 private List<PageNode> testCleanupPageListSource(final int size) {541 private List<PageNode> testCleanupPageListSource(final int size) {
560 final List<PageNode> source = new ArrayList<PageNode>(size);542 final List<PageNode> source = new ArrayList<PageNode>(size);
561 for (int index = 0; index < 1000000; index++) {543 for (int index = 0; index < size; index++) {
562 source.add(new PageNode(0, index, index * 10, index));544 source.add(new PageNode(0, index, index * 10, index));
563 }545 }
564 return source;546 return source;
565 }547 }
566548
567 private void testCleanupPageListHelper(final List<PageNode> source) throws Exception {549 private void testCleanupPageListHelper(final List<PageNode> source) throws Exception {
568 final List<PageNode> cleaned = new ArrayList<PageNode>(source);550 final List<PageNode> cleaned = new LinkedList<PageNode>(source);
569 for (final Iterator<PageNode> iterator = cleaned.iterator(); iterator.hasNext();) {551 for (final Iterator<PageNode> iterator = cleaned.iterator(); iterator.hasNext();) {
570 if (iterator.next().isInvalid()) {552 if (iterator.next().isInvalid()) {
571 iterator.remove();553 iterator.remove();

Subscribers

People subscribed via source and target branches