Merge lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop-2 into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 355
Merged at revision: 354
Proposed branch: lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop-2
Merge into: lp:akiban-persistit
Diff against target: 137 lines (+25/-15)
4 files modified
src/main/java/com/persistit/JournalManager.java (+7/-1)
src/main/java/com/persistit/Transaction.java (+14/-10)
src/test/java/com/persistit/JournalManagerTest.java (+1/-1)
src/test/java/com/persistit/stress/unit/CommitBench.java (+3/-3)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop-2
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+119647@code.launchpad.net

Description of the change

This is a better fix for bug 1036422 CommitPolicy HARD can soak CPU. The previous version delayed transaction commit unnecessarily and resulted in slower update and delete atomics. This code results in a significant improvement. A separate report will detail the benchmark differences.

The key elements in this proposal are (a) don't call waitForDurability() at all in a read-only transaction, and (b) use the Lock in waitDurability rather than a fixed-length sleep to wake up immediately after an I/O that completes in less than 1ms.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

Makes sense.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/main/java/com/persistit/JournalManager.java'
--- src/main/java/com/persistit/JournalManager.java 2012-08-14 14:16:00 +0000
+++ src/main/java/com/persistit/JournalManager.java 2012-08-15 00:37:23 +0000
@@ -2255,7 +2255,13 @@
2255 kick();2255 kick();
2256 if (delay <= 0) {2256 if (delay <= 0) {
2257 didWait = true;2257 didWait = true;
2258 Util.spinSleep();2258 try {
2259 if (_lock.readLock().tryLock(NS_PER_MS, TimeUnit.NANOSECONDS)) {
2260 _lock.readLock().unlock();
2261 }
2262 } catch (InterruptedException e) {
2263 throw new PersistitInterruptedException(e);
2264 }
2259 }2265 }
2260 } else {2266 } else {
2261 /*2267 /*
22622268
=== modified file 'src/main/java/com/persistit/Transaction.java'
--- src/main/java/com/persistit/Transaction.java 2012-08-04 19:19:44 +0000
+++ src/main/java/com/persistit/Transaction.java 2012-08-15 00:37:23 +0000
@@ -460,7 +460,7 @@
460 * higher than with the HARD policy.460 * higher than with the HARD policy.
461 */461 */
462 GROUP;462 GROUP;
463 463
464 static CommitPolicy forName(final String policyName) {464 static CommitPolicy forName(final String policyName) {
465 for (CommitPolicy policy : values()) {465 for (CommitPolicy policy : values()) {
466 if (policy.name().equalsIgnoreCase(policyName)) {466 if (policy.name().equalsIgnoreCase(policyName)) {
@@ -594,7 +594,7 @@
594 try {594 try {
595 _transactionStatus = _persistit.getTransactionIndex().registerTransaction();595 _transactionStatus = _persistit.getTransactionIndex().registerTransaction();
596 } catch (InterruptedException e) {596 } catch (InterruptedException e) {
597 _rollbackCompleted = true;597 _rollbackCompleted = true;
598 throw new PersistitInterruptedException(e);598 throw new PersistitInterruptedException(e);
599 }599 }
600 _rollbackPending = false;600 _rollbackPending = false;
@@ -621,7 +621,7 @@
621 try {621 try {
622 _transactionStatus = _persistit.getTransactionIndex().registerCheckpointTransaction();622 _transactionStatus = _persistit.getTransactionIndex().registerCheckpointTransaction();
623 } catch (InterruptedException e) {623 } catch (InterruptedException e) {
624 _rollbackCompleted = true;624 _rollbackCompleted = true;
625 throw new PersistitInterruptedException(e);625 throw new PersistitInterruptedException(e);
626 }626 }
627 _rollbackPending = false;627 _rollbackPending = false;
@@ -856,13 +856,14 @@
856 sequence(COMMIT_FLUSH_C);856 sequence(COMMIT_FLUSH_C);
857 boolean committed = false;857 boolean committed = false;
858 try {858 try {
859 flushTransactionBuffer(false);859 if (flushTransactionBuffer(false)) {
860 _persistit.getJournalManager().waitForDurability(860 _persistit.getJournalManager().waitForDurability(
861 policy == CommitPolicy.SOFT ? _persistit.getTransactionCommitLeadTime() : 0,861 policy == CommitPolicy.SOFT ? _persistit.getTransactionCommitLeadTime() : 0,
862 policy == CommitPolicy.GROUP ? _persistit.getTransactionCommitStallTime() : 0);862 policy == CommitPolicy.GROUP ? _persistit.getTransactionCommitStallTime() : 0);
863 }
863 committed = true;864 committed = true;
864 } finally {865 } finally {
865 866
866 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,867 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,
867 committed ? _commitTimestamp : TransactionStatus.ABORTED);868 committed ? _commitTimestamp : TransactionStatus.ABORTED);
868 _commitCompleted = committed;869 _commitCompleted = committed;
@@ -1143,17 +1144,20 @@
1143 }1144 }
1144 }1145 }
11451146
1146 synchronized void flushTransactionBuffer(final boolean chain) throws PersistitException {1147 synchronized boolean flushTransactionBuffer(final boolean chain) throws PersistitException {
1148 boolean didWrite = false;
1147 if (_buffer.position() > 0 || _previousJournalAddress != 0) {1149 if (_buffer.position() > 0 || _previousJournalAddress != 0) {
1148 long previousJournalAddress = _persistit.getJournalManager().writeTransactionToJournal(_buffer,1150 long previousJournalAddress = _persistit.getJournalManager().writeTransactionToJournal(_buffer,
1149 _startTimestamp, _commitTimestamp, _previousJournalAddress);1151 _startTimestamp, _commitTimestamp, _previousJournalAddress);
1150 _buffer.clear();1152 _buffer.clear();
1153 didWrite = true;
1151 if (chain) {1154 if (chain) {
1152 _previousJournalAddress = previousJournalAddress;1155 _previousJournalAddress = previousJournalAddress;
1153 } else {1156 } else {
1154 _previousJournalAddress = 0;1157 _previousJournalAddress = 0;
1155 }1158 }
1156 }1159 }
1160 return didWrite;
1157 }1161 }
11581162
1159 synchronized void flushOnCheckpoint(final long timestamp) throws PersistitException {1163 synchronized void flushOnCheckpoint(final long timestamp) throws PersistitException {
@@ -1293,7 +1297,7 @@
1293 + MAXIMUM_STEP);1297 + MAXIMUM_STEP);
1294 }1298 }
1295 }1299 }
1296 1300
1297 private int treeHandle(final Tree tree) {1301 private int treeHandle(final Tree tree) {
1298 final int treeHandle = tree.getHandle();1302 final int treeHandle = tree.getHandle();
1299 assert treeHandle != 0 : "Undefined tree handle in " + tree;1303 assert treeHandle != 0 : "Undefined tree handle in " + tree;
13001304
=== modified file 'src/test/java/com/persistit/JournalManagerTest.java'
--- src/test/java/com/persistit/JournalManagerTest.java 2012-08-13 22:24:48 +0000
+++ src/test/java/com/persistit/JournalManagerTest.java 2012-08-15 00:37:23 +0000
@@ -540,7 +540,7 @@
540 long waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay();540 long waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay();
541 final Exchange exchange = _persistit.getExchange("persistit", "JournalManagerTest", true);541 final Exchange exchange = _persistit.getExchange("persistit", "JournalManagerTest", true);
542 final Transaction txn = exchange.getTransaction();542 final Transaction txn = exchange.getTransaction();
543 for (int count = 0; count < 100; count++) {543 for (int count = 0; count < 1000; count++) {
544 txn.begin();544 txn.begin();
545 exchange.getValue().put(RED_FOX + count);545 exchange.getValue().put(RED_FOX + count);
546 exchange.to(count).store();546 exchange.to(count).store();
547547
=== modified file 'src/test/java/com/persistit/stress/unit/CommitBench.java'
--- src/test/java/com/persistit/stress/unit/CommitBench.java 2012-08-02 04:45:28 +0000
+++ src/test/java/com/persistit/stress/unit/CommitBench.java 2012-08-15 00:37:23 +0000
@@ -45,8 +45,8 @@
4545
46 private final int RECORDS = 200000;46 private final int RECORDS = 200000;
47 private final int RECORDS_PER_TXN = 10;47 private final int RECORDS_PER_TXN = 10;
48 private final String[] ARG_TEMPLATE = new String[] { "threads|int:1:1000|Number of threads",48 private final String[] ARG_TEMPLATE = new String[] { "threads|int:1:1:1000|Number of threads",
49 "duration|int:10:86400|Duration of test in seconds",49 "duration|int:10:10:86400|Duration of test in seconds",
50 "policy|String:HARD|Commit policy: SOFT, HARD or GROUP", };50 "policy|String:HARD|Commit policy: SOFT, HARD or GROUP", };
5151
52 volatile long stopTime;52 volatile long stopTime;
@@ -59,7 +59,7 @@
59 }59 }
6060
61 public void bench(final String[] args) throws Exception {61 public void bench(final String[] args) throws Exception {
62 final ArgParser ap = new ArgParser("CommitBench", args, ARG_TEMPLATE);62 final ArgParser ap = new ArgParser("CommitBench", args, ARG_TEMPLATE).strict();
63 int threadCount = ap.getIntValue("threads");63 int threadCount = ap.getIntValue("threads");
64 int duration = ap.getIntValue("duration");64 int duration = ap.getIntValue("duration");
65 String policy = ap.getStringValue("policy");65 String policy = ap.getStringValue("policy");

Subscribers

People subscribed via source and target branches