Merge lp:~skinny.moey/drizzle/multi_level_slave into lp:~drizzle-trunk/drizzle/development

Proposed by Joe Daly
Status: Work in progress
Proposed branch: lp:~skinny.moey/drizzle/multi_level_slave
Merge into: lp:~drizzle-trunk/drizzle/development
Diff against target: 655 lines (+191/-30)
16 files modified
drizzled/execute.cc (+2/-0)
drizzled/session.cc (+3/-0)
drizzled/session.h (+30/-1)
plugin/innobase/dict/create_replication.cc (+43/-4)
plugin/innobase/handler/ha_innodb.cc (+8/-1)
plugin/innobase/handler/replication_dictionary.cc (+7/-1)
plugin/innobase/include/create_replication.h (+6/-3)
plugin/innobase/include/read_replication.h (+2/-0)
plugin/innobase/tests/r/innodb-system-table-view.result (+6/-4)
plugin/innobase/tests/r/innodb_replication_log.result (+2/-0)
plugin/schema_dictionary/tests/r/data_dictionary.result (+17/-1)
plugin/slave/queue_consumer.cc (+33/-7)
plugin/slave/queue_consumer.h (+9/-2)
plugin/slave/queue_producer.cc (+11/-3)
plugin/slave/queue_producer.h (+2/-0)
plugin/slave/replication_schema.cc (+10/-3)
To merge this branch: bzr merge lp:~skinny.moey/drizzle/multi_level_slave
Reviewer Review Type Date Requested Status
Drizzle Merge Team Pending
Review via email: mp+55990@code.launchpad.net

Description of the change

This change adds the column ORIGINATING_SERVER_ID and ORIGINATING_COMMIT_ID to the DATA_DICTIONARY.SYS_REPLICATION_LOG and the SYS_REPLICATION_LOG.APPLIER_STATE table. These fields are added to allow a configuration where a 2nd level slave exists, to allow for recovery should a master fail, such as A->B->C (A replicates to B, B replicates to C). With these new columns C can be reconfigured to point to A with no guesswork as to the correct position to start at.

To post a comment you must log in.
Revision history for this message
Joe Daly (skinny.moey) wrote :

on hold at the moment

Revision history for this message
Patrick Crews (patrick-crews) wrote :

Seeing a significant number of random failures on the randgen slave plugin tests with this code. Not tied to Joe's work, but it appears to be tied to the multi-master code itself.

Still testing current trunk vs. pre-merge trunk to make sure of this

Revision history for this message
Patrick Crews (patrick-crews) wrote :

It would appear that we need to back out the multi-master code from trunk - it is breaking the randgen slave_plugin tests (for single master/slave) with some regularity. Many runs pass with flying colors, but about 1/6 - 1/10 will have a failing test case with differences between master and slave (often the crash test though it varies).

Will recommend working with Shrews in his beta-multi-master branch until this is ironed out.
To repeat:
./dbqp --mode=randgen --randgen-path=/path/to/randgen --suite=slave_plugin

Unmerged revisions

2263. By Joe Daly

add originating_server_id and originating_commit_id for applier_state table, this is needed for a slave that does not have the replication log turned on

2262. By Joe Daly

merge trunk

2261. By Joe Daly

add ORIGINATING_SERVER_ID and +INNODB_REPLICATION_LOG DATA_DICTIONARY to INNODB_REPLICATION_LOG table

2260. By Joe Daly

merge trunk

2259. By Joe Daly

fix tests, and fix up if block setting originating_server_id and commit_id

2258. By Joe Daly

