Merge lp:~jaypipes/drizzle/replication-group-commit into lp:~drizzle-trunk/drizzle/development

Proposed by Jay Pipes
Status: Merged
Merged at revision: not available
Proposed branch: lp:~jaypipes/drizzle/replication-group-commit
Merge into: lp:~drizzle-trunk/drizzle/development
Diff against target: 523 lines
12 files modified
drizzled/message/statement_transform.cc (+18/-78)
drizzled/message/statement_transform.h (+0/-26)
drizzled/message/transaction.proto (+3/-2)
drizzled/message/transaction_writer.cc (+3/-4)
drizzled/replication_services.cc (+52/-26)
tests/suite/transaction_log/r/filtered_replicator.result (+21/-0)
tests/suite/transaction_log/r/no_modification.result (+20/-0)
tests/suite/transaction_log/r/update.result (+21/-0)
tests/suite/transaction_log/t/no_modification-master.opt (+1/-0)
tests/suite/transaction_log/t/no_modification.inc (+26/-0)
tests/suite/transaction_log/t/no_modification.test (+13/-0)
tests/suite/transaction_log/t/update.inc (+16/-0)
To merge this branch: bzr merge lp:~jaypipes/drizzle/replication-group-commit
Reviewer Review Type Date Requested Status
Eric Lambert (community) Approve
Review via email: mp+14380@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Jay Pipes (jaypipes) wrote :

    This patch fixes a bug in the replication service and transaction
    proto file that was occurring when an UPDATE statement was SETting
    a field to a value by referencing the field itself.

    For instance, imagine the following scenario:

    CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
    INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
    UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);

    Previously, the UpdateHeader message contained a single set_value
    byte array containing the new value to set the field to. However,
    in scenarios such as the above, for each record being updated, the
    value that the field would be set to is different. Therefore, this
    patch moves the set_value byte array from the UpdateHeader message
    into the UpdateRecord message and names it after_value to match the
    existing before_value byte array.

    This patch adds a test case to the existing update.test case in the
    transaction log suite and modifies the statement_transform library to
    properly handle the above scenario.

1177. By Jay Pipes <jpipes@serialcoder>

Updated filtered_replicator test due to new tests for UPDATE.

Revision history for this message
Eric Lambert (elambert) wrote :

