Merge lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop-2 into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 355
Merged at revision: 354
Proposed branch: lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop-2
Merge into: lp:akiban-persistit
Diff against target: 137 lines (+25/-15)
4 files modified
src/main/java/com/persistit/JournalManager.java (+7/-1)
src/main/java/com/persistit/Transaction.java (+14/-10)
src/test/java/com/persistit/JournalManagerTest.java (+1/-1)
src/test/java/com/persistit/stress/unit/CommitBench.java (+3/-3)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-hard-commit-infinite-loop-2
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+119647@code.launchpad.net

Description of the change

This is a better fix for bug 1036422 CommitPolicy HARD can soak CPU. The previous version delayed transaction commit unnecessarily and resulted in slower update and delete atomics. This code results in a significant improvement. A separate report will detail the benchmark differences.

The key elements in this proposal are (a) don't call waitForDurability() at all in a read-only transaction, and (b) use the Lock in waitDurability rather than a fixed-length sleep to wake up immediately after an I/O that completes in less than 1ms.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

Makes sense.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/JournalManager.java'
2--- src/main/java/com/persistit/JournalManager.java 2012-08-14 14:16:00 +0000
3+++ src/main/java/com/persistit/JournalManager.java 2012-08-15 00:37:23 +0000
4@@ -2255,7 +2255,13 @@
5 kick();
6 if (delay <= 0) {
7 didWait = true;
8- Util.spinSleep();
9+ try {
10+ if (_lock.readLock().tryLock(NS_PER_MS, TimeUnit.NANOSECONDS)) {
11+ _lock.readLock().unlock();
12+ }
13+ } catch (InterruptedException e) {
14+ throw new PersistitInterruptedException(e);
15+ }
16 }
17 } else {
18 /*
19
20=== modified file 'src/main/java/com/persistit/Transaction.java'
21--- src/main/java/com/persistit/Transaction.java 2012-08-04 19:19:44 +0000
22+++ src/main/java/com/persistit/Transaction.java 2012-08-15 00:37:23 +0000
23@@ -460,7 +460,7 @@
24 * higher than with the HARD policy.
25 */
26 GROUP;
27-
28+
29 static CommitPolicy forName(final String policyName) {
30 for (CommitPolicy policy : values()) {
31 if (policy.name().equalsIgnoreCase(policyName)) {
32@@ -594,7 +594,7 @@
33 try {
34 _transactionStatus = _persistit.getTransactionIndex().registerTransaction();
35 } catch (InterruptedException e) {
36- _rollbackCompleted = true;
37+ _rollbackCompleted = true;
38 throw new PersistitInterruptedException(e);
39 }
40 _rollbackPending = false;
41@@ -621,7 +621,7 @@
42 try {
43 _transactionStatus = _persistit.getTransactionIndex().registerCheckpointTransaction();
44 } catch (InterruptedException e) {
45- _rollbackCompleted = true;
46+ _rollbackCompleted = true;
47 throw new PersistitInterruptedException(e);
48 }
49 _rollbackPending = false;
50@@ -856,13 +856,14 @@
51 sequence(COMMIT_FLUSH_C);
52 boolean committed = false;
53 try {
54- flushTransactionBuffer(false);
55- _persistit.getJournalManager().waitForDurability(
56- policy == CommitPolicy.SOFT ? _persistit.getTransactionCommitLeadTime() : 0,
57- policy == CommitPolicy.GROUP ? _persistit.getTransactionCommitStallTime() : 0);
58+ if (flushTransactionBuffer(false)) {
59+ _persistit.getJournalManager().waitForDurability(
60+ policy == CommitPolicy.SOFT ? _persistit.getTransactionCommitLeadTime() : 0,
61+ policy == CommitPolicy.GROUP ? _persistit.getTransactionCommitStallTime() : 0);
62+ }
63 committed = true;
64 } finally {
65-
66+
67 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,
68 committed ? _commitTimestamp : TransactionStatus.ABORTED);
69 _commitCompleted = committed;
70@@ -1143,17 +1144,20 @@
71 }
72 }
73
74- synchronized void flushTransactionBuffer(final boolean chain) throws PersistitException {
75+ synchronized boolean flushTransactionBuffer(final boolean chain) throws PersistitException {
76+ boolean didWrite = false;
77 if (_buffer.position() > 0 || _previousJournalAddress != 0) {
78 long previousJournalAddress = _persistit.getJournalManager().writeTransactionToJournal(_buffer,
79 _startTimestamp, _commitTimestamp, _previousJournalAddress);
80 _buffer.clear();
81+ didWrite = true;
82 if (chain) {
83 _previousJournalAddress = previousJournalAddress;
84 } else {
85 _previousJournalAddress = 0;
86 }
87 }
88+ return didWrite;
89 }
90
91 synchronized void flushOnCheckpoint(final long timestamp) throws PersistitException {
92@@ -1293,7 +1297,7 @@
93 + MAXIMUM_STEP);
94 }
95 }
96-
97+
98 private int treeHandle(final Tree tree) {
99 final int treeHandle = tree.getHandle();
100 assert treeHandle != 0 : "Undefined tree handle in " + tree;
101
102=== modified file 'src/test/java/com/persistit/JournalManagerTest.java'
103--- src/test/java/com/persistit/JournalManagerTest.java 2012-08-13 22:24:48 +0000
104+++ src/test/java/com/persistit/JournalManagerTest.java 2012-08-15 00:37:23 +0000
105@@ -540,7 +540,7 @@
106 long waitLoopsWithoutDelay = jman.getWaitLoopsWithNoDelay();
107 final Exchange exchange = _persistit.getExchange("persistit", "JournalManagerTest", true);
108 final Transaction txn = exchange.getTransaction();
109- for (int count = 0; count < 100; count++) {
110+ for (int count = 0; count < 1000; count++) {
111 txn.begin();
112 exchange.getValue().put(RED_FOX + count);
113 exchange.to(count).store();
114
115=== modified file 'src/test/java/com/persistit/stress/unit/CommitBench.java'
116--- src/test/java/com/persistit/stress/unit/CommitBench.java 2012-08-02 04:45:28 +0000
117+++ src/test/java/com/persistit/stress/unit/CommitBench.java 2012-08-15 00:37:23 +0000
118@@ -45,8 +45,8 @@
119
120 private final int RECORDS = 200000;
121 private final int RECORDS_PER_TXN = 10;
122- private final String[] ARG_TEMPLATE = new String[] { "threads|int:1:1000|Number of threads",
123- "duration|int:10:86400|Duration of test in seconds",
124+ private final String[] ARG_TEMPLATE = new String[] { "threads|int:1:1:1000|Number of threads",
125+ "duration|int:10:10:86400|Duration of test in seconds",
126 "policy|String:HARD|Commit policy: SOFT, HARD or GROUP", };
127
128 volatile long stopTime;
129@@ -59,7 +59,7 @@
130 }
131
132 public void bench(final String[] args) throws Exception {
133- final ArgParser ap = new ArgParser("CommitBench", args, ARG_TEMPLATE);
134+ final ArgParser ap = new ArgParser("CommitBench", args, ARG_TEMPLATE).strict();
135 int threadCount = ap.getIntValue("threads");
136 int duration = ap.getIntValue("duration");
137 String policy = ap.getStringValue("policy");

Subscribers

People subscribed via source and target branches