Merge lp:~pbeaman/akiban-persistit/fix-wait-for-durability into lp:akiban-persistit
- fix-wait-for-durability
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Peter Beaman | ||||
Approved revision: | 381 | ||||
Merged at revision: | 368 | ||||
Proposed branch: | lp:~pbeaman/akiban-persistit/fix-wait-for-durability | ||||
Merge into: | lp:akiban-persistit | ||||
Diff against target: |
1050 lines (+237/-196) 12 files modified
src/main/java/com/persistit/BufferPool.java (+13/-5) src/main/java/com/persistit/CleanupManager.java (+32/-11) src/main/java/com/persistit/Exchange.java (+11/-4) src/main/java/com/persistit/IOMeter.java (+14/-4) src/main/java/com/persistit/IntegrityCheck.java (+20/-0) src/main/java/com/persistit/JournalManager.java (+102/-118) src/main/java/com/persistit/Persistit.java (+12/-10) src/main/java/com/persistit/TransactionPlayer.java (+3/-1) src/main/java/com/persistit/logging/LogBase.java (+2/-2) src/main/java/com/persistit/mxbeans/IOMeterMXBean.java (+2/-2) src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java (+20/-15) src/test/java/com/persistit/JournalManagerTest.java (+6/-24) |
||||
To merge this branch: | bzr merge lp:~pbeaman/akiban-persistit/fix-wait-for-durability | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Akiban Build User | Needs Fixing | ||
Nathan Williams | Approve | ||
Review via email: mp+123174@code.launchpad.net |
Commit message
Description of the change
Modifies the JournalManager#
* Modify BufferPool preload warmup logging to show elapsed time
* CleanupManager no longer calls pruneObsoleteTr
* JournalManager: added UrgentFileCount
* JournalManager: Move the calls to force Volume updates back into the copier thread because waiting until Checkpoints causes massive I/O
* JournalManager: Remove ReentrantReadWr
* JournalManager: Corrected time interval calculations in waitForDurability
* JournalManager: Changed calculation of expected IO time to use average rather than max. Actual behavior is very erratic, and our best effort to delay commit() until completion of a force cycle is very inaccurate; we may want to remove the behavior and certainly should modify or remove the documentation.
* JournalManager: Relaxed the leadTime and flusher poll times from 100ms to 500ms. (This change may no longer be necessary; will test and revise as appropriate.)
* Persistit: Corrected a problem in copyBackPages() in which the relaxed pruneObsoleteTr
Peter Beaman (pbeaman) wrote : | # |
Nathan Williams (nwilliams) wrote : | # |
This look pretty good.
Just a few questions:
IOMeter#
JournalManager#
Minor:
Probably just my preference, but constants with units in the name make one less thing to check (new intervals in CleanupManager).
Peter Beaman (pbeaman) wrote : | # |
Thanks, good questions.
DEFAULT_
I/O is currently being done and when that happens the copier interval
goes down. I'm not wedded to that heuristic, but it has been there
since the beginning. The change was to normalize the units in IOMeter.
The IORate field attempts to show KBytes per second, whereas the
threshold used to be in Bytes per second. Now the threshold is
expressed in KBytes per second too. Clearly there's a neglected
documentation task - I will fix that.
selectNodesForCopy used to call cleanupPageList before looping, and
the assert made sense. However, because the cleanup operation can be
expensive (even with the new algorithm) I reduced the number of number
of times it is called, and therefore the selectNodesForCopy method can
now see and skip invalid PageNodes.
I agree about units in the constant names and will change.
On Mon, Sep 10, 2012 at 10:29 AM, Nathan Williams <email address hidden> wrote:
> Review: Needs Information
>
> This look pretty good.
>
> Just a few questions:
> IOMeter#
>
> JournalManager#
>
> Minor:
> Probably just my preference, but constants with units in the name make one less thing to check (new intervals in CleanupManager).
> --
> https:/
> You are the owner of lp:~pbeaman/akiban-persistit/fix-wait-for-durability.
Peter Beaman (pbeaman) wrote : | # |
Revised as requested. The CleanupManager time constants are name named XXX_MS as are a number of other constants.
From original proposal reverted the changes in JOURNAL_FLUSHER poll time and SOFT commit lead times to their former values. I believe these changes didn't help and are not needed for good performance and want to benchmark again with the original values.
Nathan Williams (nwilliams) wrote : | # |
Thanks for the tweaks. Looks good to me.
Akiban Build User (build-akiban) wrote : | # |
There was one failure during build/test:
* unknown exception (check log)
Peter Beaman (pbeaman) wrote : | # |
Jenkins glitch. Reapproving.
Preview Diff
1 | === modified file 'src/main/java/com/persistit/BufferPool.java' |
2 | --- src/main/java/com/persistit/BufferPool.java 2012-08-24 14:00:17 +0000 |
3 | +++ src/main/java/com/persistit/BufferPool.java 2012-09-12 22:04:20 +0000 |
4 | @@ -97,10 +97,11 @@ |
5 | */ |
6 | private final static int INVENTORY_VERSIONS = 3; |
7 | |
8 | + private final static long NS_PER_SEC = 1000000000; |
9 | /** |
10 | - * Preload log multiple |
11 | + * Preload log message interval, in seconds |
12 | */ |
13 | - private final static int INVENTORY_PRELOAD_LOG_MESSAGE_MULTIPLE = 10000; |
14 | + private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_SEC; |
15 | |
16 | /** |
17 | * The Persistit instance that references this BufferPool. |
18 | @@ -1451,6 +1452,9 @@ |
19 | void preloadBufferInventory() { |
20 | int count = 0; |
21 | int total = 0; |
22 | + final long startTime = System.nanoTime(); |
23 | + long reportTime = startTime; |
24 | + |
25 | try { |
26 | final JournalManager jman = _persistit.getJournalManager(); |
27 | final Exchange exchange = getBufferInventoryExchange(); |
28 | @@ -1496,8 +1500,11 @@ |
29 | final Buffer buff = get(vol, pn.getPageAddress(), false, true); |
30 | buff.release(); |
31 | count++; |
32 | - if ((count % INVENTORY_PRELOAD_LOG_MESSAGE_MULTIPLE) == 0) { |
33 | - _persistit.getLogBase().bufferInventoryProgress.log(count, total); |
34 | + final long now = System.nanoTime(); |
35 | + if (now - reportTime >= INVENTORY_PRELOAD_LOG_MESSAGE_NS) { |
36 | + _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) |
37 | + / NS_PER_SEC); |
38 | + reportTime = now; |
39 | } |
40 | if (count >= _bufferCount) { |
41 | // |
42 | @@ -1513,7 +1520,8 @@ |
43 | } catch (final PersistitException e) { |
44 | _persistit.getLogBase().bufferInventoryException.log(e); |
45 | } finally { |
46 | - _persistit.getLogBase().bufferInventoryProgress.log(count, total); |
47 | + final long now = System.nanoTime(); |
48 | + _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_SEC); |
49 | } |
50 | } |
51 | |
52 | |
53 | === modified file 'src/main/java/com/persistit/CleanupManager.java' |
54 | --- src/main/java/com/persistit/CleanupManager.java 2012-08-24 13:57:19 +0000 |
55 | +++ src/main/java/com/persistit/CleanupManager.java 2012-09-12 22:04:20 +0000 |
56 | @@ -36,15 +36,19 @@ |
57 | void performAction(Persistit persistit) throws PersistitException; |
58 | } |
59 | |
60 | - final static long DEFAULT_CLEANUP_INTERVAL = 1000; |
61 | + final static long DEFAULT_CLEANUP_INTERVAL_MS = 1000; |
62 | |
63 | final static int DEFAULT_QUEUE_SIZE = 50000; |
64 | |
65 | - final static int WORKLIST_LENGTH = 500; |
66 | - |
67 | - private final static long DEFAULT_MINIMUM_PRUNING_DELAY = 1000; |
68 | - |
69 | - final Queue<CleanupAction> _cleanupActionQueue = new ArrayBlockingQueue<CleanupAction>(DEFAULT_QUEUE_SIZE); |
70 | + private final static int WORKLIST_LENGTH = 500; |
71 | + |
72 | + private final static long MINIMUM_MAINTENANCE_INTERVAL_NS = 1000000000L; |
73 | + |
74 | + private final static long MINIMUM_PRUNE_OBSOLETE_TRANSACTIONS_INTERVAL_NS = 50000000000L; |
75 | + |
76 | + private final static long DEFAULT_MINIMUM_PRUNING_DELAY_NS = 1000; |
77 | + |
78 | + private final Queue<CleanupAction> _cleanupActionQueue = new ArrayBlockingQueue<CleanupAction>(DEFAULT_QUEUE_SIZE); |
79 | |
80 | private final AtomicBoolean _closed = new AtomicBoolean(); |
81 | |
82 | @@ -56,7 +60,11 @@ |
83 | |
84 | private final AtomicLong _errors = new AtomicLong(); |
85 | |
86 | - private final AtomicLong _minimumPruningDelay = new AtomicLong(DEFAULT_MINIMUM_PRUNING_DELAY); |
87 | + private final AtomicLong _minimumPruningDelay = new AtomicLong(DEFAULT_MINIMUM_PRUNING_DELAY_NS); |
88 | + |
89 | + private long _lastMaintenance; |
90 | + |
91 | + private long _lastPruneObsoleteTransactions; |
92 | |
93 | CleanupManager(final Persistit persistit) { |
94 | super(persistit); |
95 | @@ -64,7 +72,10 @@ |
96 | |
97 | public void start() { |
98 | _closed.set(false); |
99 | - start("CLEANUP_MANAGER", DEFAULT_CLEANUP_INTERVAL); |
100 | + final long now = System.nanoTime(); |
101 | + _lastMaintenance = now; |
102 | + _lastPruneObsoleteTransactions = now; |
103 | + start("CLEANUP_MANAGER", DEFAULT_CLEANUP_INTERVAL_MS); |
104 | } |
105 | |
106 | public void close(final boolean flush) throws PersistitException { |
107 | @@ -138,9 +149,19 @@ |
108 | |
109 | @Override |
110 | public void poll() throws Exception { |
111 | - _persistit.getIOMeter().poll(); |
112 | - _persistit.cleanup(); |
113 | - _persistit.getJournalManager().pruneObsoleteTransactions(); |
114 | + |
115 | + final long now = System.nanoTime(); |
116 | + if (now - _lastMaintenance > MINIMUM_MAINTENANCE_INTERVAL_NS) { |
117 | + _persistit.getIOMeter().poll(); |
118 | + _persistit.cleanup(); |
119 | + _lastMaintenance = now; |
120 | + } |
121 | + |
122 | + if (now - _lastPruneObsoleteTransactions > MINIMUM_PRUNE_OBSOLETE_TRANSACTIONS_INTERVAL_NS) { |
123 | + _persistit.getJournalManager().pruneObsoleteTransactions(); |
124 | + _lastPruneObsoleteTransactions = now; |
125 | + } |
126 | + |
127 | final List<CleanupAction> workList = new ArrayList<CleanupAction>(WORKLIST_LENGTH); |
128 | synchronized (this) { |
129 | while (workList.size() < WORKLIST_LENGTH) { |
130 | |
131 | === modified file 'src/main/java/com/persistit/Exchange.java' |
132 | --- src/main/java/com/persistit/Exchange.java 2012-08-28 14:29:50 +0000 |
133 | +++ src/main/java/com/persistit/Exchange.java 2012-09-12 22:04:20 +0000 |
134 | @@ -1290,6 +1290,9 @@ |
135 | if (!isDirectoryExchange()) { |
136 | _persistit.checkSuspended(); |
137 | } |
138 | + if (!_ignoreTransactions && !_transaction.isActive()) { |
139 | + _persistit.getJournalManager().throttle(); |
140 | + } |
141 | // TODO: directoryExchange, and lots of tests, don't use transactions. |
142 | // Skip MVCC for now. |
143 | int options = StoreOptions.WAIT; |
144 | @@ -2990,13 +2993,9 @@ |
145 | } |
146 | |
147 | private boolean removeInternal(final Direction selection, final boolean fetchFirst) throws PersistitException { |
148 | - assertCorrectThread(true); |
149 | - _persistit.checkClosed(); |
150 | - |
151 | if (selection != EQ && selection != GTEQ && selection != GT) { |
152 | throw new IllegalArgumentException("Invalid mode " + selection); |
153 | } |
154 | - |
155 | final int keySize = _key.getEncodedSize(); |
156 | |
157 | _key.copyTo(_spareKey3); |
158 | @@ -3005,6 +3004,7 @@ |
159 | // Special case for empty key |
160 | if (keySize == 0) { |
161 | if (selection == EQ) { |
162 | + assertCorrectThread(true); |
163 | return false; |
164 | } |
165 | _spareKey3.append(BEFORE); |
166 | @@ -3094,6 +3094,13 @@ |
167 | assertCorrectThread(true); |
168 | _persistit.checkClosed(); |
169 | |
170 | + if (!isDirectoryExchange()) { |
171 | + _persistit.checkSuspended(); |
172 | + } |
173 | + if (!_ignoreTransactions && !_transaction.isActive()) { |
174 | + _persistit.getJournalManager().throttle(); |
175 | + } |
176 | + |
177 | if (_ignoreTransactions || !_transaction.isActive()) { |
178 | return raw_removeKeyRangeInternal(key1, key2, fetchFirst, false); |
179 | } |
180 | |
181 | === modified file 'src/main/java/com/persistit/IOMeter.java' |
182 | --- src/main/java/com/persistit/IOMeter.java 2012-08-24 13:57:19 +0000 |
183 | +++ src/main/java/com/persistit/IOMeter.java 2012-09-12 22:04:20 +0000 |
184 | @@ -32,6 +32,7 @@ |
185 | |
186 | import com.persistit.mxbeans.IOMeterMXBean; |
187 | import com.persistit.util.ArgParser; |
188 | +import com.persistit.util.Util; |
189 | |
190 | /** |
191 | * |
192 | @@ -57,7 +58,9 @@ |
193 | private final static String DUMP_FORMAT = "time=%,12d op=%2s vol=%4s page=%,16d addr=%,16d size=%,8d index=%,7d"; |
194 | private final static int DUMP_RECORD_LENGTH = 37; |
195 | |
196 | - private final static int DEFAULT_QUIESCENT_IO_THRESHOLD = 100000; |
197 | + private final static int DEFAULT_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 100; |
198 | + private final static int MINIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 0; |
199 | + private final static int MAXIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC = 1000000; |
200 | |
201 | private final static int READ_PAGE_FROM_VOLUME = 1; |
202 | private final static int READ_PAGE_FROM_JOURNAL = 2; |
203 | @@ -72,7 +75,7 @@ |
204 | |
205 | private final static int ITEM_COUNT = 11; |
206 | |
207 | - private long _quiescentIOthreshold = DEFAULT_QUIESCENT_IO_THRESHOLD; |
208 | + private long _quiescentIOthreshold = DEFAULT_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC; |
209 | |
210 | private final AtomicReference<DataOutputStream> _logStream = new AtomicReference<DataOutputStream>(); |
211 | |
212 | @@ -149,6 +152,7 @@ |
213 | |
214 | /** |
215 | * @return the quiescentIOthreshold |
216 | + * @see #setQuiescentIOthreshold(long) |
217 | */ |
218 | @Override |
219 | public synchronized long getQuiescentIOthreshold() { |
220 | @@ -156,12 +160,18 @@ |
221 | } |
222 | |
223 | /** |
224 | + * Persistit monitors the rate at which new I/O operations are created. When |
225 | + * the IORate falls below the quiescentIOthreshold, expressed in KBytes per |
226 | + * second, the JOURNAL_COPIER accelerates its work to try to clean up older |
227 | + * journal files. |
228 | + * |
229 | * @param quiescentIOthreshold |
230 | * the quiescentIOthreshold to set |
231 | */ |
232 | @Override |
233 | - public synchronized void setQuiescentIOthreshold(final long quiescentIO) { |
234 | - _quiescentIOthreshold = quiescentIO; |
235 | + public synchronized void setQuiescentIOthreshold(final long quiescentIOthreshold) { |
236 | + _quiescentIOthreshold = Util.rangeCheck(quiescentIOthreshold, MINIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC, |
237 | + MAXIMUM_QUIESCENT_IO_THRESHOLD_KBYTES_PER_SEC); |
238 | } |
239 | |
240 | /** |
241 | |
242 | === modified file 'src/main/java/com/persistit/IntegrityCheck.java' |
243 | --- src/main/java/com/persistit/IntegrityCheck.java 2012-09-11 14:14:55 +0000 |
244 | +++ src/main/java/com/persistit/IntegrityCheck.java 2012-09-12 22:04:20 +0000 |
245 | @@ -386,6 +386,26 @@ |
246 | } |
247 | |
248 | /** |
249 | + * Control output format. When CSV mode is enabled, the output is organized |
250 | + * as comma-separated-variable text that can be imported into a spreadsheet. |
251 | + * |
252 | + * @param csvMode |
253 | + */ |
254 | + public void setCsvMode(final boolean csvMode) { |
255 | + _csv = csvMode; |
256 | + } |
257 | + |
258 | + /** |
259 | + * Indicate whether CSV mode is enabled. If so the output is organized as |
260 | + * comma-separated-variable text that can be imported into a spreadsheet. |
261 | + * |
262 | + * @return <code>true<c/code> if CSV mode is enabled. |
263 | + */ |
264 | + public boolean isCsvMode() { |
265 | + return _csv; |
266 | + } |
267 | + |
268 | + /** |
269 | * Indicate whether missing index pages should be added when an index "hole" |
270 | * is discovered. |
271 | * |
272 | |
273 | === modified file 'src/main/java/com/persistit/JournalManager.java' |
274 | --- src/main/java/com/persistit/JournalManager.java 2012-09-11 14:14:55 +0000 |
275 | +++ src/main/java/com/persistit/JournalManager.java 2012-09-12 22:04:20 +0000 |
276 | @@ -29,15 +29,15 @@ |
277 | import java.util.Collections; |
278 | import java.util.Comparator; |
279 | import java.util.HashMap; |
280 | +import java.util.HashSet; |
281 | import java.util.Iterator; |
282 | import java.util.List; |
283 | import java.util.Map; |
284 | +import java.util.Set; |
285 | import java.util.SortedMap; |
286 | import java.util.TreeMap; |
287 | -import java.util.concurrent.TimeUnit; |
288 | import java.util.concurrent.atomic.AtomicBoolean; |
289 | import java.util.concurrent.atomic.AtomicLong; |
290 | -import java.util.concurrent.locks.ReentrantReadWriteLock; |
291 | import java.util.regex.Matcher; |
292 | import java.util.regex.Pattern; |
293 | |
294 | @@ -80,9 +80,9 @@ |
295 | final static int GENTLE_COMMIT_DELAY_MILLIS = 12; |
296 | private final static long NS_PER_MS = 1000000L; |
297 | private final static int IO_MEASUREMENT_CYCLES = 8; |
298 | - |
299 | - private final static int TOO_MANY_WARN_THRESHOLD = 15; |
300 | - private final static int TOO_MANY_ERROR_THRESHOLD = 20; |
301 | + private final static int TOO_MANY_WARN_THRESHOLD = 5; |
302 | + private final static int TOO_MANY_ERROR_THRESHOLD = 10; |
303 | + private final static long KILO = 1024; |
304 | |
305 | /** |
306 | * REGEX expression that recognizes the name of a journal file. |
307 | @@ -104,7 +104,6 @@ |
308 | private final Map<TreeDescriptor, Integer> _treeToHandleMap = new HashMap<TreeDescriptor, Integer>(); |
309 | |
310 | private final Map<Integer, TreeDescriptor> _handleToTreeMap = new HashMap<Integer, TreeDescriptor>(); |
311 | - |
312 | private final Map<Long, TransactionMapItem> _liveTransactionMap = new HashMap<Long, TransactionMapItem>(); |
313 | |
314 | private final Persistit _persistit; |
315 | @@ -184,9 +183,9 @@ |
316 | |
317 | private final AtomicLong _totalFlushIoTime = new AtomicLong(); |
318 | |
319 | - private volatile long _flushInterval = DEFAULT_FLUSH_INTERVAL; |
320 | + private volatile long _flushInterval = DEFAULT_FLUSH_INTERVAL_MS; |
321 | |
322 | - private volatile long _slowIoAlertThreshold = DEFAULT_SLOW_IO_ALERT_THRESHOLD; |
323 | + private volatile long _slowIoAlertThreshold = DEFAULT_SLOW_IO_ALERT_THRESHOLD_MS; |
324 | |
325 | private final TransactionPlayer _player = new TransactionPlayer(new JournalTransactionPlayerSupport()); |
326 | |
327 | @@ -201,7 +200,7 @@ |
328 | * performs I/O. Hopefully we can set good defaults and not expose these as |
329 | * knobs. |
330 | */ |
331 | - private volatile long _copierInterval = DEFAULT_COPIER_INTERVAL; |
332 | + private volatile long _copierInterval = DEFAULT_COPIER_INTERVAL_MS; |
333 | |
334 | private volatile int _copiesPerCycle = DEFAULT_COPIES_PER_CYCLE; |
335 | |
336 | @@ -213,7 +212,9 @@ |
337 | |
338 | private boolean _allowHandlesForTempVolumesAndTrees; |
339 | |
340 | - private final AtomicLong _waitLoopsWithNoDelay = new AtomicLong(); |
341 | + private volatile int _urgentFileCountThreshold = DEFAULT_URGENT_FILE_COUNT_THRESHOLD; |
342 | + |
343 | + private volatile long _throttleSleepInterval; |
344 | |
345 | /** |
346 | * <p> |
347 | @@ -537,10 +538,22 @@ |
348 | |
349 | @Override |
350 | public void setSlowIoAlertThreshold(final long slowIoAlertThreshold) { |
351 | - Util.rangeCheck(slowIoAlertThreshold, MINIMUM_SLOW_ALERT_THRESHOLD, MAXIMUM_SLOW_ALERT_THRESHOLD); |
352 | + Util.rangeCheck(slowIoAlertThreshold, MINIMUM_SLOW_ALERT_THRESHOLD_MS, MAXIMUM_SLOW_ALERT_THRESHOLD_MS); |
353 | _slowIoAlertThreshold = slowIoAlertThreshold; |
354 | } |
355 | |
356 | + @Override |
357 | + public int getUrgentFileCountThreshold() { |
358 | + return _urgentFileCountThreshold; |
359 | + } |
360 | + |
361 | + @Override |
362 | + public void setUrgentFileCountThreshold(final int threshold) { |
363 | + Util.rangeCheck(threshold, MINIMUM_URGENT_FILE_COUNT_THRESHOLD, MAXIMUM_URGENT_FILE_COUNT_THRESHOLD); |
364 | + _urgentFileCountThreshold = threshold; |
365 | + |
366 | + } |
367 | + |
368 | /** |
369 | * Compute an "urgency" factor that determines how vigorously the |
370 | * JOURNAL_COPIER thread should perform I/O. This number is computed on a |
371 | @@ -554,8 +567,8 @@ |
372 | if (_copyFast.get()) { |
373 | return URGENT; |
374 | } |
375 | - final int journalFileCount = getJournalFileCount(); |
376 | - return Math.min(URGENT, journalFileCount); |
377 | + final int remainingFiles = _urgentFileCountThreshold - getJournalFileCount(); |
378 | + return Math.max(0, Math.min(URGENT - remainingFiles, URGENT)); |
379 | } |
380 | |
381 | /** |
382 | @@ -567,13 +580,9 @@ |
383 | * @throws PersistitInterruptedException |
384 | */ |
385 | public void throttle() throws PersistitInterruptedException { |
386 | - final int urgency = urgency(); |
387 | - if (!_appendOnly.get()) { |
388 | - if (urgency == URGENT) { |
389 | - Util.sleep(URGENT_COMMIT_DELAY_MILLIS); |
390 | - } else if (urgency >= ALMOST_URGENT) { |
391 | - Util.sleep(GENTLE_COMMIT_DELAY_MILLIS); |
392 | - } |
393 | + final long interval = _throttleSleepInterval; |
394 | + if (interval > 0) { |
395 | + Util.sleep(interval); |
396 | } |
397 | } |
398 | |
399 | @@ -1007,14 +1016,6 @@ |
400 | // |
401 | force(); |
402 | // |
403 | - // Make sure all copied pages have been flushed to disk. |
404 | - // |
405 | - for (final Volume vol : _volumeToHandleMap.keySet()) { |
406 | - if (vol.isOpened()) { |
407 | - vol.getStorage().force(); |
408 | - } |
409 | - } |
410 | - // |
411 | // Prepare room for CP.OVERHEAD bytes in the journal. If doing so |
412 | // started a new journal file then there's no need to write another |
413 | // CP record. |
414 | @@ -1227,7 +1228,6 @@ |
415 | static long fileToGeneration(final File file) { |
416 | final Matcher matcher = PATH_PATTERN.matcher(file.getName()); |
417 | if (matcher.matches()) { |
418 | - // TODO - validate range |
419 | return Long.parseLong(matcher.group(2)); |
420 | } else { |
421 | return -1; |
422 | @@ -1237,7 +1237,6 @@ |
423 | static String fileToPath(final File file) { |
424 | final Matcher matcher = PATH_PATTERN.matcher(file.getPath()); |
425 | if (matcher.matches()) { |
426 | - // TODO - validate range |
427 | return matcher.group(1); |
428 | } else { |
429 | return null; |
430 | @@ -1677,11 +1676,6 @@ |
431 | } |
432 | } |
433 | // |
434 | - // Remove the page list entries too. |
435 | - // |
436 | - _droppedPageCount += cleanupPageList(); |
437 | - |
438 | - // |
439 | // Remove any PageNode from the branchMap having a timestamp less |
440 | // than the checkpoint. Generally all such entries are removed after |
441 | // the first checkpoint that has been established after recovery. |
442 | @@ -2168,6 +2162,20 @@ |
443 | } finally { |
444 | _copying.set(false); |
445 | } |
446 | + |
447 | + long throttleInterval = 0; |
448 | + if (!_appendOnly.get()) { |
449 | + final int urgency = urgency(); |
450 | + if (urgency == URGENT) { |
451 | + throttleInterval = URGENT_COMMIT_DELAY_MILLIS; |
452 | + } else if (urgency > ALMOST_URGENT) { |
453 | + throttleInterval = GENTLE_COMMIT_DELAY_MILLIS; |
454 | + } |
455 | + } |
456 | + if (throttleInterval != _throttleSleepInterval) { |
457 | + _throttleSleepInterval = throttleInterval; |
458 | + } |
459 | + |
460 | } |
461 | |
462 | @Override |
463 | @@ -2175,14 +2183,14 @@ |
464 | return _closed.get() || _shouldStop; |
465 | } |
466 | |
467 | - @Override |
468 | /** |
469 | - * Return a nice interval, in milliseconds, to wait between |
470 | - * copierCycle invocations. The interval decreases as interval |
471 | - * goes up, and becomes zero when the urgency is 10. The interval |
472 | - * is also zero if there has be no recent I/O activity invoked |
473 | - * by other activities. |
474 | + * Return a nice interval, in milliseconds, to wait between copierCycle |
475 | + * invocations. The interval decreases as interval goes up, and becomes |
476 | + * zero when the urgency is greater than or equal to 8. The interval is |
477 | + * also zero if there has be no recent I/O activity invoked by other |
478 | + * activities. |
479 | */ |
480 | + @Override |
481 | public long getPollInterval() { |
482 | final IOMeter iom = _persistit.getIOMeter(); |
483 | final long pollInterval = super.getPollInterval(); |
484 | @@ -2198,7 +2206,7 @@ |
485 | |
486 | int divisor = 1; |
487 | |
488 | - if (iom.recentCharge() < iom.getQuiescentIOthreshold()) { |
489 | + if (iom.recentCharge() < iom.getQuiescentIOthreshold() * KILO) { |
490 | divisor = HALF_URGENT; |
491 | } else if (urgency > HALF_URGENT) { |
492 | divisor = urgency - HALF_URGENT; |
493 | @@ -2214,8 +2222,6 @@ |
494 | |
495 | private class JournalFlusher extends IOTaskRunnable { |
496 | |
497 | - final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); |
498 | - |
499 | volatile long _lastExceptionTimestamp = 0; |
500 | volatile Exception _lastException = null; |
501 | |
502 | @@ -2248,6 +2254,7 @@ |
503 | * posted an _endTimestamp larger than flushedTimestamp. |
504 | */ |
505 | final long now = System.nanoTime(); |
506 | + long remainingStallTime = stallTime; |
507 | |
508 | while (true) { |
509 | /* |
510 | @@ -2269,10 +2276,10 @@ |
511 | endTimestamp = _endTimestamp; |
512 | startTime = _startTime; |
513 | endTime = _endTime; |
514 | - if (flushedTimestamp > startTimestamp && startTimestamp > endTimestamp) { |
515 | - estimatedRemainingIoNanos = Math.max(startTime + _expectedIoTime - now, 0); |
516 | - } |
517 | if (startTimestamp == _startTimestamp && endTimestamp == _endTimestamp) { |
518 | + if (flushedTimestamp > startTimestamp && startTimestamp > endTimestamp) { |
519 | + estimatedRemainingIoNanos = Math.max(startTime + _expectedIoTime - now, 0); |
520 | + } |
521 | break; |
522 | } |
523 | Util.spinSleep(); |
524 | @@ -2280,22 +2287,23 @@ |
525 | |
526 | if (endTimestamp > flushedTimestamp && startTimestamp > flushedTimestamp) { |
527 | /* |
528 | - * Done - commit is fully durable |
529 | + * Done - commit is durable |
530 | */ |
531 | break; |
532 | } |
533 | |
534 | long remainingSleepNanos; |
535 | - boolean didWait = false; |
536 | if (estimatedRemainingIoNanos == -1) { |
537 | remainingSleepNanos = Math.max(0, _flushInterval - (now - endTime)); |
538 | } else { |
539 | remainingSleepNanos = _flushInterval; |
540 | } |
541 | |
542 | - long estimatedNanosToFinish = Math.max(estimatedRemainingIoNanos, 0); |
543 | + long estimatedNanosToFinish; |
544 | if (startTimestamp < flushedTimestamp) { |
545 | - estimatedNanosToFinish += remainingSleepNanos + _expectedIoTime; |
546 | + estimatedNanosToFinish = remainingSleepNanos + _expectedIoTime; |
547 | + } else { |
548 | + estimatedNanosToFinish = estimatedRemainingIoNanos; |
549 | } |
550 | |
551 | if (leadTime > 0 && leadTime * NS_PER_MS >= estimatedNanosToFinish) { |
552 | @@ -2311,41 +2319,20 @@ |
553 | * possible (determined by stallTime) before kicking the |
554 | * JOURNAL_FLUSHER to write the caller's transaction. |
555 | */ |
556 | - final long delay = stallTime * NS_PER_MS - estimatedNanosToFinish; |
557 | - if (delay > 0) { |
558 | - Util.sleep(delay / NS_PER_MS); |
559 | - didWait = true; |
560 | - } |
561 | - kick(); |
562 | - if (delay <= 0) { |
563 | - didWait = true; |
564 | - try { |
565 | - if (_lock.readLock().tryLock(NS_PER_MS, TimeUnit.NANOSECONDS)) { |
566 | - _lock.readLock().unlock(); |
567 | - } |
568 | - } catch (final InterruptedException e) { |
569 | - throw new PersistitInterruptedException(e); |
570 | - } |
571 | + if (remainingStallTime > 0) { |
572 | + Util.sleep(remainingStallTime); |
573 | + remainingStallTime = 0; |
574 | + } else { |
575 | + kick(); |
576 | + Util.spinSleep(); |
577 | } |
578 | } else { |
579 | /* |
580 | - * Otherwise, wait until the I/O is about half done and then |
581 | - * retry. |
582 | + * Otherwise wait for concurrent I/O operation to finish. Do |
583 | + * this by polling because our experiments with using locks |
584 | + * here showed significant excess CPU consumption. |
585 | */ |
586 | - final long delay = ((estimatedNanosToFinish - leadTime * NS_PER_MS) / 2) + NS_PER_MS; |
587 | - try { |
588 | - if (delay > 0) { |
589 | - didWait = true; |
590 | - if (_lock.readLock().tryLock(delay, TimeUnit.NANOSECONDS)) { |
591 | - _lock.readLock().unlock(); |
592 | - } |
593 | - } |
594 | - } catch (final InterruptedException e) { |
595 | - throw new PersistitInterruptedException(e); |
596 | - } |
597 | - } |
598 | - if (!didWait) { |
599 | - _waitLoopsWithNoDelay.incrementAndGet(); |
600 | + Util.spinSleep(); |
601 | } |
602 | } |
603 | if (_lastExceptionTimestamp > flushedTimestamp) { |
604 | @@ -2370,7 +2357,6 @@ |
605 | * waitForDurability to know when the I/O operation has |
606 | * finished. |
607 | */ |
608 | - _lock.writeLock().lock(); |
609 | try { |
610 | _startTimestamp = _persistit.getTimestampAllocator().updateTimestamp(); |
611 | _startTime = System.nanoTime(); |
612 | @@ -2382,22 +2368,24 @@ |
613 | } finally { |
614 | _endTime = System.nanoTime(); |
615 | _endTimestamp = _persistit.getTimestampAllocator().updateTimestamp(); |
616 | - _lock.writeLock().unlock(); |
617 | } |
618 | |
619 | final long elapsed = _endTime - _startTime; |
620 | _totalFlushCycles.incrementAndGet(); |
621 | _totalFlushIoTime.addAndGet(elapsed); |
622 | _ioTimes[_ioCycle] = elapsed; |
623 | - _ioCycle = (_ioCycle + 1) % _ioTimes.length; |
624 | + _ioCycle = (_ioCycle + 1) % IO_MEASUREMENT_CYCLES; |
625 | |
626 | - long max = 0; |
627 | - for (int index = 0; index < _ioTimes.length; index++) { |
628 | - max = Math.max(max, _ioTimes[index]); |
629 | + long avg = 0; |
630 | + for (int index = 0; index < IO_MEASUREMENT_CYCLES; index++) { |
631 | + avg += _ioTimes[index]; |
632 | } |
633 | - _expectedIoTime = max; |
634 | + avg /= IO_MEASUREMENT_CYCLES; |
635 | + |
636 | + _expectedIoTime = avg; |
637 | if (elapsed > _slowIoAlertThreshold * NS_PER_MS) { |
638 | - _persistit.getLogBase().longJournalIO.log(elapsed / NS_PER_MS); |
639 | + _persistit.getLogBase().longJournalIO.log(elapsed / NS_PER_MS, IO_MEASUREMENT_CYCLES, avg |
640 | + / NS_PER_MS); |
641 | } |
642 | |
643 | } catch (final Exception e) { |
644 | @@ -2416,6 +2404,7 @@ |
645 | } finally { |
646 | _flushing.set(false); |
647 | } |
648 | + |
649 | } |
650 | |
651 | @Override |
652 | @@ -2426,14 +2415,12 @@ |
653 | |
654 | synchronized void selectForCopy(final List<PageNode> list) { |
655 | list.clear(); |
656 | - _droppedPageCount += cleanupPageList(); |
657 | if (!_appendOnly.get()) { |
658 | final long timeStampUpperBound = Math.min(getLastValidCheckpointTimestamp(), _copierTimestampLimit); |
659 | for (final Iterator<PageNode> iterator = _pageList.iterator(); iterator.hasNext();) { |
660 | final PageNode pageNode = iterator.next(); |
661 | - for (PageNode pn = pageNode; pn != null; pn = pn.getPrevious()) { |
662 | + for (PageNode pn = pageNode; pn != null && !pn.isInvalid(); pn = pn.getPrevious()) { |
663 | if (pn.getTimestamp() < timeStampUpperBound) { |
664 | - assert !pn.isInvalid(); |
665 | list.add(pn); |
666 | break; |
667 | } |
668 | @@ -2517,6 +2504,7 @@ |
669 | Volume volumeRef = null; |
670 | Volume volume = null; |
671 | int handle = -1; |
672 | + final Set<Volume> volumes = new HashSet<Volume>(); |
673 | |
674 | for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) { |
675 | if (_closed.get() && !_copyFast.get() || _appendOnly.get()) { |
676 | @@ -2567,6 +2555,7 @@ |
677 | |
678 | try { |
679 | volume.getStorage().writePage(bb, pageAddress); |
680 | + volumes.add(volume); |
681 | } catch (final PersistitException ioe) { |
682 | _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(), |
683 | pageNode.getJournalAddress()); |
684 | @@ -2578,6 +2567,10 @@ |
685 | pageNode.getJournalAddress(), urgency()); |
686 | } |
687 | |
688 | + for (final Volume vol : volumes) { |
689 | + vol.getStorage().force(); |
690 | + } |
691 | + |
692 | } |
693 | |
694 | private void cleanupForCopy(final List<PageNode> list) throws PersistitException { |
695 | @@ -2715,26 +2708,21 @@ |
696 | * @return Count of removed PageNode instances. |
697 | */ |
698 | int cleanupPageList() { |
699 | - int to = -1; |
700 | - int count = 0; |
701 | - for (int index = _pageList.size(); --index >= 0;) { |
702 | - if (_pageList.get(index).isInvalid()) { |
703 | - if (to == -1) { |
704 | - to = index; |
705 | - } |
706 | - } else { |
707 | - if (to != -1) { |
708 | - _pageList.removeRange(index + 1, to + 1); |
709 | - count += to - index; |
710 | - to = -1; |
711 | - } |
712 | + final int size = _pageList.size(); |
713 | + int from; |
714 | + for (from = 0; from < size && !_pageList.get(from).isInvalid(); from++) |
715 | + ; |
716 | + int to = from; |
717 | + for (from = from + 1; from < size; from++) { |
718 | + final PageNode pn = _pageList.get(from); |
719 | + if (!pn.isInvalid()) { |
720 | + _pageList.set(to++, pn); |
721 | } |
722 | } |
723 | - if (to != -1) { |
724 | - _pageList.removeRange(0, to + 1); |
725 | - count += to + 1; |
726 | + if (size > to) { |
727 | + _pageList.removeRange(to, size); |
728 | } |
729 | - return count; |
730 | + return size - to; |
731 | } |
732 | |
733 | private void reportJournalFileCount() { |
734 | @@ -2744,11 +2732,11 @@ |
735 | */ |
736 | final int journalFileCount = getJournalFileCount(); |
737 | if (journalFileCount != _lastReportedJournalFileCount) { |
738 | - if (journalFileCount > TOO_MANY_ERROR_THRESHOLD) { |
739 | + if (journalFileCount > TOO_MANY_ERROR_THRESHOLD + _urgentFileCountThreshold) { |
740 | _persistit.getAlertMonitor() |
741 | .post(new Event(AlertLevel.ERROR, _persistit.getLogBase().tooManyJournalFilesError, |
742 | journalFileCount), AlertMonitor.MANY_JOURNAL_FILES); |
743 | - } else if (journalFileCount > TOO_MANY_WARN_THRESHOLD) { |
744 | + } else if (journalFileCount > TOO_MANY_WARN_THRESHOLD + _urgentFileCountThreshold) { |
745 | _persistit.getAlertMonitor() |
746 | .post(new Event(AlertLevel.WARN, _persistit.getLogBase().tooManyJournalFilesWarning, |
747 | journalFileCount), AlertMonitor.MANY_JOURNAL_FILES); |
748 | @@ -2923,11 +2911,11 @@ |
749 | _liveTransactionMap.clear(); |
750 | } |
751 | |
752 | - synchronized long getCurrentJournalSize() { |
753 | + long getCurrentJournalSize() { |
754 | return _currentAddress % _blockSize; |
755 | } |
756 | |
757 | - synchronized int getJournalFileCount() { |
758 | + int getJournalFileCount() { |
759 | return (int) (_currentAddress / _blockSize - _baseAddress / _blockSize) + 1; |
760 | } |
761 | |
762 | @@ -2989,8 +2977,4 @@ |
763 | public SortedMap<Integer, TreeDescriptor> queryTreeMap() { |
764 | return new TreeMap<Integer, TreeDescriptor>(_handleToTreeMap); |
765 | } |
766 | - |
767 | - long getWaitLoopsWithNoDelay() { |
768 | - return _waitLoopsWithNoDelay.get(); |
769 | - } |
770 | } |
771 | |
772 | === modified file 'src/main/java/com/persistit/Persistit.java' |
773 | --- src/main/java/com/persistit/Persistit.java 2012-09-11 14:14:55 +0000 |
774 | +++ src/main/java/com/persistit/Persistit.java 2012-09-12 22:04:20 +0000 |
775 | @@ -327,11 +327,11 @@ |
776 | private final static SplitPolicy DEFAULT_SPLIT_POLICY = SplitPolicy.PACK_BIAS; |
777 | private final static JoinPolicy DEFAULT_JOIN_POLICY = JoinPolicy.EVEN_BIAS; |
778 | private final static CommitPolicy DEFAULT_TRANSACTION_COMMIT_POLICY = CommitPolicy.SOFT; |
779 | - private final static long DEFAULT_COMMIT_LEAD_TIME = 100; |
780 | - private final static long DEFAULT_COMMIT_STALL_TIME = 1; |
781 | - private final static long MAX_COMMIT_LEAD_TIME = 5000; |
782 | - private final static long MAX_COMMIT_STALL_TIME = 5000; |
783 | - private final static long FLUSH_DELAY_INTERVAL = 5000; |
784 | + private final static long DEFAULT_COMMIT_LEAD_TIME_MS = 100; |
785 | + private final static long DEFAULT_COMMIT_STALL_TIME_MS = 1; |
786 | + private final static long MAX_COMMIT_LEAD_TIME_MS = 5000; |
787 | + private final static long MAX_COMMIT_STALL_TIME_MS = 5000; |
788 | + private final static long LOG_FLUSH_DELAY_INTERVAL_MS = 5000; |
789 | |
790 | private final static int MAX_FATAL_ERROR_MESSAGES = 10; |
791 | |
792 | @@ -365,7 +365,7 @@ |
793 | public void run() { |
794 | while (!_stop) { |
795 | try { |
796 | - Util.sleep(FLUSH_DELAY_INTERVAL); |
797 | + Util.sleep(LOG_FLUSH_DELAY_INTERVAL_MS); |
798 | } catch (final PersistitInterruptedException ie) { |
799 | break; |
800 | } |
801 | @@ -458,9 +458,9 @@ |
802 | |
803 | private volatile CommitPolicy _defaultCommitPolicy = DEFAULT_TRANSACTION_COMMIT_POLICY; |
804 | |
805 | - private volatile long _commitLeadTime = DEFAULT_COMMIT_LEAD_TIME; |
806 | + private volatile long _commitLeadTime = DEFAULT_COMMIT_LEAD_TIME_MS; |
807 | |
808 | - private volatile long _commitStallTime = DEFAULT_COMMIT_STALL_TIME; |
809 | + private volatile long _commitStallTime = DEFAULT_COMMIT_STALL_TIME_MS; |
810 | |
811 | private final ThreadLocal<SoftReference<int[]>> _intArrayThreadLocal = new ThreadLocal<SoftReference<int[]>>(); |
812 | |
813 | @@ -1447,6 +1447,8 @@ |
814 | */ |
815 | for (int i = 0; i < 5; ++i) { |
816 | if (!_closed.get() && _initialized.get()) { |
817 | + _transactionIndex.updateActiveTransactionCache(); |
818 | + _journalManager.pruneObsoleteTransactions(); |
819 | _checkpointManager.checkpoint(); |
820 | _journalManager.copyBack(); |
821 | final int fileCount = _journalManager.getJournalFileCount(); |
822 | @@ -2051,7 +2053,7 @@ |
823 | } |
824 | |
825 | void setTransactionCommitleadTime(final long time) { |
826 | - _commitLeadTime = Util.rangeCheck(time, 0, MAX_COMMIT_LEAD_TIME); |
827 | + _commitLeadTime = Util.rangeCheck(time, 0, MAX_COMMIT_LEAD_TIME_MS); |
828 | } |
829 | |
830 | long getTransactionCommitStallTime() { |
831 | @@ -2059,7 +2061,7 @@ |
832 | } |
833 | |
834 | void setTransactionCommitStallTime(final long time) { |
835 | - _commitStallTime = Util.rangeCheck(time, 0, MAX_COMMIT_STALL_TIME); |
836 | + _commitStallTime = Util.rangeCheck(time, 0, MAX_COMMIT_STALL_TIME_MS); |
837 | } |
838 | |
839 | /** |
840 | |
841 | === modified file 'src/main/java/com/persistit/TransactionPlayer.java' |
842 | --- src/main/java/com/persistit/TransactionPlayer.java 2012-08-24 13:57:19 +0000 |
843 | +++ src/main/java/com/persistit/TransactionPlayer.java 2012-09-12 22:04:20 +0000 |
844 | @@ -300,7 +300,9 @@ |
845 | if (VolumeStructure.DIRECTORY_TREE_NAME.equals(td.getTreeName())) { |
846 | return volume.getStructure().directoryExchange(); |
847 | } else { |
848 | - return _support.getPersistit().getExchange(volume, td.getTreeName(), true); |
849 | + final Exchange exchange = _support.getPersistit().getExchange(volume, td.getTreeName(), true); |
850 | + exchange.ignoreTransactions(); |
851 | + return exchange; |
852 | } |
853 | } |
854 | |
855 | |
856 | === modified file 'src/main/java/com/persistit/logging/LogBase.java' |
857 | --- src/main/java/com/persistit/logging/LogBase.java 2012-08-24 14:00:17 +0000 |
858 | +++ src/main/java/com/persistit/logging/LogBase.java 2012-09-12 22:04:20 +0000 |
859 | @@ -232,7 +232,7 @@ |
860 | @Message("WARNING|Crash retried %,d times on %s") |
861 | public final LogItem crashRetry = PersistitLogMessage.empty(); |
862 | |
863 | - @Message("WARNING|Journal flush operation took %,dms") |
864 | + @Message("WARNING|Journal flush operation took %,dms last %,d cycles average is %,dms") |
865 | public final LogItem longJournalIO = PersistitLogMessage.empty(); |
866 | |
867 | @Message("INFO|Normal journal file count %,d") |
868 | @@ -247,7 +247,7 @@ |
869 | @Message("INFO|Preloading buffer pool inventory recorded at %tc") |
870 | public final LogItem bufferInventoryLoad = PersistitLogMessage.empty(); |
871 | |
872 | - @Message("INFO|Preloaded %,d of %,d buffers") |
873 | + @Message("INFO|Preloaded %,d of %,d buffers in %,d seconds") |
874 | public final LogItem bufferInventoryProgress = PersistitLogMessage.empty(); |
875 | |
876 | @Message("WARNING|Exception while writing buffer pool inventory %s") |
877 | |
878 | === modified file 'src/main/java/com/persistit/mxbeans/IOMeterMXBean.java' |
879 | --- src/main/java/com/persistit/mxbeans/IOMeterMXBean.java 2012-08-24 13:57:19 +0000 |
880 | +++ src/main/java/com/persistit/mxbeans/IOMeterMXBean.java 2012-09-12 22:04:20 +0000 |
881 | @@ -69,7 +69,7 @@ |
882 | /** |
883 | * @return the quiescentIOthreshold |
884 | */ |
885 | - @Description("Disk I/O scheduling parameter in bytes per second specifying threshold " |
886 | + @Description("Disk I/O scheduling parameter in KBytes per second specifying threshold " |
887 | + "between \"quiescent\" and \"busy\" states") |
888 | public long getQuiescentIOthreshold(); |
889 | |
890 | @@ -77,7 +77,7 @@ |
891 | * @param quiescentIO |
892 | * the quiescentIOthreshold to set |
893 | */ |
894 | - @Description("Disk I/O scheduling parameter in bytes per second specifying threshold " |
895 | + @Description("Disk I/O scheduling parameter in KBytes per second specifying threshold " |
896 | + "between \"quiescent\" and \"busy\" states") |
897 | public void setQuiescentIOthreshold(long quiescentIO); |
898 | |
899 | |
900 | === modified file 'src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java' |
901 | --- src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-08-24 13:57:19 +0000 |
902 | +++ src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-09-12 22:04:20 +0000 |
903 | @@ -76,14 +76,20 @@ |
904 | * Default time interval (in milliseconds) between calls to the |
905 | * FileChannel.force() method. |
906 | */ |
907 | - final static long DEFAULT_FLUSH_INTERVAL = 100; |
908 | + final static long DEFAULT_FLUSH_INTERVAL_MS = 100; |
909 | |
910 | /** |
911 | * Default time interval (in milliseconds) between calls to the journal |
912 | * copier method. |
913 | */ |
914 | - final static long DEFAULT_COPIER_INTERVAL = 10000; |
915 | - |
916 | + final static long DEFAULT_COPIER_INTERVAL_MS = 10000; |
917 | + /** |
918 | + * Default journal file count at which transactions are throttled to allow |
919 | + * copier to catch up. |
920 | + */ |
921 | + final static int DEFAULT_URGENT_FILE_COUNT_THRESHOLD = 15; |
922 | + final static int MINIMUM_URGENT_FILE_COUNT_THRESHOLD = 5; |
923 | + final static int MAXIMUM_URGENT_FILE_COUNT_THRESHOLD = 100; |
924 | /** |
925 | * Default value for maximum pages to be copied per cycle. |
926 | */ |
927 | @@ -94,17 +100,17 @@ |
928 | * exceptions on attempts to write to the journal. Prevents excessively |
929 | * verbose log on repeated failures. |
930 | */ |
931 | - final static long DEFAULT_LOG_REPEAT_INTERVAL = 60000L; |
932 | - final static long MINIMUM_LOG_REPEAT_INTERVAL = 1000L; |
933 | - final static long MAXIMUM_LOG_REPEAT_INTERVAL = Long.MAX_VALUE; |
934 | + final static long DEFAULT_LOG_REPEAT_INTERVAL_MS = 60000L; |
935 | + final static long MINIMUM_LOG_REPEAT_INTERVAL_MS = 1000L; |
936 | + final static long MAXIMUM_LOG_REPEAT_INTERVAL_MS = Long.MAX_VALUE; |
937 | /** |
938 | * Default threshold time in milliseconds for JournalManager flush |
939 | * operations. If a flush operation takes longer than this time, a WARNING |
940 | * message is written to the log. |
941 | */ |
942 | - final static long DEFAULT_SLOW_IO_ALERT_THRESHOLD = 2000L; |
943 | - final static long MINIMUM_SLOW_ALERT_THRESHOLD = 100L; |
944 | - final static long MAXIMUM_SLOW_ALERT_THRESHOLD = Long.MAX_VALUE; |
945 | + final static long DEFAULT_SLOW_IO_ALERT_THRESHOLD_MS = 2000L; |
946 | + final static long MINIMUM_SLOW_ALERT_THRESHOLD_MS = 100L; |
947 | + final static long MAXIMUM_SLOW_ALERT_THRESHOLD_MS = Long.MAX_VALUE; |
948 | |
949 | /** |
950 | * File name appended when journal path specifies only a directory |
951 | @@ -115,12 +121,6 @@ |
952 | */ |
953 | final static String PATH_FORMAT = "%s.%012d"; |
954 | |
955 | - /** |
956 | - * Default setting for number of pages in the page map before the urgency of |
957 | - * copying starts to increase. |
958 | - */ |
959 | - final static int DEFAULT_PAGE_MAP_SIZE_BASE = 250000; |
960 | - |
961 | final static int MAXIMUM_CONCURRENT_TRANSACTIONS = 10000; |
962 | |
963 | @Description("Number of transaction map items in the live map") |
964 | @@ -243,4 +243,9 @@ |
965 | @Description("Threshold in milliseconds for warnings of long duration flush cycles") |
966 | void setSlowIoAlertThreshold(long slowIoAlertThreshold); |
967 | |
968 | + @Description("Journal file count threshold for throttling transactions") |
969 | + int getUrgentFileCountThreshold(); |
970 | + |
971 | + @Description("Journal file count threshold for throttling transactions") |
972 | + void setUrgentFileCountThreshold(int threshold); |
973 | } |
974 | |
975 | === modified file 'src/test/java/com/persistit/JournalManagerTest.java' |
976 | --- src/test/java/com/persistit/JournalManagerTest.java 2012-09-04 21:01:23 +0000 |
977 | +++ src/test/java/com/persistit/JournalManagerTest.java 2012-09-12 22:04:20 +0000 |
978 | @@ -33,6 +33,7 @@ |
979 | import java.util.HashMap; |
980 | import java.util.HashSet; |
981 | import java.util.Iterator; |
982 | +import java.util.LinkedList; |
983 | import java.util.List; |
984 | import java.util.Map; |
985 | import java.util.Properties; |
986 | @@ -43,7 +44,6 @@ |
987 | |
988 | import com.persistit.CheckpointManager.Checkpoint; |
989 | import com.persistit.JournalManager.PageNode; |
990 | -import com.persistit.Transaction.CommitPolicy; |
991 | import com.persistit.TransactionPlayer.TransactionPlayerListener; |
992 | import com.persistit.exception.PersistitException; |
993 | import com.persistit.unit.ConcurrentUtil.ThrowingRunnable; |
994 | @@ -440,7 +440,7 @@ |
995 | * Randomly invalidated PageNodes |
996 | */ |
997 | { |
998 | - final int SIZE = 5000; |
999 | + final int SIZE = 1000000; |
1000 | final Random random = new Random(1); |
1001 | final List<PageNode> source = testCleanupPageListSource(SIZE); |
1002 | int next = -1; |
1003 | @@ -448,8 +448,8 @@ |
1004 | if (index < next) { |
1005 | source.get(index).invalidate(); |
1006 | } else { |
1007 | - index += random.nextInt(50); |
1008 | - next = random.nextInt(50) + index; |
1009 | + index += random.nextInt(5); |
1010 | + next = random.nextInt(5) + index; |
1011 | } |
1012 | } |
1013 | testCleanupPageListHelper(source); |
1014 | @@ -538,34 +538,16 @@ |
1015 | .getIgnoredUpdates() > 0); |
1016 | } |
1017 | |
1018 | - @Test |
1019 | - public void waitForDurabilitySoaksCPU() throws Exception { |
1020 | - _persistit.setDefaultTransactionCommitPolicy(CommitPolicy.HARD); |
1021 | - final JournalManager jman = _persistit.getJournalManager(); |
1022 | - long waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay(); |
1023 | - final Exchange exchange = _persistit.getExchange("persistit", "JournalManagerTest", true); |
1024 | - final Transaction txn = exchange.getTransaction(); |
1025 | - for (int count = 0; count < 1000; count++) { |
1026 | - txn.begin(); |
1027 | - exchange.getValue().put(RED_FOX + count); |
1028 | - exchange.to(count).store(); |
1029 | - txn.commit(); |
1030 | - txn.end(); |
1031 | - } |
1032 | - waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay() - waitLoopsWithoutDelay; |
1033 | - assertEquals("Wait loops without delay", 0, waitLoopsWithoutDelay); |
1034 | - } |
1035 | - |
1036 | private List<PageNode> testCleanupPageListSource(final int size) { |
1037 | final List<PageNode> source = new ArrayList<PageNode>(size); |
1038 | - for (int index = 0; index < 1000000; index++) { |
1039 | + for (int index = 0; index < size; index++) { |
1040 | source.add(new PageNode(0, index, index * 10, index)); |
1041 | } |
1042 | return source; |
1043 | } |
1044 | |
1045 | private void testCleanupPageListHelper(final List<PageNode> source) throws Exception { |
1046 | - final List<PageNode> cleaned = new ArrayList<PageNode>(source); |
1047 | + final List<PageNode> cleaned = new LinkedList<PageNode>(source); |
1048 | for (final Iterator<PageNode> iterator = cleaned.iterator(); iterator.hasNext();) { |
1049 | if (iterator.next().isInvalid()) { |
1050 | iterator.remove(); |
Pushed a couple of additional changes inspired by observing tests:
- cleanupPageList is now O(N) rather O(N*N)
- non-transactions store/remove operations now throttle
- the throttle() method itself is now faster - the throttle interval is computed every few seconds by the JOURNAL_COPIER thread.
- a couple of minor code cleanups