Merge lp:~mmcm/akiban-server/pg-periodically-commit into lp:~akiban-technologies/akiban-server/trunk

Proposed by Mike McMahon
Status: Merged
Approved by: Nathan Williams
Approved revision: 2738
Merged at revision: 2737
Proposed branch: lp:~mmcm/akiban-server/pg-periodically-commit
Merge into: lp:~akiban-technologies/akiban-server/trunk
Prerequisite: lp:~mmcm/akiban-server/pg-server-set-variable
Diff against target: 214 lines (+51/-11)
7 files modified
src/main/java/com/akiban/server/service/transaction/TransactionService.java (+3/-0)
src/main/java/com/akiban/server/store/PersistitTransactionService.java (+5/-0)
src/main/java/com/akiban/sql/embedded/JDBCConnection.java (+2/-2)
src/main/java/com/akiban/sql/pg/PostgresSessionStatement.java (+1/-1)
src/main/java/com/akiban/sql/server/ServerSession.java (+3/-0)
src/main/java/com/akiban/sql/server/ServerSessionBase.java (+19/-6)
src/main/java/com/akiban/sql/server/ServerTransaction.java (+18/-2)
To merge this branch: bzr merge lp:~mmcm/akiban-server/pg-periodically-commit
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+179829@code.launchpad.net

Description of the change

Add a mode where explicit transaction commits from time to time.

This is useful for initial data loads from a program that believes that it is doing all the loading with auto-commit off but failing due to the 5 second FDB limitation.

It is controlled by a connection property. It might be cleaner to have it be an extension to SET SESSION CHARACTERISTICS AS TRANSACTION. However, a property has the advantage that is can be set by another process, thereby avoiding the need to alter the application.

The check is done at the end of each statement, where auto-commit would be done. This means that it really only works for a series of isolated statements, like a bunch of INSERTs. It does not work, in particular, for a large DELETE or UPDATE. These are problematic because there is a scan, too, so the underlying transaction cannot be commited invisibly. I am open to suggestions for how to extend / replace this to handle those cases.

The implementation for the default store (Persistit) in this branch is empty. An FDB branch will actually do something.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

