Merge lp:~mmcm/akiban-server/pg-periodically-commit into lp:~akiban-technologies/akiban-server/trunk

Proposed by Mike McMahon
Status: Merged
Approved by: Nathan Williams
Approved revision: 2738
Merged at revision: 2737
Proposed branch: lp:~mmcm/akiban-server/pg-periodically-commit
Merge into: lp:~akiban-technologies/akiban-server/trunk
Prerequisite: lp:~mmcm/akiban-server/pg-server-set-variable
Diff against target: 214 lines (+51/-11)
7 files modified
src/main/java/com/akiban/server/service/transaction/TransactionService.java (+3/-0)
src/main/java/com/akiban/server/store/PersistitTransactionService.java (+5/-0)
src/main/java/com/akiban/sql/embedded/JDBCConnection.java (+2/-2)
src/main/java/com/akiban/sql/pg/PostgresSessionStatement.java (+1/-1)
src/main/java/com/akiban/sql/server/ServerSession.java (+3/-0)
src/main/java/com/akiban/sql/server/ServerSessionBase.java (+19/-6)
src/main/java/com/akiban/sql/server/ServerTransaction.java (+18/-2)
To merge this branch: bzr merge lp:~mmcm/akiban-server/pg-periodically-commit
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+179829@code.launchpad.net

Description of the change

Add a mode where explicit transaction commits from time to time.

This is useful for initial data loads from a program that believes that it is doing all the loading with auto-commit off but failing due to the 5 second FDB limitation.

It is controlled by a connection property. It might be cleaner to have it be an extension to SET SESSION CHARACTERISTICS AS TRANSACTION. However, a property has the advantage that is can be set by another process, thereby avoiding the need to alter the application.

The check is done at the end of each statement, where auto-commit would be done. This means that it really only works for a series of isolated statements, like a bunch of INSERTs. It does not work, in particular, for a large DELETE or UPDATE. These are problematic because there is a scan, too, so the underlying transaction cannot be commited invisibly. I am open to suggestions for how to extend / replace this to handle those cases.

The implementation for the default store (Persistit) in this branch is empty. An FDB branch will actually do something.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