looks good to me

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'drizzled/message/statement_transform.cc'
2--- drizzled/message/statement_transform.cc 2009-10-13 20:10:07 +0000
3+++ drizzled/message/statement_transform.cc 2009-11-03 19:11:13 +0000
4@@ -344,6 +344,23 @@
5 destination->push_back(quoted_identifier);
6 destination->append(" SET ");
7
8+ return NONE;
9+}
10+
11+enum message::TransformSqlError
12+message::transformUpdateRecordToSql(const message::UpdateHeader &header,
13+ const message::UpdateRecord &record,
14+ std::string *destination,
15+ enum message::TransformSqlVariant sql_variant)
16+{
17+ enum message::TransformSqlError error= transformUpdateHeaderToSql(header,
18+ destination,
19+ sql_variant);
20+
21+ char quoted_identifier= '`';
22+ if (sql_variant == ANSI)
23+ quoted_identifier= '"';
24+
25 /* Add field SET list to SQL string... */
26 size_t num_set_fields= header.set_field_metadata_size();
27 size_t x;
28@@ -365,32 +382,13 @@
29 if (should_quote_field_value)
30 destination->push_back('\'');
31
32- destination->append(header.set_value(x));
33+ destination->append(record.after_value(x));
34
35 if (should_quote_field_value)
36 destination->push_back('\'');
37 }
38
39- return NONE;
40-}
41-
42-enum message::TransformSqlError
43-message::transformUpdateRecordToSql(const message::UpdateHeader &header,
44- const message::UpdateRecord &record,
45- std::string *destination,
46- enum message::TransformSqlVariant sql_variant)
47-{
48- enum message::TransformSqlError error= transformUpdateHeaderToSql(header,
49- destination,
50- sql_variant);
51-
52- char quoted_identifier= '`';
53- if (sql_variant == ANSI)
54- quoted_identifier= '"';
55-
56 size_t num_key_fields= header.key_field_metadata_size();
57- size_t x;
58- bool should_quote_field_value= false;
59
60 destination->append(" WHERE ");
61 for (x= 0; x < num_key_fields; ++x)
62@@ -423,64 +421,6 @@
63 }
64
65 enum message::TransformSqlError
66-message::transformUpdateStatementToSql(const message::UpdateHeader &header,
67- const message::UpdateData &data,
68- std::string *destination,
69- enum message::TransformSqlVariant sql_variant)
70-{
71- enum message::TransformSqlError error= transformUpdateHeaderToSql(header,
72- destination,
73- sql_variant);
74-
75- char quoted_identifier= '`';
76- if (sql_variant == ANSI)
77- quoted_identifier= '"';
78-
79- /* Add WHERE clause to SQL string... */
80- size_t num_key_fields= header.key_field_metadata_size();
81- size_t num_key_records= data.record_size();
82- size_t x, y;
83- bool should_quote_field_value= false;
84-
85- destination->append(" WHERE ");
86- for (x= 0; x < num_key_records; ++x)
87- {
88- if (x != 0)
89- destination->append(" OR "); /* Always OR condition for multiple key records */
90-
91- if (num_key_fields > 1)
92- destination->push_back('(');
93-
94- for (y= 0; y < num_key_fields; ++y)
95- {
96- const message::FieldMetadata &field_metadata= header.key_field_metadata(y);
97-
98- if (y != 0)
99- destination->append(" AND "); /* Always AND condition with a multi-column PK */
100-
101- destination->push_back(quoted_identifier);
102- destination->append(field_metadata.name());
103- destination->push_back(quoted_identifier);
104-
105- destination->push_back('=');
106-
107- should_quote_field_value= message::shouldQuoteFieldValue(field_metadata.type());
108-
109- if (should_quote_field_value)
110- destination->push_back('\'');
111-
112- destination->append(data.record(x).key_value(y));
113-
114- if (should_quote_field_value)
115- destination->push_back('\'');
116- }
117- if (num_key_fields > 1)
118- destination->push_back(')');
119- }
120- return error;
121-}
122-
123-enum message::TransformSqlError
124 message::transformDeleteHeaderToSql(const message::DeleteHeader &header,
125 std::string *destination,
126 enum message::TransformSqlVariant sql_variant)
127
128=== modified file 'drizzled/message/statement_transform.h'
129--- drizzled/message/statement_transform.h 2009-10-06 01:38:40 +0000
130+++ drizzled/message/statement_transform.h 2009-11-03 19:11:13 +0000
131@@ -154,32 +154,6 @@
132 transformInsertHeaderToSql(const InsertHeader &header,
133 std::string *destination,
134 enum TransformSqlVariant sql_variant= DRIZZLE);
135-/**
136- * This function looks at a supplied UpdateHeader
137- * and UpdateData message and constructs a correctly-formatted SQL
138- * statement to the supplied destination string.
139- *
140- * @note
141- *
142- * This function constructs a <strong>single SQL statement</strong>
143- * that contains all the update keys represented in all records in
144- * the UpdateData message.
145- *
146- * @param UpdateHeader message to transform
147- * @param UpdateData message to transform
148- * @param Destination string to append SQL to
149- * @param Variation of SQL to generate
150- *
151- * @retval
152- * NONE if successful transformation
153- * @retval
154- * Error code (see enum TransformSqlError definition) if failure
155- */
156-enum TransformSqlError
157-transformUpdateStatementToSql(const UpdateHeader &header,
158- const UpdateData &data,
159- std::string *destination,
160- enum TransformSqlVariant sql_variant= DRIZZLE);
161
162 /**
163 * Helper function to construct the header portion of an UPDATE
164
165=== modified file 'drizzled/message/transaction.proto'
166--- drizzled/message/transaction.proto 2009-10-06 01:38:40 +0000
167+++ drizzled/message/transaction.proto 2009-11-03 19:11:13 +0000
168@@ -364,7 +364,8 @@
169 message UpdateRecord
170 {
171 repeated bytes key_value = 1; /* The value of keys of updated records (unique or primary key) */
172- repeated bytes before_value = 2; /* The value of the record before the update (optional) */
173+ repeated bytes after_value = 2; /* The value of the field after the update */
174+ repeated bytes before_value = 3; /* The value of the field before the update (optional) */
175 }
176
177 /*
178@@ -372,6 +373,7 @@
179 *
180 * INSERT ... ON DUPLICATE KEY UPDATE
181 * UPDATE
182+ * REPLACE INTO when the UPDATE optimization occurs.
183 *
184 * The statement is composed of a header (UpdateHeader) containing
185 * metadata about affected tables and fields, as well as one or more data
186@@ -388,7 +390,6 @@
187 required TableMetadata table_metadata = 1; /* Minimal metadata about the table affected */
188 repeated FieldMetadata key_field_metadata = 2; /* Collection of metadata about key fields */
189 repeated FieldMetadata set_field_metadata = 3; /* Collection of metadata about fields affected */
190- repeated bytes set_value = 4; /* The value of the field after the update */
191 }
192
193 message UpdateData
194
195=== modified file 'drizzled/message/transaction_writer.cc'
196--- drizzled/message/transaction_writer.cc 2009-10-19 19:19:38 +0000
197+++ drizzled/message/transaction_writer.cc 2009-11-03 19:11:13 +0000
198@@ -227,8 +227,6 @@
199 sf_meta->set_name("a");
200 sf_meta->set_type(message::Table::Field::VARCHAR);
201
202- header->add_set_value("5");
203-
204 /* Add new values... */
205 message::UpdateData *data= statement->mutable_update_data();
206 data->set_segment_id(1);
207@@ -236,6 +234,7 @@
208
209 message::UpdateRecord *record1= data->add_record();
210
211+ record1->add_after_value("5");
212 record1->add_key_value("1");
213
214 statement->set_end_timestamp(getNanoTimestamp());
215@@ -266,8 +265,6 @@
216 sf_meta->set_name("a");
217 sf_meta->set_type(message::Table::Field::VARCHAR);
218
219- header->add_set_value("5");
220-
221 /* Add new values... */
222 message::UpdateData *data= statement->mutable_update_data();
223 data->set_segment_id(1);
224@@ -276,7 +273,9 @@
225 message::UpdateRecord *record1= data->add_record();
226 message::UpdateRecord *record2= data->add_record();
227
228+ record1->add_after_value("5");
229 record1->add_key_value("1");
230+ record2->add_after_value("5");
231 record2->add_key_value("2");
232
233 statement->set_end_timestamp(getNanoTimestamp());
234
235=== modified file 'drizzled/replication_services.cc'
236--- drizzled/replication_services.cc 2009-10-19 19:19:38 +0000
237+++ drizzled/replication_services.cc 2009-11-03 19:11:12 +0000
238@@ -241,6 +241,8 @@
239 {
240 finalizeStatement(*statement, in_session);
241 }
242+ else
243+ return; /* No data modification occurred inside the transaction */
244
245 message::Transaction* transaction= getActiveTransaction(in_session);
246
247@@ -463,7 +465,53 @@
248 field_metadata= header->add_set_field_metadata();
249 field_metadata->set_name(current_field->field_name);
250 field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
251-
252+ }
253+ }
254+}
255+void ReplicationServices::updateRecord(Session *in_session,
256+ Table *in_table,
257+ const unsigned char *old_record,
258+ const unsigned char *new_record)
259+{
260+ if (! is_active)
261+ return;
262+
263+ message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
264+
265+ message::UpdateData *data= statement.mutable_update_data();
266+ data->set_segment_id(1);
267+ data->set_end_segment(true);
268+ message::UpdateRecord *record= data->add_record();
269+
270+ Field *current_field;
271+ Field **table_fields= in_table->field;
272+ String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
273+ string_value->set_charset(system_charset_info);
274+
275+ while ((current_field= *table_fields++) != NULL)
276+ {
277+ /*
278+ * Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
279+ * but then realized that an UPDATE statement could potentially have different values for
280+ * the SET field. For instance, imagine this SQL scenario:
281+ *
282+ * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
283+ * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
284+ * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
285+ *
286+ * We will generate two UpdateRecord messages with different set_value byte arrays.
287+ *
288+ * The below really should be moved into the Field API and Record API. But for now
289+ * we do this crazy pointer fiddling to figure out if the current field
290+ * has been updated in the supplied record raw byte pointers.
291+ */
292+ const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
293+ const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
294+
295+ uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
296+
297+ if (memcmp(old_ptr, new_ptr, field_length) != 0)
298+ {
299 /* Store the original "read bit" for this field */
300 bool is_read_set= current_field->isReadSet();
301
302@@ -479,33 +527,10 @@
303 */
304 current_field->setReadSet(is_read_set);
305
306- header->add_set_value(string_value->c_ptr());
307+ record->add_after_value(string_value->c_ptr());
308 string_value->free();
309 }
310- }
311-}
312-void ReplicationServices::updateRecord(Session *in_session,
313- Table *in_table,
314- const unsigned char *old_record,
315- const unsigned char *new_record)
316-{
317- if (! is_active)
318- return;
319-
320- message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
321-
322- message::UpdateData *data= statement.mutable_update_data();
323- data->set_segment_id(1);
324- data->set_end_segment(true);
325- message::UpdateRecord *record= data->add_record();
326-
327- Field *current_field;
328- Field **table_fields= in_table->field;
329- String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
330- string_value->set_charset(system_charset_info);
331-
332- while ((current_field= *table_fields++) != NULL)
333- {
334+
335 /*
336 * Add the WHERE clause values now...the fields which return true
337 * for isReadSet() are in the WHERE clause. For tables with no
338@@ -520,6 +545,7 @@
339 */
340 string_value->free();
341 }
342+
343 }
344 }
345
346
347=== modified file 'tests/suite/transaction_log/r/filtered_replicator.result'
348--- tests/suite/transaction_log/r/filtered_replicator.result 2009-10-18 15:09:52 +0000
349+++ tests/suite/transaction_log/r/filtered_replicator.result 2009-11-03 19:11:13 +0000
350@@ -72,6 +72,15 @@
351 INSERT INTO t1 (name,alias) VALUES ("jeff lebowski","dude");
352 UPDATE t1 SET alias = "the dude" WHERE alias = "dude";
353 DROP TABLE t1;
354+CREATE TABLE t1 (
355+id INT NOT NULL
356+, counter INT NOT NULL
357+, PRIMARY KEY (id)
358+);
359+INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
360+UPDATE t1 SET counter = counter + 1 WHERE id = 1;
361+UPDATE t1 SET counter = counter + 1 WHERE id IN (2,3);
362+DROP TABLE t1;
363 DROP TABLE IF EXISTS `t1`;
364 CREATE TABLE t1 ( id INT NOT NULL , padding VARCHAR(200) NOT NULL );
365 INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (1,'I love testing.');
366@@ -118,4 +127,16 @@
367 INSERT INTO `test`.`t1` (`id`,`name`,`alias`) VALUES (1,'jeff lebowski','dude');
368 UPDATE `test`.`t1` SET `alias`='the dude' WHERE `id`=1;
369 DROP TABLE `t1`;
370+CREATE TABLE t1 ( id INT NOT NULL , counter INT NOT NULL , PRIMARY KEY (id) );
371+START TRANSACTION;
372+INSERT INTO `test`.`t1` (`id`,`counter`) VALUES (1,1);
373+INSERT INTO `test`.`t1` (`id`,`counter`) VALUES (2,2);
374+INSERT INTO `test`.`t1` (`id`,`counter`) VALUES (3,3);
375+COMMIT;
376+UPDATE `test`.`t1` SET `counter`=2 WHERE `id`=1;
377+START TRANSACTION;
378+UPDATE `test`.`t1` SET `counter`=3 WHERE `id`=2;
379+UPDATE `test`.`t1` SET `counter`=4 WHERE `id`=3;
380+COMMIT;
381+DROP TABLE `t1`;
382 SET GLOBAL transaction_log_truncate_debug= true;
383
384=== added file 'tests/suite/transaction_log/r/no_modification.result'
385--- tests/suite/transaction_log/r/no_modification.result 1970-01-01 00:00:00 +0000
386+++ tests/suite/transaction_log/r/no_modification.result 2009-11-03 19:11:13 +0000
387@@ -0,0 +1,20 @@
388+DROP TABLE IF EXISTS t1;
389+CREATE TABLE t1 (
390+id INT NOT NULL
391+, padding VARCHAR(200) NOT NULL
392+);
393+INSERT INTO t1 VALUES (1, "I love testing.");
394+INSERT INTO t1 VALUES (2, "I hate testing.");
395+START TRANSACTION;
396+SELECT * FROM t1 WHERE id = 1;
397+id padding
398+1 I love testing.
399+SELECT * FROM t1 WHERE id = 2;
400+id padding
401+2 I hate testing.
402+COMMIT;
403+DROP TABLE IF EXISTS `t1`;
404+CREATE TABLE t1 ( id INT NOT NULL , padding VARCHAR(200) NOT NULL );
405+INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (1,'I love testing.');
406+INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (2,'I hate testing.');
407+SET GLOBAL transaction_log_truncate_debug= true;
408
409=== modified file 'tests/suite/transaction_log/r/update.result'
410--- tests/suite/transaction_log/r/update.result 2009-10-18 15:09:52 +0000
411+++ tests/suite/transaction_log/r/update.result 2009-11-03 19:11:13 +0000
412@@ -17,6 +17,15 @@
413 INSERT INTO t1 (name,alias) VALUES ("jeff lebowski","dude");
414 UPDATE t1 SET alias = "the dude" WHERE alias = "dude";
415 DROP TABLE t1;
416+CREATE TABLE t1 (
417+id INT NOT NULL
418+, counter INT NOT NULL
419+, PRIMARY KEY (id)
420+);
421+INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
422+UPDATE t1 SET counter = counter + 1 WHERE id = 1;
423+UPDATE t1 SET counter = counter + 1 WHERE id IN (2,3);
424+DROP TABLE t1;
425 DROP TABLE IF EXISTS `t1`;
426 CREATE TABLE t1 ( id INT NOT NULL , padding VARCHAR(200) NOT NULL , PRIMARY KEY (id) );
427 INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (1,'I love testing.');
428@@ -31,4 +40,16 @@
429 INSERT INTO `test`.`t1` (`id`,`name`,`alias`) VALUES (1,'jeff lebowski','dude');
430 UPDATE `test`.`t1` SET `alias`='the dude' WHERE `id`=1;
431 DROP TABLE `t1`;
432+CREATE TABLE t1 ( id INT NOT NULL , counter INT NOT NULL , PRIMARY KEY (id) );
433+START TRANSACTION;
434+INSERT INTO `test`.`t1` (`id`,`counter`) VALUES (1,1);
435+INSERT INTO `test`.`t1` (`id`,`counter`) VALUES (2,2);
436+INSERT INTO `test`.`t1` (`id`,`counter`) VALUES (3,3);
437+COMMIT;
438+UPDATE `test`.`t1` SET `counter`=2 WHERE `id`=1;
439+START TRANSACTION;
440+UPDATE `test`.`t1` SET `counter`=3 WHERE `id`=2;
441+UPDATE `test`.`t1` SET `counter`=4 WHERE `id`=3;
442+COMMIT;
443+DROP TABLE `t1`;
444 SET GLOBAL transaction_log_truncate_debug= true;
445
446=== added file 'tests/suite/transaction_log/t/no_modification-master.opt'
447--- tests/suite/transaction_log/t/no_modification-master.opt 1970-01-01 00:00:00 +0000
448+++ tests/suite/transaction_log/t/no_modification-master.opt 2009-11-03 19:11:13 +0000
449@@ -0,0 +1,1 @@
450+--default-replicator-enable --transaction-log-enable --scheduler=multi_thread
451
452=== added file 'tests/suite/transaction_log/t/no_modification.inc'
453--- tests/suite/transaction_log/t/no_modification.inc 1970-01-01 00:00:00 +0000
454+++ tests/suite/transaction_log/t/no_modification.inc 2009-11-03 19:11:13 +0000
455@@ -0,0 +1,26 @@
456+#
457+# Simple test of the transaction log for when a START TRANSACTION ... COMMIT
458+# is issued, but no data modifications actually occurred.
459+#
460+# This situation occurs in, for instance, sysbench, which wraps even SELECTs
461+# in a transaction.
462+#
463+
464+--disable_warnings
465+DROP TABLE IF EXISTS t1;
466+--enable_warnings
467+
468+CREATE TABLE t1 (
469+ id INT NOT NULL
470+, padding VARCHAR(200) NOT NULL
471+);
472+
473+INSERT INTO t1 VALUES (1, "I love testing.");
474+INSERT INTO t1 VALUES (2, "I hate testing.");
475+
476+START TRANSACTION;
477+
478+SELECT * FROM t1 WHERE id = 1;
479+SELECT * FROM t1 WHERE id = 2;
480+
481+COMMIT;
482
483=== added file 'tests/suite/transaction_log/t/no_modification.test'
484--- tests/suite/transaction_log/t/no_modification.test 1970-01-01 00:00:00 +0000
485+++ tests/suite/transaction_log/t/no_modification.test 2009-11-03 19:11:13 +0000
486@@ -0,0 +1,13 @@
487+#
488+# Tests when a transaction does not modify data.
489+#
490+
491+# Populate log with some records...
492+--source suite/transaction_log/t/no_modification.inc
493+
494+# Read in the transaction.log.
495+
496+--exec ../drizzled/message/transaction_reader var/master-data/transaction.log
497+
498+# Truncate the log file to reset for the next test
499+--source suite/transaction_log/t/truncate_log.inc
500
501=== modified file 'tests/suite/transaction_log/t/update.inc'
502--- tests/suite/transaction_log/t/update.inc 2009-10-18 15:09:52 +0000
503+++ tests/suite/transaction_log/t/update.inc 2009-11-03 19:11:13 +0000
504@@ -43,3 +43,19 @@
505 UPDATE t1 SET alias = "the dude" WHERE alias = "dude";
506
507 DROP TABLE t1;
508+
509+# Tests UPDATE statement which changes an existing row
510+# by referencing the changed field.
511+
512+CREATE TABLE t1 (
513+ id INT NOT NULL
514+, counter INT NOT NULL
515+, PRIMARY KEY (id)
516+);
517+
518+INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
519+
520+UPDATE t1 SET counter = counter + 1 WHERE id = 1;
521+UPDATE t1 SET counter = counter + 1 WHERE id IN (2,3);
522+
523+DROP TABLE t1;