Merge lp:~pbeaman/akiban-persistit/fix-wait-for-durability into lp:akiban-persistit
- fix-wait-for-durability
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Peter Beaman | ||||
Approved revision: | 381 | ||||
Merged at revision: | 368 | ||||
Proposed branch: | lp:~pbeaman/akiban-persistit/fix-wait-for-durability | ||||
Merge into: | lp:akiban-persistit | ||||
Diff against target: |
1050 lines (+237/-196) 12 files modified
src/main/java/com/persistit/BufferPool.java (+13/-5) src/main/java/com/persistit/CleanupManager.java (+32/-11) src/main/java/com/persistit/Exchange.java (+11/-4) src/main/java/com/persistit/IOMeter.java (+14/-4) src/main/java/com/persistit/IntegrityCheck.java (+20/-0) src/main/java/com/persistit/JournalManager.java (+102/-118) src/main/java/com/persistit/Persistit.java (+12/-10) src/main/java/com/persistit/TransactionPlayer.java (+3/-1) src/main/java/com/persistit/logging/LogBase.java (+2/-2) src/main/java/com/persistit/mxbeans/IOMeterMXBean.java (+2/-2) src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java (+20/-15) src/test/java/com/persistit/JournalManagerTest.java (+6/-24) |
||||
To merge this branch: | bzr merge lp:~pbeaman/akiban-persistit/fix-wait-for-durability | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Akiban Build User | Needs Fixing | ||
Nathan Williams | Approve | ||
Review via email: mp+123174@code.launchpad.net |
Commit message
Description of the change
Modifies the JournalManager#
* Modify BufferPool preload warmup logging to show elapsed time
* CleanupManager no longer calls pruneObsoleteTr
* JournalManager: added UrgentFileCount
* JournalManager: Move the calls to force Volume updates back into the copier thread because waiting until Checkpoints causes massive I/O
* JournalManager: Remove ReentrantReadWr
* JournalManager: Corrected time interval calculations in waitForDurability
* JournalManager: Changed calculation of expected IO time to use average rather than max. Actual behavior is very erratic, and our best effort to delay commit() until completion of a force cycle is very inaccurate; we may want to remove the behavior and certainly should modify or remove the documentation.
* JournalManager: Relaxed the leadTime and flusher poll times from 100ms to 500ms. (This change may no longer be necessary; will test and revise as appropriate.)
* Persistit: Corrected a problem in copyBackPages() in which the relaxed pruneObsoleteTr
Peter Beaman (pbeaman) wrote : | # |
Nathan Williams (nwilliams) wrote : | # |
This look pretty good.
Just a few questions:
IOMeter#
JournalManager#
Minor:
Probably just my preference, but constants with units in the name make one less thing to check (new intervals in CleanupManager).
Peter Beaman (pbeaman) wrote : | # |
Thanks, good questions.
DEFAULT_
I/O is currently being done and when that happens the copier interval
goes down. I'm not wedded to that heuristic, but it has been there
since the beginning. The change was to normalize the units in IOMeter.
The IORate field attempts to show KBytes per second, whereas the
threshold used to be in Bytes per second. Now the threshold is
expressed in KBytes per second too. Clearly there's a neglected
documentation task - I will fix that.
selectNodesForCopy used to call cleanupPageList before looping, and
the assert made sense. However, because the cleanup operation can be
expensive (even with the new algorithm) I reduced the number of number
of times it is called, and therefore the selectNodesForCopy method can
now see and skip invalid PageNodes.
I agree about units in the constant names and will change.
On Mon, Sep 10, 2012 at 10:29 AM, Nathan Williams <email address hidden> wrote:
> Review: Needs Information
>
> This look pretty good.
>
> Just a few questions:
> IOMeter#
>
> JournalManager#
>
> Minor:
> Probably just my preference, but constants with units in the name make one less thing to check (new intervals in CleanupManager).
> --
> https:/
> You are the owner of lp:~pbeaman/akiban-persistit/fix-wait-for-durability.
Peter Beaman (pbeaman) wrote : | # |
Revised as requested. The CleanupManager time constants are name named XXX_MS as are a number of other constants.
From original proposal reverted the changes in JOURNAL_FLUSHER poll time and SOFT commit lead times to their former values. I believe these changes didn't help and are not needed for good performance and want to benchmark again with the original values.
Nathan Williams (nwilliams) wrote : | # |
Thanks for the tweaks. Looks good to me.
Akiban Build User (build-akiban) wrote : | # |
There was one failure during build/test:
* unknown exception (check log)
Peter Beaman (pbeaman) wrote : | # |
Jenkins glitch. Reapproving.
Preview Diff
1 | === modified file 'src/main/java/com/persistit/BufferPool.java' | |||
2 | --- src/main/java/com/persistit/BufferPool.java 2012-08-24 14:00:17 +0000 | |||
3 | +++ src/main/java/com/persistit/BufferPool.java 2012-09-12 22:04:20 +0000 | |||
4 | @@ -97,10 +97,11 @@ | |||
5 | 97 | */ | 97 | */ |
6 | 98 | private final static int INVENTORY_VERSIONS = 3; | 98 | private final static int INVENTORY_VERSIONS = 3; |
7 | 99 | 99 | ||
8 | 100 | private final static long NS_PER_SEC = 1000000000; | ||
9 | 100 | /** | 101 | /** |
11 | 101 | * Preload log multiple | 102 | * Preload log message interval, in seconds |
12 | 102 | */ | 103 | */ |
14 | 103 | private final static int INVENTORY_PRELOAD_LOG_MESSAGE_MULTIPLE = 10000; | 104 | private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_SEC; |
15 | 104 | 105 | ||
16 | 105 | /** | 106 | /** |
17 | 106 | * The Persistit instance that references this BufferPool. | 107 | * The Persistit instance that references this BufferPool. |
18 | @@ -1451,6 +1452,9 @@ | |||
19 | 1451 | void preloadBufferInventory() { | 1452 | void preloadBufferInventory() { |
20 | 1452 | int count = 0; | 1453 | int count = 0; |
21 | 1453 | int total = 0; | 1454 | int total = 0; |
22 | 1455 | final long startTime = System.nanoTime(); | ||
23 | 1456 | long reportTime = startTime; | ||
24 | 1457 | |||
25 | 1454 | try { | 1458 | try { |
26 | 1455 | final JournalManager jman = _persistit.getJournalManager(); | 1459 | final JournalManager jman = _persistit.getJournalManager(); |
27 | 1456 | final Exchange exchange = getBufferInventoryExchange(); | 1460 | final Exchange exchange = getBufferInventoryExchange(); |
28 | @@ -1496,8 +1500,11 @@ | |||
29 | 1496 | final Buffer buff = get(vol, pn.getPageAddress(), false, true); | 1500 | final Buffer buff = get(vol, pn.getPageAddress(), false, true); |
30 | 1497 | buff.release(); | 1501 | buff.release(); |
31 | 1498 | count++; | 1502 | count++; |
34 | 1499 | if ((count % INVENTORY_PRELOAD_LOG_MESSAGE_MULTIPLE) == 0) { | 1503 | final long now = System.nanoTime(); |
35 | 1500 | _persistit.getLogBase().bufferInventoryProgress.log(count, total); | 1504 | if (now - reportTime >= INVENTORY_PRELOAD_LOG_MESSAGE_NS) { |
36 | 1505 | _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) | ||
37 | 1506 | / NS_PER_SEC); | ||
38 | 1507 | reportTime = now; | ||
39 | 1501 | } | 1508 | } |
40 | 1502 | if (count >= _bufferCount) { | 1509 | if (count >= _bufferCount) { |
41 | 1503 | // | 1510 | // |
42 | @@ -1513,7 +1520,8 @@ | |||
43 | 1513 | } catch (final PersistitException e) { | 1520 | } catch (final PersistitException e) { |
44 | 1514 | _persistit.getLogBase().bufferInventoryException.log(e); | 1521 | _persistit.getLogBase().bufferInventoryException.log(e); |
45 | 1515 | } finally { | 1522 | } finally { |
47 | 1516 | _persistit.getLogBase().bufferInventoryProgress.log(count, total); | 1523 | final long now = System.nanoTime(); |
48 | 1524 | _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_SEC); | ||
49 | 1517 | } | 1525 | } |
50 | 1518 | } | 1526 | } |
51 | 1519 | 1527 | ||
52 | 1520 | 1528 | ||
53 | === modified file 'src/main/java/com/persistit/CleanupManager.java' | |||
54 | --- src/main/java/com/persistit/CleanupManager.java 2012-08-24 13:57:19 +0000 | |||
55 | +++ src/main/java/com/persistit/CleanupManager.java 2012-09-12 22:04:20 +0000 | |||
56 | @@ -36,15 +36,19 @@ | |||
57 | 36 | void performAction(Persistit persistit) throws PersistitException; | 36 | void performAction(Persistit persistit) throws PersistitException; |
58 | 37 | } | 37 | } |
59 | 38 | 38 | ||
61 | 39 | final static long DEFAULT_CLEANUP_INTERVAL = 1000; | 39 | final static long DEFAULT_CLEANUP_INTERVAL_MS = 1000; |
62 | 40 | 40 | ||
63 | 41 | final static int DEFAULT_QUEUE_SIZE = 50000; | 41 | final static int DEFAULT_QUEUE_SIZE = 50000; |
64 | 42 | 42 | ||
70 | 43 | final static int WORKLIST_LENGTH = 500; | 43 | private final static int WORKLIST_LENGTH = 500; |
71 | 44 | 44 | ||
72 | 45 | private final static long DEFAULT_MINIMUM_PRUNING_DELAY = 1000; | 45 | private final static long MINIMUM_MAINTENANCE_INTERVAL_NS = 1000000000L; |
73 | 46 | 46 | ||
74 | 47 | final Queue<CleanupAction> _cleanupActionQueue = new ArrayBlockingQueue<CleanupAction>(DEFAULT_QUEUE_SIZE); | 47 | private final static long MINIMUM_PRUNE_OBSOLETE_TRANSACTIONS_INTERVAL_NS = 50000000000L; |
75 | 48 | |||
76 | 49 | private final static long DEFAULT_MINIMUM_PRUNING_DELAY_NS = 1000; | ||
77 | 50 | |||
78 | 51 | private final Queue<CleanupAction> _cleanupActionQueue = new ArrayBlockingQueue<CleanupAction>(DEFAULT_QUEUE_SIZE); | ||
79 | 48 | 52 | ||
80 | 49 | private final AtomicBoolean _closed = new AtomicBoolean(); | 53 | private final AtomicBoolean _closed = new AtomicBoolean(); |
81 | 50 | 54 | ||
82 | @@ -56,7 +60,11 @@ | |||
83 | 56 | 60 | ||
84 | 57 | private final AtomicLong _errors = new AtomicLong(); | 61 | private final AtomicLong _errors = new AtomicLong(); |
85 | 58 | 62 | ||
87 | 59 | private final AtomicLong _minimumPruningDelay = new AtomicLong(DEFAULT_MINIMUM_PRUNING_DELAY); | 63 | private final AtomicLong _minimumPruningDelay = new AtomicLong(DEFAULT_MINIMUM_PRUNING_DELAY_NS); |
88 | 64 | |||
89 | 65 | private long _lastMaintenance; | ||
90 | 66 | |||
91 | 67 | private long _lastPruneObsoleteTransactions; | ||
92 | 60 | 68 | ||
93 | 61 | CleanupManager(final Persistit persistit) { | 69 | CleanupManager(final Persistit persistit) { |
94 | 62 | super(persistit); | 70 | super(persistit); |
95 | @@ -64,7 +72,10 @@ | |||
96 | 64 | 72 | ||
97 | 65 | public void start() { | 73 | public void start() { |
98 | 66 | _closed.set(false); | 74 | _closed.set(false); |
100 | 67 | start("CLEANUP_MANAGER", DEFAULT_CLEANUP_INTERVAL); | 75 | final long now = System.nanoTime(); |
101 | 76 | _lastMaintenance = now; | ||
102 | 77 | _lastPruneObsoleteTransactions = now; | ||
103 | 78 | start("CLEANUP_MANAGER", DEFAULT_CLEANUP_INTERVAL_MS); | ||
104 | 68 | } | 79 | } |
105 | 69 | 80 | ||
106 | 70 | public void close(final boolean flush) throws PersistitException { | 81 | public void close(final boolean flush) throws PersistitException { |
107 | @@ -138,9 +149,19 @@ | |||
108 | 138 | 149 | ||
109 | 139 | @Override | 150 | @Override |
110 | 140 | public void poll() throws Exception { | 151 | public void poll() throws Exception { |
114 | 141 | _persistit.getIOMeter().poll(); | 152 | |
115 | 142 | _persistit.cleanup(); | 153 | final long now = System.nanoTime(); |
116 | 143 | _persistit.getJournalManager().pruneObsoleteTransactions(); | 154 | if (now - _lastMaintenance > MINIMUM_MAINTENANCE_INTERVAL_NS) { |
117 | 155 | _persistit.getIOMeter().poll(); | ||
118 | 156 | _persistit.cleanup(); | ||
119 | 157 | _lastMaintenance = now; | ||
120 | 158 | } | ||
121 | 159 | |||
122 | 160 | if (now - _lastPruneObsoleteTransactions > MINIMUM_PRUNE_OBSOLETE_TRANSACTIONS_INTERVAL_NS) { | ||
123 | 161 | _persistit.getJournalManager().pruneObsoleteTransactions(); | ||
124 | 162 | _lastPruneObsoleteTransactions = now; | ||
125 | 163 | } | ||
126 | 164 | |||
127 | 144 | final List<CleanupAction> workList = new ArrayList<CleanupAction>(WORKLIST_LENGTH); | 165 | final List<CleanupAction> workList = new ArrayList<CleanupAction>(WORKLIST_LENGTH); |
128 | 145 | synchronized (this) { | 166 | synchronized (this) { |
129 | 146 | while (workList.size() < WORKLIST_LENGTH) { | 167 | while (workList.size() < WORKLIST_LENGTH) { |
130 | 147 | 168 | ||
131 | === modified file 'src/main/java/com/persistit/Exchange.java' | |||
132 | --- src/main/java/com/persistit/Exchange.java 2012-08-28 14:29:50 +0000 | |||
133 | +++ src/main/java/com/persistit/Exchange.java 2012-09-12 22:04:20 +0000 | |||
134 | @@ -1290,6 +1290,9 @@ | |||
135 | 1290 | if (!isDirectoryExchange()) { | 1290 | if (!isDirectoryExchange()) { |
136 | 1291 | _persistit.checkSuspended(); | 1291 | _persistit.checkSuspended(); |
137 | 1292 | } | 1292 | } |
138 | 1293 | if (!_ignoreTransactions && !_transaction.isActive()) { | ||
139 | 1294 | _persistit.getJournalManager().throttle(); | ||
140 | 1295 | } | ||
141 | 1293 | // TODO: directoryExchange, and lots of tests, don't use transactions. | 1296 | // TODO: directoryExchange, and lots of tests, don't use transactions. |
142 | 1294 | // Skip MVCC for now. | 1297 | // Skip MVCC for now. |
143 | 1295 | int options = StoreOptions.WAIT; | 1298 | int options = StoreOptions.WAIT; |
144 | @@ -2990,13 +2993,9 @@ | |||
145 | 2990 | } | 2993 | } |
146 | 2991 | 2994 | ||
147 | 2992 | private boolean removeInternal(final Direction selection, final boolean fetchFirst) throws PersistitException { | 2995 | private boolean removeInternal(final Direction selection, final boolean fetchFirst) throws PersistitException { |
148 | 2993 | assertCorrectThread(true); | ||
149 | 2994 | _persistit.checkClosed(); | ||
150 | 2995 | |||
151 | 2996 | if (selection != EQ && selection != GTEQ && selection != GT) { | 2996 | if (selection != EQ && selection != GTEQ && selection != GT) { |
152 | 2997 | throw new IllegalArgumentException("Invalid mode " + selection); | 2997 | throw new IllegalArgumentException("Invalid mode " + selection); |
153 | 2998 | } | 2998 | } |
154 | 2999 | |||
155 | 3000 | final int keySize = _key.getEncodedSize(); | 2999 | final int keySize = _key.getEncodedSize(); |
156 | 3001 | 3000 | ||
157 | 3002 | _key.copyTo(_spareKey3); | 3001 | _key.copyTo(_spareKey3); |
158 | @@ -3005,6 +3004,7 @@ | |||
159 | 3005 | // Special case for empty key | 3004 | // Special case for empty key |
160 | 3006 | if (keySize == 0) { | 3005 | if (keySize == 0) { |
161 | 3007 | if (selection == EQ) { | 3006 | if (selection == EQ) { |
162 | 3007 | assertCorrectThread(true); | ||
163 | 3008 | return false; | 3008 | return false; |
164 | 3009 | } | 3009 | } |
165 | 3010 | _spareKey3.append(BEFORE); | 3010 | _spareKey3.append(BEFORE); |
166 | @@ -3094,6 +3094,13 @@ | |||
167 | 3094 | assertCorrectThread(true); | 3094 | assertCorrectThread(true); |
168 | 3095 | _persistit.checkClosed(); | 3095 | _persistit.checkClosed(); |
169 | 3096 | 3096 | ||
170 | 3097 | if (!isDirectoryExchange()) { | ||
171 | 3098 | _persistit.checkSuspended(); | ||
172 | 3099 | } | ||
173 | 3100 | if (!_ignoreTransactions && !_transaction.isActive()) { | ||
174 | 3101 | _persistit.getJournalManager().throttle(); | ||
175 | 3102 | } | ||
176 | 3103 | |||
177 | 3097 | if (_ignoreTransactions || !_transaction.isActive()) { | 3104 | if (_ignoreTransactions || !_transaction.isActive()) { |
178 | 3098 | return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false); | 3105 | return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false); |
179 | 3099 | } | 3106 | } |
180 | 3100 | 3107 | ||
181 | === modified file 'src/main/java/com/persistit/IOMeter.java' | |||
182 | --- src/main/java/com/persistit/IOMeter.java 2012-08-24 13:57:19 +0000 | |||
183 | +++ src/main/java/com/persistit/IOMeter.java 2012-09-12 22:04:20 +0000 | |||
184 | @@ -32,6 +32,7 @@ | |||
185 | 32 | 32 | ||
186 | 33 | import com.persistit.mxbeans.IOMeterMXBean; | 33 | import com.persistit.mxbeans.IOMeterMXBean; |
187 | 34 | import com.persistit.util.ArgParser; | 34 | import com.persistit.util.ArgParser; |
188 | 35 | import com.persistit.util.Util; | ||
189 | 35 | 36 | ||
190 | 36 | /** | 37 | /** |
191 | 37 | * | 38 | * |
192 | @@ -57,7 +58,9 @@ | |||
193 | 57 | private final static String DUMP_FORMAT = "time=%,12d op=%2s vol=%4s page=%,16d addr=%,16d size=%,8d index=%,7d"; | 58 | private final static String DUMP_FORMAT = "time=%,12d op=%2s vol=%4s page=%,16d addr=%,16d size=%,8d index=%,7d"; |
194 | 58 | private final static int DUMP_RECORD_LENGTH = 37; | 59 | private final static int DUMP_RECORD_LENGTH = 37; |
195 | 59 | 60 | ||
197 | 60 | private final static int DEFAULT_QUIESCENT_IO_THRESHOLD = 100000; | 61 | private final static int DEFAULT_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 100; |
198 | 62 | private final static int MINIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 0; | ||
199 | 63 | private final static int MAXIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 1000000; | ||
200 | 61 | 64 | ||
201 | 62 | private final static int READ_PAGE_FROM_VOLUME = 1; | 65 | private final static int READ_PAGE_FROM_VOLUME = 1; |
202 | 63 | private final static int READ_PAGE_FROM_JOURNAL = 2; | 66 | private final static int READ_PAGE_FROM_JOURNAL = 2; |
203 | @@ -72,7 +75,7 @@ | |||
204 | 72 | 75 | ||
205 | 73 | private final static int ITEM_COUNT = 11; | 76 | private final static int ITEM_COUNT = 11; |
206 | 74 | 77 | ||
208 | 75 | private long _quiescentIOthreshold = DEFAULT_QUIESCENT_IO_THRESHOLD; | 78 | private long _quiescentIOthreshold = DEFAULT_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC; |
209 | 76 | 79 | ||
210 | 77 | private final AtomicReference<DataOutputStream> _logStream = new AtomicReference<DataOutputStream>(); | 80 | private final AtomicReference<DataOutputStream> _logStream = new AtomicReference<DataOutputStream>(); |
211 | 78 | 81 | ||
212 | @@ -149,6 +152,7 @@ | |||
213 | 149 | 152 | ||
214 | 150 | /** | 153 | /** |
215 | 151 | * @return the quiescentIOthreshold | 154 | * @return the quiescentIOthreshold |
216 | 155 | * @see #setQuiescentIOthreshold(long) | ||
217 | 152 | */ | 156 | */ |
218 | 153 | @Override | 157 | @Override |
219 | 154 | public synchronized long getQuiescentIOthreshold() { | 158 | public synchronized long getQuiescentIOthreshold() { |
220 | @@ -156,12 +160,18 @@ | |||
221 | 156 | } | 160 | } |
222 | 157 | 161 | ||
223 | 158 | /** | 162 | /** |
224 | 163 | * Persistit monitors the rate at which new I/O operations are created. When | ||
225 | 164 | * the IORate falls below the quiescentIOthreshold, expressed in KBytes per | ||
226 | 165 | * second, the JOURNAL_COPIER accelerates its work to try to clean up older | ||
227 | 166 | * journal files. | ||
228 | 167 | * | ||
229 | 159 | * @param quiescentIOthreshold | 168 | * @param quiescentIOthreshold |
230 | 160 | * the quiescentIOthreshold to set | 169 | * the quiescentIOthreshold to set |
231 | 161 | */ | 170 | */ |
232 | 162 | @Override | 171 | @Override |
235 | 163 | public synchronized void setQuiescentIOthreshold(final long quiescentIO) { | 172 | public synchronized void setQuiescentIOthreshold(final long quiescentIOthreshold) { |
236 | 164 | _quiescentIOthreshold = quiescentIO; | 173 | _quiescentIOthreshold = Util.rangeCheck(quiescentIOthreshold, MINIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC, |
237 | 174 | MAXIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC); | ||
238 | 165 | } | 175 | } |
239 | 166 | 176 | ||
240 | 167 | /** | 177 | /** |
241 | 168 | 178 | ||
242 | === modified file 'src/main/java/com/persistit/IntegrityCheck.java' | |||
243 | --- src/main/java/com/persistit/IntegrityCheck.java 2012-09-11 14:14:55 +0000 | |||
244 | +++ src/main/java/com/persistit/IntegrityCheck.java 2012-09-12 22:04:20 +0000 | |||
245 | @@ -386,6 +386,26 @@ | |||
246 | 386 | } | 386 | } |
247 | 387 | 387 | ||
248 | 388 | /** | 388 | /** |
249 | 389 | * Control output format. When CSV mode is enabled, the output is organized | ||
250 | 390 | * as comma-separated-variable text that can be imported into a spreadsheet. | ||
251 | 391 | * | ||
252 | 392 | * @param csvMode | ||
253 | 393 | */ | ||
254 | 394 | public void setCsvMode(final boolean csvMode) { | ||
255 | 395 | _csv = csvMode; | ||
256 | 396 | } | ||
257 | 397 | |||
258 | 398 | /** | ||
259 | 399 | * Indicate whether CSV mode is enabled. If so the output is organized as | ||
260 | 400 | * comma-separated-variable text that can be imported into a spreadsheet. | ||
261 | 401 | * | ||
262 | 402 | * @return <code>true<c/code> if CSV mode is enabled. | ||
263 | 403 | */ | ||
264 | 404 | public boolean isCsvMode() { | ||
265 | 405 | return _csv; | ||
266 | 406 | } | ||
267 | 407 | |||
268 | 408 | /** | ||
269 | 389 | * Indicate whether missing index pages should be added when an index "hole" | 409 | * Indicate whether missing index pages should be added when an index "hole" |
270 | 390 | * is discovered. | 410 | * is discovered. |
271 | 391 | * | 411 | * |
272 | 392 | 412 | ||
273 | === modified file 'src/main/java/com/persistit/JournalManager.java' | |||
274 | --- src/main/java/com/persistit/JournalManager.java 2012-09-11 14:14:55 +0000 | |||
275 | +++ src/main/java/com/persistit/JournalManager.java 2012-09-12 22:04:20 +0000 | |||
276 | @@ -29,15 +29,15 @@ | |||
277 | 29 | import java.util.Collections; | 29 | import java.util.Collections; |
278 | 30 | import java.util.Comparator; | 30 | import java.util.Comparator; |
279 | 31 | import java.util.HashMap; | 31 | import java.util.HashMap; |
280 | 32 | import java.util.HashSet; | ||
281 | 32 | import java.util.Iterator; | 33 | import java.util.Iterator; |
282 | 33 | import java.util.List; | 34 | import java.util.List; |
283 | 34 | import java.util.Map; | 35 | import java.util.Map; |
284 | 36 | import java.util.Set; | ||
285 | 35 | import java.util.SortedMap; | 37 | import java.util.SortedMap; |
286 | 36 | import java.util.TreeMap; | 38 | import java.util.TreeMap; |
287 | 37 | import java.util.concurrent.TimeUnit; | ||
288 | 38 | import java.util.concurrent.atomic.AtomicBoolean; | 39 | import java.util.concurrent.atomic.AtomicBoolean; |
289 | 39 | import java.util.concurrent.atomic.AtomicLong; | 40 | import java.util.concurrent.atomic.AtomicLong; |
290 | 40 | import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
291 | 41 | import java.util.regex.Matcher; | 41 | import java.util.regex.Matcher; |
292 | 42 | import java.util.regex.Pattern; | 42 | import java.util.regex.Pattern; |
293 | 43 | 43 | ||
294 | @@ -80,9 +80,9 @@ | |||
295 | 80 | final static int GENTLE_COMMIT_DELAY_MILLIS = 12; | 80 | final static int GENTLE_COMMIT_DELAY_MILLIS = 12; |
296 | 81 | private final static long NS_PER_MS = 1000000L; | 81 | private final static long NS_PER_MS = 1000000L; |
297 | 82 | private final static int IO_MEASUREMENT_CYCLES = 8; | 82 | private final static int IO_MEASUREMENT_CYCLES = 8; |
301 | 83 | 83 | private final static int TOO_MANY_WARN_THRESHOLD = 5; | |
302 | 84 | private final static int TOO_MANY_WARN_THRESHOLD = 15; | 84 | private final static int TOO_MANY_ERROR_THRESHOLD = 10; |
303 | 85 | private final static int TOO_MANY_ERROR_THRESHOLD = 20; | 85 | private final static long KILO = 1024; |
304 | 86 | 86 | ||
305 | 87 | /** | 87 | /** |
306 | 88 | * REGEX expression that recognizes the name of a journal file. | 88 | * REGEX expression that recognizes the name of a journal file. |
307 | @@ -104,7 +104,6 @@ | |||
308 | 104 | private final Map<TreeDescriptor, Integer> _treeToHandleMap = new HashMap<TreeDescriptor, Integer>(); | 104 | private final Map<TreeDescriptor, Integer> _treeToHandleMap = new HashMap<TreeDescriptor, Integer>(); |
309 | 105 | 105 | ||
310 | 106 | private final Map<Integer, TreeDescriptor> _handleToTreeMap = new HashMap<Integer, TreeDescriptor>(); | 106 | private final Map<Integer, TreeDescriptor> _handleToTreeMap = new HashMap<Integer, TreeDescriptor>(); |
311 | 107 | |||
312 | 108 | private final Map<Long, TransactionMapItem> _liveTransactionMap = new HashMap<Long, TransactionMapItem>(); | 107 | private final Map<Long, TransactionMapItem> _liveTransactionMap = new HashMap<Long, TransactionMapItem>(); |
313 | 109 | 108 | ||
314 | 110 | private final Persistit _persistit; | 109 | private final Persistit _persistit; |
315 | @@ -184,9 +183,9 @@ | |||
316 | 184 | 183 | ||
317 | 185 | private final AtomicLong _totalFlushIoTime = new AtomicLong(); | 184 | private final AtomicLong _totalFlushIoTime = new AtomicLong(); |
318 | 186 | 185 | ||
320 | 187 | private volatile long _flushInterval = DEFAULT_FLUSH_INTERVAL; | 186 | private volatile long _flushInterval = DEFAULT_FLUSH_INTERVAL_MS; |
321 | 188 | 187 | ||
323 | 189 | private volatile long _slowIoAlertThreshold = DEFAULT_SLOW_IO_ALERT_THRESHOLD; | 188 | private volatile long _slowIoAlertThreshold = DEFAULT_SLOW_IO_ALERT_THRESHOLD_MS; |
324 | 190 | 189 | ||
325 | 191 | private final TransactionPlayer _player = new TransactionPlayer(new JournalTransactionPlayerSupport()); | 190 | private final TransactionPlayer _player = new TransactionPlayer(new JournalTransactionPlayerSupport()); |
326 | 192 | 191 | ||
327 | @@ -201,7 +200,7 @@ | |||
328 | 201 | * performs I/O. Hopefully we can set good defaults and not expose these as | 200 | * performs I/O. Hopefully we can set good defaults and not expose these as |
329 | 202 | * knobs. | 201 | * knobs. |
330 | 203 | */ | 202 | */ |
332 | 204 | private volatile long _copierInterval = DEFAULT_COPIER_INTERVAL; | 203 | private volatile long _copierInterval = DEFAULT_COPIER_INTERVAL_MS; |
333 | 205 | 204 | ||
334 | 206 | private volatile int _copiesPerCycle = DEFAULT_COPIES_PER_CYCLE; | 205 | private volatile int _copiesPerCycle = DEFAULT_COPIES_PER_CYCLE; |
335 | 207 | 206 | ||
336 | @@ -213,7 +212,9 @@ | |||
337 | 213 | 212 | ||
338 | 214 | private boolean _allowHandlesForTempVolumesAndTrees; | 213 | private boolean _allowHandlesForTempVolumesAndTrees; |
339 | 215 | 214 | ||
341 | 216 | private final AtomicLong _waitLoopsWithNoDelay = new AtomicLong(); | 215 | private volatile int _urgentFileCountThreshold = DEFAULT_URGENT_FILE_COUNT_THRESHOLD; |
342 | 216 | |||
343 | 217 | private volatile long _throttleSleepInterval; | ||
344 | 217 | 218 | ||
345 | 218 | /** | 219 | /** |
346 | 219 | * <p> | 220 | * <p> |
347 | @@ -537,10 +538,22 @@ | |||
348 | 537 | 538 | ||
349 | 538 | @Override | 539 | @Override |
350 | 539 | public void setSlowIoAlertThreshold(final long slowIoAlertThreshold) { | 540 | public void setSlowIoAlertThreshold(final long slowIoAlertThreshold) { |
352 | 540 | Util.rangeCheck(slowIoAlertThreshold, MINIMUM_SLOW_ALERT_THRESHOLD, MAXIMUM_SLOW_ALERT_THRESHOLD); | 541 | Util.rangeCheck(slowIoAlertThreshold, MINIMUM_SLOW_ALERT_THRESHOLD_MS, MAXIMUM_SLOW_ALERT_THRESHOLD_MS); |
353 | 541 | _slowIoAlertThreshold = slowIoAlertThreshold; | 542 | _slowIoAlertThreshold = slowIoAlertThreshold; |
354 | 542 | } | 543 | } |
355 | 543 | 544 | ||
356 | 545 | @Override | ||
357 | 546 | public int getUrgentFileCountThreshold() { | ||
358 | 547 | return _urgentFileCountThreshold; | ||
359 | 548 | } | ||
360 | 549 | |||
361 | 550 | @Override | ||
362 | 551 | public void setUrgentFileCountThreshold(final int threshold) { | ||
363 | 552 | Util.rangeCheck(threshold, MINIMUM_URGENT_FILE_COUNT_THRESHOLD, MAXIMUM_URGENT_FILE_COUNT_THRESHOLD); | ||
364 | 553 | _urgentFileCountThreshold = threshold; | ||
365 | 554 | |||
366 | 555 | } | ||
367 | 556 | |||
368 | 544 | /** | 557 | /** |
369 | 545 | * Compute an "urgency" factor that determines how vigorously the | 558 | * Compute an "urgency" factor that determines how vigorously the |
370 | 546 | * JOURNAL_COPIER thread should perform I/O. This number is computed on a | 559 | * JOURNAL_COPIER thread should perform I/O. This number is computed on a |
371 | @@ -554,8 +567,8 @@ | |||
372 | 554 | if (_copyFast.get()) { | 567 | if (_copyFast.get()) { |
373 | 555 | return URGENT; | 568 | return URGENT; |
374 | 556 | } | 569 | } |
377 | 557 | final int journalFileCount = getJournalFileCount(); | 570 | final int remainingFiles = _urgentFileCountThreshold - getJournalFileCount(); |
378 | 558 | return Math.min(URGENT, journalFileCount); | 571 | return Math.max(0, Math.min(URGENT - remainingFiles, URGENT)); |
379 | 559 | } | 572 | } |
380 | 560 | 573 | ||
381 | 561 | /** | 574 | /** |
382 | @@ -567,13 +580,9 @@ | |||
383 | 567 | * @throws PersistitInterruptedException | 580 | * @throws PersistitInterruptedException |
384 | 568 | */ | 581 | */ |
385 | 569 | public void throttle() throws PersistitInterruptedException { | 582 | public void throttle() throws PersistitInterruptedException { |
393 | 570 | final int urgency = urgency(); | 583 | final long interval = _throttleSleepInterval; |
394 | 571 | if (!_appendOnly.get()) { | 584 | if (interval > 0) { |
395 | 572 | if (urgency == URGENT) { | 585 | Util.sleep(interval); |
389 | 573 | Util.sleep(URGENT_COMMIT_DELAY_MILLIS); | ||
390 | 574 | } else if (urgency >= ALMOST_URGENT) { | ||
391 | 575 | Util.sleep(GENTLE_COMMIT_DELAY_MILLIS); | ||
392 | 576 | } | ||
396 | 577 | } | 586 | } |
397 | 578 | } | 587 | } |
398 | 579 | 588 | ||
399 | @@ -1007,14 +1016,6 @@ | |||
400 | 1007 | // | 1016 | // |
401 | 1008 | force(); | 1017 | force(); |
402 | 1009 | // | 1018 | // |
403 | 1010 | // Make sure all copied pages have been flushed to disk. | ||
404 | 1011 | // | ||
405 | 1012 | for (final Volume vol : _volumeToHandleMap.keySet()) { | ||
406 | 1013 | if (vol.isOpened()) { | ||
407 | 1014 | vol.getStorage().force(); | ||
408 | 1015 | } | ||
409 | 1016 | } | ||
410 | 1017 | // | ||
411 | 1018 | // Prepare room for CP.OVERHEAD bytes in the journal. If doing so | 1019 | // Prepare room for CP.OVERHEAD bytes in the journal. If doing so |
412 | 1019 | // started a new journal file then there's no need to write another | 1020 | // started a new journal file then there's no need to write another |
413 | 1020 | // CP record. | 1021 | // CP record. |
414 | @@ -1227,7 +1228,6 @@ | |||
415 | 1227 | static long fileToGeneration(final File file) { | 1228 | static long fileToGeneration(final File file) { |
416 | 1228 | final Matcher matcher = PATH_PATTERN.matcher(file.getName()); | 1229 | final Matcher matcher = PATH_PATTERN.matcher(file.getName()); |
417 | 1229 | if (matcher.matches()) { | 1230 | if (matcher.matches()) { |
418 | 1230 | // TODO - validate range | ||
419 | 1231 | return Long.parseLong(matcher.group(2)); | 1231 | return Long.parseLong(matcher.group(2)); |
420 | 1232 | } else { | 1232 | } else { |
421 | 1233 | return -1; | 1233 | return -1; |
422 | @@ -1237,7 +1237,6 @@ | |||
423 | 1237 | static String fileToPath(final File file) { | 1237 | static String fileToPath(final File file) { |
424 | 1238 | final Matcher matcher = PATH_PATTERN.matcher(file.getPath()); | 1238 | final Matcher matcher = PATH_PATTERN.matcher(file.getPath()); |
425 | 1239 | if (matcher.matches()) { | 1239 | if (matcher.matches()) { |
426 | 1240 | // TODO - validate range | ||
427 | 1241 | return matcher.group(1); | 1240 | return matcher.group(1); |
428 | 1242 | } else { | 1241 | } else { |
429 | 1243 | return null; | 1242 | return null; |
430 | @@ -1677,11 +1676,6 @@ | |||
431 | 1677 | } | 1676 | } |
432 | 1678 | } | 1677 | } |
433 | 1679 | // | 1678 | // |
434 | 1680 | // Remove the page list entries too. | ||
435 | 1681 | // | ||
436 | 1682 | _droppedPageCount += cleanupPageList(); | ||
437 | 1683 | |||
438 | 1684 | // | ||
439 | 1685 | // Remove any PageNode from the branchMap having a timestamp less | 1679 | // Remove any PageNode from the branchMap having a timestamp less |
440 | 1686 | // than the checkpoint. Generally all such entries are removed after | 1680 | // than the checkpoint. Generally all such entries are removed after |
441 | 1687 | // the first checkpoint that has been established after recovery. | 1681 | // the first checkpoint that has been established after recovery. |
442 | @@ -2168,6 +2162,20 @@ | |||
443 | 2168 | } finally { | 2162 | } finally { |
444 | 2169 | _copying.set(false); | 2163 | _copying.set(false); |
445 | 2170 | } | 2164 | } |
446 | 2165 | |||
447 | 2166 | long throttleInterval = 0; | ||
448 | 2167 | if (!_appendOnly.get()) { | ||
449 | 2168 | final int urgency = urgency(); | ||
450 | 2169 | if (urgency == URGENT) { | ||
451 | 2170 | throttleInterval = URGENT_COMMIT_DELAY_MILLIS; | ||
452 | 2171 | } else if (urgency > ALMOST_URGENT) { | ||
453 | 2172 | throttleInterval = GENTLE_COMMIT_DELAY_MILLIS; | ||
454 | 2173 | } | ||
455 | 2174 | } | ||
456 | 2175 | if (throttleInterval != _throttleSleepInterval) { | ||
457 | 2176 | _throttleSleepInterval = throttleInterval; | ||
458 | 2177 | } | ||
459 | 2178 | |||
460 | 2171 | } | 2179 | } |
461 | 2172 | 2180 | ||
462 | 2173 | @Override | 2181 | @Override |
463 | @@ -2175,14 +2183,14 @@ | |||
464 | 2175 | return _closed.get() || _shouldStop; | 2183 | return _closed.get() || _shouldStop; |
465 | 2176 | } | 2184 | } |
466 | 2177 | 2185 | ||
467 | 2178 | @Override | ||
468 | 2179 | /** | 2186 | /** |
474 | 2180 | * Return a nice interval, in milliseconds, to wait between | 2187 | * Return a nice interval, in milliseconds, to wait between copierCycle |
475 | 2181 | * copierCycle invocations. The interval decreases as interval | 2188 | * invocations. The interval decreases as interval goes up, and becomes |
476 | 2182 | * goes up, and becomes zero when the urgency is 10. The interval | 2189 | * zero when the urgency is greater than or equal to 8. The interval is |
477 | 2183 | * is also zero if there has be no recent I/O activity invoked | 2190 | * also zero if there has be no recent I/O activity invoked by other |
478 | 2184 | * by other activities. | 2191 | * activities. |
479 | 2185 | */ | 2192 | */ |
480 | 2193 | @Override | ||
481 | 2186 | public long getPollInterval() { | 2194 | public long getPollInterval() { |
482 | 2187 | final IOMeter iom = _persistit.getIOMeter(); | 2195 | final IOMeter iom = _persistit.getIOMeter(); |
483 | 2188 | final long pollInterval = super.getPollInterval(); | 2196 | final long pollInterval = super.getPollInterval(); |
484 | @@ -2198,7 +2206,7 @@ | |||
485 | 2198 | 2206 | ||
486 | 2199 | int divisor = 1; | 2207 | int divisor = 1; |
487 | 2200 | 2208 | ||
489 | 2201 | if (iom.recentCharge() < iom.getQuiescentIOthreshold()) { | 2209 | if (iom.recentCharge() < iom.getQuiescentIOthreshold() * KILO) { |
490 | 2202 | divisor = HALF_URGENT; | 2210 | divisor = HALF_URGENT; |
491 | 2203 | } else if (urgency > HALF_URGENT) { | 2211 | } else if (urgency > HALF_URGENT) { |
492 | 2204 | divisor = urgency - HALF_URGENT; | 2212 | divisor = urgency - HALF_URGENT; |
493 | @@ -2214,8 +2222,6 @@ | |||
494 | 2214 | 2222 | ||
495 | 2215 | private class JournalFlusher extends IOTaskRunnable { | 2223 | private class JournalFlusher extends IOTaskRunnable { |
496 | 2216 | 2224 | ||
497 | 2217 | final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); | ||
498 | 2218 | |||
499 | 2219 | volatile long _lastExceptionTimestamp = 0; | 2225 | volatile long _lastExceptionTimestamp = 0; |
500 | 2220 | volatile Exception _lastException = null; | 2226 | volatile Exception _lastException = null; |
501 | 2221 | 2227 | ||
502 | @@ -2248,6 +2254,7 @@ | |||
503 | 2248 | * posted an _endTimestamp larger than flushedTimestamp. | 2254 | * posted an _endTimestamp larger than flushedTimestamp. |
504 | 2249 | */ | 2255 | */ |
505 | 2250 | final long now = System.nanoTime(); | 2256 | final long now = System.nanoTime(); |
506 | 2257 | long remainingStallTime = stallTime; | ||
507 | 2251 | 2258 | ||
508 | 2252 | while (true) { | 2259 | while (true) { |
509 | 2253 | /* | 2260 | /* |
510 | @@ -2269,10 +2276,10 @@ | |||
511 | 2269 | endTimestamp = _endTimestamp; | 2276 | endTimestamp = _endTimestamp; |
512 | 2270 | startTime = _startTime; | 2277 | startTime = _startTime; |
513 | 2271 | endTime = _endTime; | 2278 | endTime = _endTime; |
514 | 2272 | if (flushedTimestamp > startTimestamp && startTimestamp > endTimestamp) { | ||
515 | 2273 | estimatedRemainingIoNanos = Math.max(startTime + _expectedIoTime - now, 0); | ||
516 | 2274 | } | ||
517 | 2275 | if (startTimestamp == _startTimestamp && endTimestamp == _endTimestamp) { | 2279 | if (startTimestamp == _startTimestamp && endTimestamp == _endTimestamp) { |
518 | 2280 | if (flushedTimestamp > startTimestamp && startTimestamp > endTimestamp) { | ||
519 | 2281 | estimatedRemainingIoNanos = Math.max(startTime + _expectedIoTime - now, 0); | ||
520 | 2282 | } | ||
521 | 2276 | break; | 2283 | break; |
522 | 2277 | } | 2284 | } |
523 | 2278 | Util.spinSleep(); | 2285 | Util.spinSleep(); |
524 | @@ -2280,22 +2287,23 @@ | |||
525 | 2280 | 2287 | ||
526 | 2281 | if (endTimestamp > flushedTimestamp && startTimestamp > flushedTimestamp) { | 2288 | if (endTimestamp > flushedTimestamp && startTimestamp > flushedTimestamp) { |
527 | 2282 | /* | 2289 | /* |
529 | 2283 | * Done - commit is fully durable | 2290 | * Done - commit is durable |
530 | 2284 | */ | 2291 | */ |
531 | 2285 | break; | 2292 | break; |
532 | 2286 | } | 2293 | } |
533 | 2287 | 2294 | ||
534 | 2288 | long remainingSleepNanos; | 2295 | long remainingSleepNanos; |
535 | 2289 | boolean didWait = false; | ||
536 | 2290 | if (estimatedRemainingIoNanos == -1) { | 2296 | if (estimatedRemainingIoNanos == -1) { |
537 | 2291 | remainingSleepNanos = Math.max(0, _flushInterval - (now - endTime)); | 2297 | remainingSleepNanos = Math.max(0, _flushInterval - (now - endTime)); |
538 | 2292 | } else { | 2298 | } else { |
539 | 2293 | remainingSleepNanos = _flushInterval; | 2299 | remainingSleepNanos = _flushInterval; |
540 | 2294 | } | 2300 | } |
541 | 2295 | 2301 | ||
543 | 2296 | long estimatedNanosToFinish = Math.max(estimatedRemainingIoNanos, 0); | 2302 | long estimatedNanosToFinish; |
544 | 2297 | if (startTimestamp < flushedTimestamp) { | 2303 | if (startTimestamp < flushedTimestamp) { |
546 | 2298 | estimatedNanosToFinish += remainingSleepNanos + _expectedIoTime; | 2304 | estimatedNanosToFinish = remainingSleepNanos + _expectedIoTime; |
547 | 2305 | } else { | ||
548 | 2306 | estimatedNanosToFinish = estimatedRemainingIoNanos; | ||
549 | 2299 | } | 2307 | } |
550 | 2300 | 2308 | ||
551 | 2301 | if (leadTime > 0 && leadTime * NS_PER_MS >= estimatedNanosToFinish) { | 2309 | if (leadTime > 0 && leadTime * NS_PER_MS >= estimatedNanosToFinish) { |
552 | @@ -2311,41 +2319,20 @@ | |||
553 | 2311 | * possible (determined by stallTime) before kicking the | 2319 | * possible (determined by stallTime) before kicking the |
554 | 2312 | * JOURNAL_FLUSHER to write the caller's transaction. | 2320 | * JOURNAL_FLUSHER to write the caller's transaction. |
555 | 2313 | */ | 2321 | */ |
571 | 2314 | final long delay = stallTime * NS_PER_MS - estimatedNanosToFinish; | 2322 | if (remainingStallTime > 0) { |
572 | 2315 | if (delay > 0) { | 2323 | Util.sleep(remainingStallTime); |
573 | 2316 | Util.sleep(delay / NS_PER_MS); | 2324 | remainingStallTime = 0; |
574 | 2317 | didWait = true; | 2325 | } else { |
575 | 2318 | } | 2326 | kick(); |
576 | 2319 | kick(); | 2327 | Util.spinSleep(); |
562 | 2320 | if (delay <= 0) { | ||
563 | 2321 | didWait = true; | ||
564 | 2322 | try { | ||
565 | 2323 | if (_lock.readLock().tryLock(NS_PER_MS, TimeUnit.NANOSECONDS)) { | ||
566 | 2324 | _lock.readLock().unlock(); | ||
567 | 2325 | } | ||
568 | 2326 | } catch (final InterruptedException e) { | ||
569 | 2327 | throw new PersistitInterruptedException(e); | ||
570 | 2328 | } | ||
577 | 2329 | } | 2328 | } |
578 | 2330 | } else { | 2329 | } else { |
579 | 2331 | /* | 2330 | /* |
582 | 2332 | * Otherwise, wait until the I/O is about half done and then | 2331 | * Otherwise wait for concurrent I/O operation to finish. Do |
583 | 2333 | * retry. | 2332 | * this by polling because our experiments with using locks |
584 | 2333 | * here showed significant excess CPU consumption. | ||
585 | 2334 | */ | 2334 | */ |
600 | 2335 | final long delay = ((estimatedNanosToFinish - leadTime * NS_PER_MS) / 2) + NS_PER_MS; | 2335 | Util.spinSleep(); |
587 | 2336 | try { | ||
588 | 2337 | if (delay > 0) { | ||
589 | 2338 | didWait = true; | ||
590 | 2339 | if (_lock.readLock().tryLock(delay, TimeUnit.NANOSECONDS)) { | ||
591 | 2340 | _lock.readLock().unlock(); | ||
592 | 2341 | } | ||
593 | 2342 | } | ||
594 | 2343 | } catch (final InterruptedException e) { | ||
595 | 2344 | throw new PersistitInterruptedException(e); | ||
596 | 2345 | } | ||
597 | 2346 | } | ||
598 | 2347 | if (!didWait) { | ||
599 | 2348 | _waitLoopsWithNoDelay.incrementAndGet(); | ||
601 | 2349 | } | 2336 | } |
602 | 2350 | } | 2337 | } |
603 | 2351 | if (_lastExceptionTimestamp > flushedTimestamp) { | 2338 | if (_lastExceptionTimestamp > flushedTimestamp) { |
604 | @@ -2370,7 +2357,6 @@ | |||
605 | 2370 | * waitForDurability to know when the I/O operation has | 2357 | * waitForDurability to know when the I/O operation has |
606 | 2371 | * finished. | 2358 | * finished. |
607 | 2372 | */ | 2359 | */ |
608 | 2373 | _lock.writeLock().lock(); | ||
609 | 2374 | try { | 2360 | try { |
610 | 2375 | _startTimestamp = _persistit.getTimestampAllocator().updateTimestamp(); | 2361 | _startTimestamp = _persistit.getTimestampAllocator().updateTimestamp(); |
611 | 2376 | _startTime = System.nanoTime(); | 2362 | _startTime = System.nanoTime(); |
612 | @@ -2382,22 +2368,24 @@ | |||
613 | 2382 | } finally { | 2368 | } finally { |
614 | 2383 | _endTime = System.nanoTime(); | 2369 | _endTime = System.nanoTime(); |
615 | 2384 | _endTimestamp = _persistit.getTimestampAllocator().updateTimestamp(); | 2370 | _endTimestamp = _persistit.getTimestampAllocator().updateTimestamp(); |
616 | 2385 | _lock.writeLock().unlock(); | ||
617 | 2386 | } | 2371 | } |
618 | 2387 | 2372 | ||
619 | 2388 | final long elapsed = _endTime - _startTime; | 2373 | final long elapsed = _endTime - _startTime; |
620 | 2389 | _totalFlushCycles.incrementAndGet(); | 2374 | _totalFlushCycles.incrementAndGet(); |
621 | 2390 | _totalFlushIoTime.addAndGet(elapsed); | 2375 | _totalFlushIoTime.addAndGet(elapsed); |
622 | 2391 | _ioTimes[_ioCycle] = elapsed; | 2376 | _ioTimes[_ioCycle] = elapsed; |
624 | 2392 | _ioCycle = (_ioCycle + 1) % _ioTimes.length; | 2377 | _ioCycle = (_ioCycle + 1) % IO_MEASUREMENT_CYCLES; |
625 | 2393 | 2378 | ||
629 | 2394 | long max = 0; | 2379 | long avg = 0; |
630 | 2395 | for (int index = 0; index < _ioTimes.length; index++) { | 2380 | for (int index = 0; index < IO_MEASUREMENT_CYCLES; index++) { |
631 | 2396 | max = Math.max(max, _ioTimes[index]); | 2381 | avg += _ioTimes[index]; |
632 | 2397 | } | 2382 | } |
634 | 2398 | _expectedIoTime = max; | 2383 | avg /= IO_MEASUREMENT_CYCLES; |
635 | 2384 | |||
636 | 2385 | _expectedIoTime = avg; | ||
637 | 2399 | if (elapsed > _slowIoAlertThreshold * NS_PER_MS) { | 2386 | if (elapsed > _slowIoAlertThreshold * NS_PER_MS) { |
639 | 2400 | _persistit.getLogBase().longJournalIO.log(elapsed / NS_PER_MS); | 2387 | _persistit.getLogBase().longJournalIO.log(elapsed / NS_PER_MS, IO_MEASUREMENT_CYCLES, avg |
640 | 2388 | / NS_PER_MS); | ||
641 | 2401 | } | 2389 | } |
642 | 2402 | 2390 | ||
643 | 2403 | } catch (final Exception e) { | 2391 | } catch (final Exception e) { |
644 | @@ -2416,6 +2404,7 @@ | |||
645 | 2416 | } finally { | 2404 | } finally { |
646 | 2417 | _flushing.set(false); | 2405 | _flushing.set(false); |
647 | 2418 | } | 2406 | } |
648 | 2407 | |||
649 | 2419 | } | 2408 | } |
650 | 2420 | 2409 | ||
651 | 2421 | @Override | 2410 | @Override |
652 | @@ -2426,14 +2415,12 @@ | |||
653 | 2426 | 2415 | ||
654 | 2427 | synchronized void selectForCopy(final List<PageNode> list) { | 2416 | synchronized void selectForCopy(final List<PageNode> list) { |
655 | 2428 | list.clear(); | 2417 | list.clear(); |
656 | 2429 | _droppedPageCount += cleanupPageList(); | ||
657 | 2430 | if (!_appendOnly.get()) { | 2418 | if (!_appendOnly.get()) { |
658 | 2431 | final long timeStampUpperBound = Math.min(getLastValidCheckpointTimestamp(), _copierTimestampLimit); | 2419 | final long timeStampUpperBound = Math.min(getLastValidCheckpointTimestamp(), _copierTimestampLimit); |
659 | 2432 | for (final Iterator<PageNode> iterator = _pageList.iterator(); iterator.hasNext();) { | 2420 | for (final Iterator<PageNode> iterator = _pageList.iterator(); iterator.hasNext();) { |
660 | 2433 | final PageNode pageNode = iterator.next(); | 2421 | final PageNode pageNode = iterator.next(); |
662 | 2434 | for (PageNode pn = pageNode; pn != null; pn = pn.getPrevious()) { | 2422 | for (PageNode pn = pageNode; pn != null && !pn.isInvalid(); pn = pn.getPrevious()) { |
663 | 2435 | if (pn.getTimestamp() < timeStampUpperBound) { | 2423 | if (pn.getTimestamp() < timeStampUpperBound) { |
664 | 2436 | assert !pn.isInvalid(); | ||
665 | 2437 | list.add(pn); | 2424 | list.add(pn); |
666 | 2438 | break; | 2425 | break; |
667 | 2439 | } | 2426 | } |
668 | @@ -2517,6 +2504,7 @@ | |||
669 | 2517 | Volume volumeRef = null; | 2504 | Volume volumeRef = null; |
670 | 2518 | Volume volume = null; | 2505 | Volume volume = null; |
671 | 2519 | int handle = -1; | 2506 | int handle = -1; |
672 | 2507 | final Set<Volume> volumes = new HashSet<Volume>(); | ||
673 | 2520 | 2508 | ||
674 | 2521 | for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) { | 2509 | for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) { |
675 | 2522 | if (_closed.get() && !_copyFast.get() || _appendOnly.get()) { | 2510 | if (_closed.get() && !_copyFast.get() || _appendOnly.get()) { |
676 | @@ -2567,6 +2555,7 @@ | |||
677 | 2567 | 2555 | ||
678 | 2568 | try { | 2556 | try { |
679 | 2569 | volume.getStorage().writePage(bb, pageAddress); | 2557 | volume.getStorage().writePage(bb, pageAddress); |
680 | 2558 | volumes.add(volume); | ||
681 | 2570 | } catch (final PersistitException ioe) { | 2559 | } catch (final PersistitException ioe) { |
682 | 2571 | _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(), | 2560 | _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(), |
683 | 2572 | pageNode.getJournalAddress()); | 2561 | pageNode.getJournalAddress()); |
684 | @@ -2578,6 +2567,10 @@ | |||
685 | 2578 | pageNode.getJournalAddress(), urgency()); | 2567 | pageNode.getJournalAddress(), urgency()); |
686 | 2579 | } | 2568 | } |
687 | 2580 | 2569 | ||
688 | 2570 | for (final Volume vol : volumes) { | ||
689 | 2571 | vol.getStorage().force(); | ||
690 | 2572 | } | ||
691 | 2573 | |||
692 | 2581 | } | 2574 | } |
693 | 2582 | 2575 | ||
694 | 2583 | private void cleanupForCopy(final List<PageNode> list) throws PersistitException { | 2576 | private void cleanupForCopy(final List<PageNode> list) throws PersistitException { |
695 | @@ -2715,26 +2708,21 @@ | |||
696 | 2715 | * @return Count of removed PageNode instances. | 2708 | * @return Count of removed PageNode instances. |
697 | 2716 | */ | 2709 | */ |
698 | 2717 | int cleanupPageList() { | 2710 | int cleanupPageList() { |
712 | 2718 | int to = -1; | 2711 | final int size = _pageList.size(); |
713 | 2719 | int count = 0; | 2712 | int from; |
714 | 2720 | for (int index = _pageList.size(); --index >= 0;) { | 2713 | for (from = 0; from < size && !_pageList.get(from).isInvalid(); from++) |
715 | 2721 | if (_pageList.get(index).isInvalid()) { | 2714 | ; |
716 | 2722 | if (to == -1) { | 2715 | int to = from; |
717 | 2723 | to = index; | 2716 | for (from = from + 1; from < size; from++) { |
718 | 2724 | } | 2717 | final PageNode pn = _pageList.get(from); |
719 | 2725 | } else { | 2718 | if (!pn.isInvalid()) { |
720 | 2726 | if (to != -1) { | 2719 | _pageList.set(to++, pn); |
708 | 2727 | _pageList.removeRange(index + 1, to + 1); | ||
709 | 2728 | count += to - index; | ||
710 | 2729 | to = -1; | ||
711 | 2730 | } | ||
721 | 2731 | } | 2720 | } |
722 | 2732 | } | 2721 | } |
726 | 2733 | if (to != -1) { | 2722 | if (size > to) { |
727 | 2734 | _pageList.removeRange(0, to + 1); | 2723 | _pageList.removeRange(to, size); |
725 | 2735 | count += to + 1; | ||
728 | 2736 | } | 2724 | } |
730 | 2737 | return count; | 2725 | return size - to; |
731 | 2738 | } | 2726 | } |
732 | 2739 | 2727 | ||
733 | 2740 | private void reportJournalFileCount() { | 2728 | private void reportJournalFileCount() { |
734 | @@ -2744,11 +2732,11 @@ | |||
735 | 2744 | */ | 2732 | */ |
736 | 2745 | final int journalFileCount = getJournalFileCount(); | 2733 | final int journalFileCount = getJournalFileCount(); |
737 | 2746 | if (journalFileCount != _lastReportedJournalFileCount) { | 2734 | if (journalFileCount != _lastReportedJournalFileCount) { |
739 | 2747 | if (journalFileCount > TOO_MANY_ERROR_THRESHOLD) { | 2735 | if (journalFileCount > TOO_MANY_ERROR_THRESHOLD + _urgentFileCountThreshold) { |
740 | 2748 | _persistit.getAlertMonitor() | 2736 | _persistit.getAlertMonitor() |
741 | 2749 | .post(new Event(AlertLevel.ERROR, _persistit.getLogBase().tooManyJournalFilesError, | 2737 | .post(new Event(AlertLevel.ERROR, _persistit.getLogBase().tooManyJournalFilesError, |
742 | 2750 | journalFileCount), AlertMonitor.MANY_JOURNAL_FILES); | 2738 | journalFileCount), AlertMonitor.MANY_JOURNAL_FILES); |
744 | 2751 | } else if (journalFileCount > TOO_MANY_WARN_THRESHOLD) { | 2739 | } else if (journalFileCount > TOO_MANY_WARN_THRESHOLD + _urgentFileCountThreshold) { |
745 | 2752 | _persistit.getAlertMonitor() | 2740 | _persistit.getAlertMonitor() |
746 | 2753 | .post(new Event(AlertLevel.WARN, _persistit.getLogBase().tooManyJournalFilesWarning, | 2741 | .post(new Event(AlertLevel.WARN, _persistit.getLogBase().tooManyJournalFilesWarning, |
747 | 2754 | journalFileCount), AlertMonitor.MANY_JOURNAL_FILES); | 2742 | journalFileCount), AlertMonitor.MANY_JOURNAL_FILES); |
748 | @@ -2923,11 +2911,11 @@ | |||
749 | 2923 | _liveTransactionMap.clear(); | 2911 | _liveTransactionMap.clear(); |
750 | 2924 | } | 2912 | } |
751 | 2925 | 2913 | ||
753 | 2926 | synchronized long getCurrentJournalSize() { | 2914 | long getCurrentJournalSize() { |
754 | 2927 | return _currentAddress % _blockSize; | 2915 | return _currentAddress % _blockSize; |
755 | 2928 | } | 2916 | } |
756 | 2929 | 2917 | ||
758 | 2930 | synchronized int getJournalFileCount() { | 2918 | int getJournalFileCount() { |
759 | 2931 | return (int) (_currentAddress / _blockSize - _baseAddress / _blockSize) + 1; | 2919 | return (int) (_currentAddress / _blockSize - _baseAddress / _blockSize) + 1; |
760 | 2932 | } | 2920 | } |
761 | 2933 | 2921 | ||
762 | @@ -2989,8 +2977,4 @@ | |||
763 | 2989 | public SortedMap<Integer, TreeDescriptor> queryTreeMap() { | 2977 | public SortedMap<Integer, TreeDescriptor> queryTreeMap() { |
764 | 2990 | return new TreeMap<Integer, TreeDescriptor>(_handleToTreeMap); | 2978 | return new TreeMap<Integer, TreeDescriptor>(_handleToTreeMap); |
765 | 2991 | } | 2979 | } |
766 | 2992 | |||
767 | 2993 | long getWaitLoopsWithNoDelay() { | ||
768 | 2994 | return _waitLoopsWithNoDelay.get(); | ||
769 | 2995 | } | ||
770 | 2996 | } | 2980 | } |
771 | 2997 | 2981 | ||
772 | === modified file 'src/main/java/com/persistit/Persistit.java' | |||
773 | --- src/main/java/com/persistit/Persistit.java 2012-09-11 14:14:55 +0000 | |||
774 | +++ src/main/java/com/persistit/Persistit.java 2012-09-12 22:04:20 +0000 | |||
775 | @@ -327,11 +327,11 @@ | |||
776 | 327 | private final static SplitPolicy DEFAULT_SPLIT_POLICY = SplitPolicy.PACK_BIAS; | 327 | private final static SplitPolicy DEFAULT_SPLIT_POLICY = SplitPolicy.PACK_BIAS; |
777 | 328 | private final static JoinPolicy DEFAULT_JOIN_POLICY = JoinPolicy.EVEN_BIAS; | 328 | private final static JoinPolicy DEFAULT_JOIN_POLICY = JoinPolicy.EVEN_BIAS; |
778 | 329 | private final static CommitPolicy DEFAULT_TRANSACTION_COMMIT_POLICY = CommitPolicy.SOFT; | 329 | private final static CommitPolicy DEFAULT_TRANSACTION_COMMIT_POLICY = CommitPolicy.SOFT; |
784 | 330 | private final static long DEFAULT_COMMIT_LEAD_TIME = 100; | 330 | private final static long DEFAULT_COMMIT_LEAD_TIME_MS = 100; |
785 | 331 | private final static long DEFAULT_COMMIT_STALL_TIME = 1; | 331 | private final static long DEFAULT_COMMIT_STALL_TIME_MS = 1; |
786 | 332 | private final static long MAX_COMMIT_LEAD_TIME = 5000; | 332 | private final static long MAX_COMMIT_LEAD_TIME_MS = 5000; |
787 | 333 | private final static long MAX_COMMIT_STALL_TIME = 5000; | 333 | private final static long MAX_COMMIT_STALL_TIME_MS = 5000; |
788 | 334 | private final static long FLUSH_DELAY_INTERVAL = 5000; | 334 | private final static long LOG_FLUSH_DELAY_INTERVAL_MS = 5000; |
789 | 335 | 335 | ||
790 | 336 | private final static int MAX_FATAL_ERROR_MESSAGES = 10; | 336 | private final static int MAX_FATAL_ERROR_MESSAGES = 10; |
791 | 337 | 337 | ||
792 | @@ -365,7 +365,7 @@ | |||
793 | 365 | public void run() { | 365 | public void run() { |
794 | 366 | while (!_stop) { | 366 | while (!_stop) { |
795 | 367 | try { | 367 | try { |
797 | 368 | Util.sleep(FLUSH_DELAY_INTERVAL); | 368 | Util.sleep(LOG_FLUSH_DELAY_INTERVAL_MS); |
798 | 369 | } catch (final PersistitInterruptedException ie) { | 369 | } catch (final PersistitInterruptedException ie) { |
799 | 370 | break; | 370 | break; |
800 | 371 | } | 371 | } |
801 | @@ -458,9 +458,9 @@ | |||
802 | 458 | 458 | ||
803 | 459 | private volatile CommitPolicy _defaultCommitPolicy = DEFAULT_TRANSACTION_COMMIT_POLICY; | 459 | private volatile CommitPolicy _defaultCommitPolicy = DEFAULT_TRANSACTION_COMMIT_POLICY; |
804 | 460 | 460 | ||
806 | 461 | private volatile long _commitLeadTime = DEFAULT_COMMIT_LEAD_TIME; | 461 | private volatile long _commitLeadTime = DEFAULT_COMMIT_LEAD_TIME_MS; |
807 | 462 | 462 | ||
809 | 463 | private volatile long _commitStallTime = DEFAULT_COMMIT_STALL_TIME; | 463 | private volatile long _commitStallTime = DEFAULT_COMMIT_STALL_TIME_MS; |
810 | 464 | 464 | ||
811 | 465 | private final ThreadLocal<SoftReference<int[]>> _intArrayThreadLocal = new ThreadLocal<SoftReference<int[]>>(); | 465 | private final ThreadLocal<SoftReference<int[]>> _intArrayThreadLocal = new ThreadLocal<SoftReference<int[]>>(); |
812 | 466 | 466 | ||
813 | @@ -1447,6 +1447,8 @@ | |||
814 | 1447 | */ | 1447 | */ |
815 | 1448 | for (int i = 0; i < 5; ++i) { | 1448 | for (int i = 0; i < 5; ++i) { |
816 | 1449 | if (!_closed.get() && _initialized.get()) { | 1449 | if (!_closed.get() && _initialized.get()) { |
817 | 1450 | _transactionIndex.updateActiveTransactionCache(); | ||
818 | 1451 | _journalManager.pruneObsoleteTransactions(); | ||
819 | 1450 | _checkpointManager.checkpoint(); | 1452 | _checkpointManager.checkpoint(); |
820 | 1451 | _journalManager.copyBack(); | 1453 | _journalManager.copyBack(); |
821 | 1452 | final int fileCount = _journalManager.getJournalFileCount(); | 1454 | final int fileCount = _journalManager.getJournalFileCount(); |
822 | @@ -2051,7 +2053,7 @@ | |||
823 | 2051 | } | 2053 | } |
824 | 2052 | 2054 | ||
825 | 2053 | void setTransactionCommitleadTime(final long time) { | 2055 | void setTransactionCommitleadTime(final long time) { |
827 | 2054 | _commitLeadTime = Util.rangeCheck(time, 0, MAX_COMMIT_LEAD_TIME); | 2056 | _commitLeadTime = Util.rangeCheck(time, 0, MAX_COMMIT_LEAD_TIME_MS); |
828 | 2055 | } | 2057 | } |
829 | 2056 | 2058 | ||
830 | 2057 | long getTransactionCommitStallTime() { | 2059 | long getTransactionCommitStallTime() { |
831 | @@ -2059,7 +2061,7 @@ | |||
832 | 2059 | } | 2061 | } |
833 | 2060 | 2062 | ||
834 | 2061 | void setTransactionCommitStallTime(final long time) { | 2063 | void setTransactionCommitStallTime(final long time) { |
836 | 2062 | _commitStallTime = Util.rangeCheck(time, 0, MAX_COMMIT_STALL_TIME); | 2064 | _commitStallTime = Util.rangeCheck(time, 0, MAX_COMMIT_STALL_TIME_MS); |
837 | 2063 | } | 2065 | } |
838 | 2064 | 2066 | ||
839 | 2065 | /** | 2067 | /** |
840 | 2066 | 2068 | ||
841 | === modified file 'src/main/java/com/persistit/TransactionPlayer.java' | |||
842 | --- src/main/java/com/persistit/TransactionPlayer.java 2012-08-24 13:57:19 +0000 | |||
843 | +++ src/main/java/com/persistit/TransactionPlayer.java 2012-09-12 22:04:20 +0000 | |||
844 | @@ -300,7 +300,9 @@ | |||
845 | 300 | if (VolumeStructure.DIRECTORY_TREE_NAME.equals(td.getTreeName())) { | 300 | if (VolumeStructure.DIRECTORY_TREE_NAME.equals(td.getTreeName())) { |
846 | 301 | return volume.getStructure().directoryExchange(); | 301 | return volume.getStructure().directoryExchange(); |
847 | 302 | } else { | 302 | } else { |
849 | 303 | return _support.getPersistit().getExchange(volume, td.getTreeName(), true); | 303 | final Exchange exchange = _support.getPersistit().getExchange(volume, td.getTreeName(), true); |
850 | 304 | exchange.ignoreTransactions(); | ||
851 | 305 | return exchange; | ||
852 | 304 | } | 306 | } |
853 | 305 | } | 307 | } |
854 | 306 | 308 | ||
855 | 307 | 309 | ||
856 | === modified file 'src/main/java/com/persistit/logging/LogBase.java' | |||
857 | --- src/main/java/com/persistit/logging/LogBase.java 2012-08-24 14:00:17 +0000 | |||
858 | +++ src/main/java/com/persistit/logging/LogBase.java 2012-09-12 22:04:20 +0000 | |||
859 | @@ -232,7 +232,7 @@ | |||
860 | 232 | @Message("WARNING|Crash retried %,d times on %s") | 232 | @Message("WARNING|Crash retried %,d times on %s") |
861 | 233 | public final LogItem crashRetry = PersistitLogMessage.empty(); | 233 | public final LogItem crashRetry = PersistitLogMessage.empty(); |
862 | 234 | 234 | ||
864 | 235 | @Message("WARNING|Journal flush operation took %,dms") | 235 | @Message("WARNING|Journal flush operation took %,dms last %,d cycles average is %,dms") |
865 | 236 | public final LogItem longJournalIO = PersistitLogMessage.empty(); | 236 | public final LogItem longJournalIO = PersistitLogMessage.empty(); |
866 | 237 | 237 | ||
867 | 238 | @Message("INFO|Normal journal file count %,d") | 238 | @Message("INFO|Normal journal file count %,d") |
868 | @@ -247,7 +247,7 @@ | |||
869 | 247 | @Message("INFO|Preloading buffer pool inventory recorded at %tc") | 247 | @Message("INFO|Preloading buffer pool inventory recorded at %tc") |
870 | 248 | public final LogItem bufferInventoryLoad = PersistitLogMessage.empty(); | 248 | public final LogItem bufferInventoryLoad = PersistitLogMessage.empty(); |
871 | 249 | 249 | ||
873 | 250 | @Message("INFO|Preloaded %,d of %,d buffers") | 250 | @Message("INFO|Preloaded %,d of %,d buffers in %,d seconds") |
874 | 251 | public final LogItem bufferInventoryProgress = PersistitLogMessage.empty(); | 251 | public final LogItem bufferInventoryProgress = PersistitLogMessage.empty(); |
875 | 252 | 252 | ||
876 | 253 | @Message("WARNING|Exception while writing buffer pool inventory %s") | 253 | @Message("WARNING|Exception while writing buffer pool inventory %s") |
877 | 254 | 254 | ||
878 | === modified file 'src/main/java/com/persistit/mxbeans/IOMeterMXBean.java' | |||
879 | --- src/main/java/com/persistit/mxbeans/IOMeterMXBean.java 2012-08-24 13:57:19 +0000 | |||
880 | +++ src/main/java/com/persistit/mxbeans/IOMeterMXBean.java 2012-09-12 22:04:20 +0000 | |||
881 | @@ -69,7 +69,7 @@ | |||
882 | 69 | /** | 69 | /** |
883 | 70 | * @return the quiescentIOthreshold | 70 | * @return the quiescentIOthreshold |
884 | 71 | */ | 71 | */ |
886 | 72 | @Description("Disk I/O scheduling parameter in bytes per second specifying threshold " | 72 | @Description("Disk I/O scheduling parameter in KBytes per second specifying threshold " |
887 | 73 | + "between \"quiescent\" and \"busy\" states") | 73 | + "between \"quiescent\" and \"busy\" states") |
888 | 74 | public long getQuiescentIOthreshold(); | 74 | public long getQuiescentIOthreshold(); |
889 | 75 | 75 | ||
890 | @@ -77,7 +77,7 @@ | |||
891 | 77 | * @param quiescentIO | 77 | * @param quiescentIO |
892 | 78 | * the quiescentIOthreshold to set | 78 | * the quiescentIOthreshold to set |
893 | 79 | */ | 79 | */ |
895 | 80 | @Description("Disk I/O scheduling parameter in bytes per second specifying threshold " | 80 | @Description("Disk I/O scheduling parameter in KBytes per second specifying threshold " |
896 | 81 | + "between \"quiescent\" and \"busy\" states") | 81 | + "between \"quiescent\" and \"busy\" states") |
897 | 82 | public void setQuiescentIOthreshold(long quiescentIO); | 82 | public void setQuiescentIOthreshold(long quiescentIO); |
898 | 83 | 83 | ||
899 | 84 | 84 | ||
900 | === modified file 'src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java' | |||
901 | --- src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-08-24 13:57:19 +0000 | |||
902 | +++ src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-09-12 22:04:20 +0000 | |||
903 | @@ -76,14 +76,20 @@ | |||
904 | 76 | * Default time interval (in milliseconds) between calls to the | 76 | * Default time interval (in milliseconds) between calls to the |
905 | 77 | * FileChannel.force() method. | 77 | * FileChannel.force() method. |
906 | 78 | */ | 78 | */ |
908 | 79 | final static long DEFAULT_FLUSH_INTERVAL = 100; | 79 | final static long DEFAULT_FLUSH_INTERVAL_MS = 100; |
909 | 80 | 80 | ||
910 | 81 | /** | 81 | /** |
911 | 82 | * Default time interval (in milliseconds) between calls to the journal | 82 | * Default time interval (in milliseconds) between calls to the journal |
912 | 83 | * copier method. | 83 | * copier method. |
913 | 84 | */ | 84 | */ |
916 | 85 | final static long DEFAULT_COPIER_INTERVAL = 10000; | 85 | final static long DEFAULT_COPIER_INTERVAL_MS = 10000; |
917 | 86 | 86 | /** | |
918 | 87 | * Default journal file count at which transactions are throttled to allow | ||
919 | 88 | * copier to catch up. | ||
920 | 89 | */ | ||
921 | 90 | final static int DEFAULT_URGENT_FILE_COUNT_THRESHOLD = 15; | ||
922 | 91 | final static int MINIMUM_URGENT_FILE_COUNT_THRESHOLD = 5; | ||
923 | 92 | final static int MAXIMUM_URGENT_FILE_COUNT_THRESHOLD = 100; | ||
924 | 87 | /** | 93 | /** |
925 | 88 | * Default value for maximum pages to be copied per cycle. | 94 | * Default value for maximum pages to be copied per cycle. |
926 | 89 | */ | 95 | */ |
927 | @@ -94,17 +100,17 @@ | |||
928 | 94 | * exceptions on attempts to write to the journal. Prevents excessively | 100 | * exceptions on attempts to write to the journal. Prevents excessively |
929 | 95 | * verbose log on repeated failures. | 101 | * verbose log on repeated failures. |
930 | 96 | */ | 102 | */ |
934 | 97 | final static long DEFAULT_LOG_REPEAT_INTERVAL = 60000L; | 103 | final static long DEFAULT_LOG_REPEAT_INTERVAL_MS = 60000L; |
935 | 98 | final static long MINIMUM_LOG_REPEAT_INTERVAL = 1000L; | 104 | final static long MINIMUM_LOG_REPEAT_INTERVAL_MS = 1000L; |
936 | 99 | final static long MAXIMUM_LOG_REPEAT_INTERVAL = Long.MAX_VALUE; | 105 | final static long MAXIMUM_LOG_REPEAT_INTERVAL_MS = Long.MAX_VALUE; |
937 | 100 | /** | 106 | /** |
938 | 101 | * Default threshold time in milliseconds for JournalManager flush | 107 | * Default threshold time in milliseconds for JournalManager flush |
939 | 102 | * operations. If a flush operation takes longer than this time, a WARNING | 108 | * operations. If a flush operation takes longer than this time, a WARNING |
940 | 103 | * message is written to the log. | 109 | * message is written to the log. |
941 | 104 | */ | 110 | */ |
945 | 105 | final static long DEFAULT_SLOW_IO_ALERT_THRESHOLD = 2000L; | 111 | final static long DEFAULT_SLOW_IO_ALERT_THRESHOLD_MS = 2000L; |
946 | 106 | final static long MINIMUM_SLOW_ALERT_THRESHOLD = 100L; | 112 | final static long MINIMUM_SLOW_ALERT_THRESHOLD_MS = 100L; |
947 | 107 | final static long MAXIMUM_SLOW_ALERT_THRESHOLD = Long.MAX_VALUE; | 113 | final static long MAXIMUM_SLOW_ALERT_THRESHOLD_MS = Long.MAX_VALUE; |
948 | 108 | 114 | ||
949 | 109 | /** | 115 | /** |
950 | 110 | * File name appended when journal path specifies only a directory | 116 | * File name appended when journal path specifies only a directory |
951 | @@ -115,12 +121,6 @@ | |||
952 | 115 | */ | 121 | */ |
953 | 116 | final static String PATH_FORMAT = "%s.%012d"; | 122 | final static String PATH_FORMAT = "%s.%012d"; |
954 | 117 | 123 | ||
955 | 118 | /** | ||
956 | 119 | * Default setting for number of pages in the page map before the urgency of | ||
957 | 120 | * copying starts to increase. | ||
958 | 121 | */ | ||
959 | 122 | final static int DEFAULT_PAGE_MAP_SIZE_BASE = 250000; | ||
960 | 123 | |||
961 | 124 | final static int MAXIMUM_CONCURRENT_TRANSACTIONS = 10000; | 124 | final static int MAXIMUM_CONCURRENT_TRANSACTIONS = 10000; |
962 | 125 | 125 | ||
963 | 126 | @Description("Number of transaction map items in the live map") | 126 | @Description("Number of transaction map items in the live map") |
964 | @@ -243,4 +243,9 @@ | |||
965 | 243 | @Description("Threshold in milliseconds for warnings of long duration flush cycles") | 243 | @Description("Threshold in milliseconds for warnings of long duration flush cycles") |
966 | 244 | void setSlowIoAlertThreshold(long slowIoAlertThreshold); | 244 | void setSlowIoAlertThreshold(long slowIoAlertThreshold); |
967 | 245 | 245 | ||
968 | 246 | @Description("Journal file count threshold for throttling transactions") | ||
969 | 247 | int getUrgentFileCountThreshold(); | ||
970 | 248 | |||
971 | 249 | @Description("Journal file count threshold for throttling transactions") | ||
972 | 250 | void setUrgentFileCountThreshold(int threshold); | ||
973 | 246 | } | 251 | } |
974 | 247 | 252 | ||
975 | === modified file 'src/test/java/com/persistit/JournalManagerTest.java' | |||
976 | --- src/test/java/com/persistit/JournalManagerTest.java 2012-09-04 21:01:23 +0000 | |||
977 | +++ src/test/java/com/persistit/JournalManagerTest.java 2012-09-12 22:04:20 +0000 | |||
978 | @@ -33,6 +33,7 @@ | |||
979 | 33 | import java.util.HashMap; | 33 | import java.util.HashMap; |
980 | 34 | import java.util.HashSet; | 34 | import java.util.HashSet; |
981 | 35 | import java.util.Iterator; | 35 | import java.util.Iterator; |
982 | 36 | import java.util.LinkedList; | ||
983 | 36 | import java.util.List; | 37 | import java.util.List; |
984 | 37 | import java.util.Map; | 38 | import java.util.Map; |
985 | 38 | import java.util.Properties; | 39 | import java.util.Properties; |
986 | @@ -43,7 +44,6 @@ | |||
987 | 43 | 44 | ||
988 | 44 | import com.persistit.CheckpointManager.Checkpoint; | 45 | import com.persistit.CheckpointManager.Checkpoint; |
989 | 45 | import com.persistit.JournalManager.PageNode; | 46 | import com.persistit.JournalManager.PageNode; |
990 | 46 | import com.persistit.Transaction.CommitPolicy; | ||
991 | 47 | import com.persistit.TransactionPlayer.TransactionPlayerListener; | 47 | import com.persistit.TransactionPlayer.TransactionPlayerListener; |
992 | 48 | import com.persistit.exception.PersistitException; | 48 | import com.persistit.exception.PersistitException; |
993 | 49 | import com.persistit.unit.ConcurrentUtil.ThrowingRunnable; | 49 | import com.persistit.unit.ConcurrentUtil.ThrowingRunnable; |
994 | @@ -440,7 +440,7 @@ | |||
995 | 440 | * Randomly invalidated PageNodes | 440 | * Randomly invalidated PageNodes |
996 | 441 | */ | 441 | */ |
997 | 442 | { | 442 | { |
999 | 443 | final int SIZE = 5000; | 443 | final int SIZE = 1000000; |
1000 | 444 | final Random random = new Random(1); | 444 | final Random random = new Random(1); |
1001 | 445 | final List<PageNode> source = testCleanupPageListSource(SIZE); | 445 | final List<PageNode> source = testCleanupPageListSource(SIZE); |
1002 | 446 | int next = -1; | 446 | int next = -1; |
1003 | @@ -448,8 +448,8 @@ | |||
1004 | 448 | if (index < next) { | 448 | if (index < next) { |
1005 | 449 | source.get(index).invalidate(); | 449 | source.get(index).invalidate(); |
1006 | 450 | } else { | 450 | } else { |
1009 | 451 | index += random.nextInt(50); | 451 | index += random.nextInt(5); |
1010 | 452 | next = random.nextInt(50) + index; | 452 | next = random.nextInt(5) + index; |
1011 | 453 | } | 453 | } |
1012 | 454 | } | 454 | } |
1013 | 455 | testCleanupPageListHelper(source); | 455 | testCleanupPageListHelper(source); |
1014 | @@ -538,34 +538,16 @@ | |||
1015 | 538 | .getIgnoredUpdates() > 0); | 538 | .getIgnoredUpdates() > 0); |
1016 | 539 | } | 539 | } |
1017 | 540 | 540 | ||
1018 | 541 | @Test | ||
1019 | 542 | public void waitForDurabilitySoaksCPU() throws Exception { | ||
1020 | 543 | _persistit.setDefaultTransactionCommitPolicy(CommitPolicy.HARD); | ||
1021 | 544 | final JournalManager jman = _persistit.getJournalManager(); | ||
1022 | 545 | long waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay(); | ||
1023 | 546 | final Exchange exchange = _persistit.getExchange("persistit", "JournalManagerTest", true); | ||
1024 | 547 | final Transaction txn = exchange.getTransaction(); | ||
1025 | 548 | for (int count = 0; count < 1000; count++) { | ||
1026 | 549 | txn.begin(); | ||
1027 | 550 | exchange.getValue().put(RED_FOX + count); | ||
1028 | 551 | exchange.to(count).store(); | ||
1029 | 552 | txn.commit(); | ||
1030 | 553 | txn.end(); | ||
1031 | 554 | } | ||
1032 | 555 | waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay() - waitLoopsWithoutDelay; | ||
1033 | 556 | assertEquals("Wait loops without delay", 0, waitLoopsWithoutDelay); | ||
1034 | 557 | } | ||
1035 | 558 | |||
1036 | 559 | private List<PageNode> testCleanupPageListSource(final int size) { | 541 | private List<PageNode> testCleanupPageListSource(final int size) { |
1037 | 560 | final List<PageNode> source = new ArrayList<PageNode>(size); | 542 | final List<PageNode> source = new ArrayList<PageNode>(size); |
1039 | 561 | for (int index = 0; index < 1000000; index++) { | 543 | for (int index = 0; index < size; index++) { |
1040 | 562 | source.add(new PageNode(0, index, index * 10, index)); | 544 | source.add(new PageNode(0, index, index * 10, index)); |
1041 | 563 | } | 545 | } |
1042 | 564 | return source; | 546 | return source; |
1043 | 565 | } | 547 | } |
1044 | 566 | 548 | ||
1045 | 567 | private void testCleanupPageListHelper(final List<PageNode> source) throws Exception { | 549 | private void testCleanupPageListHelper(final List<PageNode> source) throws Exception { |
1047 | 568 | final List<PageNode> cleaned = new ArrayList<PageNode>(source); | 550 | final List<PageNode> cleaned = new LinkedList<PageNode>(source); |
1048 | 569 | for (final Iterator<PageNode> iterator = cleaned.iterator(); iterator.hasNext();) { | 551 | for (final Iterator<PageNode> iterator = cleaned.iterator(); iterator.hasNext();) { |
1049 | 570 | if (iterator.next().isInvalid()) { | 552 | if (iterator.next().isInvalid()) { |
1050 | 571 | iterator.remove(); | 553 | iterator.remove(); |
Pushed a couple of additional changes inspired by observing tests:
- cleanupPageList is now O(N) rather O(N*N)
- non-transactions store/remove operations now throttle
- the throttle() method itself is now faster - the throttle interval is computed every few seconds by the JOURNAL_COPIER thread.
- a couple of minor code cleanups