Looks good.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/akiban/server/service/transaction/TransactionService.java'
2--- src/main/java/com/akiban/server/service/transaction/TransactionService.java 2013-03-22 20:05:57 +0000
3+++ src/main/java/com/akiban/server/service/transaction/TransactionService.java 2013-08-12 23:49:27 +0000
4@@ -84,6 +84,9 @@
5 */
6 int incrementTransactionStep(Session session);
7
8+ /** Commit the transaction if this is a good time. */
9+ void periodicallyCommit(Session session);
10+
11 /** Add a callback to transaction. */
12 void addCallback(Session session, CallbackType type, Callback callback);
13
14
15=== modified file 'src/main/java/com/akiban/server/store/PersistitTransactionService.java'
16--- src/main/java/com/akiban/server/store/PersistitTransactionService.java 2013-03-22 20:05:57 +0000
17+++ src/main/java/com/akiban/server/store/PersistitTransactionService.java 2013-08-12 23:49:27 +0000
18@@ -159,6 +159,11 @@
19 }
20
21 @Override
22+ public void periodicallyCommit(Session session) {
23+ // Persistit can mostly manage with a long-running transaction.
24+ }
25+
26+ @Override
27 public void addCallback(Session session, CallbackType type, Callback callback) {
28 session.push(getCallbackKey(type), callback);
29 }
30
31=== modified file 'src/main/java/com/akiban/sql/embedded/JDBCConnection.java'
32--- src/main/java/com/akiban/sql/embedded/JDBCConnection.java 2013-06-11 16:30:17 +0000
33+++ src/main/java/com/akiban/sql/embedded/JDBCConnection.java 2013-08-12 23:49:27 +0000
34@@ -143,7 +143,7 @@
35 }
36 sessionMonitor.enterStage(MonitorStage.OPTIMIZE);
37 if (transaction == null)
38- localTransaction = new ServerTransaction(this, true);
39+ localTransaction = new ServerTransaction(this, true, false);
40 if ((sqlStmt instanceof DMLStatementNode) &&
41 !(sqlStmt instanceof CallStatementNode))
42 return compiler.compileExecutableStatement((DMLStatementNode)sqlStmt, parser.getParameterList(), getParameterNames, autoGeneratedKeys, context);
43@@ -182,7 +182,7 @@
44 }
45 sessionMonitor.enterStage(MonitorStage.OPTIMIZE);
46 if (transaction == null)
47- localTransaction = new ServerTransaction(this, true);
48+ localTransaction = new ServerTransaction(this, true, false);
49 ExplainPlanContext context = new ExplainPlanContext(compiler, reqs.serviceManager(), session);
50 Explainable explainable;
51 if ((sqlStmt instanceof DMLStatementNode) &&
52
53=== modified file 'src/main/java/com/akiban/sql/pg/PostgresSessionStatement.java'
54--- src/main/java/com/akiban/sql/pg/PostgresSessionStatement.java 2013-07-06 17:46:02 +0000
55+++ src/main/java/com/akiban/sql/pg/PostgresSessionStatement.java 2013-08-12 23:49:27 +0000
56@@ -53,7 +53,7 @@
57 "client_encoding", "DateStyle", "geqo", "ksqo",
58 "queryTimeoutSec", "zeroDateTimeBehavior", "maxNotificationLevel", "OutputFormat",
59 "parserInfixBit", "parserInfixLogical", "parserDoubleQuoted",
60- "newtypes"
61+ "newtypes", "transactionPeriodicallyCommit"
62 };
63
64 private Operation operation;
65
66=== modified file 'src/main/java/com/akiban/sql/server/ServerSession.java'
67--- src/main/java/com/akiban/sql/server/ServerSession.java 2013-07-13 20:04:24 +0000
68+++ src/main/java/com/akiban/sql/server/ServerSession.java 2013-08-12 23:49:27 +0000
69@@ -131,6 +131,9 @@
70 /** Set following transaction to read-only / read-write. */
71 public void setTransactionDefaultReadOnly(boolean readOnly);
72
73+ /** Set following transaction to commit as determined by store. */
74+ public void setTransactionPeriodicallyCommit(boolean periodicallyCommit);
75+
76 /** Get the functions registry. */
77 public FunctionsRegistry functionsRegistry();
78
79
80=== modified file 'src/main/java/com/akiban/sql/server/ServerSessionBase.java'
81--- src/main/java/com/akiban/sql/server/ServerSessionBase.java 2013-07-13 20:04:24 +0000
82+++ src/main/java/com/akiban/sql/server/ServerSessionBase.java 2013-08-12 23:49:27 +0000
83@@ -60,6 +60,7 @@
84 new HashMap<>();
85 protected ServerTransaction transaction;
86 protected boolean transactionDefaultReadOnly = false;
87+ protected boolean transactionPeriodicallyCommit = false;
88 protected ServerSessionMonitor sessionMonitor;
89
90 protected Long queryTimeoutMilli = null;
91@@ -124,6 +125,12 @@
92 queryTimeoutMilli = (long)(Double.parseDouble(value) * 1000);
93 return true;
94 }
95+ if ("transactionPeriodicallyCommit".equals(key)) {
96+ boolean periodicallyCommit = (value != null) && Boolean.parseBoolean(value);
97+ transactionPeriodicallyCommit = periodicallyCommit;
98+ if (transaction != null)
99+ transaction.setPeriodicallyCommit(periodicallyCommit);
100+ }
101 return false;
102 }
103
104@@ -210,7 +217,7 @@
105 public void beginTransaction() {
106 if (transaction != null)
107 throw new TransactionInProgressException();
108- transaction = new ServerTransaction(this, transactionDefaultReadOnly);
109+ transaction = new ServerTransaction(this, transactionDefaultReadOnly, transactionPeriodicallyCommit);
110 }
111
112 @Override
113@@ -250,6 +257,11 @@
114 }
115
116 @Override
117+ public void setTransactionPeriodicallyCommit(boolean periodicallyCommit) {
118+ this.transactionPeriodicallyCommit = periodicallyCommit;
119+ }
120+
121+ @Override
122 public FunctionsRegistry functionsRegistry() {
123 return reqs.functionsRegistry();
124 }
125@@ -331,14 +343,14 @@
126 throw new NoTransactionInProgressException();
127 case READ:
128 case NEW:
129- localTransaction = new ServerTransaction(this, true);
130+ localTransaction = new ServerTransaction(this, true, false);
131 break;
132 case WRITE:
133 case NEW_WRITE:
134 case WRITE_STEP_ISOLATED:
135 if (transactionDefaultReadOnly)
136 throw new TransactionReadOnlyException();
137- localTransaction = new ServerTransaction(this, false);
138+ localTransaction = new ServerTransaction(this, false, false);
139 localTransaction.beforeUpdate(true);
140 break;
141 }
142@@ -369,17 +381,18 @@
143 else
144 localTransaction.abort();
145 }
146- else {
147+ else if (transaction != null) {
148 // Make changes visible in open global transaction.
149 ServerStatement.TransactionMode transactionMode = stmt.getTransactionMode();
150 switch (transactionMode) {
151 case REQUIRED_WRITE:
152 case WRITE:
153 case WRITE_STEP_ISOLATED:
154- if (transaction != null)
155- transaction.afterUpdate(transactionMode == ServerStatement.TransactionMode.WRITE_STEP_ISOLATED);
156+ transaction.afterUpdate(transactionMode == ServerStatement.TransactionMode.WRITE_STEP_ISOLATED);
157 break;
158 }
159+ // Give periodic commit a chance if enabled.
160+ transaction.checkPeriodicallyCommit();
161 }
162 }
163
164
165=== modified file 'src/main/java/com/akiban/sql/server/ServerTransaction.java'
166--- src/main/java/com/akiban/sql/server/ServerTransaction.java 2013-03-22 20:05:57 +0000
167+++ src/main/java/com/akiban/sql/server/ServerTransaction.java 2013-08-12 23:49:27 +0000
168@@ -29,14 +29,16 @@
169 {
170 private final Session session;
171 private final TransactionService txnService;
172- private boolean readOnly;
173+ private boolean readOnly, periodicallyCommit;
174 private Date transactionTime;
175
176 /** Begin a new transaction or signal an exception. */
177- public ServerTransaction(ServerSession server, boolean readOnly) {
178+ public ServerTransaction(ServerSession server,
179+ boolean readOnly, boolean periodicallyCommit) {
180 this.session = server.getSession();
181 this.txnService = server.getTransactionService();
182 this.readOnly = readOnly;
183+ this.periodicallyCommit = periodicallyCommit;
184 txnService.beginTransaction(session);
185 }
186
187@@ -48,6 +50,14 @@
188 this.readOnly = readOnly;
189 }
190
191+ public boolean isPeriodicallyCommit() {
192+ return periodicallyCommit;
193+ }
194+
195+ public void setPeriodicallyCommit(boolean periodicallyCommit) {
196+ this.periodicallyCommit = periodicallyCommit;
197+ }
198+
199 public void checkTransactionMode(ServerStatement.TransactionMode transactionMode) {
200 switch (transactionMode) {
201 case NONE:
202@@ -95,6 +105,12 @@
203 return txnService.isRollbackPending(session);
204 }
205
206+ public void checkPeriodicallyCommit() {
207+ if (periodicallyCommit) {
208+ txnService.periodicallyCommit(session);
209+ }
210+ }
211+
212 /** Return the transaction's time, which is fixed the first time
213 * something asks for it. */
214 public Date getTime(ServerSession server) {

Subscribers

People subscribed via source and target branches