Merge lp:~dshrews/drizzle/bug600795 into lp:~drizzle-trunk/drizzle/development

Proposed by David Shrewsbury
Status: Merged
Approved by: Monty Taylor
Approved revision: 1721
Merged at revision: 1735
Proposed branch: lp:~dshrews/drizzle/bug600795
Merge into: lp:~drizzle-trunk/drizzle/development
Diff against target: 743 lines (+306/-105)
4 files modified
drizzled/cursor.cc (+1/-0)
drizzled/message/transaction_reader.cc (+56/-2)
drizzled/transaction_services.cc (+171/-39)
drizzled/transaction_services.h (+78/-64)
To merge this branch: bzr merge lp:~dshrews/drizzle/bug600795
Reviewer Review Type Date Requested Status
Drizzle Merge Team Pending
Review via email: mp+33919@code.launchpad.net

Description of the change

This enables logging of the INSERTs produced by LOAD DATA.

As a consequence, I had to introduce a transaction message size threshold, because otherwise, we would have a large Transaction object for LOAD DATA that would contain all of the INSERTs for a single LOAD DATA statement. That could use a lot of memory and create a large GPB message, which has a maximum of 64M. (Right now, the transaction message size threshold is hard coded to 1M. I'll make this configurable in the near future once our sys var changes are merged into trunk.)

This means that we may have multiple Transactions in the replication stream for a single transaction, but they will all have the same transaction id, and all but the last Transaction will have the Statement end_segment attribute set to FALSE.

So a reader of the stream (or transaction log) will know to COMMIT when either:

a) The transaction id changes
b) Or, the Statement(s) within the Transaction have end_segment = TRUE

To post a comment you must log in.
lp:~dshrews/drizzle/bug600795 updated
1722. By David Shrewsbury