add ORIGINATING_SERVER_ID and ORIGINATING_COMMIT_ID field

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'drizzled/execute.cc'
2--- drizzled/execute.cc 2011-02-18 16:08:59 +0000
3+++ drizzled/execute.cc 2011-04-01 19:01:31 +0000
4@@ -67,6 +67,8 @@
5 // session. Eventually we will allow someone to change the effective
6 // user.
7 new_session->user()= _session.user();
8+ new_session->setOriginatingServerID(_session.getOriginatingServerID());
9+ new_session->setOriginatingCommitID(_session.getOriginatingCommitID());
10
11 if (Session::schedule(new_session))
12 {
13
14=== modified file 'drizzled/session.cc'
15--- drizzled/session.cc 2011-03-28 02:46:21 +0000
16+++ drizzled/session.cc 2011-04-01 19:01:31 +0000
17@@ -223,6 +223,7 @@
18 tablespace_op(false),
19 use_usage(false),
20 security_ctx(identifier::User::make_shared()),
21+ originating_server_id_set(false),
22 client(client_arg)
23 {
24 client->setSession(this);
25@@ -238,6 +239,8 @@
26 lex().current_select= 0;
27 memset(&variables, 0, sizeof(variables));
28 scoreboard_index= -1;
29+ originating_server_id= 0;
30+ originating_commit_id= 0;
31 cleanup_done= abort_on_warning= no_warnings_for_error= false;
32
33 /* query_cache init */
34
35=== modified file 'drizzled/session.h'
36--- drizzled/session.h 2011-03-28 14:49:14 +0000
37+++ drizzled/session.h 2011-04-01 19:01:31 +0000
38@@ -293,6 +293,32 @@
39 scoreboard_index= in_scoreboard_index;
40 }
41
42+ bool isOriginatingServerIDSet()
43+ {
44+ return originating_server_id_set;
45+ }
46+
47+ void setOriginatingServerID(uint32_t in_originating_server_id)
48+ {
49+ originating_server_id= in_originating_server_id;
50+ originating_server_id_set= true;
51+ }
52+
53+ uint32_t getOriginatingServerID()
54+ {
55+ return originating_server_id;
56+ }
57+
58+ void setOriginatingCommitID(uint64_t in_originating_commit_id)
59+ {
60+ originating_commit_id= in_originating_commit_id;
61+ }
62+
63+ uint64_t getOriginatingCommitID()
64+ {
65+ return originating_commit_id;
66+ }
67+
68 /**
69 * Is this session viewable by the current user?
70 */
71@@ -1370,7 +1396,10 @@
72 bool tablespace_op; /**< This is true in DISCARD/IMPORT TABLESPACE */
73 bool use_usage;
74 rusage usage;
75- identifier::user::mptr security_ctx;
76+ identifier::user::mptr security_ctx;
77+ bool originating_server_id_set;
78+ uint32_t originating_server_id;
79+ uint64_t originating_commit_id;
80 int32_t scoreboard_index;
81 plugin::Client *client;
82 util::string::shared_ptr _schema;
83
84=== modified file 'plugin/innobase/dict/create_replication.cc'
85--- plugin/innobase/dict/create_replication.cc 2011-03-22 23:52:29 +0000
86+++ plugin/innobase/dict/create_replication.cc 2011-04-01 19:01:31 +0000
87@@ -76,7 +76,7 @@
88 error = que_eval_sql(info,
89 "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
90 "BEGIN\n"
91- "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
92+ "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), ORIGINATING_SERVER_ID INT, ORIGINATING_COMMIT_ID INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
93 "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
94 "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
95 "END;\n"
96@@ -147,6 +147,14 @@
97 field->set_type(drizzled::message::Table::Field::BIGINT);
98
99 field= table_message->add_field();
100+ field->set_name("ORIGINATING_SERVER_ID");
101+ field->set_type(drizzled::message::Table::Field::INTEGER);
102+
103+ field= table_message->add_field();
104+ field->set_name("ORIGINATING_COMMIT_ID");
105+ field->set_type(drizzled::message::Table::Field::BIGINT);
106+
107+ field= table_message->add_field();
108 field->set_name("MESSAGE_LEN");
109 field->set_type(drizzled::message::Table::Field::INTEGER);
110
111@@ -191,7 +199,10 @@
112 ulint insert_replication_message(const char *message, size_t size,
113 trx_t *trx, uint64_t trx_id,
114 uint64_t end_timestamp, bool is_end_segment,
115- uint32_t seg_id)
116+ uint32_t seg_id, uint32_t server_id,
117+ bool use_originating_server_id,
118+ uint32_t originating_server_id,
119+ uint64_t originating_commit_id)
120 {
121 ulint error;
122 row_prebuilt_t* prebuilt; /* For reading rows */
123@@ -251,12 +262,30 @@
124 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
125 dfield_set_data(dfield, data, 8);
126
127+ if (not use_originating_server_id)
128+ {
129+ /* This transaction originated from this server, rather then being
130+ replicated to this server reset the values to reflect that */
131+ originating_server_id= server_id;
132+ originating_commit_id= commit_id;
133+ }
134+
135 dfield = dtuple_get_nth_field(dtuple, 4);
136 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
137+ row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&originating_server_id, 4, dict_table_is_comp(prebuilt->table));
138+ dfield_set_data(dfield, data, 4);
139+
140+ dfield = dtuple_get_nth_field(dtuple, 5);
141+ data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
142+ row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&originating_commit_id, 8, dict_table_is_comp(prebuilt->table));
143+ dfield_set_data(dfield, data, 8);
144+
145+ dfield = dtuple_get_nth_field(dtuple, 6);
146+ data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
147 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
148 dfield_set_data(dfield, data, 4);
149
150- dfield = dtuple_get_nth_field(dtuple, 5);
151+ dfield = dtuple_get_nth_field(dtuple, 7);
152 dfield_set_data(dfield, message, size);
153
154 ins_node_t* node = prebuilt->ins_node;
155@@ -355,8 +384,18 @@
156 convert_to_mysql_format(timestampbyte, field, 8);
157 ret.end_timestamp= *(uint64_t *)timestampbyte;
158
159+ field = rec_get_nth_field_old(rec, 6, &len);
160+ byte serverbyte[4];
161+ convert_to_mysql_format(serverbyte, field, 4);
162+ ret.originating_server_id= *(uint32_t *)serverbyte;
163+
164+ field = rec_get_nth_field_old(rec, 7, &len);
165+ byte originatingcommitbyte[8];
166+ convert_to_mysql_format(originatingcommitbyte, field, 8);
167+ ret.originating_commit_id= *(uint64_t *)originatingcommitbyte;
168+
169 // Handler message
170- field = rec_get_nth_field_old(rec, 7, &len);
171+ field = rec_get_nth_field_old(rec, 9, &len);
172 ret.message= (char *)field;
173 ret.message_length= len;
174
175
176=== modified file 'plugin/innobase/handler/ha_innodb.cc'
177--- plugin/innobase/handler/ha_innodb.cc 2011-03-29 21:04:17 +0000
178+++ plugin/innobase/handler/ha_innodb.cc 2011-04-01 19:01:31 +0000
179@@ -966,8 +966,15 @@
180 uint64_t end_timestamp= message.transaction_context().end_timestamp();
181 bool is_end_segment= message.end_segment();
182 trx->log_commit_id= TRUE;
183+ uint32_t server_id= message.transaction_context().server_id();
184+ uint32_t originating_server_id= session.getOriginatingServerID();
185+ uint64_t originating_commit_id= session.getOriginatingCommitID();
186+ bool use_originating_server_id= session.isOriginatingServerIDSet();
187+
188 ulint error= insert_replication_message(data, message.ByteSize(), trx, trx_id,
189- end_timestamp, is_end_segment, seg_id);
190+ end_timestamp, is_end_segment, seg_id, server_id,
191+ use_originating_server_id, originating_server_id,
192+ originating_commit_id);
193 (void)error;
194
195 delete[] data;
196
197=== modified file 'plugin/innobase/handler/replication_dictionary.cc'
198--- plugin/innobase/handler/replication_dictionary.cc 2011-02-17 00:14:13 +0000
199+++ plugin/innobase/handler/replication_dictionary.cc 2011-04-01 19:01:31 +0000
200@@ -79,6 +79,8 @@
201 add_field("TRANSACTION_SEGMENT_ID", plugin::TableFunction::NUMBER, 0, false);
202 add_field("COMMIT_ID", plugin::TableFunction::NUMBER, 0, false);
203 add_field("END_TIMESTAMP", plugin::TableFunction::NUMBER, 0, false);
204+ add_field("ORIGINATING_SERVER_ID", plugin::TableFunction::NUMBER, 0, false);
205+ add_field("ORIGINATING_COMMIT_ID", plugin::TableFunction::NUMBER, 0, false);
206 add_field("TRANSACTION_MESSAGE_STRING", plugin::TableFunction::STRING, transaction_message_threshold, false);
207 add_field("TRANSACTION_LENGTH", plugin::TableFunction::NUMBER, 0, false);
208 }
209@@ -110,7 +112,11 @@
210 push(static_cast<uint64_t>(ret.commit_id));
211
212 push(static_cast<uint64_t>(ret.end_timestamp));
213-
214+
215+ push(static_cast<uint64_t>(ret.originating_server_id));
216+
217+ push(static_cast<uint64_t>(ret.originating_commit_id));
218+
219 /* Message in viewable format */
220 bool result= message.ParseFromArray(ret.message, ret.message_length);
221
222
223=== modified file 'plugin/innobase/include/create_replication.h'
224--- plugin/innobase/include/create_replication.h 2011-03-14 05:40:28 +0000
225+++ plugin/innobase/include/create_replication.h 2011-04-01 19:01:31 +0000
226@@ -45,10 +45,13 @@
227
228 UNIV_INTERN ulint dict_create_sys_replication_log(void);
229
230-UNIV_INTERN ulint insert_replication_message(const char *message, size_t size,
231- trx_t *trx, uint64_t trx_id,
232+UNIV_INTERN ulint insert_replication_message(const char *message, size_t size,
233+ trx_t *trx, uint64_t trx_id,
234 uint64_t end_timestamp, bool is_end_segment,
235- uint32_t seg_id);
236+ uint32_t seg_id, uint32_t server_id,
237+ bool use_originating_server_id,
238+ uint32_t originating_server_id,
239+ uint64_t originating_commit_id);
240
241 UNIV_INTERN struct read_replication_state_st *replication_read_init(void);
242 UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *);
243
244=== modified file 'plugin/innobase/include/read_replication.h'
245--- plugin/innobase/include/read_replication.h 2011-03-14 05:40:28 +0000
246+++ plugin/innobase/include/read_replication.h 2011-04-01 19:01:31 +0000
247@@ -27,6 +27,8 @@
248 unsigned long long commit_id;
249 unsigned long long end_timestamp;
250 unsigned long seg_id;
251+ unsigned long originating_server_id;
252+ unsigned long long originating_commit_id;
253 unsigned long long message_length;
254 const char *message;
255 };
256
257=== modified file 'plugin/innobase/tests/r/innodb-system-table-view.result'
258--- plugin/innobase/tests/r/innodb-system-table-view.result 2011-02-10 18:19:37 +0000
259+++ plugin/innobase/tests/r/innodb-system-table-view.result 2011-04-01 19:01:31 +0000
260@@ -2,7 +2,7 @@
261 TABLE_ID NAME FLAG N_COLS SPACE
262 11 SYS_FOREIGN 0 7 0
263 12 SYS_FOREIGN_COLS 0 7 0
264-13 SYS_REPLICATION_LOG 0 9 0
265+13 SYS_REPLICATION_LOG 0 11 0
266 SELECT * FROM DATA_DICTIONARY.INNODB_SYS_INDEXES;
267 INDEX_ID NAME TABLE_ID TYPE N_FIELDS PAGE_NO SPACE
268 11 ID_IND 11 3 1 302 0
269@@ -25,8 +25,10 @@
270 13 SEGID 1 6 0 4
271 13 COMMIT_ID 2 6 0 8
272 13 END_TIMESTAMP 3 6 0 8
273-13 MESSAGE_LEN 4 6 0 4
274-13 MESSAGE 5 5 4129792 0
275+13 ORIGINATING_SERVER_ID 4 6 0 4
276+13 ORIGINATING_COMMIT_ID 5 6 0 8
277+13 MESSAGE_LEN 6 6 0 4
278+13 MESSAGE 7 5 4129792 0
279 SELECT * FROM DATA_DICTIONARY.INNODB_SYS_FIELDS;
280 INDEX_ID NAME POS
281 11 ID 0
282@@ -70,7 +72,7 @@
283 NAME FLAG N_COLS SPACE
284 SYS_FOREIGN 0 7 0
285 SYS_FOREIGN_COLS 0 7 0
286-SYS_REPLICATION_LOG 0 9 0
287+SYS_REPLICATION_LOG 0 11 0
288 test/child 1 5 0
289 test/parent 1 4 0
290 SELECT name, n_fields
291
292=== modified file 'plugin/innobase/tests/r/innodb_replication_log.result'
293--- plugin/innobase/tests/r/innodb_replication_log.result 2011-02-21 17:37:00 +0000
294+++ plugin/innobase/tests/r/innodb_replication_log.result 2011-04-01 19:01:31 +0000
295@@ -10,6 +10,8 @@
296 `TRANSACTION_SEGMENT_ID` BIGINT NOT NULL,
297 `COMMIT_ID` BIGINT NOT NULL,
298 `END_TIMESTAMP` BIGINT NOT NULL,
299+ `ORIGINATING_SERVER_ID` BIGINT NOT NULL,
300+ `ORIGINATING_COMMIT_ID` BIGINT NOT NULL,
301 `TRANSACTION_MESSAGE_STRING` TEXT COLLATE utf8_general_ci NOT NULL,
302 `TRANSACTION_LENGTH` BIGINT NOT NULL
303 ) ENGINE=FunctionEngine COLLATE = utf8_general_ci REPLICATE = FALSE
304
305=== modified file 'plugin/schema_dictionary/tests/r/data_dictionary.result'
306--- plugin/schema_dictionary/tests/r/data_dictionary.result 2011-03-18 16:53:27 +0000
307+++ plugin/schema_dictionary/tests/r/data_dictionary.result 2011-04-01 19:01:31 +0000
308@@ -1,7 +1,7 @@
309 use data_dictionary;
310 SELECT count(*) FROM columns;
311 count(*)
312-580
313+584
314 SELECT count(*) FROM indexes;
315 count(*)
316 2
317@@ -356,6 +356,10 @@
318 ORDINAL_POSITION
319 ORDINAL_POSITION
320 ORDINAL_POSITION
321+ORIGINATING_COMMIT_ID
322+ORIGINATING_COMMIT_ID
323+ORIGINATING_SERVER_ID
324+ORIGINATING_SERVER_ID
325 OTHER_INDEX_SIZE
326 PAGES_FREE
327 PAGES_FREE
328@@ -869,6 +873,8 @@
329 DATA_DICTIONARY INNODB_LOCK_WAITS REQUESTING_TRX_ID
330 DATA_DICTIONARY INNODB_REPLICATION_LOG COMMIT_ID
331 DATA_DICTIONARY INNODB_REPLICATION_LOG END_TIMESTAMP
332+DATA_DICTIONARY INNODB_REPLICATION_LOG ORIGINATING_COMMIT_ID
333+DATA_DICTIONARY INNODB_REPLICATION_LOG ORIGINATING_SERVER_ID
334 DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_ID
335 DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_LENGTH
336 DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_MESSAGE_STRING
337@@ -995,6 +1001,8 @@
338 DATA_DICTIONARY SYS_REPLICATION_LOG ID
339 DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE
340 DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE_LEN
341+DATA_DICTIONARY SYS_REPLICATION_LOG ORIGINATING_COMMIT_ID
342+DATA_DICTIONARY SYS_REPLICATION_LOG ORIGINATING_SERVER_ID
343 DATA_DICTIONARY SYS_REPLICATION_LOG SEGID
344 DATA_DICTIONARY TABLES AUTO_INCREMENT
345 DATA_DICTIONARY TABLES ENGINE
346@@ -1532,6 +1540,8 @@
347 INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID
348 INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID
349 INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP
350+INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID
351+INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID
352 INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID
353 INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH
354 INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING
355@@ -1787,6 +1797,8 @@
356 SYS_REPLICATION_LOG DATA_DICTIONARY ID
357 SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE
358 SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN
359+SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID
360+SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID
361 SYS_REPLICATION_LOG DATA_DICTIONARY SEGID
362 TABLES DATA_DICTIONARY AUTO_INCREMENT
363 TABLES DATA_DICTIONARY ENGINE
364@@ -2115,6 +2127,8 @@
365 INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID
366 INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID
367 INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP
368+INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID
369+INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID
370 INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID
371 INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH
372 INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING
373@@ -2370,6 +2384,8 @@
374 SYS_REPLICATION_LOG DATA_DICTIONARY ID
375 SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE
376 SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN
377+SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID
378+SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID
379 SYS_REPLICATION_LOG DATA_DICTIONARY SEGID
380 TABLES DATA_DICTIONARY AUTO_INCREMENT
381 TABLES DATA_DICTIONARY ENGINE
382
383=== modified file 'plugin/slave/queue_consumer.cc'
384--- plugin/slave/queue_consumer.cc 2011-03-19 20:20:32 +0000
385+++ plugin/slave/queue_consumer.cc 2011-04-01 19:01:31 +0000
386@@ -75,6 +75,8 @@
387 for (size_t x= 0; x < completedTransactionIds.size(); x++)
388 {
389 string commit_id;
390+ uint32_t originating_server_id= 0;
391+ uint64_t originating_commit_id= 0;
392 uint64_t trx_id= completedTransactionIds[x];
393
394 vector<string> aggregate_sql; /* final SQL to execute */
395@@ -83,7 +85,8 @@
396 message::Transaction transaction;
397 uint32_t segment_id= 1;
398
399- while (getMessage(transaction, commit_id, master_id, trx_id, segment_id++))
400+ while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_id,
401+ originating_commit_id, segment_id++))
402 {
403 convertToSQL(transaction, aggregate_sql, segmented_sql);
404 transaction.Clear();
405@@ -132,7 +135,8 @@
406 }
407 }
408
409- if (not executeSQLWithCommitId(aggregate_sql, commit_id, master_id))
410+ if (not executeSQLWithCommitId(aggregate_sql, commit_id, master_id,
411+ originating_server_id, originating_commit_id))
412 {
413 if (_ignore_errors)
414 {
415@@ -143,6 +147,10 @@
416 string tmp("UPDATE `sys_replication`.`applier_state`"
417 " SET `last_applied_commit_id` = ");
418 tmp.append(commit_id);
419+ tmp.append(", `originating_server_id` = ");
420+ tmp.append(boost::lexical_cast<string>(originating_server_id));
421+ tmp.append(", `originating_commit_id` = ");
422+ tmp.append(boost::lexical_cast<string>(originating_commit_id));
423 tmp.append(" WHERE `master_id` = ");
424 tmp.append(master_id);
425 sql.push_back(tmp);
426@@ -168,9 +176,12 @@
427 string &commit_id,
428 const string &master_id,
429 uint64_t trx_id,
430+ uint32_t &originating_server_id,
431+ uint64_t &originating_commit_id,
432 uint32_t segment_id)
433 {
434- string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`"
435+ string sql("SELECT `msg`, `commit_order`, `originating_server_id`, "
436+ " `originating_commit_id` FROM `sys_replication`.`queue`"
437 " WHERE `trx_id` = ");
438 sql.append(boost::lexical_cast<string>(trx_id));
439 sql.append(" AND `seg_id` = ", 16);
440@@ -178,12 +189,12 @@
441 sql.append(" AND `master_id` = ", 19),
442 sql.append(master_id);
443
444- sql::ResultSet result_set(2);
445+ sql::ResultSet result_set(4);
446 Execute execute(*(_session.get()), true);
447
448 execute.run(sql, result_set);
449
450- assert(result_set.getMetaData().getColumnCount() == 2);
451+ assert(result_set.getMetaData().getColumnCount() == 4);
452
453 /* Really should only be 1 returned row */
454 uint32_t found_rows= 0;
455@@ -191,6 +202,8 @@
456 {
457 string msg= result_set.getString(0);
458 string com_id= result_set.getString(1);
459+ string orig_server_id= result_set.getString(2);
460+ string orig_commit_id= result_set.getString(3);
461
462 if ((msg == "") || (found_rows == 1))
463 break;
464@@ -198,10 +211,14 @@
465 /* Neither column should be NULL */
466 assert(result_set.isNull(0) == false);
467 assert(result_set.isNull(1) == false);
468+ assert(result_set.isNull(2) == false);
469+ assert(result_set.isNull(3) == false);
470
471 google::protobuf::TextFormat::ParseFromString(msg, &transaction);
472
473 commit_id= com_id;
474+ originating_server_id= boost::lexical_cast<uint32_t>(orig_server_id);
475+ originating_commit_id= boost::lexical_cast<uint64_t>(orig_commit_id);
476 found_rows++;
477 }
478
479@@ -404,15 +421,24 @@
480
481 bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
482 const string &commit_id,
483- const string &master_id)
484+ const string &master_id,
485+ uint32_t originating_server_id,
486+ uint64_t originating_commit_id)
487 {
488 string tmp("UPDATE `sys_replication`.`applier_state`"
489 " SET `last_applied_commit_id` = ");
490 tmp.append(commit_id);
491+ tmp.append(", `originating_server_id` = ");
492+ tmp.append(boost::lexical_cast<string>(originating_server_id));
493+ tmp.append(", `originating_commit_id` = ");
494+ tmp.append(boost::lexical_cast<string>(originating_commit_id));
495 tmp.append(" WHERE `master_id` = ");
496 tmp.append(master_id);
497 sql.push_back(tmp);
498-
499+
500+ _session->setOriginatingServerID(originating_server_id);
501+ _session->setOriginatingCommitID(originating_commit_id);
502+
503 return executeSQL(sql);
504 }
505
506
507=== modified file 'plugin/slave/queue_consumer.h'
508--- plugin/slave/queue_consumer.h 2011-03-19 20:20:32 +0000
509+++ plugin/slave/queue_consumer.h 2011-04-01 19:01:31 +0000
510@@ -111,6 +111,8 @@
511 std::string &commit_id,
512 const std::string &master_id,
513 uint64_t trx_id,
514+ uint32_t &originating_server_id,
515+ uint64_t &originating_commit_id,
516 uint32_t segment_id);
517
518 /**
519@@ -133,14 +135,19 @@
520 * @param sql Batch of SQL statements to execute.
521 * @param commit_id Commit ID value to store in state table.
522 * @param master_id ID of the master this SQL came from.
523+ * @param originating_server_id Server ID of the master where
524+ * this SQL was originally applied.
525+ * @param originating_commit_id Commit ID of the master where
526+ * this SQL was originally applied.
527 *
528 * @retval true Success
529 * @retval false Failure
530 */
531 bool executeSQLWithCommitId(std::vector<std::string> &sql,
532 const std::string &commit_id,
533- const std::string &master_id);
534-
535+ const std::string &master_id,
536+ uint32_t originating_server_id,
537+ uint64_t originating_commit_id);
538 /**
539 * Remove messages for a given transaction from the queue.
540 *
541
542=== modified file 'plugin/slave/queue_producer.cc'
543--- plugin/slave/queue_producer.cc 2011-03-19 20:20:32 +0000
544+++ plugin/slave/queue_producer.cc 2011-04-01 19:01:31 +0000
545@@ -296,6 +296,8 @@
546 bool QueueProducer::queueInsert(const char *trx_id,
547 const char *seg_id,
548 const char *commit_id,
549+ const char *originating_server_id,
550+ const char *originating_commit_id,
551 const char *msg,
552 const char *msg_length)
553 {
554@@ -307,7 +309,8 @@
555 * The SQL to insert our results into the local queue.
556 */
557 string sql= "INSERT INTO `sys_replication`.`queue`"
558- " (`master_id`, `trx_id`, `seg_id`, `commit_order`, `msg`) VALUES (";
559+ " (`master_id`, `trx_id`, `seg_id`, `commit_order`,"
560+ " `originating_server_id`, `originating_commit_id`, `msg`) VALUES (";
561 sql.append(boost::lexical_cast<string>(masterId()));
562 sql.append(", ", 2);
563 sql.append(trx_id);
564@@ -315,6 +318,10 @@
565 sql.append(seg_id);
566 sql.append(", ", 2);
567 sql.append(commit_id);
568+ sql.append(", ", 2);
569+ sql.append(originating_server_id);
570+ sql.append(", ", 2);
571+ sql.append(originating_commit_id);
572 sql.append(", '", 3);
573
574 /*
575@@ -395,7 +402,8 @@
576 /*
577 * The SQL to pull everything we need from the master.
578 */
579- string sql= "SELECT `id`, `segid`, `commit_id`, `message`, `message_len` "
580+ string sql= "SELECT `id`, `segid`, `commit_id`, `originating_server_id`,"
581+ " `originating_commit_id`, `message`, `message_len` "
582 " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
583
584 for (size_t x= 0; x < trx_id_list.size(); x++)
585@@ -439,7 +447,7 @@
586
587 while ((row= drizzle_row_next(&result)) != NULL)
588 {
589- if (not queueInsert(row[0], row[1], row[2], row[3], row[4]))
590+ if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
591 {
592 errmsg_printf(error::ERROR,
593 _("Replication slave: Unable to insert into queue."));
594
595=== modified file 'plugin/slave/queue_producer.h'
596--- plugin/slave/queue_producer.h 2011-03-19 20:20:32 +0000
597+++ plugin/slave/queue_producer.h 2011-04-01 19:01:31 +0000
598@@ -184,6 +184,8 @@
599 bool queueInsert(const char *trx_id,
600 const char *seg_id,
601 const char *commit_id,
602+ const char *originating_server_id,
603+ const char *originating_commit_id,
604 const char *msg,
605 const char *msg_length);
606
607
608=== modified file 'plugin/slave/replication_schema.cc'
609--- plugin/slave/replication_schema.cc 2011-03-19 20:20:32 +0000
610+++ plugin/slave/replication_schema.cc 2011-04-01 19:01:31 +0000
611@@ -73,6 +73,7 @@
612 * Table: applier_state
613 * Version 1.0: Initial definition
614 * Version 1.1: Added master_id and changed PK to master_id
615+ * Version 1.2: Added originating_server_id and originating_commit_id
616 */
617
618 sql.clear();
619@@ -80,10 +81,12 @@
620 sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state` ("
621 " `master_id` BIGINT NOT NULL,"
622 " `last_applied_commit_id` BIGINT NOT NULL,"
623+ " `originating_server_id` BIGINT NOT NULL,"
624+ " `originating_commit_id` BIGINT NOT NULL,"
625 " `status` VARCHAR(20) NOT NULL,"
626 " `error_msg` VARCHAR(250),"
627 " PRIMARY KEY(`master_id`))"
628- " COMMENT = 'VERSION 1.1'");
629+ " COMMENT = 'VERSION 1.2'");
630
631 if (not executeSQL(sql))
632 return false;
633@@ -101,6 +104,8 @@
634 " `trx_id` BIGINT NOT NULL,"
635 " `seg_id` INT NOT NULL,"
636 " `commit_order` BIGINT,"
637+ " `originating_server_id` INT NOT NULL,"
638+ " `originating_commit_id` BIGINT NOT NULL,"
639 " `msg` BLOB,"
640 " PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))"
641 " COMMENT = 'VERSION 1.1'");
642@@ -151,9 +156,11 @@
643 {
644 sql.clear();
645 sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
646- " (`master_id`, `last_applied_commit_id`, `status`) VALUES ("
647+ " (`master_id`, `last_applied_commit_id`,"
648+ " `originating_server_id`, `originating_commit_id`,"
649+ " `status`) VALUES ("
650 + boost::lexical_cast<string>(master_id)
651- + ",0 , 'STOPPED')");
652+ + ",0 , 0, 0, 'STOPPED')");
653 if (not executeSQL(sql))
654 return false;
655 }