Looks good.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/main/java/com/akiban/server/service/transaction/TransactionService.java'
--- src/main/java/com/akiban/server/service/transaction/TransactionService.java 2013-03-22 20:05:57 +0000
+++ src/main/java/com/akiban/server/service/transaction/TransactionService.java 2013-08-12 23:49:27 +0000
@@ -84,6 +84,9 @@
84 */84 */
85 int incrementTransactionStep(Session session);85 int incrementTransactionStep(Session session);
8686
87 /** Commit the transaction if this is a good time. */
88 void periodicallyCommit(Session session);
89
87 /** Add a callback to transaction. */90 /** Add a callback to transaction. */
88 void addCallback(Session session, CallbackType type, Callback callback);91 void addCallback(Session session, CallbackType type, Callback callback);
8992
9093
=== modified file 'src/main/java/com/akiban/server/store/PersistitTransactionService.java'
--- src/main/java/com/akiban/server/store/PersistitTransactionService.java 2013-03-22 20:05:57 +0000
+++ src/main/java/com/akiban/server/store/PersistitTransactionService.java 2013-08-12 23:49:27 +0000
@@ -159,6 +159,11 @@
159 }159 }
160160
161 @Override161 @Override
162 public void periodicallyCommit(Session session) {
163 // Persistit can mostly manage with a long-running transaction.
164 }
165
166 @Override
162 public void addCallback(Session session, CallbackType type, Callback callback) {167 public void addCallback(Session session, CallbackType type, Callback callback) {
163 session.push(getCallbackKey(type), callback);168 session.push(getCallbackKey(type), callback);
164 }169 }
165170
=== modified file 'src/main/java/com/akiban/sql/embedded/JDBCConnection.java'
--- src/main/java/com/akiban/sql/embedded/JDBCConnection.java 2013-06-11 16:30:17 +0000
+++ src/main/java/com/akiban/sql/embedded/JDBCConnection.java 2013-08-12 23:49:27 +0000
@@ -143,7 +143,7 @@
143 }143 }
144 sessionMonitor.enterStage(MonitorStage.OPTIMIZE);144 sessionMonitor.enterStage(MonitorStage.OPTIMIZE);
145 if (transaction == null)145 if (transaction == null)
146 localTransaction = new ServerTransaction(this, true);146 localTransaction = new ServerTransaction(this, true, false);
147 if ((sqlStmt instanceof DMLStatementNode) && 147 if ((sqlStmt instanceof DMLStatementNode) &&
148 !(sqlStmt instanceof CallStatementNode))148 !(sqlStmt instanceof CallStatementNode))
149 return compiler.compileExecutableStatement((DMLStatementNode)sqlStmt, parser.getParameterList(), getParameterNames, autoGeneratedKeys, context);149 return compiler.compileExecutableStatement((DMLStatementNode)sqlStmt, parser.getParameterList(), getParameterNames, autoGeneratedKeys, context);
@@ -182,7 +182,7 @@
182 }182 }
183 sessionMonitor.enterStage(MonitorStage.OPTIMIZE);183 sessionMonitor.enterStage(MonitorStage.OPTIMIZE);
184 if (transaction == null)184 if (transaction == null)
185 localTransaction = new ServerTransaction(this, true);185 localTransaction = new ServerTransaction(this, true, false);
186 ExplainPlanContext context = new ExplainPlanContext(compiler, reqs.serviceManager(), session);186 ExplainPlanContext context = new ExplainPlanContext(compiler, reqs.serviceManager(), session);
187 Explainable explainable;187 Explainable explainable;
188 if ((sqlStmt instanceof DMLStatementNode) && 188 if ((sqlStmt instanceof DMLStatementNode) &&
189189
=== modified file 'src/main/java/com/akiban/sql/pg/PostgresSessionStatement.java'
--- src/main/java/com/akiban/sql/pg/PostgresSessionStatement.java 2013-07-06 17:46:02 +0000
+++ src/main/java/com/akiban/sql/pg/PostgresSessionStatement.java 2013-08-12 23:49:27 +0000
@@ -53,7 +53,7 @@
53 "client_encoding", "DateStyle", "geqo", "ksqo",53 "client_encoding", "DateStyle", "geqo", "ksqo",
54 "queryTimeoutSec", "zeroDateTimeBehavior", "maxNotificationLevel", "OutputFormat",54 "queryTimeoutSec", "zeroDateTimeBehavior", "maxNotificationLevel", "OutputFormat",
55 "parserInfixBit", "parserInfixLogical", "parserDoubleQuoted",55 "parserInfixBit", "parserInfixLogical", "parserDoubleQuoted",
56 "newtypes"56 "newtypes", "transactionPeriodicallyCommit"
57 };57 };
5858
59 private Operation operation;59 private Operation operation;
6060
=== modified file 'src/main/java/com/akiban/sql/server/ServerSession.java'
--- src/main/java/com/akiban/sql/server/ServerSession.java 2013-07-13 20:04:24 +0000
+++ src/main/java/com/akiban/sql/server/ServerSession.java 2013-08-12 23:49:27 +0000
@@ -131,6 +131,9 @@
131 /** Set following transaction to read-only / read-write. */131 /** Set following transaction to read-only / read-write. */
132 public void setTransactionDefaultReadOnly(boolean readOnly);132 public void setTransactionDefaultReadOnly(boolean readOnly);
133133
134 /** Set following transaction to commit as determined by store. */
135 public void setTransactionPeriodicallyCommit(boolean periodicallyCommit);
136
134 /** Get the functions registry. */137 /** Get the functions registry. */
135 public FunctionsRegistry functionsRegistry();138 public FunctionsRegistry functionsRegistry();
136139
137140
=== modified file 'src/main/java/com/akiban/sql/server/ServerSessionBase.java'
--- src/main/java/com/akiban/sql/server/ServerSessionBase.java 2013-07-13 20:04:24 +0000
+++ src/main/java/com/akiban/sql/server/ServerSessionBase.java 2013-08-12 23:49:27 +0000
@@ -60,6 +60,7 @@
60 new HashMap<>();60 new HashMap<>();
61 protected ServerTransaction transaction;61 protected ServerTransaction transaction;
62 protected boolean transactionDefaultReadOnly = false;62 protected boolean transactionDefaultReadOnly = false;
63 protected boolean transactionPeriodicallyCommit = false;
63 protected ServerSessionMonitor sessionMonitor;64 protected ServerSessionMonitor sessionMonitor;
6465
65 protected Long queryTimeoutMilli = null;66 protected Long queryTimeoutMilli = null;
@@ -124,6 +125,12 @@
124 queryTimeoutMilli = (long)(Double.parseDouble(value) * 1000);125 queryTimeoutMilli = (long)(Double.parseDouble(value) * 1000);
125 return true;126 return true;
126 }127 }
128 if ("transactionPeriodicallyCommit".equals(key)) {
129 boolean periodicallyCommit = (value != null) && Boolean.parseBoolean(value);
130 transactionPeriodicallyCommit = periodicallyCommit;
131 if (transaction != null)
132 transaction.setPeriodicallyCommit(periodicallyCommit);
133 }
127 return false;134 return false;
128 }135 }
129136
@@ -210,7 +217,7 @@
210 public void beginTransaction() {217 public void beginTransaction() {
211 if (transaction != null)218 if (transaction != null)
212 throw new TransactionInProgressException();219 throw new TransactionInProgressException();
213 transaction = new ServerTransaction(this, transactionDefaultReadOnly);220 transaction = new ServerTransaction(this, transactionDefaultReadOnly, transactionPeriodicallyCommit);
214 }221 }
215222
216 @Override223 @Override
@@ -250,6 +257,11 @@
250 }257 }
251258
252 @Override259 @Override
260 public void setTransactionPeriodicallyCommit(boolean periodicallyCommit) {
261 this.transactionPeriodicallyCommit = periodicallyCommit;
262 }
263
264 @Override
253 public FunctionsRegistry functionsRegistry() {265 public FunctionsRegistry functionsRegistry() {
254 return reqs.functionsRegistry();266 return reqs.functionsRegistry();
255 }267 }
@@ -331,14 +343,14 @@
331 throw new NoTransactionInProgressException();343 throw new NoTransactionInProgressException();
332 case READ:344 case READ:
333 case NEW:345 case NEW:
334 localTransaction = new ServerTransaction(this, true);346 localTransaction = new ServerTransaction(this, true, false);
335 break;347 break;
336 case WRITE:348 case WRITE:
337 case NEW_WRITE:349 case NEW_WRITE:
338 case WRITE_STEP_ISOLATED:350 case WRITE_STEP_ISOLATED:
339 if (transactionDefaultReadOnly)351 if (transactionDefaultReadOnly)
340 throw new TransactionReadOnlyException();352 throw new TransactionReadOnlyException();
341 localTransaction = new ServerTransaction(this, false);353 localTransaction = new ServerTransaction(this, false, false);
342 localTransaction.beforeUpdate(true);354 localTransaction.beforeUpdate(true);
343 break;355 break;
344 }356 }
@@ -369,17 +381,18 @@
369 else381 else
370 localTransaction.abort();382 localTransaction.abort();
371 }383 }
372 else {384 else if (transaction != null) {
373 // Make changes visible in open global transaction.385 // Make changes visible in open global transaction.
374 ServerStatement.TransactionMode transactionMode = stmt.getTransactionMode();386 ServerStatement.TransactionMode transactionMode = stmt.getTransactionMode();
375 switch (transactionMode) {387 switch (transactionMode) {
376 case REQUIRED_WRITE:388 case REQUIRED_WRITE:
377 case WRITE:389 case WRITE:
378 case WRITE_STEP_ISOLATED:390 case WRITE_STEP_ISOLATED:
379 if (transaction != null)391 transaction.afterUpdate(transactionMode == ServerStatement.TransactionMode.WRITE_STEP_ISOLATED);
380 transaction.afterUpdate(transactionMode == ServerStatement.TransactionMode.WRITE_STEP_ISOLATED);
381 break;392 break;
382 }393 }
394 // Give periodic commit a chance if enabled.
395 transaction.checkPeriodicallyCommit();
383 }396 }
384 }397 }
385398
386399
=== modified file 'src/main/java/com/akiban/sql/server/ServerTransaction.java'
--- src/main/java/com/akiban/sql/server/ServerTransaction.java 2013-03-22 20:05:57 +0000
+++ src/main/java/com/akiban/sql/server/ServerTransaction.java 2013-08-12 23:49:27 +0000
@@ -29,14 +29,16 @@
29{29{
30 private final Session session;30 private final Session session;
31 private final TransactionService txnService;31 private final TransactionService txnService;
32 private boolean readOnly;32 private boolean readOnly, periodicallyCommit;
33 private Date transactionTime;33 private Date transactionTime;
34 34
35 /** Begin a new transaction or signal an exception. */35 /** Begin a new transaction or signal an exception. */
36 public ServerTransaction(ServerSession server, boolean readOnly) {36 public ServerTransaction(ServerSession server,
37 boolean readOnly, boolean periodicallyCommit) {
37 this.session = server.getSession();38 this.session = server.getSession();
38 this.txnService = server.getTransactionService();39 this.txnService = server.getTransactionService();
39 this.readOnly = readOnly;40 this.readOnly = readOnly;
41 this.periodicallyCommit = periodicallyCommit;
40 txnService.beginTransaction(session);42 txnService.beginTransaction(session);
41 }43 }
4244
@@ -48,6 +50,14 @@
48 this.readOnly = readOnly;50 this.readOnly = readOnly;
49 }51 }
5052
53 public boolean isPeriodicallyCommit() {
54 return periodicallyCommit;
55 }
56
57 public void setPeriodicallyCommit(boolean periodicallyCommit) {
58 this.periodicallyCommit = periodicallyCommit;
59 }
60
51 public void checkTransactionMode(ServerStatement.TransactionMode transactionMode) {61 public void checkTransactionMode(ServerStatement.TransactionMode transactionMode) {
52 switch (transactionMode) {62 switch (transactionMode) {
53 case NONE:63 case NONE:
@@ -95,6 +105,12 @@
95 return txnService.isRollbackPending(session);105 return txnService.isRollbackPending(session);
96 }106 }
97107
108 public void checkPeriodicallyCommit() {
109 if (periodicallyCommit) {
110 txnService.periodicallyCommit(session);
111 }
112 }
113
98 /** Return the transaction's time, which is fixed the first time114 /** Return the transaction's time, which is fixed the first time
99 * something asks for it. */115 * something asks for it. */
100 public Date getTime(ServerSession server) {116 public Date getTime(ServerSession server) {

Subscribers

People subscribed via source and target branches