Merge lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 354
Merged at revision: 353
Proposed branch: lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop
Merge into: lp:akiban-persistit
Diff against target: 194 lines (+58/-20)
2 files modified
src/main/java/com/persistit/JournalManager.java (+39/-20)
src/test/java/com/persistit/JournalManagerTest.java (+19/-0)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+119552@code.launchpad.net

Description of the change

Fixes bug 1036422 - CommitPolicy HARD can soak CPU.

Modify logic in JournalManager.JournalFlusher#waitForDurability to make sure no loop gets executed without at least a short (1ms) sleep interval. Add logic to detect a failure to do that and a unit test to demonstrate the problem and that it is fixed.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

The vast majority of this diff is formatting changes. Haven't commented on that in a while, so it seemed appropriate :)

Otherwise, looks good -- simple and tested.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/JournalManager.java'
2--- src/main/java/com/persistit/JournalManager.java 2012-08-02 14:19:26 +0000
3+++ src/main/java/com/persistit/JournalManager.java 2012-08-14 14:22:24 +0000
4@@ -212,6 +212,8 @@
5
6 private boolean _allowHandlesForTempVolumesAndTrees;
7
8+ private AtomicLong _waitLoopsWithNoDelay = new AtomicLong();
9+
10 /**
11 * <p>
12 * Initialize the new journal. This method takes its information from the
13@@ -648,8 +650,8 @@
14 synchronized (this) {
15 if (address >= _writeBufferAddress && address + length <= _currentAddress) {
16 assert _writeBufferAddress + _writeBuffer.position() == _currentAddress : String.format(
17- "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress, _writeBuffer
18- .position(), _currentAddress);
19+ "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress,
20+ _writeBuffer.position(), _currentAddress);
21 final int wbPosition = _writeBuffer.position();
22 final int wbLimit = _writeBuffer.limit();
23 _writeBuffer.position((int) (address - _writeBufferAddress));
24@@ -948,8 +950,8 @@
25 advance(TM.OVERHEAD);
26 int offset = 0;
27 for (final TransactionMapItem ts : _liveTransactionMap.values()) {
28- TM.putEntry(_writeBuffer, offset / TM.ENTRY_SIZE, ts.getStartTimestamp(), ts.getCommitTimestamp(), ts
29- .getStartAddress(), ts.getLastRecordAddress());
30+ TM.putEntry(_writeBuffer, offset / TM.ENTRY_SIZE, ts.getStartTimestamp(), ts.getCommitTimestamp(),
31+ ts.getStartAddress(), ts.getLastRecordAddress());
32 offset += TM.ENTRY_SIZE;
33 count--;
34 if (count == 0 || offset + TM.ENTRY_SIZE >= _writeBuffer.remaining()) {
35@@ -1303,8 +1305,8 @@
36 if (address != Long.MAX_VALUE && _writeBuffer != null) {
37
38 assert _writeBufferAddress + _writeBuffer.position() == _currentAddress : String.format(
39- "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress, _writeBuffer
40- .position(), _currentAddress);
41+ "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress,
42+ _writeBuffer.position(), _currentAddress);
43
44 try {
45 if (_writeBuffer.position() > 0) {
46@@ -1360,8 +1362,8 @@
47 }
48
49 assert _writeBufferAddress + _writeBuffer.position() == _currentAddress : String.format(
50- "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress, _writeBuffer
51- .position(), _currentAddress);
52+ "writeBufferAddress=%,d position=%,d currentAddress=%,d", _writeBufferAddress,
53+ _writeBuffer.position(), _currentAddress);
54
55 _persistit.getIOMeter().chargeFlushJournal(written, address);
56 return _writeBufferAddress;
57@@ -2220,6 +2222,7 @@
58 }
59
60 long remainingSleepNanos;
61+ boolean didWait = false;
62 if (estimatedRemainingIoNanos == -1) {
63 remainingSleepNanos = Math.max(0, _flushInterval - (now - endTime));
64 } else {
65@@ -2247,22 +2250,33 @@
66 long delay = stallTime * NS_PER_MS - estimatedNanosToFinish;
67 if (delay > 0) {
68 Util.sleep(delay / NS_PER_MS);
69+ didWait = true;
70 }
71 kick();
72+ if (delay <= 0) {
73+ didWait = true;
74+ Util.spinSleep();
75+ }
76 } else {
77 /*
78 * Otherwise, wait until the I/O is about half done and then
79 * retry.
80 */
81- long delay = (estimatedNanosToFinish - leadTime * NS_PER_MS) / 2;
82+ long delay = ((estimatedNanosToFinish - leadTime * NS_PER_MS) / 2) + NS_PER_MS;
83 try {
84- if (delay > 0 && _lock.readLock().tryLock(delay, TimeUnit.NANOSECONDS)) {
85- _lock.readLock().unlock();
86+ if (delay > 0) {
87+ didWait = true;
88+ if (_lock.readLock().tryLock(delay, TimeUnit.NANOSECONDS)) {
89+ _lock.readLock().unlock();
90+ }
91 }
92 } catch (InterruptedException e) {
93 throw new PersistitInterruptedException(e);
94 }
95 }
96+ if (!didWait) {
97+ _waitLoopsWithNoDelay.incrementAndGet();
98+ }
99 }
100 if (_lastExceptionTimestamp > timestamp) {
101 final Exception e = _lastException;
102@@ -2406,9 +2420,10 @@
103 }
104 pageAddress = readPageBufferFromJournal(stablePageNode, bb);
105 } catch (PersistitException ioe) {
106- _persistit.getAlertMonitor().post(
107- new Event(AlertLevel.ERROR, _persistit.getLogBase().copyException, ioe, volume, pageNode
108- .getPageAddress(), pageNode.getJournalAddress()), AlertMonitor.JOURNAL_CATEGORY);
109+ _persistit
110+ .getAlertMonitor()
111+ .post(new Event(AlertLevel.ERROR, _persistit.getLogBase().copyException, ioe, volume,
112+ pageNode.getPageAddress(), pageNode.getJournalAddress()), AlertMonitor.JOURNAL_CATEGORY);
113 throw ioe;
114 }
115
116@@ -2454,8 +2469,8 @@
117 }
118 if (volume == null) {
119 _persistit.getAlertMonitor().post(
120- new Event(AlertLevel.WARN, _persistit.getLogBase().missingVolume, volumeRef, pageNode
121- .getJournalAddress()), AlertMonitor.MISSING_VOLUME_CATEGORY);
122+ new Event(AlertLevel.WARN, _persistit.getLogBase().missingVolume, volumeRef,
123+ pageNode.getJournalAddress()), AlertMonitor.MISSING_VOLUME_CATEGORY);
124 if (_ignoreMissingVolume.get()) {
125 _persistit.getLogBase().lostPageFromMissingVolume.log(pageNode.getPageAddress(), volumeRef,
126 pageNode.getJournalAddress());
127@@ -2483,8 +2498,8 @@
128 try {
129 volume.getStorage().writePage(bb, pageAddress);
130 } catch (PersistitException ioe) {
131- _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(), pageNode
132- .getJournalAddress());
133+ _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(),
134+ pageNode.getJournalAddress());
135 throw ioe;
136 }
137
138@@ -2771,8 +2786,8 @@
139 */
140 if (ts != null) {
141 if (ts.getMvvCount() > 0 && _persistit.isInitialized()) {
142- _persistit.getLogBase().pruningIncomplete.log(ts, TransactionPlayer.addressToString(address,
143- timestamp));
144+ _persistit.getLogBase().pruningIncomplete.log(ts,
145+ TransactionPlayer.addressToString(address, timestamp));
146 }
147 }
148 }
149@@ -2905,4 +2920,8 @@
150 public SortedMap<Integer, TreeDescriptor> queryTreeMap() {
151 return new TreeMap<Integer, TreeDescriptor>(_handleToTreeMap);
152 }
153+
154+ long getWaitLoopsWithNoDelay() {
155+ return _waitLoopsWithNoDelay.get();
156+ }
157 }
158
159=== modified file 'src/test/java/com/persistit/JournalManagerTest.java'
160--- src/test/java/com/persistit/JournalManagerTest.java 2012-08-02 14:19:26 +0000
161+++ src/test/java/com/persistit/JournalManagerTest.java 2012-08-14 14:22:24 +0000
162@@ -43,6 +43,7 @@
163
164 import com.persistit.CheckpointManager.Checkpoint;
165 import com.persistit.JournalManager.PageNode;
166+import com.persistit.Transaction.CommitPolicy;
167 import com.persistit.TransactionPlayer.TransactionPlayerListener;
168 import com.persistit.exception.PersistitException;
169 import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
170@@ -531,6 +532,24 @@
171 assertTrue("Should have failed updates during recovery", _persistit.getRecoveryManager().getPlayer()
172 .getIgnoredUpdates() > 0);
173 }
174+
175+ @Test
176+ public void waitForDurabilitySoaksCPU() throws Exception {
177+ _persistit.setDefaultTransactionCommitPolicy(CommitPolicy.HARD);
178+ final JournalManager jman = _persistit.getJournalManager();
179+ long waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay();
180+ final Exchange exchange = _persistit.getExchange("persistit", "JournalManagerTest", true);
181+ final Transaction txn = exchange.getTransaction();
182+ for (int count = 0; count < 100; count++) {
183+ txn.begin();
184+ exchange.getValue().put(RED_FOX + count);
185+ exchange.to(count).store();
186+ txn.commit();
187+ txn.end();
188+ }
189+ waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay() - waitLoopsWithoutDelay;
190+ assertEquals("Wait loops without delay", 0, waitLoopsWithoutDelay);
191+ }
192
193 private List<PageNode> testCleanupPageListSource(final int size) {
194 final List<PageNode> source = new ArrayList<PageNode>(size);

Subscribers

People subscribed via source and target branches