Merge lp:~skinny.moey/drizzle/multi_level_slave into lp:~drizzle-trunk/drizzle/development

Proposed by Joe Daly
Status: Work in progress
Proposed branch: lp:~skinny.moey/drizzle/multi_level_slave
Merge into: lp:~drizzle-trunk/drizzle/development
Diff against target: 655 lines (+191/-30)
16 files modified
drizzled/execute.cc (+2/-0)
drizzled/session.cc (+3/-0)
drizzled/session.h (+30/-1)
plugin/innobase/dict/create_replication.cc (+43/-4)
plugin/innobase/handler/ha_innodb.cc (+8/-1)
plugin/innobase/handler/replication_dictionary.cc (+7/-1)
plugin/innobase/include/create_replication.h (+6/-3)
plugin/innobase/include/read_replication.h (+2/-0)
plugin/innobase/tests/r/innodb-system-table-view.result (+6/-4)
plugin/innobase/tests/r/innodb_replication_log.result (+2/-0)
plugin/schema_dictionary/tests/r/data_dictionary.result (+17/-1)
plugin/slave/queue_consumer.cc (+33/-7)
plugin/slave/queue_consumer.h (+9/-2)
plugin/slave/queue_producer.cc (+11/-3)
plugin/slave/queue_producer.h (+2/-0)
plugin/slave/replication_schema.cc (+10/-3)
To merge this branch: bzr merge lp:~skinny.moey/drizzle/multi_level_slave
Reviewer Review Type Date Requested Status
Drizzle Merge Team Pending
Review via email: mp+55990@code.launchpad.net

Description of the change

This change adds the column ORIGINATING_SERVER_ID and ORIGINATING_COMMIT_ID to the DATA_DICTIONARY.SYS_REPLICATION_LOG and the SYS_REPLICATION_LOG.APPLIER_STATE table. These fields are added to allow a configuration where a 2nd level slave exists, to allow for recovery should a master fail, such as A->B->C (A replicates to B, B replicates to C). With these new columns C can be reconfigured to point to A with no guesswork as to the correct position to start at.

To post a comment you must log in.
Revision history for this message
Joe Daly (skinny.moey) wrote :

on hold at the moment

Revision history for this message
Patrick Crews (patrick-crews) wrote :

Seeing a significant number of random failures on the randgen slave plugin tests with this code. Not tied to Joe's work, but it appears to be tied to the multi-master code itself.

Still testing current trunk vs. pre-merge trunk to make sure of this

Revision history for this message
Patrick Crews (patrick-crews) wrote :

It would appear that we need to back out the multi-master code from trunk - it is breaking the randgen slave_plugin tests (for single master/slave) with some regularity. Many runs pass with flying colors, but about 1/6 - 1/10 will have a failing test case with differences between master and slave (often the crash test though it varies).

Will recommend working with Shrews in his beta-multi-master branch until this is ironed out.
To repeat:
./dbqp --mode=randgen --randgen-path=/path/to/randgen --suite=slave_plugin

Unmerged revisions

2263. By Joe Daly

add originating_server_id and originating_commit_id for applier_state table, this is needed for a slave that does not have the replication log turned on

2262. By Joe Daly

merge trunk

2261. By Joe Daly

add ORIGINATING_SERVER_ID and +INNODB_REPLICATION_LOG DATA_DICTIONARY to INNODB_REPLICATION_LOG table

2260. By Joe Daly

merge trunk

2259. By Joe Daly

fix tests, and fix up if block setting originating_server_id and commit_id

2258. By Joe Daly

