Merge lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 354
Merged at revision: 353
Proposed branch: lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop
Merge into: lp:akiban-persistit
Diff against target: 194 lines (+58/-20)
2 files modified
src/main/java/com/persistit/JournalManager.java (+39/-20)
src/test/java/com/persistit/JournalManagerTest.java (+19/-0)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+119552@code.launchpad.net

Description of the change

Fixes bug 1036422 - CommitPolicy HARD can soak CPU.

Modify logic in JournalManager.JournalFlusher#waitForDurability to make sure no loop gets executed without at least a short (1ms) sleep interval. Add logic to detect a failure to do that and a unit test to demonstrate the problem and that it is fixed.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

The vast majority of this diff is formatting changes. Haven't commented on that in a while, so it seemed appropriate :)

Otherwise, looks good -- simple and tested.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/main/java/com/persistit/JournalManager.java'
--- src/main/java/com/persistit/JournalManager.java 2012-08-02 14:19:26 +0000
+++ src/main/java/com/persistit/JournalManager.java 2012-08-14 14:22:24 +0000
@@ -212,6 +212,8 @@
212212
213 private boolean _allowHandlesForTempVolumesAndTrees;213 private boolean _allowHandlesForTempVolumesAndTrees;
214214
215 private AtomicLong _waitLoopsWithNoDelay = new AtomicLong();
216
215 /**217 /**
216 * <p>218 * <p>
217 * Initialize the new journal. This method takes its information from the219 * Initialize the new journal. This method takes its information from the
@@ -648,8 +650,8 @@
648 synchronized (this) {650 synchronized (this) {
649 if (address >= _writeBufferAddress && address + length <= _currentAddress) {651 if (address >= _writeBufferAddress && address + length <= _currentAddress) {
650 assert _writeBufferAddress + _writeBuffer.position() == _currentAddress : String.format(652 assert _writeBufferAddress + _writeBuffer.position() == _currentAddress : String.format(
651 "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress, _writeBuffer653 "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress,
652 .position(), _currentAddress);654 _writeBuffer.position(), _currentAddress);
653 final int wbPosition = _writeBuffer.position();655 final int wbPosition = _writeBuffer.position();
654 final int wbLimit = _writeBuffer.limit();656 final int wbLimit = _writeBuffer.limit();
655 _writeBuffer.position((int) (address - _writeBufferAddress));657 _writeBuffer.position((int) (address - _writeBufferAddress));
@@ -948,8 +950,8 @@
948 advance(TM.OVERHEAD);950 advance(TM.OVERHEAD);
949 int offset = 0;951 int offset = 0;
950 for (final TransactionMapItem ts : _liveTransactionMap.values()) {952 for (final TransactionMapItem ts : _liveTransactionMap.values()) {
951 TM.putEntry(_writeBuffer, offset / TM.ENTRY_SIZE, ts.getStartTimestamp(), ts.getCommitTimestamp(), ts953 TM.putEntry(_writeBuffer, offset / TM.ENTRY_SIZE, ts.getStartTimestamp(), ts.getCommitTimestamp(),
952 .getStartAddress(), ts.getLastRecordAddress());954 ts.getStartAddress(), ts.getLastRecordAddress());
953 offset += TM.ENTRY_SIZE;955 offset += TM.ENTRY_SIZE;
954 count--;956 count--;
955 if (count == 0 || offset + TM.ENTRY_SIZE >= _writeBuffer.remaining()) {957 if (count == 0 || offset + TM.ENTRY_SIZE >= _writeBuffer.remaining()) {
@@ -1303,8 +1305,8 @@
1303 if (address != Long.MAX_VALUE && _writeBuffer != null) {1305 if (address != Long.MAX_VALUE && _writeBuffer != null) {
13041306
1305 assert _writeBufferAddress + _writeBuffer.position() == _currentAddress : String.format(1307 assert _writeBufferAddress + _writeBuffer.position() == _currentAddress : String.format(
1306 "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress, _writeBuffer1308 "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress,
1307 .position(), _currentAddress);1309 _writeBuffer.position(), _currentAddress);
13081310
1309 try {1311 try {
1310 if (_writeBuffer.position() > 0) {1312 if (_writeBuffer.position() > 0) {
@@ -1360,8 +1362,8 @@
1360 }1362 }
13611363
1362 assert _writeBufferAddress + _writeBuffer.position() == _currentAddress : String.format(1364 assert _writeBufferAddress + _writeBuffer.position() == _currentAddress : String.format(
1363 "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress, _writeBuffer1365 "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress,
1364 .position(), _currentAddress);1366 _writeBuffer.position(), _currentAddress);
13651367
1366 _persistit.getIOMeter().chargeFlushJournal(written, address);1368 _persistit.getIOMeter().chargeFlushJournal(written, address);
1367 return _writeBufferAddress;1369 return _writeBufferAddress;
@@ -2220,6 +2222,7 @@
2220 }2222 }
22212223
2222 long remainingSleepNanos;2224 long remainingSleepNanos;
2225 boolean didWait = false;
2223 if (estimatedRemainingIoNanos == -1) {2226 if (estimatedRemainingIoNanos == -1) {
2224 remainingSleepNanos = Math.max(0, _flushInterval - (now - endTime));2227 remainingSleepNanos = Math.max(0, _flushInterval - (now - endTime));
2225 } else {2228 } else {
@@ -2247,22 +2250,33 @@
2247 long delay = stallTime * NS_PER_MS - estimatedNanosToFinish;2250 long delay = stallTime * NS_PER_MS - estimatedNanosToFinish;
2248 if (delay > 0) {2251 if (delay > 0) {
2249 Util.sleep(delay / NS_PER_MS);2252 Util.sleep(delay / NS_PER_MS);
2253 didWait = true;
2250 }2254 }
2251 kick();2255 kick();
2256 if (delay <= 0) {
2257 didWait = true;
2258 Util.spinSleep();
2259 }
2252 } else {2260 } else {
2253 /*2261 /*
2254 * Otherwise, wait until the I/O is about half done and then2262 * Otherwise, wait until the I/O is about half done and then
2255 * retry.2263 * retry.
2256 */2264 */
2257 long delay = (estimatedNanosToFinish - leadTime * NS_PER_MS) / 2;2265 long delay = ((estimatedNanosToFinish - leadTime * NS_PER_MS) / 2) + NS_PER_MS;
2258 try {2266 try {
2259 if (delay > 0 && _lock.readLock().tryLock(delay, TimeUnit.NANOSECONDS)) {2267 if (delay > 0) {
2260 _lock.readLock().unlock();2268 didWait = true;
2269 if (_lock.readLock().tryLock(delay, TimeUnit.NANOSECONDS)) {
2270 _lock.readLock().unlock();
2271 }
2261 }2272 }
2262 } catch (InterruptedException e) {2273 } catch (InterruptedException e) {
2263 throw new PersistitInterruptedException(e);2274 throw new PersistitInterruptedException(e);
2264 }2275 }
2265 }2276 }
2277 if (!didWait) {
2278 _waitLoopsWithNoDelay.incrementAndGet();
2279 }
2266 }2280 }
2267 if (_lastExceptionTimestamp > timestamp) {2281 if (_lastExceptionTimestamp > timestamp) {
2268 final Exception e = _lastException;2282 final Exception e = _lastException;
@@ -2406,9 +2420,10 @@
2406 }2420 }
2407 pageAddress = readPageBufferFromJournal(stablePageNode, bb);2421 pageAddress = readPageBufferFromJournal(stablePageNode, bb);
2408 } catch (PersistitException ioe) {2422 } catch (PersistitException ioe) {
2409 _persistit.getAlertMonitor().post(2423 _persistit
2410 new Event(AlertLevel.ERROR, _persistit.getLogBase().copyException, ioe, volume, pageNode2424 .getAlertMonitor()
2411 .getPageAddress(), pageNode.getJournalAddress()), AlertMonitor.JOURNAL_CATEGORY);2425 .post(new Event(AlertLevel.ERROR, _persistit.getLogBase().copyException, ioe, volume,
2426 pageNode.getPageAddress(), pageNode.getJournalAddress()), AlertMonitor.JOURNAL_CATEGORY);
2412 throw ioe;2427 throw ioe;
2413 }2428 }
24142429
@@ -2454,8 +2469,8 @@
2454 }2469 }
2455 if (volume == null) {2470 if (volume == null) {
2456 _persistit.getAlertMonitor().post(2471 _persistit.getAlertMonitor().post(
2457 new Event(AlertLevel.WARN, _persistit.getLogBase().missingVolume, volumeRef, pageNode2472 new Event(AlertLevel.WARN, _persistit.getLogBase().missingVolume, volumeRef,
2458 .getJournalAddress()), AlertMonitor.MISSING_VOLUME_CATEGORY);2473 pageNode.getJournalAddress()), AlertMonitor.MISSING_VOLUME_CATEGORY);
2459 if (_ignoreMissingVolume.get()) {2474 if (_ignoreMissingVolume.get()) {
2460 _persistit.getLogBase().lostPageFromMissingVolume.log(pageNode.getPageAddress(), volumeRef,2475 _persistit.getLogBase().lostPageFromMissingVolume.log(pageNode.getPageAddress(), volumeRef,
2461 pageNode.getJournalAddress());2476 pageNode.getJournalAddress());
@@ -2483,8 +2498,8 @@
2483 try {2498 try {
2484 volume.getStorage().writePage(bb, pageAddress);2499 volume.getStorage().writePage(bb, pageAddress);
2485 } catch (PersistitException ioe) {2500 } catch (PersistitException ioe) {
2486 _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(), pageNode2501 _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(),
2487 .getJournalAddress());2502 pageNode.getJournalAddress());
2488 throw ioe;2503 throw ioe;
2489 }2504 }
24902505
@@ -2771,8 +2786,8 @@
2771 */2786 */
2772 if (ts != null) {2787 if (ts != null) {
2773 if (ts.getMvvCount() > 0 && _persistit.isInitialized()) {2788 if (ts.getMvvCount() > 0 && _persistit.isInitialized()) {
2774 _persistit.getLogBase().pruningIncomplete.log(ts, TransactionPlayer.addressToString(address,2789 _persistit.getLogBase().pruningIncomplete.log(ts,
2775 timestamp));2790 TransactionPlayer.addressToString(address, timestamp));
2776 }2791 }
2777 }2792 }
2778 }2793 }
@@ -2905,4 +2920,8 @@
2905 public SortedMap<Integer, TreeDescriptor> queryTreeMap() {2920 public SortedMap<Integer, TreeDescriptor> queryTreeMap() {
2906 return new TreeMap<Integer, TreeDescriptor>(_handleToTreeMap);2921 return new TreeMap<Integer, TreeDescriptor>(_handleToTreeMap);
2907 }2922 }
2923
2924 long getWaitLoopsWithNoDelay() {
2925 return _waitLoopsWithNoDelay.get();
2926 }
2908}2927}
29092928
=== modified file 'src/test/java/com/persistit/JournalManagerTest.java'
--- src/test/java/com/persistit/JournalManagerTest.java 2012-08-02 14:19:26 +0000
+++ src/test/java/com/persistit/JournalManagerTest.java 2012-08-14 14:22:24 +0000
@@ -43,6 +43,7 @@
4343
44import com.persistit.CheckpointManager.Checkpoint;44import com.persistit.CheckpointManager.Checkpoint;
45import com.persistit.JournalManager.PageNode;45import com.persistit.JournalManager.PageNode;
46import com.persistit.Transaction.CommitPolicy;
46import com.persistit.TransactionPlayer.TransactionPlayerListener;47import com.persistit.TransactionPlayer.TransactionPlayerListener;
47import com.persistit.exception.PersistitException;48import com.persistit.exception.PersistitException;
48import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;49import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
@@ -531,6 +532,24 @@
531 assertTrue("Should have failed updates during recovery", _persistit.getRecoveryManager().getPlayer()532 assertTrue("Should have failed updates during recovery", _persistit.getRecoveryManager().getPlayer()
532 .getIgnoredUpdates() > 0);533 .getIgnoredUpdates() > 0);
533 }534 }
535
536 @Test
537 public void waitForDurabilitySoaksCPU() throws Exception {
538 _persistit.setDefaultTransactionCommitPolicy(CommitPolicy.HARD);
539 final JournalManager jman = _persistit.getJournalManager();
540 long waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay();
541 final Exchange exchange = _persistit.getExchange("persistit", "JournalManagerTest", true);
542 final Transaction txn = exchange.getTransaction();
543 for (int count = 0; count < 100; count++) {
544 txn.begin();
545 exchange.getValue().put(RED_FOX + count);
546 exchange.to(count).store();
547 txn.commit();
548 txn.end();
549 }
550 waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay() - waitLoopsWithoutDelay;
551 assertEquals("Wait loops without delay", 0, waitLoopsWithoutDelay);
552 }
534553
535 private List<PageNode> testCleanupPageListSource(final int size) {554 private List<PageNode> testCleanupPageListSource(final int size) {
536 final List<PageNode> source = new ArrayList<PageNode>(size);555 final List<PageNode> source = new ArrayList<PageNode>(size);

Subscribers

People subscribed via source and target branches