Merge lp:~skinny.moey/drizzle/multi_level_slave into lp:~drizzle-trunk/drizzle/development
- multi_level_slave
- Merge into development
Status: | Work in progress |
---|---|
Proposed branch: | lp:~skinny.moey/drizzle/multi_level_slave |
Merge into: | lp:~drizzle-trunk/drizzle/development |
Diff against target: |
655 lines (+191/-30) 16 files modified
drizzled/execute.cc (+2/-0) drizzled/session.cc (+3/-0) drizzled/session.h (+30/-1) plugin/innobase/dict/create_replication.cc (+43/-4) plugin/innobase/handler/ha_innodb.cc (+8/-1) plugin/innobase/handler/replication_dictionary.cc (+7/-1) plugin/innobase/include/create_replication.h (+6/-3) plugin/innobase/include/read_replication.h (+2/-0) plugin/innobase/tests/r/innodb-system-table-view.result (+6/-4) plugin/innobase/tests/r/innodb_replication_log.result (+2/-0) plugin/schema_dictionary/tests/r/data_dictionary.result (+17/-1) plugin/slave/queue_consumer.cc (+33/-7) plugin/slave/queue_consumer.h (+9/-2) plugin/slave/queue_producer.cc (+11/-3) plugin/slave/queue_producer.h (+2/-0) plugin/slave/replication_schema.cc (+10/-3) |
To merge this branch: | bzr merge lp:~skinny.moey/drizzle/multi_level_slave |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Drizzle Merge Team | Pending | ||
Review via email: mp+55990@code.launchpad.net |
Commit message
Description of the change
This change adds the column ORIGINATING_
Patrick Crews (patrick-crews) wrote : | # |
Seeing a significant number of random failures on the randgen slave plugin tests with this code. Not tied to Joe's work, but it appears to be tied to the multi-master code itself.
Still testing current trunk vs. pre-merge trunk to make sure of this
Patrick Crews (patrick-crews) wrote : | # |
It would appear that we need to back out the multi-master code from trunk - it is breaking the randgen slave_plugin tests (for single master/slave) with some regularity. Many runs pass with flying colors, but about 1/6 - 1/10 will have a failing test case with differences between master and slave (often the crash test though it varies).
Will recommend working with Shrews in his beta-multi-master branch until this is ironed out.
To repeat:
./dbqp --mode=randgen --randgen-
Unmerged revisions
- 2263. By Joe Daly
-
add originating_
server_ id and originating_ commit_ id for applier_state table, this is needed for a slave that does not have the replication log turned on - 2262. By Joe Daly
-
merge trunk
- 2261. By Joe Daly
-
add ORIGINATING_
SERVER_ ID and +INNODB_ REPLICATION_ LOG DATA_DICTIONARY to INNODB_ REPLICATION_ LOG table - 2260. By Joe Daly
-
merge trunk
- 2259. By Joe Daly
-
fix tests, and fix up if block setting originating_
server_ id and commit_id - 2258. By Joe Daly
-
add ORIGINATING_
SERVER_ ID and ORIGINATING_ COMMIT_ ID field
Preview Diff
1 | === modified file 'drizzled/execute.cc' | |||
2 | --- drizzled/execute.cc 2011-02-18 16:08:59 +0000 | |||
3 | +++ drizzled/execute.cc 2011-04-01 19:01:31 +0000 | |||
4 | @@ -67,6 +67,8 @@ | |||
5 | 67 | // session. Eventually we will allow someone to change the effective | 67 | // session. Eventually we will allow someone to change the effective |
6 | 68 | // user. | 68 | // user. |
7 | 69 | new_session->user()= _session.user(); | 69 | new_session->user()= _session.user(); |
8 | 70 | new_session->setOriginatingServerID(_session.getOriginatingServerID()); | ||
9 | 71 | new_session->setOriginatingCommitID(_session.getOriginatingCommitID()); | ||
10 | 70 | 72 | ||
11 | 71 | if (Session::schedule(new_session)) | 73 | if (Session::schedule(new_session)) |
12 | 72 | { | 74 | { |
13 | 73 | 75 | ||
14 | === modified file 'drizzled/session.cc' | |||
15 | --- drizzled/session.cc 2011-03-28 02:46:21 +0000 | |||
16 | +++ drizzled/session.cc 2011-04-01 19:01:31 +0000 | |||
17 | @@ -223,6 +223,7 @@ | |||
18 | 223 | tablespace_op(false), | 223 | tablespace_op(false), |
19 | 224 | use_usage(false), | 224 | use_usage(false), |
20 | 225 | security_ctx(identifier::User::make_shared()), | 225 | security_ctx(identifier::User::make_shared()), |
21 | 226 | originating_server_id_set(false), | ||
22 | 226 | client(client_arg) | 227 | client(client_arg) |
23 | 227 | { | 228 | { |
24 | 228 | client->setSession(this); | 229 | client->setSession(this); |
25 | @@ -238,6 +239,8 @@ | |||
26 | 238 | lex().current_select= 0; | 239 | lex().current_select= 0; |
27 | 239 | memset(&variables, 0, sizeof(variables)); | 240 | memset(&variables, 0, sizeof(variables)); |
28 | 240 | scoreboard_index= -1; | 241 | scoreboard_index= -1; |
29 | 242 | originating_server_id= 0; | ||
30 | 243 | originating_commit_id= 0; | ||
31 | 241 | cleanup_done= abort_on_warning= no_warnings_for_error= false; | 244 | cleanup_done= abort_on_warning= no_warnings_for_error= false; |
32 | 242 | 245 | ||
33 | 243 | /* query_cache init */ | 246 | /* query_cache init */ |
34 | 244 | 247 | ||
35 | === modified file 'drizzled/session.h' | |||
36 | --- drizzled/session.h 2011-03-28 14:49:14 +0000 | |||
37 | +++ drizzled/session.h 2011-04-01 19:01:31 +0000 | |||
38 | @@ -293,6 +293,32 @@ | |||
39 | 293 | scoreboard_index= in_scoreboard_index; | 293 | scoreboard_index= in_scoreboard_index; |
40 | 294 | } | 294 | } |
41 | 295 | 295 | ||
42 | 296 | bool isOriginatingServerIDSet() | ||
43 | 297 | { | ||
44 | 298 | return originating_server_id_set; | ||
45 | 299 | } | ||
46 | 300 | |||
47 | 301 | void setOriginatingServerID(uint32_t in_originating_server_id) | ||
48 | 302 | { | ||
49 | 303 | originating_server_id= in_originating_server_id; | ||
50 | 304 | originating_server_id_set= true; | ||
51 | 305 | } | ||
52 | 306 | |||
53 | 307 | uint32_t getOriginatingServerID() | ||
54 | 308 | { | ||
55 | 309 | return originating_server_id; | ||
56 | 310 | } | ||
57 | 311 | |||
58 | 312 | void setOriginatingCommitID(uint64_t in_originating_commit_id) | ||
59 | 313 | { | ||
60 | 314 | originating_commit_id= in_originating_commit_id; | ||
61 | 315 | } | ||
62 | 316 | |||
63 | 317 | uint64_t getOriginatingCommitID() | ||
64 | 318 | { | ||
65 | 319 | return originating_commit_id; | ||
66 | 320 | } | ||
67 | 321 | |||
68 | 296 | /** | 322 | /** |
69 | 297 | * Is this session viewable by the current user? | 323 | * Is this session viewable by the current user? |
70 | 298 | */ | 324 | */ |
71 | @@ -1370,7 +1396,10 @@ | |||
72 | 1370 | bool tablespace_op; /**< This is true in DISCARD/IMPORT TABLESPACE */ | 1396 | bool tablespace_op; /**< This is true in DISCARD/IMPORT TABLESPACE */ |
73 | 1371 | bool use_usage; | 1397 | bool use_usage; |
74 | 1372 | rusage usage; | 1398 | rusage usage; |
76 | 1373 | identifier::user::mptr security_ctx; | 1399 | identifier::user::mptr security_ctx; |
77 | 1400 | bool originating_server_id_set; | ||
78 | 1401 | uint32_t originating_server_id; | ||
79 | 1402 | uint64_t originating_commit_id; | ||
80 | 1374 | int32_t scoreboard_index; | 1403 | int32_t scoreboard_index; |
81 | 1375 | plugin::Client *client; | 1404 | plugin::Client *client; |
82 | 1376 | util::string::shared_ptr _schema; | 1405 | util::string::shared_ptr _schema; |
83 | 1377 | 1406 | ||
84 | === modified file 'plugin/innobase/dict/create_replication.cc' | |||
85 | --- plugin/innobase/dict/create_replication.cc 2011-03-22 23:52:29 +0000 | |||
86 | +++ plugin/innobase/dict/create_replication.cc 2011-04-01 19:01:31 +0000 | |||
87 | @@ -76,7 +76,7 @@ | |||
88 | 76 | error = que_eval_sql(info, | 76 | error = que_eval_sql(info, |
89 | 77 | "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n" | 77 | "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n" |
90 | 78 | "BEGIN\n" | 78 | "BEGIN\n" |
92 | 79 | "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n" | 79 | "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), ORIGINATING_SERVER_ID INT, ORIGINATING_COMMIT_ID INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n" |
93 | 80 | "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n" | 80 | "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n" |
94 | 81 | "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n" | 81 | "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n" |
95 | 82 | "END;\n" | 82 | "END;\n" |
96 | @@ -147,6 +147,14 @@ | |||
97 | 147 | field->set_type(drizzled::message::Table::Field::BIGINT); | 147 | field->set_type(drizzled::message::Table::Field::BIGINT); |
98 | 148 | 148 | ||
99 | 149 | field= table_message->add_field(); | 149 | field= table_message->add_field(); |
100 | 150 | field->set_name("ORIGINATING_SERVER_ID"); | ||
101 | 151 | field->set_type(drizzled::message::Table::Field::INTEGER); | ||
102 | 152 | |||
103 | 153 | field= table_message->add_field(); | ||
104 | 154 | field->set_name("ORIGINATING_COMMIT_ID"); | ||
105 | 155 | field->set_type(drizzled::message::Table::Field::BIGINT); | ||
106 | 156 | |||
107 | 157 | field= table_message->add_field(); | ||
108 | 150 | field->set_name("MESSAGE_LEN"); | 158 | field->set_name("MESSAGE_LEN"); |
109 | 151 | field->set_type(drizzled::message::Table::Field::INTEGER); | 159 | field->set_type(drizzled::message::Table::Field::INTEGER); |
110 | 152 | 160 | ||
111 | @@ -191,7 +199,10 @@ | |||
112 | 191 | ulint insert_replication_message(const char *message, size_t size, | 199 | ulint insert_replication_message(const char *message, size_t size, |
113 | 192 | trx_t *trx, uint64_t trx_id, | 200 | trx_t *trx, uint64_t trx_id, |
114 | 193 | uint64_t end_timestamp, bool is_end_segment, | 201 | uint64_t end_timestamp, bool is_end_segment, |
116 | 194 | uint32_t seg_id) | 202 | uint32_t seg_id, uint32_t server_id, |
117 | 203 | bool use_originating_server_id, | ||
118 | 204 | uint32_t originating_server_id, | ||
119 | 205 | uint64_t originating_commit_id) | ||
120 | 195 | { | 206 | { |
121 | 196 | ulint error; | 207 | ulint error; |
122 | 197 | row_prebuilt_t* prebuilt; /* For reading rows */ | 208 | row_prebuilt_t* prebuilt; /* For reading rows */ |
123 | @@ -251,12 +262,30 @@ | |||
124 | 251 | row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table)); | 262 | row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table)); |
125 | 252 | dfield_set_data(dfield, data, 8); | 263 | dfield_set_data(dfield, data, 8); |
126 | 253 | 264 | ||
127 | 265 | if (not use_originating_server_id) | ||
128 | 266 | { | ||
129 | 267 | /* This transaction originated from this server, rather then being | ||
130 | 268 | replicated to this server reset the values to reflect that */ | ||
131 | 269 | originating_server_id= server_id; | ||
132 | 270 | originating_commit_id= commit_id; | ||
133 | 271 | } | ||
134 | 272 | |||
135 | 254 | dfield = dtuple_get_nth_field(dtuple, 4); | 273 | dfield = dtuple_get_nth_field(dtuple, 4); |
136 | 255 | data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4)); | 274 | data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4)); |
137 | 275 | row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&originating_server_id, 4, dict_table_is_comp(prebuilt->table)); | ||
138 | 276 | dfield_set_data(dfield, data, 4); | ||
139 | 277 | |||
140 | 278 | dfield = dtuple_get_nth_field(dtuple, 5); | ||
141 | 279 | data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8)); | ||
142 | 280 | row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&originating_commit_id, 8, dict_table_is_comp(prebuilt->table)); | ||
143 | 281 | dfield_set_data(dfield, data, 8); | ||
144 | 282 | |||
145 | 283 | dfield = dtuple_get_nth_field(dtuple, 6); | ||
146 | 284 | data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4)); | ||
147 | 256 | row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table)); | 285 | row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table)); |
148 | 257 | dfield_set_data(dfield, data, 4); | 286 | dfield_set_data(dfield, data, 4); |
149 | 258 | 287 | ||
151 | 259 | dfield = dtuple_get_nth_field(dtuple, 5); | 288 | dfield = dtuple_get_nth_field(dtuple, 7); |
152 | 260 | dfield_set_data(dfield, message, size); | 289 | dfield_set_data(dfield, message, size); |
153 | 261 | 290 | ||
154 | 262 | ins_node_t* node = prebuilt->ins_node; | 291 | ins_node_t* node = prebuilt->ins_node; |
155 | @@ -355,8 +384,18 @@ | |||
156 | 355 | convert_to_mysql_format(timestampbyte, field, 8); | 384 | convert_to_mysql_format(timestampbyte, field, 8); |
157 | 356 | ret.end_timestamp= *(uint64_t *)timestampbyte; | 385 | ret.end_timestamp= *(uint64_t *)timestampbyte; |
158 | 357 | 386 | ||
159 | 387 | field = rec_get_nth_field_old(rec, 6, &len); | ||
160 | 388 | byte serverbyte[4]; | ||
161 | 389 | convert_to_mysql_format(serverbyte, field, 4); | ||
162 | 390 | ret.originating_server_id= *(uint32_t *)serverbyte; | ||
163 | 391 | |||
164 | 392 | field = rec_get_nth_field_old(rec, 7, &len); | ||
165 | 393 | byte originatingcommitbyte[8]; | ||
166 | 394 | convert_to_mysql_format(originatingcommitbyte, field, 8); | ||
167 | 395 | ret.originating_commit_id= *(uint64_t *)originatingcommitbyte; | ||
168 | 396 | |||
169 | 358 | // Handler message | 397 | // Handler message |
171 | 359 | field = rec_get_nth_field_old(rec, 7, &len); | 398 | field = rec_get_nth_field_old(rec, 9, &len); |
172 | 360 | ret.message= (char *)field; | 399 | ret.message= (char *)field; |
173 | 361 | ret.message_length= len; | 400 | ret.message_length= len; |
174 | 362 | 401 | ||
175 | 363 | 402 | ||
176 | === modified file 'plugin/innobase/handler/ha_innodb.cc' | |||
177 | --- plugin/innobase/handler/ha_innodb.cc 2011-03-29 21:04:17 +0000 | |||
178 | +++ plugin/innobase/handler/ha_innodb.cc 2011-04-01 19:01:31 +0000 | |||
179 | @@ -966,8 +966,15 @@ | |||
180 | 966 | uint64_t end_timestamp= message.transaction_context().end_timestamp(); | 966 | uint64_t end_timestamp= message.transaction_context().end_timestamp(); |
181 | 967 | bool is_end_segment= message.end_segment(); | 967 | bool is_end_segment= message.end_segment(); |
182 | 968 | trx->log_commit_id= TRUE; | 968 | trx->log_commit_id= TRUE; |
183 | 969 | uint32_t server_id= message.transaction_context().server_id(); | ||
184 | 970 | uint32_t originating_server_id= session.getOriginatingServerID(); | ||
185 | 971 | uint64_t originating_commit_id= session.getOriginatingCommitID(); | ||
186 | 972 | bool use_originating_server_id= session.isOriginatingServerIDSet(); | ||
187 | 973 | |||
188 | 969 | ulint error= insert_replication_message(data, message.ByteSize(), trx, trx_id, | 974 | ulint error= insert_replication_message(data, message.ByteSize(), trx, trx_id, |
190 | 970 | end_timestamp, is_end_segment, seg_id); | 975 | end_timestamp, is_end_segment, seg_id, server_id, |
191 | 976 | use_originating_server_id, originating_server_id, | ||
192 | 977 | originating_commit_id); | ||
193 | 971 | (void)error; | 978 | (void)error; |
194 | 972 | 979 | ||
195 | 973 | delete[] data; | 980 | delete[] data; |
196 | 974 | 981 | ||
197 | === modified file 'plugin/innobase/handler/replication_dictionary.cc' | |||
198 | --- plugin/innobase/handler/replication_dictionary.cc 2011-02-17 00:14:13 +0000 | |||
199 | +++ plugin/innobase/handler/replication_dictionary.cc 2011-04-01 19:01:31 +0000 | |||
200 | @@ -79,6 +79,8 @@ | |||
201 | 79 | add_field("TRANSACTION_SEGMENT_ID", plugin::TableFunction::NUMBER, 0, false); | 79 | add_field("TRANSACTION_SEGMENT_ID", plugin::TableFunction::NUMBER, 0, false); |
202 | 80 | add_field("COMMIT_ID", plugin::TableFunction::NUMBER, 0, false); | 80 | add_field("COMMIT_ID", plugin::TableFunction::NUMBER, 0, false); |
203 | 81 | add_field("END_TIMESTAMP", plugin::TableFunction::NUMBER, 0, false); | 81 | add_field("END_TIMESTAMP", plugin::TableFunction::NUMBER, 0, false); |
204 | 82 | add_field("ORIGINATING_SERVER_ID", plugin::TableFunction::NUMBER, 0, false); | ||
205 | 83 | add_field("ORIGINATING_COMMIT_ID", plugin::TableFunction::NUMBER, 0, false); | ||
206 | 82 | add_field("TRANSACTION_MESSAGE_STRING", plugin::TableFunction::STRING, transaction_message_threshold, false); | 84 | add_field("TRANSACTION_MESSAGE_STRING", plugin::TableFunction::STRING, transaction_message_threshold, false); |
207 | 83 | add_field("TRANSACTION_LENGTH", plugin::TableFunction::NUMBER, 0, false); | 85 | add_field("TRANSACTION_LENGTH", plugin::TableFunction::NUMBER, 0, false); |
208 | 84 | } | 86 | } |
209 | @@ -110,7 +112,11 @@ | |||
210 | 110 | push(static_cast<uint64_t>(ret.commit_id)); | 112 | push(static_cast<uint64_t>(ret.commit_id)); |
211 | 111 | 113 | ||
212 | 112 | push(static_cast<uint64_t>(ret.end_timestamp)); | 114 | push(static_cast<uint64_t>(ret.end_timestamp)); |
214 | 113 | 115 | ||
215 | 116 | push(static_cast<uint64_t>(ret.originating_server_id)); | ||
216 | 117 | |||
217 | 118 | push(static_cast<uint64_t>(ret.originating_commit_id)); | ||
218 | 119 | |||
219 | 114 | /* Message in viewable format */ | 120 | /* Message in viewable format */ |
220 | 115 | bool result= message.ParseFromArray(ret.message, ret.message_length); | 121 | bool result= message.ParseFromArray(ret.message, ret.message_length); |
221 | 116 | 122 | ||
222 | 117 | 123 | ||
223 | === modified file 'plugin/innobase/include/create_replication.h' | |||
224 | --- plugin/innobase/include/create_replication.h 2011-03-14 05:40:28 +0000 | |||
225 | +++ plugin/innobase/include/create_replication.h 2011-04-01 19:01:31 +0000 | |||
226 | @@ -45,10 +45,13 @@ | |||
227 | 45 | 45 | ||
228 | 46 | UNIV_INTERN ulint dict_create_sys_replication_log(void); | 46 | UNIV_INTERN ulint dict_create_sys_replication_log(void); |
229 | 47 | 47 | ||
232 | 48 | UNIV_INTERN ulint insert_replication_message(const char *message, size_t size, | 48 | UNIV_INTERN ulint insert_replication_message(const char *message, size_t size, |
233 | 49 | trx_t *trx, uint64_t trx_id, | 49 | trx_t *trx, uint64_t trx_id, |
234 | 50 | uint64_t end_timestamp, bool is_end_segment, | 50 | uint64_t end_timestamp, bool is_end_segment, |
236 | 51 | uint32_t seg_id); | 51 | uint32_t seg_id, uint32_t server_id, |
237 | 52 | bool use_originating_server_id, | ||
238 | 53 | uint32_t originating_server_id, | ||
239 | 54 | uint64_t originating_commit_id); | ||
240 | 52 | 55 | ||
241 | 53 | UNIV_INTERN struct read_replication_state_st *replication_read_init(void); | 56 | UNIV_INTERN struct read_replication_state_st *replication_read_init(void); |
242 | 54 | UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *); | 57 | UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *); |
243 | 55 | 58 | ||
244 | === modified file 'plugin/innobase/include/read_replication.h' | |||
245 | --- plugin/innobase/include/read_replication.h 2011-03-14 05:40:28 +0000 | |||
246 | +++ plugin/innobase/include/read_replication.h 2011-04-01 19:01:31 +0000 | |||
247 | @@ -27,6 +27,8 @@ | |||
248 | 27 | unsigned long long commit_id; | 27 | unsigned long long commit_id; |
249 | 28 | unsigned long long end_timestamp; | 28 | unsigned long long end_timestamp; |
250 | 29 | unsigned long seg_id; | 29 | unsigned long seg_id; |
251 | 30 | unsigned long originating_server_id; | ||
252 | 31 | unsigned long long originating_commit_id; | ||
253 | 30 | unsigned long long message_length; | 32 | unsigned long long message_length; |
254 | 31 | const char *message; | 33 | const char *message; |
255 | 32 | }; | 34 | }; |
256 | 33 | 35 | ||
257 | === modified file 'plugin/innobase/tests/r/innodb-system-table-view.result' | |||
258 | --- plugin/innobase/tests/r/innodb-system-table-view.result 2011-02-10 18:19:37 +0000 | |||
259 | +++ plugin/innobase/tests/r/innodb-system-table-view.result 2011-04-01 19:01:31 +0000 | |||
260 | @@ -2,7 +2,7 @@ | |||
261 | 2 | TABLE_ID NAME FLAG N_COLS SPACE | 2 | TABLE_ID NAME FLAG N_COLS SPACE |
262 | 3 | 11 SYS_FOREIGN 0 7 0 | 3 | 11 SYS_FOREIGN 0 7 0 |
263 | 4 | 12 SYS_FOREIGN_COLS 0 7 0 | 4 | 12 SYS_FOREIGN_COLS 0 7 0 |
265 | 5 | 13 SYS_REPLICATION_LOG 0 9 0 | 5 | 13 SYS_REPLICATION_LOG 0 11 0 |
266 | 6 | SELECT * FROM DATA_DICTIONARY.INNODB_SYS_INDEXES; | 6 | SELECT * FROM DATA_DICTIONARY.INNODB_SYS_INDEXES; |
267 | 7 | INDEX_ID NAME TABLE_ID TYPE N_FIELDS PAGE_NO SPACE | 7 | INDEX_ID NAME TABLE_ID TYPE N_FIELDS PAGE_NO SPACE |
268 | 8 | 11 ID_IND 11 3 1 302 0 | 8 | 11 ID_IND 11 3 1 302 0 |
269 | @@ -25,8 +25,10 @@ | |||
270 | 25 | 13 SEGID 1 6 0 4 | 25 | 13 SEGID 1 6 0 4 |
271 | 26 | 13 COMMIT_ID 2 6 0 8 | 26 | 13 COMMIT_ID 2 6 0 8 |
272 | 27 | 13 END_TIMESTAMP 3 6 0 8 | 27 | 13 END_TIMESTAMP 3 6 0 8 |
275 | 28 | 13 MESSAGE_LEN 4 6 0 4 | 28 | 13 ORIGINATING_SERVER_ID 4 6 0 4 |
276 | 29 | 13 MESSAGE 5 5 4129792 0 | 29 | 13 ORIGINATING_COMMIT_ID 5 6 0 8 |
277 | 30 | 13 MESSAGE_LEN 6 6 0 4 | ||
278 | 31 | 13 MESSAGE 7 5 4129792 0 | ||
279 | 30 | SELECT * FROM DATA_DICTIONARY.INNODB_SYS_FIELDS; | 32 | SELECT * FROM DATA_DICTIONARY.INNODB_SYS_FIELDS; |
280 | 31 | INDEX_ID NAME POS | 33 | INDEX_ID NAME POS |
281 | 32 | 11 ID 0 | 34 | 11 ID 0 |
282 | @@ -70,7 +72,7 @@ | |||
283 | 70 | NAME FLAG N_COLS SPACE | 72 | NAME FLAG N_COLS SPACE |
284 | 71 | SYS_FOREIGN 0 7 0 | 73 | SYS_FOREIGN 0 7 0 |
285 | 72 | SYS_FOREIGN_COLS 0 7 0 | 74 | SYS_FOREIGN_COLS 0 7 0 |
287 | 73 | SYS_REPLICATION_LOG 0 9 0 | 75 | SYS_REPLICATION_LOG 0 11 0 |
288 | 74 | test/child 1 5 0 | 76 | test/child 1 5 0 |
289 | 75 | test/parent 1 4 0 | 77 | test/parent 1 4 0 |
290 | 76 | SELECT name, n_fields | 78 | SELECT name, n_fields |
291 | 77 | 79 | ||
292 | === modified file 'plugin/innobase/tests/r/innodb_replication_log.result' | |||
293 | --- plugin/innobase/tests/r/innodb_replication_log.result 2011-02-21 17:37:00 +0000 | |||
294 | +++ plugin/innobase/tests/r/innodb_replication_log.result 2011-04-01 19:01:31 +0000 | |||
295 | @@ -10,6 +10,8 @@ | |||
296 | 10 | `TRANSACTION_SEGMENT_ID` BIGINT NOT NULL, | 10 | `TRANSACTION_SEGMENT_ID` BIGINT NOT NULL, |
297 | 11 | `COMMIT_ID` BIGINT NOT NULL, | 11 | `COMMIT_ID` BIGINT NOT NULL, |
298 | 12 | `END_TIMESTAMP` BIGINT NOT NULL, | 12 | `END_TIMESTAMP` BIGINT NOT NULL, |
299 | 13 | `ORIGINATING_SERVER_ID` BIGINT NOT NULL, | ||
300 | 14 | `ORIGINATING_COMMIT_ID` BIGINT NOT NULL, | ||
301 | 13 | `TRANSACTION_MESSAGE_STRING` TEXT COLLATE utf8_general_ci NOT NULL, | 15 | `TRANSACTION_MESSAGE_STRING` TEXT COLLATE utf8_general_ci NOT NULL, |
302 | 14 | `TRANSACTION_LENGTH` BIGINT NOT NULL | 16 | `TRANSACTION_LENGTH` BIGINT NOT NULL |
303 | 15 | ) ENGINE=FunctionEngine COLLATE = utf8_general_ci REPLICATE = FALSE | 17 | ) ENGINE=FunctionEngine COLLATE = utf8_general_ci REPLICATE = FALSE |
304 | 16 | 18 | ||
305 | === modified file 'plugin/schema_dictionary/tests/r/data_dictionary.result' | |||
306 | --- plugin/schema_dictionary/tests/r/data_dictionary.result 2011-03-18 16:53:27 +0000 | |||
307 | +++ plugin/schema_dictionary/tests/r/data_dictionary.result 2011-04-01 19:01:31 +0000 | |||
308 | @@ -1,7 +1,7 @@ | |||
309 | 1 | use data_dictionary; | 1 | use data_dictionary; |
310 | 2 | SELECT count(*) FROM columns; | 2 | SELECT count(*) FROM columns; |
311 | 3 | count(*) | 3 | count(*) |
313 | 4 | 580 | 4 | 584 |
314 | 5 | SELECT count(*) FROM indexes; | 5 | SELECT count(*) FROM indexes; |
315 | 6 | count(*) | 6 | count(*) |
316 | 7 | 2 | 7 | 2 |
317 | @@ -356,6 +356,10 @@ | |||
318 | 356 | ORDINAL_POSITION | 356 | ORDINAL_POSITION |
319 | 357 | ORDINAL_POSITION | 357 | ORDINAL_POSITION |
320 | 358 | ORDINAL_POSITION | 358 | ORDINAL_POSITION |
321 | 359 | ORIGINATING_COMMIT_ID | ||
322 | 360 | ORIGINATING_COMMIT_ID | ||
323 | 361 | ORIGINATING_SERVER_ID | ||
324 | 362 | ORIGINATING_SERVER_ID | ||
325 | 359 | OTHER_INDEX_SIZE | 363 | OTHER_INDEX_SIZE |
326 | 360 | PAGES_FREE | 364 | PAGES_FREE |
327 | 361 | PAGES_FREE | 365 | PAGES_FREE |
328 | @@ -869,6 +873,8 @@ | |||
329 | 869 | DATA_DICTIONARY INNODB_LOCK_WAITS REQUESTING_TRX_ID | 873 | DATA_DICTIONARY INNODB_LOCK_WAITS REQUESTING_TRX_ID |
330 | 870 | DATA_DICTIONARY INNODB_REPLICATION_LOG COMMIT_ID | 874 | DATA_DICTIONARY INNODB_REPLICATION_LOG COMMIT_ID |
331 | 871 | DATA_DICTIONARY INNODB_REPLICATION_LOG END_TIMESTAMP | 875 | DATA_DICTIONARY INNODB_REPLICATION_LOG END_TIMESTAMP |
332 | 876 | DATA_DICTIONARY INNODB_REPLICATION_LOG ORIGINATING_COMMIT_ID | ||
333 | 877 | DATA_DICTIONARY INNODB_REPLICATION_LOG ORIGINATING_SERVER_ID | ||
334 | 872 | DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_ID | 878 | DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_ID |
335 | 873 | DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_LENGTH | 879 | DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_LENGTH |
336 | 874 | DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_MESSAGE_STRING | 880 | DATA_DICTIONARY INNODB_REPLICATION_LOG TRANSACTION_MESSAGE_STRING |
337 | @@ -995,6 +1001,8 @@ | |||
338 | 995 | DATA_DICTIONARY SYS_REPLICATION_LOG ID | 1001 | DATA_DICTIONARY SYS_REPLICATION_LOG ID |
339 | 996 | DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE | 1002 | DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE |
340 | 997 | DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE_LEN | 1003 | DATA_DICTIONARY SYS_REPLICATION_LOG MESSAGE_LEN |
341 | 1004 | DATA_DICTIONARY SYS_REPLICATION_LOG ORIGINATING_COMMIT_ID | ||
342 | 1005 | DATA_DICTIONARY SYS_REPLICATION_LOG ORIGINATING_SERVER_ID | ||
343 | 998 | DATA_DICTIONARY SYS_REPLICATION_LOG SEGID | 1006 | DATA_DICTIONARY SYS_REPLICATION_LOG SEGID |
344 | 999 | DATA_DICTIONARY TABLES AUTO_INCREMENT | 1007 | DATA_DICTIONARY TABLES AUTO_INCREMENT |
345 | 1000 | DATA_DICTIONARY TABLES ENGINE | 1008 | DATA_DICTIONARY TABLES ENGINE |
346 | @@ -1532,6 +1540,8 @@ | |||
347 | 1532 | INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID | 1540 | INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID |
348 | 1533 | INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID | 1541 | INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID |
349 | 1534 | INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP | 1542 | INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP |
350 | 1543 | INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID | ||
351 | 1544 | INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID | ||
352 | 1535 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID | 1545 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID |
353 | 1536 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH | 1546 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH |
354 | 1537 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING | 1547 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING |
355 | @@ -1787,6 +1797,8 @@ | |||
356 | 1787 | SYS_REPLICATION_LOG DATA_DICTIONARY ID | 1797 | SYS_REPLICATION_LOG DATA_DICTIONARY ID |
357 | 1788 | SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE | 1798 | SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE |
358 | 1789 | SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN | 1799 | SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN |
359 | 1800 | SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID | ||
360 | 1801 | SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID | ||
361 | 1790 | SYS_REPLICATION_LOG DATA_DICTIONARY SEGID | 1802 | SYS_REPLICATION_LOG DATA_DICTIONARY SEGID |
362 | 1791 | TABLES DATA_DICTIONARY AUTO_INCREMENT | 1803 | TABLES DATA_DICTIONARY AUTO_INCREMENT |
363 | 1792 | TABLES DATA_DICTIONARY ENGINE | 1804 | TABLES DATA_DICTIONARY ENGINE |
364 | @@ -2115,6 +2127,8 @@ | |||
365 | 2115 | INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID | 2127 | INNODB_LOCK_WAITS DATA_DICTIONARY REQUESTING_TRX_ID |
366 | 2116 | INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID | 2128 | INNODB_REPLICATION_LOG DATA_DICTIONARY COMMIT_ID |
367 | 2117 | INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP | 2129 | INNODB_REPLICATION_LOG DATA_DICTIONARY END_TIMESTAMP |
368 | 2130 | INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID | ||
369 | 2131 | INNODB_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID | ||
370 | 2118 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID | 2132 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_ID |
371 | 2119 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH | 2133 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_LENGTH |
372 | 2120 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING | 2134 | INNODB_REPLICATION_LOG DATA_DICTIONARY TRANSACTION_MESSAGE_STRING |
373 | @@ -2370,6 +2384,8 @@ | |||
374 | 2370 | SYS_REPLICATION_LOG DATA_DICTIONARY ID | 2384 | SYS_REPLICATION_LOG DATA_DICTIONARY ID |
375 | 2371 | SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE | 2385 | SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE |
376 | 2372 | SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN | 2386 | SYS_REPLICATION_LOG DATA_DICTIONARY MESSAGE_LEN |
377 | 2387 | SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_COMMIT_ID | ||
378 | 2388 | SYS_REPLICATION_LOG DATA_DICTIONARY ORIGINATING_SERVER_ID | ||
379 | 2373 | SYS_REPLICATION_LOG DATA_DICTIONARY SEGID | 2389 | SYS_REPLICATION_LOG DATA_DICTIONARY SEGID |
380 | 2374 | TABLES DATA_DICTIONARY AUTO_INCREMENT | 2390 | TABLES DATA_DICTIONARY AUTO_INCREMENT |
381 | 2375 | TABLES DATA_DICTIONARY ENGINE | 2391 | TABLES DATA_DICTIONARY ENGINE |
382 | 2376 | 2392 | ||
383 | === modified file 'plugin/slave/queue_consumer.cc' | |||
384 | --- plugin/slave/queue_consumer.cc 2011-03-19 20:20:32 +0000 | |||
385 | +++ plugin/slave/queue_consumer.cc 2011-04-01 19:01:31 +0000 | |||
386 | @@ -75,6 +75,8 @@ | |||
387 | 75 | for (size_t x= 0; x < completedTransactionIds.size(); x++) | 75 | for (size_t x= 0; x < completedTransactionIds.size(); x++) |
388 | 76 | { | 76 | { |
389 | 77 | string commit_id; | 77 | string commit_id; |
390 | 78 | uint32_t originating_server_id= 0; | ||
391 | 79 | uint64_t originating_commit_id= 0; | ||
392 | 78 | uint64_t trx_id= completedTransactionIds[x]; | 80 | uint64_t trx_id= completedTransactionIds[x]; |
393 | 79 | 81 | ||
394 | 80 | vector<string> aggregate_sql; /* final SQL to execute */ | 82 | vector<string> aggregate_sql; /* final SQL to execute */ |
395 | @@ -83,7 +85,8 @@ | |||
396 | 83 | message::Transaction transaction; | 85 | message::Transaction transaction; |
397 | 84 | uint32_t segment_id= 1; | 86 | uint32_t segment_id= 1; |
398 | 85 | 87 | ||
400 | 86 | while (getMessage(transaction, commit_id, master_id, trx_id, segment_id++)) | 88 | while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_id, |
401 | 89 | originating_commit_id, segment_id++)) | ||
402 | 87 | { | 90 | { |
403 | 88 | convertToSQL(transaction, aggregate_sql, segmented_sql); | 91 | convertToSQL(transaction, aggregate_sql, segmented_sql); |
404 | 89 | transaction.Clear(); | 92 | transaction.Clear(); |
405 | @@ -132,7 +135,8 @@ | |||
406 | 132 | } | 135 | } |
407 | 133 | } | 136 | } |
408 | 134 | 137 | ||
410 | 135 | if (not executeSQLWithCommitId(aggregate_sql, commit_id, master_id)) | 138 | if (not executeSQLWithCommitId(aggregate_sql, commit_id, master_id, |
411 | 139 | originating_server_id, originating_commit_id)) | ||
412 | 136 | { | 140 | { |
413 | 137 | if (_ignore_errors) | 141 | if (_ignore_errors) |
414 | 138 | { | 142 | { |
415 | @@ -143,6 +147,10 @@ | |||
416 | 143 | string tmp("UPDATE `sys_replication`.`applier_state`" | 147 | string tmp("UPDATE `sys_replication`.`applier_state`" |
417 | 144 | " SET `last_applied_commit_id` = "); | 148 | " SET `last_applied_commit_id` = "); |
418 | 145 | tmp.append(commit_id); | 149 | tmp.append(commit_id); |
419 | 150 | tmp.append(", `originating_server_id` = "); | ||
420 | 151 | tmp.append(boost::lexical_cast<string>(originating_server_id)); | ||
421 | 152 | tmp.append(", `originating_commit_id` = "); | ||
422 | 153 | tmp.append(boost::lexical_cast<string>(originating_commit_id)); | ||
423 | 146 | tmp.append(" WHERE `master_id` = "); | 154 | tmp.append(" WHERE `master_id` = "); |
424 | 147 | tmp.append(master_id); | 155 | tmp.append(master_id); |
425 | 148 | sql.push_back(tmp); | 156 | sql.push_back(tmp); |
426 | @@ -168,9 +176,12 @@ | |||
427 | 168 | string &commit_id, | 176 | string &commit_id, |
428 | 169 | const string &master_id, | 177 | const string &master_id, |
429 | 170 | uint64_t trx_id, | 178 | uint64_t trx_id, |
430 | 179 | uint32_t &originating_server_id, | ||
431 | 180 | uint64_t &originating_commit_id, | ||
432 | 171 | uint32_t segment_id) | 181 | uint32_t segment_id) |
433 | 172 | { | 182 | { |
435 | 173 | string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`" | 183 | string sql("SELECT `msg`, `commit_order`, `originating_server_id`, " |
436 | 184 | " `originating_commit_id` FROM `sys_replication`.`queue`" | ||
437 | 174 | " WHERE `trx_id` = "); | 185 | " WHERE `trx_id` = "); |
438 | 175 | sql.append(boost::lexical_cast<string>(trx_id)); | 186 | sql.append(boost::lexical_cast<string>(trx_id)); |
439 | 176 | sql.append(" AND `seg_id` = ", 16); | 187 | sql.append(" AND `seg_id` = ", 16); |
440 | @@ -178,12 +189,12 @@ | |||
441 | 178 | sql.append(" AND `master_id` = ", 19), | 189 | sql.append(" AND `master_id` = ", 19), |
442 | 179 | sql.append(master_id); | 190 | sql.append(master_id); |
443 | 180 | 191 | ||
445 | 181 | sql::ResultSet result_set(2); | 192 | sql::ResultSet result_set(4); |
446 | 182 | Execute execute(*(_session.get()), true); | 193 | Execute execute(*(_session.get()), true); |
447 | 183 | 194 | ||
448 | 184 | execute.run(sql, result_set); | 195 | execute.run(sql, result_set); |
449 | 185 | 196 | ||
451 | 186 | assert(result_set.getMetaData().getColumnCount() == 2); | 197 | assert(result_set.getMetaData().getColumnCount() == 4); |
452 | 187 | 198 | ||
453 | 188 | /* Really should only be 1 returned row */ | 199 | /* Really should only be 1 returned row */ |
454 | 189 | uint32_t found_rows= 0; | 200 | uint32_t found_rows= 0; |
455 | @@ -191,6 +202,8 @@ | |||
456 | 191 | { | 202 | { |
457 | 192 | string msg= result_set.getString(0); | 203 | string msg= result_set.getString(0); |
458 | 193 | string com_id= result_set.getString(1); | 204 | string com_id= result_set.getString(1); |
459 | 205 | string orig_server_id= result_set.getString(2); | ||
460 | 206 | string orig_commit_id= result_set.getString(3); | ||
461 | 194 | 207 | ||
462 | 195 | if ((msg == "") || (found_rows == 1)) | 208 | if ((msg == "") || (found_rows == 1)) |
463 | 196 | break; | 209 | break; |
464 | @@ -198,10 +211,14 @@ | |||
465 | 198 | /* Neither column should be NULL */ | 211 | /* Neither column should be NULL */ |
466 | 199 | assert(result_set.isNull(0) == false); | 212 | assert(result_set.isNull(0) == false); |
467 | 200 | assert(result_set.isNull(1) == false); | 213 | assert(result_set.isNull(1) == false); |
468 | 214 | assert(result_set.isNull(2) == false); | ||
469 | 215 | assert(result_set.isNull(3) == false); | ||
470 | 201 | 216 | ||
471 | 202 | google::protobuf::TextFormat::ParseFromString(msg, &transaction); | 217 | google::protobuf::TextFormat::ParseFromString(msg, &transaction); |
472 | 203 | 218 | ||
473 | 204 | commit_id= com_id; | 219 | commit_id= com_id; |
474 | 220 | originating_server_id= boost::lexical_cast<uint32_t>(orig_server_id); | ||
475 | 221 | originating_commit_id= boost::lexical_cast<uint64_t>(orig_commit_id); | ||
476 | 205 | found_rows++; | 222 | found_rows++; |
477 | 206 | } | 223 | } |
478 | 207 | 224 | ||
479 | @@ -404,15 +421,24 @@ | |||
480 | 404 | 421 | ||
481 | 405 | bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql, | 422 | bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql, |
482 | 406 | const string &commit_id, | 423 | const string &commit_id, |
484 | 407 | const string &master_id) | 424 | const string &master_id, |
485 | 425 | uint32_t originating_server_id, | ||
486 | 426 | uint64_t originating_commit_id) | ||
487 | 408 | { | 427 | { |
488 | 409 | string tmp("UPDATE `sys_replication`.`applier_state`" | 428 | string tmp("UPDATE `sys_replication`.`applier_state`" |
489 | 410 | " SET `last_applied_commit_id` = "); | 429 | " SET `last_applied_commit_id` = "); |
490 | 411 | tmp.append(commit_id); | 430 | tmp.append(commit_id); |
491 | 431 | tmp.append(", `originating_server_id` = "); | ||
492 | 432 | tmp.append(boost::lexical_cast<string>(originating_server_id)); | ||
493 | 433 | tmp.append(", `originating_commit_id` = "); | ||
494 | 434 | tmp.append(boost::lexical_cast<string>(originating_commit_id)); | ||
495 | 412 | tmp.append(" WHERE `master_id` = "); | 435 | tmp.append(" WHERE `master_id` = "); |
496 | 413 | tmp.append(master_id); | 436 | tmp.append(master_id); |
497 | 414 | sql.push_back(tmp); | 437 | sql.push_back(tmp); |
499 | 415 | 438 | ||
500 | 439 | _session->setOriginatingServerID(originating_server_id); | ||
501 | 440 | _session->setOriginatingCommitID(originating_commit_id); | ||
502 | 441 | |||
503 | 416 | return executeSQL(sql); | 442 | return executeSQL(sql); |
504 | 417 | } | 443 | } |
505 | 418 | 444 | ||
506 | 419 | 445 | ||
507 | === modified file 'plugin/slave/queue_consumer.h' | |||
508 | --- plugin/slave/queue_consumer.h 2011-03-19 20:20:32 +0000 | |||
509 | +++ plugin/slave/queue_consumer.h 2011-04-01 19:01:31 +0000 | |||
510 | @@ -111,6 +111,8 @@ | |||
511 | 111 | std::string &commit_id, | 111 | std::string &commit_id, |
512 | 112 | const std::string &master_id, | 112 | const std::string &master_id, |
513 | 113 | uint64_t trx_id, | 113 | uint64_t trx_id, |
514 | 114 | uint32_t &originating_server_id, | ||
515 | 115 | uint64_t &originating_commit_id, | ||
516 | 114 | uint32_t segment_id); | 116 | uint32_t segment_id); |
517 | 115 | 117 | ||
518 | 116 | /** | 118 | /** |
519 | @@ -133,14 +135,19 @@ | |||
520 | 133 | * @param sql Batch of SQL statements to execute. | 135 | * @param sql Batch of SQL statements to execute. |
521 | 134 | * @param commit_id Commit ID value to store in state table. | 136 | * @param commit_id Commit ID value to store in state table. |
522 | 135 | * @param master_id ID of the master this SQL came from. | 137 | * @param master_id ID of the master this SQL came from. |
523 | 138 | * @param originating_server_id Server ID of the master where | ||
524 | 139 | * this SQL was originally applied. | ||
525 | 140 | * @param originating_commit_id Commit ID of the master where | ||
526 | 141 | * this SQL was originally applied. | ||
527 | 136 | * | 142 | * |
528 | 137 | * @retval true Success | 143 | * @retval true Success |
529 | 138 | * @retval false Failure | 144 | * @retval false Failure |
530 | 139 | */ | 145 | */ |
531 | 140 | bool executeSQLWithCommitId(std::vector<std::string> &sql, | 146 | bool executeSQLWithCommitId(std::vector<std::string> &sql, |
532 | 141 | const std::string &commit_id, | 147 | const std::string &commit_id, |
535 | 142 | const std::string &master_id); | 148 | const std::string &master_id, |
536 | 143 | 149 | uint32_t originating_server_id, | |
537 | 150 | uint64_t originating_commit_id); | ||
538 | 144 | /** | 151 | /** |
539 | 145 | * Remove messages for a given transaction from the queue. | 152 | * Remove messages for a given transaction from the queue. |
540 | 146 | * | 153 | * |
541 | 147 | 154 | ||
542 | === modified file 'plugin/slave/queue_producer.cc' | |||
543 | --- plugin/slave/queue_producer.cc 2011-03-19 20:20:32 +0000 | |||
544 | +++ plugin/slave/queue_producer.cc 2011-04-01 19:01:31 +0000 | |||
545 | @@ -296,6 +296,8 @@ | |||
546 | 296 | bool QueueProducer::queueInsert(const char *trx_id, | 296 | bool QueueProducer::queueInsert(const char *trx_id, |
547 | 297 | const char *seg_id, | 297 | const char *seg_id, |
548 | 298 | const char *commit_id, | 298 | const char *commit_id, |
549 | 299 | const char *originating_server_id, | ||
550 | 300 | const char *originating_commit_id, | ||
551 | 299 | const char *msg, | 301 | const char *msg, |
552 | 300 | const char *msg_length) | 302 | const char *msg_length) |
553 | 301 | { | 303 | { |
554 | @@ -307,7 +309,8 @@ | |||
555 | 307 | * The SQL to insert our results into the local queue. | 309 | * The SQL to insert our results into the local queue. |
556 | 308 | */ | 310 | */ |
557 | 309 | string sql= "INSERT INTO `sys_replication`.`queue`" | 311 | string sql= "INSERT INTO `sys_replication`.`queue`" |
559 | 310 | " (`master_id`, `trx_id`, `seg_id`, `commit_order`, `msg`) VALUES ("; | 312 | " (`master_id`, `trx_id`, `seg_id`, `commit_order`," |
560 | 313 | " `originating_server_id`, `originating_commit_id`, `msg`) VALUES ("; | ||
561 | 311 | sql.append(boost::lexical_cast<string>(masterId())); | 314 | sql.append(boost::lexical_cast<string>(masterId())); |
562 | 312 | sql.append(", ", 2); | 315 | sql.append(", ", 2); |
563 | 313 | sql.append(trx_id); | 316 | sql.append(trx_id); |
564 | @@ -315,6 +318,10 @@ | |||
565 | 315 | sql.append(seg_id); | 318 | sql.append(seg_id); |
566 | 316 | sql.append(", ", 2); | 319 | sql.append(", ", 2); |
567 | 317 | sql.append(commit_id); | 320 | sql.append(commit_id); |
568 | 321 | sql.append(", ", 2); | ||
569 | 322 | sql.append(originating_server_id); | ||
570 | 323 | sql.append(", ", 2); | ||
571 | 324 | sql.append(originating_commit_id); | ||
572 | 318 | sql.append(", '", 3); | 325 | sql.append(", '", 3); |
573 | 319 | 326 | ||
574 | 320 | /* | 327 | /* |
575 | @@ -395,7 +402,8 @@ | |||
576 | 395 | /* | 402 | /* |
577 | 396 | * The SQL to pull everything we need from the master. | 403 | * The SQL to pull everything we need from the master. |
578 | 397 | */ | 404 | */ |
580 | 398 | string sql= "SELECT `id`, `segid`, `commit_id`, `message`, `message_len` " | 405 | string sql= "SELECT `id`, `segid`, `commit_id`, `originating_server_id`," |
581 | 406 | " `originating_commit_id`, `message`, `message_len` " | ||
582 | 399 | " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN ("; | 407 | " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN ("; |
583 | 400 | 408 | ||
584 | 401 | for (size_t x= 0; x < trx_id_list.size(); x++) | 409 | for (size_t x= 0; x < trx_id_list.size(); x++) |
585 | @@ -439,7 +447,7 @@ | |||
586 | 439 | 447 | ||
587 | 440 | while ((row= drizzle_row_next(&result)) != NULL) | 448 | while ((row= drizzle_row_next(&result)) != NULL) |
588 | 441 | { | 449 | { |
590 | 442 | if (not queueInsert(row[0], row[1], row[2], row[3], row[4])) | 450 | if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6])) |
591 | 443 | { | 451 | { |
592 | 444 | errmsg_printf(error::ERROR, | 452 | errmsg_printf(error::ERROR, |
593 | 445 | _("Replication slave: Unable to insert into queue.")); | 453 | _("Replication slave: Unable to insert into queue.")); |
594 | 446 | 454 | ||
595 | === modified file 'plugin/slave/queue_producer.h' | |||
596 | --- plugin/slave/queue_producer.h 2011-03-19 20:20:32 +0000 | |||
597 | +++ plugin/slave/queue_producer.h 2011-04-01 19:01:31 +0000 | |||
598 | @@ -184,6 +184,8 @@ | |||
599 | 184 | bool queueInsert(const char *trx_id, | 184 | bool queueInsert(const char *trx_id, |
600 | 185 | const char *seg_id, | 185 | const char *seg_id, |
601 | 186 | const char *commit_id, | 186 | const char *commit_id, |
602 | 187 | const char *originating_server_id, | ||
603 | 188 | const char *originating_commit_id, | ||
604 | 187 | const char *msg, | 189 | const char *msg, |
605 | 188 | const char *msg_length); | 190 | const char *msg_length); |
606 | 189 | 191 | ||
607 | 190 | 192 | ||
608 | === modified file 'plugin/slave/replication_schema.cc' | |||
609 | --- plugin/slave/replication_schema.cc 2011-03-19 20:20:32 +0000 | |||
610 | +++ plugin/slave/replication_schema.cc 2011-04-01 19:01:31 +0000 | |||
611 | @@ -73,6 +73,7 @@ | |||
612 | 73 | * Table: applier_state | 73 | * Table: applier_state |
613 | 74 | * Version 1.0: Initial definition | 74 | * Version 1.0: Initial definition |
614 | 75 | * Version 1.1: Added master_id and changed PK to master_id | 75 | * Version 1.1: Added master_id and changed PK to master_id |
615 | 76 | * Version 1.2: Added originating_server_id and originating_commit_id | ||
616 | 76 | */ | 77 | */ |
617 | 77 | 78 | ||
618 | 78 | sql.clear(); | 79 | sql.clear(); |
619 | @@ -80,10 +81,12 @@ | |||
620 | 80 | sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state` (" | 81 | sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state` (" |
621 | 81 | " `master_id` BIGINT NOT NULL," | 82 | " `master_id` BIGINT NOT NULL," |
622 | 82 | " `last_applied_commit_id` BIGINT NOT NULL," | 83 | " `last_applied_commit_id` BIGINT NOT NULL," |
623 | 84 | " `originating_server_id` BIGINT NOT NULL," | ||
624 | 85 | " `originating_commit_id` BIGINT NOT NULL," | ||
625 | 83 | " `status` VARCHAR(20) NOT NULL," | 86 | " `status` VARCHAR(20) NOT NULL," |
626 | 84 | " `error_msg` VARCHAR(250)," | 87 | " `error_msg` VARCHAR(250)," |
627 | 85 | " PRIMARY KEY(`master_id`))" | 88 | " PRIMARY KEY(`master_id`))" |
629 | 86 | " COMMENT = 'VERSION 1.1'"); | 89 | " COMMENT = 'VERSION 1.2'"); |
630 | 87 | 90 | ||
631 | 88 | if (not executeSQL(sql)) | 91 | if (not executeSQL(sql)) |
632 | 89 | return false; | 92 | return false; |
633 | @@ -101,6 +104,8 @@ | |||
634 | 101 | " `trx_id` BIGINT NOT NULL," | 104 | " `trx_id` BIGINT NOT NULL," |
635 | 102 | " `seg_id` INT NOT NULL," | 105 | " `seg_id` INT NOT NULL," |
636 | 103 | " `commit_order` BIGINT," | 106 | " `commit_order` BIGINT," |
637 | 107 | " `originating_server_id` INT NOT NULL," | ||
638 | 108 | " `originating_commit_id` BIGINT NOT NULL," | ||
639 | 104 | " `msg` BLOB," | 109 | " `msg` BLOB," |
640 | 105 | " PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))" | 110 | " PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))" |
641 | 106 | " COMMENT = 'VERSION 1.1'"); | 111 | " COMMENT = 'VERSION 1.1'"); |
642 | @@ -151,9 +156,11 @@ | |||
643 | 151 | { | 156 | { |
644 | 152 | sql.clear(); | 157 | sql.clear(); |
645 | 153 | sql.push_back("INSERT INTO `sys_replication`.`applier_state`" | 158 | sql.push_back("INSERT INTO `sys_replication`.`applier_state`" |
647 | 154 | " (`master_id`, `last_applied_commit_id`, `status`) VALUES (" | 159 | " (`master_id`, `last_applied_commit_id`," |
648 | 160 | " `originating_server_id`, `originating_commit_id`," | ||
649 | 161 | " `status`) VALUES (" | ||
650 | 155 | + boost::lexical_cast<string>(master_id) | 162 | + boost::lexical_cast<string>(master_id) |
652 | 156 | + ",0 , 'STOPPED')"); | 163 | + ",0 , 0, 0, 'STOPPED')"); |
653 | 157 | if (not executeSQL(sql)) | 164 | if (not executeSQL(sql)) |
654 | 158 | return false; | 165 | return false; |
655 | 159 | } | 166 | } |
on hold at the moment