Merge from trunk, resolve conflict with transaction_services.h

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'drizzled/cursor.cc'
--- drizzled/cursor.cc 2010-08-26 17:42:07 +0000
+++ drizzled/cursor.cc 2010-08-28 01:13:43 +0000
@@ -1363,6 +1363,7 @@
1363 break;1363 break;
1364 case SQLCOM_INSERT:1364 case SQLCOM_INSERT:
1365 case SQLCOM_INSERT_SELECT:1365 case SQLCOM_INSERT_SELECT:
1366 case SQLCOM_LOAD:
1366 /*1367 /*
1367 * The else block below represents an 1368 * The else block below represents an
1368 * INSERT ... ON DUPLICATE KEY UPDATE that1369 * INSERT ... ON DUPLICATE KEY UPDATE that
13691370
=== modified file 'drizzled/message/transaction_reader.cc'
--- drizzled/message/transaction_reader.cc 2010-07-17 06:28:29 +0000
+++ drizzled/message/transaction_reader.cc 2010-08-28 01:13:43 +0000
@@ -95,20 +95,74 @@
95 }95 }
96}96}
9797
98static bool isEndStatement(const message::Statement &statement)
99{
100 switch (statement.type())
101 {
102 case (message::Statement::INSERT):
103 {
104 const message::InsertData &data= statement.insert_data();
105 if (not data.end_segment())
106 return false;
107 break;
108 }
109 case (message::Statement::UPDATE):
110 {
111 const message::UpdateData &data= statement.update_data();
112 if (not data.end_segment())
113 return false;
114 break;
115 }
116 case (message::Statement::DELETE):
117 {
118 const message::DeleteData &data= statement.delete_data();
119 if (not data.end_segment())
120 return false;
121 break;
122 }
123 default:
124 return true;
125 }
126 return true;
127}
128
98static void printTransaction(const message::Transaction &transaction)129static void printTransaction(const message::Transaction &transaction)
99{130{
131 static uint64_t last_trx_id= 0;
132 bool should_commit= true;
100 const message::TransactionContext trx= transaction.transaction_context();133 const message::TransactionContext trx= transaction.transaction_context();
101134
102 size_t num_statements= transaction.statement_size();135 size_t num_statements= transaction.statement_size();
103 size_t x;136 size_t x;
104137
105 cout << "START TRANSACTION;" << endl;138 /*
139 * One way to determine when a new transaction begins is when the
140 * transaction id changes. We check that here.
141 */
142 if (trx.transaction_id() != last_trx_id)
143 cout << "START TRANSACTION;" << endl;
144
145 last_trx_id= trx.transaction_id();
146
106 for (x= 0; x < num_statements; ++x)147 for (x= 0; x < num_statements; ++x)
107 {148 {
108 const message::Statement &statement= transaction.statement(x);149 const message::Statement &statement= transaction.statement(x);
150
151 if (should_commit)
152 should_commit= isEndStatement(statement);
153
109 printStatement(statement);154 printStatement(statement);
110 }155 }
111 cout << "COMMIT;" << endl;156
157 /*
158 * If ALL Statements are end segments, we can commit this Transaction.
159 * We can also check to see if the transaction_id changed, but this
160 * wouldn't work for the last Transaction in the transaction log since
161 * we don't have another Transaction to compare to. Checking for all
162 * end segments (like we do above) covers this case.
163 */
164 if (should_commit)
165 cout << "COMMIT;" << endl;
112}166}
113167
114int main(int argc, char* argv[])168int main(int argc, char* argv[])
115169
=== modified file 'drizzled/transaction_services.cc'
--- drizzled/transaction_services.cc 2010-08-26 15:33:12 +0000
+++ drizzled/transaction_services.cc 2010-08-28 01:13:43 +0000
@@ -80,6 +80,9 @@
80namespace drizzled80namespace drizzled
81{81{
8282
83/** @TODO: Make this a system variable */
84static const size_t trx_msg_threshold= 1024 * 1024;
85
83/**86/**
84 * @defgroup Transactions87 * @defgroup Transactions
85 *88 *
@@ -895,7 +898,7 @@
895 return replication_services.isActive();898 return replication_services.isActive();
896}899}
897900
898message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)901message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
899{902{
900 message::Transaction *transaction= in_session->getTransactionMessage();903 message::Transaction *transaction= in_session->getTransactionMessage();
901904
@@ -907,7 +910,7 @@
907 * deleting transaction message when done with it.910 * deleting transaction message when done with it.
908 */911 */
909 transaction= new (nothrow) message::Transaction();912 transaction= new (nothrow) message::Transaction();
910 initTransactionMessage(*transaction, in_session);913 initTransactionMessage(*transaction, in_session, should_inc_trx_id);
911 in_session->setTransactionMessage(transaction);914 in_session->setTransactionMessage(transaction);
912 return transaction;915 return transaction;
913 }916 }
@@ -916,11 +919,17 @@
916}919}
917920
918void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,921void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
919 Session *in_session)922 Session *in_session,
923 bool should_inc_trx_id)
920{924{
921 message::TransactionContext *trx= in_transaction.mutable_transaction_context();925 message::TransactionContext *trx= in_transaction.mutable_transaction_context();
922 trx->set_server_id(in_session->getServerId());926 trx->set_server_id(in_session->getServerId());
923 trx->set_transaction_id(getNextTransactionId());927
928 if (should_inc_trx_id)
929 trx->set_transaction_id(getNextTransactionId());
930 else
931 trx->set_transaction_id(getCurrentTransactionId());
932
924 trx->set_start_timestamp(in_session->getCurrentTimestamp());933 trx->set_start_timestamp(in_session->getCurrentTimestamp());
925}934}
926935
@@ -1014,7 +1023,7 @@
1014 * attach it to the transaction, and push it to replicators.1023 * attach it to the transaction, and push it to replicators.
1015 */1024 */
1016 transaction->Clear();1025 transaction->Clear();
1017 initTransactionMessage(*transaction, in_session);1026 initTransactionMessage(*transaction, in_session, true);
10181027
1019 message::Statement *statement= transaction->add_statement();1028 message::Statement *statement= transaction->add_statement();
10201029
@@ -1029,9 +1038,11 @@
1029}1038}
10301039
1031message::Statement &TransactionServices::getInsertStatement(Session *in_session,1040message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1032 Table *in_table)1041 Table *in_table,
1042 uint32_t *next_segment_id)
1033{1043{
1034 message::Statement *statement= in_session->getStatementMessage();1044 message::Statement *statement= in_session->getStatementMessage();
1045 message::Transaction *transaction= NULL;
10351046
1036 /* 1047 /*
1037 * Check the type for the current Statement message, if it is anything1048 * Check the type for the current Statement message, if it is anything
@@ -1047,21 +1058,59 @@
1047 } 1058 }
1048 else if (statement != NULL)1059 else if (statement != NULL)
1049 {1060 {
1050 const message::InsertHeader &insert_header= statement->insert_header();1061 /*
1051 string old_table_name= insert_header.table_metadata().table_name();1062 * If we've passed our threshold for the statement size (possible for
1063 * a bulk insert), we'll finalize the Statement and Transaction (doing
1064 * the Transaction will keep it from getting huge).
1065 */
1066 if (static_cast<size_t>(statement->ByteSize()) >= trx_msg_threshold)
1067 {
1068 message::InsertData *current_data= statement->mutable_insert_data();
1069
1070 /* Caller should use this value when adding a new record */
1071 *next_segment_id= current_data->segment_id() + 1;
1072
1073 current_data->set_end_segment(false);
1074
1075 /*
1076 * Send the trx message to replicators after finalizing the
1077 * statement and transaction. This will also set the Transaction
1078 * and Statement objects in Session to NULL.
1079 */
1080 commitTransactionMessage(in_session);
1081
1082 /*
1083 * Statement and Transaction should now be NULL, so new ones will get
1084 * created. We reuse the transaction id since we are segmenting
1085 * one transaction.
1086 */
1087 statement= in_session->getStatementMessage();
1088 transaction= getActiveTransactionMessage(in_session, false);
1089 }
1090 else
1091 {
1092 const message::InsertHeader &insert_header= statement->insert_header();
1093 string old_table_name= insert_header.table_metadata().table_name();
1052 1094
1053 string current_table_name;1095 string current_table_name;
1054 (void) in_table->getShare()->getTableName(current_table_name);1096 (void) in_table->getShare()->getTableName(current_table_name);
1055 if (current_table_name.compare(old_table_name))1097 if (current_table_name.compare(old_table_name))
1056 {1098 {
1057 finalizeStatementMessage(*statement, in_session);1099 finalizeStatementMessage(*statement, in_session);
1058 statement= in_session->getStatementMessage();1100 statement= in_session->getStatementMessage();
1101 }
1059 }1102 }
1060 } 1103 }
10611104
1062 if (statement == NULL)1105 if (statement == NULL)
1063 {1106 {
1064 message::Transaction *transaction= getActiveTransactionMessage(in_session);1107 /*
1108 * Transaction will be non-NULL only if we had to segment it due to
1109 * transaction size above.
1110 */
1111 if (transaction == NULL)
1112 transaction= getActiveTransactionMessage(in_session);
1113
1065 /* 1114 /*
1066 * Transaction message initialized and set, but no statement created1115 * Transaction message initialized and set, but no statement created
1067 * yet. We construct one and initialize it, here, then return the1116 * yet. We construct one and initialize it, here, then return the
@@ -1132,10 +1181,11 @@
1132 return true;1181 return true;
1133 }1182 }
11341183
1135 message::Statement &statement= getInsertStatement(in_session, in_table);1184 uint32_t next_segment_id= 1;
1185 message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
11361186
1137 message::InsertData *data= statement.mutable_insert_data();1187 message::InsertData *data= statement.mutable_insert_data();
1138 data->set_segment_id(1);1188 data->set_segment_id(next_segment_id);
1139 data->set_end_segment(true);1189 data->set_end_segment(true);
1140 message::InsertRecord *record= data->add_record();1190 message::InsertRecord *record= data->add_record();
11411191
@@ -1169,9 +1219,11 @@
1169message::Statement &TransactionServices::getUpdateStatement(Session *in_session,1219message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1170 Table *in_table,1220 Table *in_table,
1171 const unsigned char *old_record, 1221 const unsigned char *old_record,
1172 const unsigned char *new_record)1222 const unsigned char *new_record,
1223 uint32_t *next_segment_id)
1173{1224{
1174 message::Statement *statement= in_session->getStatementMessage();1225 message::Statement *statement= in_session->getStatementMessage();
1226 message::Transaction *transaction= NULL;
11751227
1176 /*1228 /*
1177 * Check the type for the current Statement message, if it is anything1229 * Check the type for the current Statement message, if it is anything
@@ -1187,21 +1239,59 @@
1187 }1239 }
1188 else if (statement != NULL)1240 else if (statement != NULL)
1189 {1241 {
1190 const message::UpdateHeader &update_header= statement->update_header();1242 /*
1191 string old_table_name= update_header.table_metadata().table_name();1243 * If we've passed our threshold for the statement size (possible for
11921244 * a bulk insert), we'll finalize the Statement and Transaction (doing
1193 string current_table_name;1245 * the Transaction will keep it from getting huge).
1194 (void) in_table->getShare()->getTableName(current_table_name);1246 */
1195 if (current_table_name.compare(old_table_name))1247 if (static_cast<size_t>(statement->ByteSize()) >= trx_msg_threshold)
1196 {1248 {
1197 finalizeStatementMessage(*statement, in_session);1249 message::UpdateData *current_data= statement->mutable_update_data();
1250
1251 /* Caller should use this value when adding a new record */
1252 *next_segment_id= current_data->segment_id() + 1;
1253
1254 current_data->set_end_segment(false);
1255
1256 /*
1257 * Send the trx message to replicators after finalizing the
1258 * statement and transaction. This will also set the Transaction
1259 * and Statement objects in Session to NULL.
1260 */
1261 commitTransactionMessage(in_session);
1262
1263 /*
1264 * Statement and Transaction should now be NULL, so new ones will get
1265 * created. We reuse the transaction id since we are segmenting
1266 * one transaction.
1267 */
1198 statement= in_session->getStatementMessage();1268 statement= in_session->getStatementMessage();
1269 transaction= getActiveTransactionMessage(in_session, false);
1270 }
1271 else
1272 {
1273 const message::UpdateHeader &update_header= statement->update_header();
1274 string old_table_name= update_header.table_metadata().table_name();
1275
1276 string current_table_name;
1277 (void) in_table->getShare()->getTableName(current_table_name);
1278 if (current_table_name.compare(old_table_name))
1279 {
1280 finalizeStatementMessage(*statement, in_session);
1281 statement= in_session->getStatementMessage();
1282 }
1199 }1283 }
1200 }1284 }
12011285
1202 if (statement == NULL)1286 if (statement == NULL)
1203 {1287 {
1204 message::Transaction *transaction= getActiveTransactionMessage(in_session);1288 /*
1289 * Transaction will be non-NULL only if we had to segment it due to
1290 * transaction size above.
1291 */
1292 if (transaction == NULL)
1293 transaction= getActiveTransactionMessage(in_session);
1294
1205 /* 1295 /*
1206 * Transaction message initialized and set, but no statement created1296 * Transaction message initialized and set, but no statement created
1207 * yet. We construct one and initialize it, here, then return the1297 * yet. We construct one and initialize it, here, then return the
@@ -1288,10 +1378,11 @@
1288 if (! replication_services.isActive())1378 if (! replication_services.isActive())
1289 return;1379 return;
12901380
1291 message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);1381 uint32_t next_segment_id= 1;
1382 message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
12921383
1293 message::UpdateData *data= statement.mutable_update_data();1384 message::UpdateData *data= statement.mutable_update_data();
1294 data->set_segment_id(1);1385 data->set_segment_id(next_segment_id);
1295 data->set_end_segment(true);1386 data->set_end_segment(true);
1296 message::UpdateRecord *record= data->add_record();1387 message::UpdateRecord *record= data->add_record();
12971388
@@ -1375,9 +1466,11 @@
1375}1466}
13761467
1377message::Statement &TransactionServices::getDeleteStatement(Session *in_session,1468message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1378 Table *in_table)1469 Table *in_table,
1470 uint32_t *next_segment_id)
1379{1471{
1380 message::Statement *statement= in_session->getStatementMessage();1472 message::Statement *statement= in_session->getStatementMessage();
1473 message::Transaction *transaction= NULL;
13811474
1382 /*1475 /*
1383 * Check the type for the current Statement message, if it is anything1476 * Check the type for the current Statement message, if it is anything
@@ -1393,21 +1486,59 @@
1393 }1486 }
1394 else if (statement != NULL)1487 else if (statement != NULL)
1395 {1488 {
1396 const message::DeleteHeader &delete_header= statement->delete_header();1489 /*
1397 string old_table_name= delete_header.table_metadata().table_name();1490 * If we've passed our threshold for the statement size (possible for
13981491 * a bulk insert), we'll finalize the Statement and Transaction (doing
1399 string current_table_name;1492 * the Transaction will keep it from getting huge).
1400 (void) in_table->getShare()->getTableName(current_table_name);1493 */
1401 if (current_table_name.compare(old_table_name))1494 if (static_cast<size_t>(statement->ByteSize()) >= trx_msg_threshold)
1402 {1495 {
1403 finalizeStatementMessage(*statement, in_session);1496 message::DeleteData *current_data= statement->mutable_delete_data();
1497
1498 /* Caller should use this value when adding a new record */
1499 *next_segment_id= current_data->segment_id() + 1;
1500
1501 current_data->set_end_segment(false);
1502
1503 /*
1504 * Send the trx message to replicators after finalizing the
1505 * statement and transaction. This will also set the Transaction
1506 * and Statement objects in Session to NULL.
1507 */
1508 commitTransactionMessage(in_session);
1509
1510 /*
1511 * Statement and Transaction should now be NULL, so new ones will get
1512 * created. We reuse the transaction id since we are segmenting
1513 * one transaction.
1514 */
1404 statement= in_session->getStatementMessage();1515 statement= in_session->getStatementMessage();
1516 transaction= getActiveTransactionMessage(in_session, false);
1517 }
1518 else
1519 {
1520 const message::DeleteHeader &delete_header= statement->delete_header();
1521 string old_table_name= delete_header.table_metadata().table_name();
1522
1523 string current_table_name;
1524 (void) in_table->getShare()->getTableName(current_table_name);
1525 if (current_table_name.compare(old_table_name))
1526 {
1527 finalizeStatementMessage(*statement, in_session);
1528 statement= in_session->getStatementMessage();
1529 }
1405 }1530 }
1406 }1531 }
14071532
1408 if (statement == NULL)1533 if (statement == NULL)
1409 {1534 {
1410 message::Transaction *transaction= getActiveTransactionMessage(in_session);1535 /*
1536 * Transaction will be non-NULL only if we had to segment it due to
1537 * transaction size above.
1538 */
1539 if (transaction == NULL)
1540 transaction= getActiveTransactionMessage(in_session);
1541
1411 /* 1542 /*
1412 * Transaction message initialized and set, but no statement created1543 * Transaction message initialized and set, but no statement created
1413 * yet. We construct one and initialize it, here, then return the1544 * yet. We construct one and initialize it, here, then return the
@@ -1469,10 +1600,11 @@
1469 if (! replication_services.isActive())1600 if (! replication_services.isActive())
1470 return;1601 return;
14711602
1472 message::Statement &statement= getDeleteStatement(in_session, in_table);1603 uint32_t next_segment_id;
1604 message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
14731605
1474 message::DeleteData *data= statement.mutable_delete_data();1606 message::DeleteData *data= statement.mutable_delete_data();
1475 data->set_segment_id(1);1607 data->set_segment_id(next_segment_id);
1476 data->set_end_segment(true);1608 data->set_end_segment(true);
1477 message::DeleteRecord *record= data->add_record();1609 message::DeleteRecord *record= data->add_record();
14781610
14791611
=== modified file 'drizzled/transaction_services.h'
--- drizzled/transaction_services.h 2010-08-26 15:33:12 +0000
+++ drizzled/transaction_services.h 2010-08-28 01:13:43 +0000
@@ -80,27 +80,35 @@
80 /**80 /**
81 * Method which returns the active Transaction message81 * Method which returns the active Transaction message
82 * for the supplied Session. If one is not found, a new Transaction82 * for the supplied Session. If one is not found, a new Transaction
83 * message is allocated, initialized, and returned.83 * message is allocated, initialized, and returned. It is possible that
84 * we may want to NOT increment the transaction id for a new Transaction
85 * object (e.g., splitting up Transactions into smaller chunks). The
86 * should_inc_trx_id flag controls if we do this.
84 *87 *
85 * @param The session processing the transaction88 * @param in_session The session processing the transaction
89 * @param should_inc_trx_id If true, increments the transaction id for a new trx
86 */90 */
87 message::Transaction *getActiveTransactionMessage(Session *in_session);91 message::Transaction *getActiveTransactionMessage(Session *in_session,
92 bool should_inc_trx_id= true);
88 /** 93 /**
89 * Method which attaches a transaction context94 * Method which attaches a transaction context
90 * the supplied transaction based on the supplied Session's95 * the supplied transaction based on the supplied Session's
91 * transaction information. This method also ensure the96 * transaction information. This method also ensure the
92 * transaction message is attached properly to the Session object97 * transaction message is attached properly to the Session object
93 *98 *
94 * @param The transaction message to initialize99 * @param in_transaction The transaction message to initialize
95 * @param The Session processing this transaction100 * @param in_session The Session processing this transaction
101 * @param should_inc_trx_id If true, increments the transaction id for a new trx
96 */102 */
97 void initTransactionMessage(message::Transaction &in_transaction, Session *in_session);103 void initTransactionMessage(message::Transaction &in_transaction,
104 Session *in_session,
105 bool should_inc_trx_id);
98 /** 106 /**
99 * Helper method which finalizes data members for the 107 * Helper method which finalizes data members for the
100 * supplied transaction's context.108 * supplied transaction's context.
101 *109 *
102 * @param The transaction message to finalize 110 * @param in_transaction The transaction message to finalize
103 * @param The Session processing this transaction111 * @param in_session The Session processing this transaction
104 */112 */
105 void finalizeTransactionMessage(message::Transaction &in_transaction, Session *in_session);113 void finalizeTransactionMessage(message::Transaction &in_transaction, Session *in_session);
106 /**114 /**
@@ -112,9 +120,9 @@
112 /**120 /**
113 * Helper method which initializes a Statement message121 * Helper method which initializes a Statement message
114 *122 *
115 * @param The statement to initialize123 * @param statement The statement to initialize
116 * @param The type of the statement124 * @param in_type The type of the statement
117 * @param The session processing this statement125 * @param in_session The session processing this statement
118 */126 */
119 void initStatementMessage(message::Statement &statement,127 void initStatementMessage(message::Statement &statement,
120 message::Statement::Type in_type,128 message::Statement::Type in_type,
@@ -123,27 +131,29 @@
123 * Finalizes a Statement message and sets the Session's statement131 * Finalizes a Statement message and sets the Session's statement
124 * message to NULL.132 * message to NULL.
125 *133 *
126 * @param The statement to initialize134 * @param statement The statement to initialize
127 * @param The session processing this statement135 * @param in_session The session processing this statement
128 */136 */
129 void finalizeStatementMessage(message::Statement &statement,137 void finalizeStatementMessage(message::Statement &statement,
130 Session *in_session);138 Session *in_session);
131 /** Helper method which returns an initialized Statement message for methods139 /** Helper method which returns an initialized Statement message for methods
132 * doing insertion of data.140 * doing insertion of data.
133 *141 *
134 * @param[in] Pointer to the Session doing the processing142 * @param[in] in_session Pointer to the Session doing the processing
135 * @param[in] Pointer to the Table object being inserted into143 * @param[in] in_table Pointer to the Table object being inserted into
144 * @param[out] next_segment_id The next Statement segment id to be used
136 */145 */
137 message::Statement &getInsertStatement(Session *in_session,146 message::Statement &getInsertStatement(Session *in_session,
138 Table *in_table);147 Table *in_table,
148 uint32_t *next_segment_id);
139149
140 /**150 /**
141 * Helper method which initializes the header message for151 * Helper method which initializes the header message for
142 * insert operations.152 * insert operations.
143 *153 *
144 * @param[inout] Statement message container to modify154 * @param[in,out] statement Statement message container to modify
145 * @param[in] Pointer to the Session doing the processing155 * @param[in] in_session Pointer to the Session doing the processing
146 * @param[in] Pointer to the Table being inserted into156 * @param[in] in_table Pointer to the Table being inserted into
147 */157 */
148 void setInsertHeader(message::Statement &statement,158 void setInsertHeader(message::Statement &statement,
149 Session *in_session,159 Session *in_session,
@@ -152,24 +162,26 @@
152 * Helper method which returns an initialized Statement162 * Helper method which returns an initialized Statement
153 * message for methods doing updates of data.163 * message for methods doing updates of data.
154 *164 *
155 * @param[in] Pointer to the Session doing the processing165 * @param[in] in_session Pointer to the Session doing the processing
156 * @param[in] Pointer to the Table object being updated166 * @param[in] in_table Pointer to the Table object being updated
157 * @param[in] Pointer to the old data in the record167 * @param[in] old_record Pointer to the old data in the record
158 * @param[in] Pointer to the new data in the record168 * @param[in] new_record Pointer to the new data in the record
169 * @param[out] next_segment_id The next Statement segment id to be used
159 */170 */
160 message::Statement &getUpdateStatement(Session *in_session,171 message::Statement &getUpdateStatement(Session *in_session,
161 Table *in_table,172 Table *in_table,
162 const unsigned char *old_record, 173 const unsigned char *old_record,
163 const unsigned char *new_record);174 const unsigned char *new_record,
175 uint32_t *next_segment_id);
164 /**176 /**
165 * Helper method which initializes the header message for177 * Helper method which initializes the header message for
166 * update operations.178 * update operations.
167 *179 *
168 * @param[inout] Statement message container to modify180 * @param[in,out] statement Statement message container to modify
169 * @param[in] Pointer to the Session doing the processing181 * @param[in] in_session Pointer to the Session doing the processing
170 * @param[in] Pointer to the Table being updated182 * @param[in] in_table Pointer to the Table being updated
171 * @param[in] Pointer to the old data in the record183 * @param[in] old_record Pointer to the old data in the record
172 * @param[in] Pointer to the new data in the record184 * @param[in] new_record Pointer to the new data in the record
173 */185 */
174 void setUpdateHeader(message::Statement &statement,186 void setUpdateHeader(message::Statement &statement,
175 Session *in_session,187 Session *in_session,
@@ -180,19 +192,21 @@
180 * Helper method which returns an initialized Statement192 * Helper method which returns an initialized Statement
181 * message for methods doing deletion of data.193 * message for methods doing deletion of data.
182 *194 *
183 * @param[in] Pointer to the Session doing the processing195 * @param[in] in_session Pointer to the Session doing the processing
184 * @param[in] Pointer to the Table object being deleted from196 * @param[in] in_table Pointer to the Table object being deleted from
197 * @param[out] next_segment_id The next Statement segment id to be used
185 */198 */
186 message::Statement &getDeleteStatement(Session *in_session,199 message::Statement &getDeleteStatement(Session *in_session,
187 Table *in_table);200 Table *in_table,
201 uint32_t *next_segment_id);
188202
189 /**203 /**
190 * Helper method which initializes the header message for204 * Helper method which initializes the header message for
191 * insert operations.205 * insert operations.
192 *206 *
193 * @param[inout] Statement message container to modify207 * @param[in,out] statement Statement message container to modify
194 * @param[in] Pointer to the Session doing the processing208 * @param[in] in_session Pointer to the Session doing the processing
195 * @param[in] Pointer to the Table being deleted from209 * @param[in] in_table Pointer to the Table being deleted from
196 */210 */
197 void setDeleteHeader(message::Statement &statement,211 void setDeleteHeader(message::Statement &statement,
198 Session *in_session,212 Session *in_session,
@@ -201,22 +215,22 @@
201 * Commits a normal transaction (see above) and pushes the transaction215 * Commits a normal transaction (see above) and pushes the transaction
202 * message out to the replicators.216 * message out to the replicators.
203 *217 *
204 * @param Pointer to the Session committing the transaction218 * @param in_session Pointer to the Session committing the transaction
205 */219 */
206 int commitTransactionMessage(Session *in_session);220 int commitTransactionMessage(Session *in_session);
207 /** 221 /**
208 * Marks the current active transaction message as being rolled back and222 * Marks the current active transaction message as being rolled back and
209 * pushes the transaction message out to replicators.223 * pushes the transaction message out to replicators.
210 *224 *
211 * @param Pointer to the Session committing the transaction225 * @param in_session Pointer to the Session committing the transaction
212 */226 */
213 void rollbackTransactionMessage(Session *in_session);227 void rollbackTransactionMessage(Session *in_session);
214 /**228 /**
215 * Creates a new InsertRecord GPB message and pushes it to229 * Creates a new InsertRecord GPB message and pushes it to
216 * replicators.230 * replicators.
217 *231 *
218 * @param Pointer to the Session which has inserted a record232 * @param in_session Pointer to the Session which has inserted a record
219 * @param Pointer to the Table containing insert information233 * @param in_table Pointer to the Table containing insert information
220 *234 *
221 * Grr, returning "true" here on error because of the cursor235 * Grr, returning "true" here on error because of the cursor
222 * reversed bool return crap...fix that.236 * reversed bool return crap...fix that.
@@ -226,10 +240,10 @@
226 * Creates a new UpdateRecord GPB message and pushes it to240 * Creates a new UpdateRecord GPB message and pushes it to
227 * replicators.241 * replicators.
228 *242 *
229 * @param Pointer to the Session which has updated a record243 * @param in_session Pointer to the Session which has updated a record
230 * @param Pointer to the Table containing update information244 * @param in_table Pointer to the Table containing update information
231 * @param Pointer to the raw bytes representing the old record/row245 * @param old_record Pointer to the raw bytes representing the old record/row
232 * @param Pointer to the raw bytes representing the new record/row 246 * @param new_record Pointer to the raw bytes representing the new record/row
233 */247 */
234 void updateRecord(Session *in_session, 248 void updateRecord(Session *in_session,
235 Table *in_table, 249 Table *in_table,
@@ -249,8 +263,8 @@
249 * to the Session's active Transaction GPB message for pushing263 * to the Session's active Transaction GPB message for pushing
250 * out to the replicator streams.264 * out to the replicator streams.
251 *265 *
252 * @param[in] Pointer to the Session which issued the statement266 * @param[in] in_session Pointer to the Session which issued the statement
253 * @param[in] message::Schema message describing new schema267 * @param[in] schema message::Schema message describing new schema
254 */268 */
255 void createSchema(Session *in_session, const message::Schema &schema);269 void createSchema(Session *in_session, const message::Schema &schema);
256 /**270 /**
@@ -258,8 +272,8 @@
258 * to the Session's active Transaction GPB message for pushing272 * to the Session's active Transaction GPB message for pushing
259 * out to the replicator streams.273 * out to the replicator streams.
260 *274 *
261 * @param[in] Pointer to the Session which issued the statement275 * @param[in] in_session Pointer to the Session which issued the statement
262 * @param[in] message::Schema message describing new schema276 * @param[in] schema_name message::Schema message describing new schema
263 */277 */
264 void dropSchema(Session *in_session, const std::string &schema_name);278 void dropSchema(Session *in_session, const std::string &schema_name);
265 /**279 /**
@@ -267,8 +281,8 @@
267 * to the Session's active Transaction GPB message for pushing281 * to the Session's active Transaction GPB message for pushing
268 * out to the replicator streams.282 * out to the replicator streams.
269 *283 *
270 * @param[in] Pointer to the Session which issued the statement284 * @param[in] in_session Pointer to the Session which issued the statement
271 * @param[in] message::Table message describing new schema285 * @param[in] table message::Table message describing new schema
272 */286 */
273 void createTable(Session *in_session, const message::Table &table);287 void createTable(Session *in_session, const message::Table &table);
274 /**288 /**
@@ -276,10 +290,10 @@
276 * to the Session's active Transaction GPB message for pushing290 * to the Session's active Transaction GPB message for pushing
277 * out to the replicator streams.291 * out to the replicator streams.
278 *292 *
279 * @param[in] Pointer to the Session which issued the statement293 * @param[in] in_session Pointer to the Session which issued the statement
280 * @param[in] The schema of the table being dropped294 * @param[in] schema_name The schema of the table being dropped
281 * @param[in] The table name of the table being dropped295 * @param[in] table_name The table name of the table being dropped
282 * @param[in] Did the user specify an IF EXISTS clause?296 * @param[in] if_exists Did the user specify an IF EXISTS clause?
283 */297 */
284 void dropTable(Session *in_session,298 void dropTable(Session *in_session,
285 const std::string &schema_name,299 const std::string &schema_name,
@@ -290,8 +304,8 @@
290 * to the Session's active Transaction GPB message for pushing304 * to the Session's active Transaction GPB message for pushing
291 * out to the replicator streams.305 * out to the replicator streams.
292 *306 *
293 * @param[in] Pointer to the Session which issued the statement307 * @param[in] in_session Pointer to the Session which issued the statement
294 * @param[in] The Table being truncated308 * @param[in] in_table The Table being truncated
295 */309 */
296 void truncateTable(Session *in_session, Table *in_table);310 void truncateTable(Session *in_session, Table *in_table);
297 /**311 /**
@@ -303,8 +317,8 @@
303 * on the I_S, etc. Not sure what to do with administrative317 * on the I_S, etc. Not sure what to do with administrative
304 * commands like CHECK TABLE, though..318 * commands like CHECK TABLE, though..
305 *319 *
306 * @param Pointer to the Session which issued the statement320 * @param in_session Pointer to the Session which issued the statement
307 * @param Query string321 * @param query Query string
308 */322 */
309 void rawStatement(Session *in_session, const std::string &query);323 void rawStatement(Session *in_session, const std::string &query);
310 /* transactions: interface to plugin::StorageEngine functions */324 /* transactions: interface to plugin::StorageEngine functions */
@@ -334,9 +348,9 @@
334 * per statement, and therefore should not need to be idempotent.348 * per statement, and therefore should not need to be idempotent.
335 * Put in assert()s to test this.349 * Put in assert()s to test this.
336 *350 *
337 * @param[in] Session pointer351 * @param[in] session Session pointer
338 * @param[in] Descriptor for the resource which will be participating352 * @param[in] monitored Descriptor for the resource which will be participating
339 * @param[in] Pointer to the TransactionalStorageEngine resource353 * @param[in] engine Pointer to the TransactionalStorageEngine resource
340 */354 */
341 void registerResourceForStatement(Session *session,355 void registerResourceForStatement(Session *session,
342 plugin::MonitoredInTransaction *monitored,356 plugin::MonitoredInTransaction *monitored,
@@ -356,10 +370,10 @@
356 * per statement, and therefore should not need to be idempotent.370 * per statement, and therefore should not need to be idempotent.
357 * Put in assert()s to test this.371 * Put in assert()s to test this.
358 *372 *
359 * @param[in] Session pointer373 * @param[in] session Session pointer
360 * @param[in] Descriptor for the resource which will be participating374 * @param[in] monitored Descriptor for the resource which will be participating
361 * @param[in] Pointer to the TransactionalStorageEngine resource375 * @param[in] engine Pointer to the TransactionalStorageEngine resource
362 * @param[in] Pointer to the XaResourceManager resource manager376 * @param[in] resource_manager Pointer to the XaResourceManager resource manager
363 */377 */
364 void registerResourceForStatement(Session *session,378 void registerResourceForStatement(Session *session,
365 plugin::MonitoredInTransaction *monitored,379 plugin::MonitoredInTransaction *monitored,