add ORIGINATING_SERVER_ID and ORIGINATING_COMMIT_ID field

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'drizzled/execute.cc'
--- drizzled/execute.cc 2011-02-18 16:08:59 +0000
+++ drizzled/execute.cc 2011-04-01 19:01:31 +0000
@@ -67,6 +67,8 @@
67 // session. Eventually we will allow someone to change the effective67 // session. Eventually we will allow someone to change the effective
68 // user.68 // user.
69 new_session->user()= _session.user();69 new_session->user()= _session.user();
70 new_session->setOriginatingServerID(_session.getOriginatingServerID());
71 new_session->setOriginatingCommitID(_session.getOriginatingCommitID());
70 72
71 if (Session::schedule(new_session))73 if (Session::schedule(new_session))
72 {74 {
7375
=== modified file 'drizzled/session.cc'
--- drizzled/session.cc 2011-03-28 02:46:21 +0000
+++ drizzled/session.cc 2011-04-01 19:01:31 +0000
@@ -223,6 +223,7 @@
223 tablespace_op(false),223 tablespace_op(false),
224 use_usage(false),224 use_usage(false),
225 security_ctx(identifier::User::make_shared()),225 security_ctx(identifier::User::make_shared()),
226 originating_server_id_set(false),
226 client(client_arg)227 client(client_arg)
227{228{
228 client->setSession(this);229 client->setSession(this);
@@ -238,6 +239,8 @@
238 lex().current_select= 0;239 lex().current_select= 0;
239 memset(&variables, 0, sizeof(variables));240 memset(&variables, 0, sizeof(variables));
240 scoreboard_index= -1;241 scoreboard_index= -1;
242 originating_server_id= 0;
243 originating_commit_id= 0;
241 cleanup_done= abort_on_warning= no_warnings_for_error= false;244 cleanup_done= abort_on_warning= no_warnings_for_error= false;
242245
243 /* query_cache init */246 /* query_cache init */
244247
=== modified file 'drizzled/session.h'
--- drizzled/session.h 2011-03-28 14:49:14 +0000
+++ drizzled/session.h 2011-04-01 19:01:31 +0000
@@ -293,6 +293,32 @@
293 scoreboard_index= in_scoreboard_index;293 scoreboard_index= in_scoreboard_index;
294 }294 }
295295
296 bool isOriginatingServerIDSet()
297 {
298 return originating_server_id_set;
299 }
300
301 void setOriginatingServerID(uint32_t in_originating_server_id)
302 {
303 originating_server_id= in_originating_server_id;
304 originating_server_id_set= true;
305 }
306
307 uint32_t getOriginatingServerID()
308 {
309 return originating_server_id;
310 }
311
312 void setOriginatingCommitID(uint64_t in_originating_commit_id)
313 {
314 originating_commit_id= in_originating_commit_id;
315 }
316
317 uint64_t getOriginatingCommitID()
318 {
319 return originating_commit_id;
320 }
321
296 /**322 /**
297 * Is this session viewable by the current user?323 * Is this session viewable by the current user?
298 */324 */
@@ -1370,7 +1396,10 @@
1370 bool tablespace_op; /**< This is true in DISCARD/IMPORT TABLESPACE */1396 bool tablespace_op; /**< This is true in DISCARD/IMPORT TABLESPACE */
1371 bool use_usage;1397 bool use_usage;
1372 rusage usage;1398 rusage usage;
1373 identifier::user::mptr security_ctx;1399 identifier::user::mptr security_ctx;
1400 bool originating_server_id_set;
1401 uint32_t originating_server_id;
1402 uint64_t originating_commit_id;
1374 int32_t scoreboard_index;1403 int32_t scoreboard_index;
1375 plugin::Client *client;1404 plugin::Client *client;
1376 util::string::shared_ptr _schema;1405 util::string::shared_ptr _schema;
13771406
=== modified file 'plugin/innobase/dict/create_replication.cc'
--- plugin/innobase/dict/create_replication.cc 2011-03-22 23:52:29 +0000
+++ plugin/innobase/dict/create_replication.cc 2011-04-01 19:01:31 +0000
@@ -76,7 +76,7 @@
76 error = que_eval_sql(info,76 error = que_eval_sql(info,
77 "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"77 "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
78 "BEGIN\n"78 "BEGIN\n"
79 "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n" 79 "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), ORIGINATING_SERVER_ID INT, ORIGINATING_COMMIT_ID INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
80 "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"80 "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
81 "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"81 "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
82 "END;\n"82 "END;\n"
@@ -147,6 +147,14 @@
147 field->set_type(drizzled::message::Table::Field::BIGINT);147 field->set_type(drizzled::message::Table::Field::BIGINT);
148148
149 field= table_message->add_field();149 field= table_message->add_field();
150 field->set_name("ORIGINATING_SERVER_ID");
151 field->set_type(drizzled::message::Table::Field::INTEGER);
152
153 field= table_message->add_field();
154 field->set_name("ORIGINATING_COMMIT_ID");
155 field->set_type(drizzled::message::Table::Field::BIGINT);
156
157 field= table_message->add_field();
150 field->set_name("MESSAGE_LEN");158 field->set_name("MESSAGE_LEN");
151 field->set_type(drizzled::message::Table::Field::INTEGER);159 field->set_type(drizzled::message::Table::Field::INTEGER);
152160
@@ -191,7 +199,10 @@
191ulint insert_replication_message(const char *message, size_t size, 199ulint insert_replication_message(const char *message, size_t size,
192 trx_t *trx, uint64_t trx_id, 200 trx_t *trx, uint64_t trx_id,
193 uint64_t end_timestamp, bool is_end_segment, 201 uint64_t end_timestamp, bool is_end_segment,
194 uint32_t seg_id) 202 uint32_t seg_id, uint32_t server_id,
203 bool use_originating_server_id,
204 uint32_t originating_server_id,
205 uint64_t originating_commit_id)
195{206{
196 ulint error;207 ulint error;
197 row_prebuilt_t* prebuilt; /* For reading rows */208 row_prebuilt_t* prebuilt; /* For reading rows */
@@ -251,12 +262,30 @@
251 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));262 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
252 dfield_set_data(dfield, data, 8);263 dfield_set_data(dfield, data, 8);
253264
265 if (not use_originating_server_id)
266 {
267 /* This transaction originated from this server, rather then being
268 replicated to this server reset the values to reflect that */
269 originating_server_id= server_id;
270 originating_commit_id= commit_id;
271 }
272
254 dfield = dtuple_get_nth_field(dtuple, 4);273 dfield = dtuple_get_nth_field(dtuple, 4);
255 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));274 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
275 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&originating_server_id, 4, dict_table_is_comp(prebuilt->table));
276 dfield_set_data(dfield, data, 4);
277
278 dfield = dtuple_get_nth_field(dtuple, 5);
279 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
280 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&originating_commit_id, 8, dict_table_is_comp(prebuilt->table));
281 dfield_set_data(dfield, data, 8);
282
283 dfield = dtuple_get_nth_field(dtuple, 6);
284 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
256 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));285 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
257 dfield_set_data(dfield, data, 4);286 dfield_set_data(dfield, data, 4);
258287
259 dfield = dtuple_get_nth_field(dtuple, 5);288 dfield = dtuple_get_nth_field(dtuple, 7);
260 dfield_set_data(dfield, message, size);289 dfield_set_data(dfield, message, size);
261290
262 ins_node_t* node = prebuilt->ins_node;291 ins_node_t* node = prebuilt->ins_node;
@@ -355,8 +384,18 @@
355 convert_to_mysql_format(timestampbyte, field, 8);384 convert_to_mysql_format(timestampbyte, field, 8);
356 ret.end_timestamp= *(uint64_t *)timestampbyte;385 ret.end_timestamp= *(uint64_t *)timestampbyte;
357386
387 field = rec_get_nth_field_old(rec, 6, &len);
388 byte serverbyte[4];
389 convert_to_mysql_format(serverbyte, field, 4);
390 ret.originating_server_id= *(uint32_t *)serverbyte;
391
392 field = rec_get_nth_field_old(rec, 7, &len);
393 byte originatingcommitbyte[8];
394 convert_to_mysql_format(originatingcommitbyte, field, 8);
395 ret.originating_commit_id= *(uint64_t *)originatingcommitbyte;
396
358 // Handler message397 // Handler message
359 field = rec_get_nth_field_old(rec, 7, &len);398 field = rec_get_nth_field_old(rec, 9, &len);
360 ret.message= (char *)field;399 ret.message= (char *)field;
361 ret.message_length= len;400 ret.message_length= len;
362401
363402
=== modified file 'plugin/innobase/handler/ha_innodb.cc'
--- plugin/innobase/handler/ha_innodb.cc 2011-03-29 21:04:17 +0000
+++ plugin/innobase/handler/ha_innodb.cc 2011-04-01 19:01:31 +0000
@@ -966,8 +966,15 @@
966 uint64_t end_timestamp= message.transaction_context().end_timestamp();966 uint64_t end_timestamp= message.transaction_context().end_timestamp();
967 bool is_end_segment= message.end_segment();967 bool is_end_segment= message.end_segment();
968 trx->log_commit_id= TRUE;968 trx->log_commit_id= TRUE;
969 uint32_t server_id= message.transaction_context().server_id();
970 uint32_t originating_server_id= session.getOriginatingServerID();
971 uint64_t originating_commit_id= session.getOriginatingCommitID();
972 bool use_originating_server_id= session.isOriginatingServerIDSet();
973
969 ulint error= insert_replication_message(data, message.ByteSize(), trx, trx_id,974 ulint error= insert_replication_message(data, message.ByteSize(), trx, trx_id,
970 end_timestamp, is_end_segment, seg_id);975 end_timestamp, is_end_segment, seg_id, server_id,
976 use_originating_server_id, originating_server_id,
977 originating_commit_id);
971 (void)error;978 (void)error;
972979
973 delete[] data;980 delete[] data;
974981
=== modified file 'plugin/innobase/handler/replication_dictionary.cc'
--- plugin/innobase/handler/replication_dictionary.cc 2011-02-17 00:14:13 +0000
+++ plugin/innobase/handler/replication_dictionary.cc 2011-04-01 19:01:31 +0000
@@ -79,6 +79,8 @@
79 add_field("TRANSACTION_SEGMENT_ID", plugin::TableFunction::NUMBER, 0, false);79 add_field("TRANSACTION_SEGMENT_ID", plugin::TableFunction::NUMBER, 0, false);
80 add_field("COMMIT_ID", plugin::TableFunction::NUMBER, 0, false);80 add_field("COMMIT_ID", plugin::TableFunction::NUMBER, 0, false);
81 add_field("END_TIMESTAMP", plugin::TableFunction::NUMBER, 0, false);81 add_field("END_TIMESTAMP", plugin::TableFunction::NUMBER, 0, false);
82 add_field("ORIGINATING_SERVER_ID", plugin::TableFunction::NUMBER, 0, false);
83 add_field("ORIGINATING_COMMIT_ID", plugin::TableFunction::NUMBER, 0, false);
82 add_field("TRANSACTION_MESSAGE_STRING", plugin::TableFunction::STRING, transaction_message_threshold, false);84 add_field("TRANSACTION_MESSAGE_STRING", plugin::TableFunction::STRING, transaction_message_threshold, false);
83 add_field("TRANSACTION_LENGTH", plugin::TableFunction::NUMBER, 0, false);85 add_field("TRANSACTION_LENGTH", plugin::TableFunction::NUMBER, 0, false);
84}86}
@@ -110,7 +112,11 @@
110 push(static_cast<uint64_t>(ret.commit_id));112 push(static_cast<uint64_t>(ret.commit_id));
111113
112 push(static_cast<uint64_t>(ret.end_timestamp));114 push(static_cast<uint64_t>(ret.end_timestamp));
113 115
116 push(static_cast<uint64_t>(ret.originating_server_id));
117
118 push(static_cast<uint64_t>(ret.originating_commit_id));
119
114 /* Message in viewable format */120 /* Message in viewable format */
115 bool result= message.ParseFromArray(ret.message, ret.message_length);121 bool result= message.ParseFromArray(ret.message, ret.message_length);
116122
117123
=== modified file 'plugin/innobase/include/create_replication.h'
--- plugin/innobase/include/create_replication.h 2011-03-14 05:40:28 +0000
+++ plugin/innobase/include/create_replication.h 2011-04-01 19:01:31 +0000
@@ -45,10 +45,13 @@
4545
46UNIV_INTERN ulint dict_create_sys_replication_log(void);46UNIV_INTERN ulint dict_create_sys_replication_log(void);
4747
48UNIV_INTERN ulint insert_replication_message(const char *message, size_t size, 48UNIV_INTERN ulint insert_replication_message(const char *message, size_t size,
49 trx_t *trx, uint64_t trx_id, 49 trx_t *trx, uint64_t trx_id,
50 uint64_t end_timestamp, bool is_end_segment,50 uint64_t end_timestamp, bool is_end_segment,
51 uint32_t seg_id);51 uint32_t seg_id, uint32_t server_id,
52 bool use_originating_server_id,
53 uint32_t originating_server_id,
54 uint64_t originating_commit_id);
5255
53UNIV_INTERN struct read_replication_state_st *replication_read_init(void);56UNIV_INTERN struct read_replication_state_st *replication_read_init(void);
54UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *);57UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *);
5558
=== modified file 'plugin/innobase/include/read_replication.h'
--- plugin/innobase/include/read_replication.h 2011-03-14 05:40:28 +0000
+++ plugin/innobase/include/read_replication.h 2011-04-01 19:01:31 +0000
@@ -27,6 +27,8 @@
27 unsigned long long commit_id;27 unsigned long long commit_id;
28 unsigned long long end_timestamp;28 unsigned long long end_timestamp;
29 unsigned long seg_id;29 unsigned long seg_id;
30 unsigned long originating_server_id;
31 unsigned long long originating_commit_id;
30 unsigned long long message_length;32 unsigned long long message_length;
31 const char *message;33 const char *message;
32};34};
3335
=== modified file 'plugin/innobase/tests/r/innodb-system-table-view.result'
--- plugin/innobase/tests/r/innodb-system-table-view.result 2011-02-10 18:19:37 +0000
+++ plugin/innobase/tests/r/innodb-system-table-view.result 2011-04-01 19:01:31 +0000
@@ -2,7 +2,7 @@
2TABLE_ID NAME FLAG N_COLS SPACE2TABLE_ID NAME FLAG N_COLS SPACE
311 SYS_FOREIGN 0 7 0311 SYS_FOREIGN 0 7 0
412 SYS_FOREIGN_COLS 0 7 0412 SYS_FOREIGN_COLS 0 7 0
513 SYS_REPLICATION_LOG 0 9 0513 SYS_REPLICATION_LOG 0 11 0
6SELECT * FROM DATA_DICTIONARY.INNODB_SYS_INDEXES;6SELECT * FROM DATA_DICTIONARY.INNODB_SYS_INDEXES;
7INDEX_ID NAME TABLE_ID TYPE N_FIELDS PAGE_NO SPACE7INDEX_ID NAME TABLE_ID TYPE N_FIELDS PAGE_NO SPACE
811 ID_IND 11 3 1 302 0811 ID_IND 11 3 1 302 0
@@ -25,8 +25,10 @@
2513 SEGID 1 6 0 42513 SEGID 1 6 0 4
2613 COMMIT_ID 2 6 0 82613 COMMIT_ID 2 6 0 8
2713 END_TIMESTAMP 3 6 0 82713 END_TIMESTAMP 3 6 0 8
2813 MESSAGE_LEN 4 6 0 42813 ORIGINATING_SERVER_ID 4 6 0 4
2913 MESSAGE 5 5 4129792 02913 ORIGINATING_COMMIT_ID 5 6 0 8
3013 MESSAGE_LEN 6 6 0 4
3113 MESSAGE 7 5 4129792 0
30SELECT * FROM DATA_DICTIONARY.INNODB_SYS_FIELDS;32SELECT * FROM DATA_DICTIONARY.INNODB_SYS_FIELDS;
31INDEX_ID NAME POS33INDEX_ID NAME POS
3211 ID 03411 ID 0
@@ -70,7 +72,7 @@
70NAME FLAG N_COLS SPACE72NAME FLAG N_COLS SPACE
71SYS_FOREIGN 0 7 073SYS_FOREIGN 0 7 0
72SYS_FOREIGN_COLS 0 7 074SYS_FOREIGN_COLS 0 7 0
73SYS_REPLICATION_LOG 0 9 075SYS_REPLICATION_LOG 0 11 0
74test/child 1 5 076test/child 1 5 0
75test/parent 1 4 077test/parent 1 4 0
76SELECT name, n_fields78SELECT name, n_fields
7779
=== modified file 'plugin/innobase/tests/r/innodb_replication_log.result'
--- plugin/innobase/tests/r/innodb_replication_log.result 2011-02-21 17:37:00 +0000
+++ plugin/innobase/tests/r/innodb_replication_log.result 2011-04-01 19:01:31 +0000
@@ -10,6 +10,8 @@
10 `TRANSACTION_SEGMENT_ID` BIGINT NOT NULL,10 `TRANSACTION_SEGMENT_ID` BIGINT NOT NULL,
11 `COMMIT_ID` BIGINT NOT NULL,11 `COMMIT_ID` BIGINT NOT NULL,
12 `END_TIMESTAMP` BIGINT NOT NULL,12 `END_TIMESTAMP` BIGINT NOT NULL,
13 `ORIGINATING_SERVER_ID` BIGINT NOT NULL,
14 `ORIGINATING_COMMIT_ID` BIGINT NOT NULL,
13 `TRANSACTION_MESSAGE_STRING` TEXT COLLATE utf8_general_ci NOT NULL,15 `TRANSACTION_MESSAGE_STRING` TEXT COLLATE utf8_general_ci NOT NULL,
14 `TRANSACTION_LENGTH` BIGINT NOT NULL16 `TRANSACTION_LENGTH` BIGINT NOT NULL
15) ENGINE=FunctionEngine COLLATE = utf8_general_ci REPLICATE = FALSE17) ENGINE=FunctionEngine COLLATE = utf8_general_ci REPLICATE = FALSE
1618
=== modified file 'plugin/schema_dictionary/tests/r/data_dictionary.result'
--- plugin/schema_dictionary/tests/r/data_dictionary.result 2011-03-18 16:53:27 +0000
+++ plugin/schema_dictionary/tests/r/data_dictionary.result 2011-04-01 19:01:31 +0000
@@ -1,7 +1,7 @@
1use data_dictionary;1use data_dictionary;
2SELECT count(*) FROM columns;2SELECT count(*) FROM columns;
3count(*)3count(*)
45804584
5SELECT count(*) FROM indexes;5SELECT count(*) FROM indexes;
6count(*)6count(*)
7272
@@ -356,6 +356,10 @@
356ORDINAL_POSITION356ORDINAL_POSITION
357ORDINAL_POSITION357ORDINAL_POSITION
358ORDINAL_POSITION358ORDINAL_POSITION
359ORIGINATING_COMMIT_ID
360ORIGINATING_COMMIT_ID
361ORIGINATING_SERVER_ID
362ORIGINATING_SERVER_ID
359OTHER_INDEX_SIZE363OTHER_INDEX_SIZE
360PAGES_FREE364PAGES_FREE
361PAGES_FREE365PAGES_FREE
@@ -869,6 +873,8 @@
869DATA_DICTIONARY INNODB_LOCK_WAITS REQUESTING_TRX_ID873DATA_DICTIONARY INNODB_LOCK_WAITS REQUESTING_TRX_ID
870DATA_DICTIONARY INNODB_REPLICATION_LOG COMMIT_ID874DATA_DICTIONARY INNODB_REPLICATION_LOG COMMIT_ID
871DATA_DICTIONARY INNODB_REPLICATION_LOG END_TIMESTAMP875DATA_DICTIONARY INNODB_REPLICATION_LOG END_TIMESTAMP
876DATA_DICTIONARY INNODB_REPLICATION_LOG ORIGINATING_COMMIT_ID
877DATA_DICTIONARY INNODB_REPLICATION_LOG ORIGINATING_SERVER_ID
872DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_ID878DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_ID
873DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_LENGTH879DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_LENGTH
874DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_MESSAGE_STRING880DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_MESSAGE_STRING
@@ -995,6 +1001,8 @@
995DATA_DICTIONARY SYS_REPLICATION_LOG ID1001DATA_DICTIONARY SYS_REPLICATION_LOG ID
996DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE1002DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE
997DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE_LEN1003DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE_LEN
1004DATA_DICTIONARY SYS_REPLICATION_LOG ORIGINATING_COMMIT_ID
1005DATA_DICTIONARY SYS_REPLICATION_LOG ORIGINATING_SERVER_ID
998DATA_DICTIONARY SYS_REPLICATION_LOG SEGID1006DATA_DICTIONARY SYS_REPLICATION_LOG SEGID
999DATA_DICTIONARY TABLES AUTO_INCREMENT1007DATA_DICTIONARY TABLES AUTO_INCREMENT
1000DATA_DICTIONARY TABLES ENGINE1008DATA_DICTIONARY TABLES ENGINE
@@ -1532,6 +1540,8 @@
1532INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID1540INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID
1533INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID1541INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID
1534INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP1542INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP
1543INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID
1544INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID
1535INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID1545INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID
1536INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH1546INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH
1537INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING1547INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING
@@ -1787,6 +1797,8 @@
1787SYS_REPLICATION_LOG DATA_DICTIONARY ID1797SYS_REPLICATION_LOG DATA_DICTIONARY ID
1788SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE1798SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE
1789SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN1799SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN
1800SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID
1801SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID
1790SYS_REPLICATION_LOG DATA_DICTIONARY SEGID1802SYS_REPLICATION_LOG DATA_DICTIONARY SEGID
1791TABLES DATA_DICTIONARY AUTO_INCREMENT1803TABLES DATA_DICTIONARY AUTO_INCREMENT
1792TABLES DATA_DICTIONARY ENGINE1804TABLES DATA_DICTIONARY ENGINE
@@ -2115,6 +2127,8 @@
2115INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID2127INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID
2116INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID2128INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID
2117INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP2129INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP
2130INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID
2131INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID
2118INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID2132INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID
2119INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH2133INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH
2120INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING2134INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING
@@ -2370,6 +2384,8 @@
2370SYS_REPLICATION_LOG DATA_DICTIONARY ID2384SYS_REPLICATION_LOG DATA_DICTIONARY ID
2371SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE2385SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE
2372SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN2386SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN
2387SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID
2388SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID
2373SYS_REPLICATION_LOG DATA_DICTIONARY SEGID2389SYS_REPLICATION_LOG DATA_DICTIONARY SEGID
2374TABLES DATA_DICTIONARY AUTO_INCREMENT2390TABLES DATA_DICTIONARY AUTO_INCREMENT
2375TABLES DATA_DICTIONARY ENGINE2391TABLES DATA_DICTIONARY ENGINE
23762392
=== modified file 'plugin/slave/queue_consumer.cc'
--- plugin/slave/queue_consumer.cc 2011-03-19 20:20:32 +0000
+++ plugin/slave/queue_consumer.cc 2011-04-01 19:01:31 +0000
@@ -75,6 +75,8 @@
75 for (size_t x= 0; x < completedTransactionIds.size(); x++)75 for (size_t x= 0; x < completedTransactionIds.size(); x++)
76 {76 {
77 string commit_id;77 string commit_id;
78 uint32_t originating_server_id= 0;
79 uint64_t originating_commit_id= 0;
78 uint64_t trx_id= completedTransactionIds[x];80 uint64_t trx_id= completedTransactionIds[x];
7981
80 vector<string> aggregate_sql; /* final SQL to execute */82 vector<string> aggregate_sql; /* final SQL to execute */
@@ -83,7 +85,8 @@
83 message::Transaction transaction;85 message::Transaction transaction;
84 uint32_t segment_id= 1;86 uint32_t segment_id= 1;
8587
86 while (getMessage(transaction, commit_id, master_id, trx_id, segment_id++))88 while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_id,
89 originating_commit_id, segment_id++))
87 {90 {
88 convertToSQL(transaction, aggregate_sql, segmented_sql);91 convertToSQL(transaction, aggregate_sql, segmented_sql);
89 transaction.Clear();92 transaction.Clear();
@@ -132,7 +135,8 @@
132 }135 }
133 }136 }
134137
135 if (not executeSQLWithCommitId(aggregate_sql, commit_id, master_id))138 if (not executeSQLWithCommitId(aggregate_sql, commit_id, master_id,
139 originating_server_id, originating_commit_id))
136 {140 {
137 if (_ignore_errors)141 if (_ignore_errors)
138 {142 {
@@ -143,6 +147,10 @@
143 string tmp("UPDATE `sys_replication`.`applier_state`"147 string tmp("UPDATE `sys_replication`.`applier_state`"
144 " SET `last_applied_commit_id` = ");148 " SET `last_applied_commit_id` = ");
145 tmp.append(commit_id);149 tmp.append(commit_id);
150 tmp.append(", `originating_server_id` = ");
151 tmp.append(boost::lexical_cast<string>(originating_server_id));
152 tmp.append(", `originating_commit_id` = ");
153 tmp.append(boost::lexical_cast<string>(originating_commit_id));
146 tmp.append(" WHERE `master_id` = ");154 tmp.append(" WHERE `master_id` = ");
147 tmp.append(master_id);155 tmp.append(master_id);
148 sql.push_back(tmp);156 sql.push_back(tmp);
@@ -168,9 +176,12 @@
168 string &commit_id,176 string &commit_id,
169 const string &master_id,177 const string &master_id,
170 uint64_t trx_id,178 uint64_t trx_id,
179 uint32_t &originating_server_id,
180 uint64_t &originating_commit_id,
171 uint32_t segment_id)181 uint32_t segment_id)
172{182{
173 string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`"183 string sql("SELECT `msg`, `commit_order`, `originating_server_id`, "
184 " `originating_commit_id` FROM `sys_replication`.`queue`"
174 " WHERE `trx_id` = ");185 " WHERE `trx_id` = ");
175 sql.append(boost::lexical_cast<string>(trx_id));186 sql.append(boost::lexical_cast<string>(trx_id));
176 sql.append(" AND `seg_id` = ", 16);187 sql.append(" AND `seg_id` = ", 16);
@@ -178,12 +189,12 @@
178 sql.append(" AND `master_id` = ", 19),189 sql.append(" AND `master_id` = ", 19),
179 sql.append(master_id);190 sql.append(master_id);
180191
181 sql::ResultSet result_set(2);192 sql::ResultSet result_set(4);
182 Execute execute(*(_session.get()), true);193 Execute execute(*(_session.get()), true);
183 194
184 execute.run(sql, result_set);195 execute.run(sql, result_set);
185 196
186 assert(result_set.getMetaData().getColumnCount() == 2);197 assert(result_set.getMetaData().getColumnCount() == 4);
187198
188 /* Really should only be 1 returned row */199 /* Really should only be 1 returned row */
189 uint32_t found_rows= 0;200 uint32_t found_rows= 0;
@@ -191,6 +202,8 @@
191 {202 {
192 string msg= result_set.getString(0);203 string msg= result_set.getString(0);
193 string com_id= result_set.getString(1);204 string com_id= result_set.getString(1);
205 string orig_server_id= result_set.getString(2);
206 string orig_commit_id= result_set.getString(3);
194207
195 if ((msg == "") || (found_rows == 1))208 if ((msg == "") || (found_rows == 1))
196 break;209 break;
@@ -198,10 +211,14 @@
198 /* Neither column should be NULL */211 /* Neither column should be NULL */
199 assert(result_set.isNull(0) == false);212 assert(result_set.isNull(0) == false);
200 assert(result_set.isNull(1) == false);213 assert(result_set.isNull(1) == false);
214 assert(result_set.isNull(2) == false);
215 assert(result_set.isNull(3) == false);
201216
202 google::protobuf::TextFormat::ParseFromString(msg, &transaction);217 google::protobuf::TextFormat::ParseFromString(msg, &transaction);
203218
204 commit_id= com_id;219 commit_id= com_id;
220 originating_server_id= boost::lexical_cast<uint32_t>(orig_server_id);
221 originating_commit_id= boost::lexical_cast<uint64_t>(orig_commit_id);
205 found_rows++;222 found_rows++;
206 }223 }
207224
@@ -404,15 +421,24 @@
404421
405bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,422bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
406 const string &commit_id,423 const string &commit_id,
407 const string &master_id)424 const string &master_id,
425 uint32_t originating_server_id,
426 uint64_t originating_commit_id)
408{427{
409 string tmp("UPDATE `sys_replication`.`applier_state`"428 string tmp("UPDATE `sys_replication`.`applier_state`"
410 " SET `last_applied_commit_id` = ");429 " SET `last_applied_commit_id` = ");
411 tmp.append(commit_id);430 tmp.append(commit_id);
431 tmp.append(", `originating_server_id` = ");
432 tmp.append(boost::lexical_cast<string>(originating_server_id));
433 tmp.append(", `originating_commit_id` = ");
434 tmp.append(boost::lexical_cast<string>(originating_commit_id));
412 tmp.append(" WHERE `master_id` = ");435 tmp.append(" WHERE `master_id` = ");
413 tmp.append(master_id);436 tmp.append(master_id);
414 sql.push_back(tmp);437 sql.push_back(tmp);
415 438
439 _session->setOriginatingServerID(originating_server_id);
440 _session->setOriginatingCommitID(originating_commit_id);
441
416 return executeSQL(sql);442 return executeSQL(sql);
417}443}
418444
419445
=== modified file 'plugin/slave/queue_consumer.h'
--- plugin/slave/queue_consumer.h 2011-03-19 20:20:32 +0000
+++ plugin/slave/queue_consumer.h 2011-04-01 19:01:31 +0000
@@ -111,6 +111,8 @@
111 std::string &commit_id,111 std::string &commit_id,
112 const std::string &master_id,112 const std::string &master_id,
113 uint64_t trx_id,113 uint64_t trx_id,
114 uint32_t &originating_server_id,
115 uint64_t &originating_commit_id,
114 uint32_t segment_id);116 uint32_t segment_id);
115117
116 /**118 /**
@@ -133,14 +135,19 @@
133 * @param sql Batch of SQL statements to execute.135 * @param sql Batch of SQL statements to execute.
134 * @param commit_id Commit ID value to store in state table.136 * @param commit_id Commit ID value to store in state table.
135 * @param master_id ID of the master this SQL came from.137 * @param master_id ID of the master this SQL came from.
138 * @param originating_server_id Server ID of the master where
139 * this SQL was originally applied.
140 * @param originating_commit_id Commit ID of the master where
141 * this SQL was originally applied.
136 *142 *
137 * @retval true Success143 * @retval true Success
138 * @retval false Failure144 * @retval false Failure
139 */145 */
140 bool executeSQLWithCommitId(std::vector<std::string> &sql,146 bool executeSQLWithCommitId(std::vector<std::string> &sql,
141 const std::string &commit_id,147 const std::string &commit_id,
142 const std::string &master_id);148 const std::string &master_id,
143 149 uint32_t originating_server_id,
150 uint64_t originating_commit_id);
144 /**151 /**
145 * Remove messages for a given transaction from the queue.152 * Remove messages for a given transaction from the queue.
146 *153 *
147154
=== modified file 'plugin/slave/queue_producer.cc'
--- plugin/slave/queue_producer.cc 2011-03-19 20:20:32 +0000
+++ plugin/slave/queue_producer.cc 2011-04-01 19:01:31 +0000
@@ -296,6 +296,8 @@
296bool QueueProducer::queueInsert(const char *trx_id,296bool QueueProducer::queueInsert(const char *trx_id,
297 const char *seg_id,297 const char *seg_id,
298 const char *commit_id,298 const char *commit_id,
299 const char *originating_server_id,
300 const char *originating_commit_id,
299 const char *msg,301 const char *msg,
300 const char *msg_length)302 const char *msg_length)
301{303{
@@ -307,7 +309,8 @@
307 * The SQL to insert our results into the local queue.309 * The SQL to insert our results into the local queue.
308 */310 */
309 string sql= "INSERT INTO `sys_replication`.`queue`"311 string sql= "INSERT INTO `sys_replication`.`queue`"
310 " (`master_id`, `trx_id`, `seg_id`, `commit_order`, `msg`) VALUES (";312 " (`master_id`, `trx_id`, `seg_id`, `commit_order`,"
313 " `originating_server_id`, `originating_commit_id`, `msg`) VALUES (";
311 sql.append(boost::lexical_cast<string>(masterId()));314 sql.append(boost::lexical_cast<string>(masterId()));
312 sql.append(", ", 2);315 sql.append(", ", 2);
313 sql.append(trx_id);316 sql.append(trx_id);
@@ -315,6 +318,10 @@
315 sql.append(seg_id);318 sql.append(seg_id);
316 sql.append(", ", 2);319 sql.append(", ", 2);
317 sql.append(commit_id);320 sql.append(commit_id);
321 sql.append(", ", 2);
322 sql.append(originating_server_id);
323 sql.append(", ", 2);
324 sql.append(originating_commit_id);
318 sql.append(", '", 3);325 sql.append(", '", 3);
319326
320 /*327 /*
@@ -395,7 +402,8 @@
395 /*402 /*
396 * The SQL to pull everything we need from the master.403 * The SQL to pull everything we need from the master.
397 */404 */
398 string sql= "SELECT `id`, `segid`, `commit_id`, `message`, `message_len` "405 string sql= "SELECT `id`, `segid`, `commit_id`, `originating_server_id`,"
406 " `originating_commit_id`, `message`, `message_len` "
399 " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";407 " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
400408
401 for (size_t x= 0; x < trx_id_list.size(); x++)409 for (size_t x= 0; x < trx_id_list.size(); x++)
@@ -439,7 +447,7 @@
439447
440 while ((row= drizzle_row_next(&result)) != NULL)448 while ((row= drizzle_row_next(&result)) != NULL)
441 {449 {
442 if (not queueInsert(row[0], row[1], row[2], row[3], row[4]))450 if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
443 {451 {
444 errmsg_printf(error::ERROR,452 errmsg_printf(error::ERROR,
445 _("Replication slave: Unable to insert into queue."));453 _("Replication slave: Unable to insert into queue."));
446454
=== modified file 'plugin/slave/queue_producer.h'
--- plugin/slave/queue_producer.h 2011-03-19 20:20:32 +0000
+++ plugin/slave/queue_producer.h 2011-04-01 19:01:31 +0000
@@ -184,6 +184,8 @@
184 bool queueInsert(const char *trx_id,184 bool queueInsert(const char *trx_id,
185 const char *seg_id,185 const char *seg_id,
186 const char *commit_id,186 const char *commit_id,
187 const char *originating_server_id,
188 const char *originating_commit_id,
187 const char *msg,189 const char *msg,
188 const char *msg_length);190 const char *msg_length);
189191
190192
=== modified file 'plugin/slave/replication_schema.cc'
--- plugin/slave/replication_schema.cc 2011-03-19 20:20:32 +0000
+++ plugin/slave/replication_schema.cc 2011-04-01 19:01:31 +0000
@@ -73,6 +73,7 @@
73 * Table: applier_state73 * Table: applier_state
74 * Version 1.0: Initial definition74 * Version 1.0: Initial definition
75 * Version 1.1: Added master_id and changed PK to master_id75 * Version 1.1: Added master_id and changed PK to master_id
76 * Version 1.2: Added originating_server_id and originating_commit_id
76 */77 */
7778
78 sql.clear();79 sql.clear();
@@ -80,10 +81,12 @@
80 sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state` ("81 sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state` ("
81 " `master_id` BIGINT NOT NULL,"82 " `master_id` BIGINT NOT NULL,"
82 " `last_applied_commit_id` BIGINT NOT NULL,"83 " `last_applied_commit_id` BIGINT NOT NULL,"
84 " `originating_server_id` BIGINT NOT NULL,"
85 " `originating_commit_id` BIGINT NOT NULL,"
83 " `status` VARCHAR(20) NOT NULL,"86 " `status` VARCHAR(20) NOT NULL,"
84 " `error_msg` VARCHAR(250),"87 " `error_msg` VARCHAR(250),"
85 " PRIMARY KEY(`master_id`))"88 " PRIMARY KEY(`master_id`))"
86 " COMMENT = 'VERSION 1.1'");89 " COMMENT = 'VERSION 1.2'");
8790
88 if (not executeSQL(sql))91 if (not executeSQL(sql))
89 return false;92 return false;
@@ -101,6 +104,8 @@
101 " `trx_id` BIGINT NOT NULL,"104 " `trx_id` BIGINT NOT NULL,"
102 " `seg_id` INT NOT NULL,"105 " `seg_id` INT NOT NULL,"
103 " `commit_order` BIGINT,"106 " `commit_order` BIGINT,"
107 " `originating_server_id` INT NOT NULL,"
108 " `originating_commit_id` BIGINT NOT NULL,"
104 " `msg` BLOB,"109 " `msg` BLOB,"
105 " PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))"110 " PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))"
106 " COMMENT = 'VERSION 1.1'");111 " COMMENT = 'VERSION 1.1'");
@@ -151,9 +156,11 @@
151 {156 {
152 sql.clear();157 sql.clear();
153 sql.push_back("INSERT INTO `sys_replication`.`applier_state`"158 sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
154 " (`master_id`, `last_applied_commit_id`, `status`) VALUES ("159 " (`master_id`, `last_applied_commit_id`,"
160 " `originating_server_id`, `originating_commit_id`,"
161 " `status`) VALUES ("
155 + boost::lexical_cast<string>(master_id)162 + boost::lexical_cast<string>(master_id)
156 + ",0 , 'STOPPED')");163 + ",0 , 0, 0, 'STOPPED')");
157 if (not executeSQL(sql))164 if (not executeSQL(sql))
158 return false;165 return false;
159 }166 }