Merge lp:~fallenpegasus/drizzle/multimaster into lp:~drizzle-trunk/drizzle/development

Proposed by Mark Atwood
Status: Merged
Approved by: Brian Aker
Approved revision: 2361
Merged at revision: 2361
Proposed branch: lp:~fallenpegasus/drizzle/multimaster
Merge into: lp:~drizzle-trunk/drizzle/development
Diff against target: 918 lines (+376/-106)
10 files modified
plugin/slave/module.cc (+0/-5)
plugin/slave/queue_consumer.cc (+65/-10)
plugin/slave/queue_consumer.h (+40/-4)
plugin/slave/queue_producer.cc (+13/-5)
plugin/slave/queue_producer.h (+18/-0)
plugin/slave/replication_schema.cc (+76/-6)
plugin/slave/replication_schema.h (+5/-1)
plugin/slave/replication_slave.cc (+95/-36)
plugin/slave/replication_slave.h (+62/-38)
tests/lib/server_mgmt/drizzled.py (+2/-1)
To merge this branch: bzr merge lp:~fallenpegasus/drizzle/multimaster
Reviewer Review Type Date Requested Status
Brian Aker Pending
Review via email: mp+67103@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'plugin/slave/module.cc'
2--- plugin/slave/module.cc 2011-04-02 01:32:02 +0000
3+++ plugin/slave/module.cc 2011-07-06 23:18:34 +0000
4@@ -45,8 +45,6 @@
5 const module::option_map &vm= context.getOptions();
6
7 ReplicationSlave *slave= new ReplicationSlave(vm["config-file"].as<string>());
8- if (vm.count("max-commit-id"))
9- slave->setMaxCommitId(vm["max-commit-id"].as<uint64_t>());
10 context.add(slave);
11 return 0;
12 }
13@@ -56,9 +54,6 @@
14 context("config-file",
15 po::value<string>()->default_value(DEFAULT_SLAVE_CFG_FILE.string()),
16 N_("Path to the slave configuration file"));
17- context("max-commit-id",
18- po::value<uint64_t>(),
19- N_("Value to use as the maximum commit ID stored on the slave"));
20 }
21
22 } /* namespace slave */
23
24=== modified file 'plugin/slave/queue_consumer.cc'
25--- plugin/slave/queue_consumer.cc 2011-04-27 18:16:25 +0000
26+++ plugin/slave/queue_consumer.cc 2011-07-06 23:18:34 +0000
27@@ -20,10 +20,11 @@
28
29 #include <config.h>
30 #include <plugin/slave/queue_consumer.h>
31+#include <drizzled/errmsg_print.h>
32+#include <drizzled/execute.h>
33 #include <drizzled/message/transaction.pb.h>
34 #include <drizzled/message/statement_transform.h>
35 #include <drizzled/sql/result_set.h>
36-#include <drizzled/execute.h>
37 #include <string>
38 #include <vector>
39 #include <boost/thread.hpp>
40@@ -51,9 +52,25 @@
41
42 bool QueueConsumer::process()
43 {
44+ for (size_t index= 0; index < _master_ids.size(); index++)
45+ {
46+ /* We go ahead and get the string version of the master ID
47+ * so we don't have to keep converting it from int to string.
48+ */
49+ const string master_id= boost::lexical_cast<string>(_master_ids[index]);
50+
51+ if (not processSingleMaster(master_id))
52+ return false;
53+ }
54+
55+ return true;
56+}
57+
58+bool QueueConsumer::processSingleMaster(const string &master_id)
59+{
60 TrxIdList completedTransactionIds;
61
62- getListOfCompletedTransactions(completedTransactionIds);
63+ getListOfCompletedTransactions(master_id, completedTransactionIds);
64
65 for (size_t x= 0; x < completedTransactionIds.size(); x++)
66 {
67@@ -68,7 +85,7 @@
68 message::Transaction transaction;
69 uint32_t segment_id= 1;
70
71- while (getMessage(transaction, commit_id, trx_id, originating_server_uuid,
72+ while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_uuid,
73 originating_commit_id, segment_id++))
74 {
75 convertToSQL(transaction, aggregate_sql, segmented_sql);
76@@ -120,12 +137,30 @@
77
78 if (not executeSQLWithCommitId(aggregate_sql, commit_id,
79 originating_server_uuid,
80- originating_commit_id))
81+ originating_commit_id,
82+ master_id))
83 {
84- return false;
85+ if (_ignore_errors)
86+ {
87+ clearErrorState();
88+
89+ /* Still need to record that we handled this trx */
90+ vector<string> sql;
91+ string tmp("UPDATE `sys_replication`.`applier_state`"
92+ " SET `last_applied_commit_id` = ");
93+ tmp.append(commit_id);
94+ tmp.append(" WHERE `master_id` = ");
95+ tmp.append(master_id);
96+ sql.push_back(tmp);
97+ executeSQL(sql);
98+ }
99+ else
100+ {
101+ return false;
102+ }
103 }
104
105- if (not deleteFromQueue(trx_id))
106+ if (not deleteFromQueue(master_id, trx_id))
107 {
108 return false;
109 }
110@@ -137,6 +172,7 @@
111
112 bool QueueConsumer::getMessage(message::Transaction &transaction,
113 string &commit_id,
114+ const string &master_id,
115 uint64_t trx_id,
116 string &originating_server_uuid,
117 uint64_t &originating_commit_id,
118@@ -148,6 +184,8 @@
119 sql.append(boost::lexical_cast<string>(trx_id));
120 sql.append(" AND `seg_id` = ", 16);
121 sql.append(boost::lexical_cast<string>(segment_id));
122+ sql.append(" AND `master_id` = ", 19),
123+ sql.append(master_id);
124
125 sql::ResultSet result_set(4);
126 Execute execute(*(_session.get()), true);
127@@ -189,13 +227,16 @@
128 return true;
129 }
130
131-bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
132+bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
133+ TrxIdList &list)
134 {
135 Execute execute(*(_session.get()), true);
136
137 string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
138 " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
139- " ORDER BY `commit_order` ASC");
140+ " AND `master_id` = "
141+ + master_id
142+ + " ORDER BY `commit_order` ASC");
143
144 /* ResultSet size must match column count */
145 sql::ResultSet result_set(1);
146@@ -329,6 +370,13 @@
147 }
148
149
150+/*
151+ * TODO: This currently updates every row in the applier_state table.
152+ * This use to be a single row. With multi-master support, we now need
153+ * a row for every master so we can track the last applied commit ID
154+ * value for each. Eventually, we may want multiple consumer threads,
155+ * so then we'd need to update each row independently.
156+ */
157 void QueueConsumer::setApplierState(const string &err_msg, bool status)
158 {
159 vector<string> statements;
160@@ -373,7 +421,8 @@
161 bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
162 const string &commit_id,
163 const string &originating_server_uuid,
164- uint64_t originating_commit_id)
165+ uint64_t originating_commit_id,
166+ const string &master_id)
167 {
168 string tmp("UPDATE `sys_replication`.`applier_state`"
169 " SET `last_applied_commit_id` = ");
170@@ -383,6 +432,9 @@
171 tmp.append("' , `originating_commit_id` = ");
172 tmp.append(boost::lexical_cast<string>(originating_commit_id));
173
174+ tmp.append(" WHERE `master_id` = ");
175+ tmp.append(master_id);
176+
177 sql.push_back(tmp);
178
179 _session->setOriginatingServerUUID(originating_server_uuid);
180@@ -392,11 +444,14 @@
181 }
182
183
184-bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
185+bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
186 {
187 string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
188 sql.append(boost::lexical_cast<std::string>(trx_id));
189
190+ sql.append(" AND `master_id` = ");
191+ sql.append(master_id);
192+
193 vector<string> sql_vect;
194 sql_vect.push_back(sql);
195
196
197=== modified file 'plugin/slave/queue_consumer.h'
198--- plugin/slave/queue_consumer.h 2011-04-27 18:16:25 +0000
199+++ plugin/slave/queue_consumer.h 2011-07-06 23:18:34 +0000
200@@ -23,6 +23,7 @@
201 #include <plugin/slave/queue_thread.h>
202 #include <plugin/slave/sql_executor.h>
203 #include <drizzled/session.h>
204+#include <string>
205
206 namespace drizzled
207 {
208@@ -41,7 +42,8 @@
209 QueueConsumer() :
210 QueueThread(),
211 SQLExecutor("slave", "replication"),
212- _check_interval(5)
213+ _check_interval(5),
214+ _ignore_errors(false)
215 { }
216
217 bool init();
218@@ -59,6 +61,14 @@
219 }
220
221 /**
222+ * Determines if we should ignore errors from statements pulled from masters.
223+ */
224+ void setIgnoreErrors(bool value)
225+ {
226+ _ignore_errors= value;
227+ }
228+
229+ /**
230 * Update applier status in state table.
231 *
232 * @param err_msg Error message string
233@@ -66,16 +76,41 @@
234 */
235 void setApplierState(const std::string &err_msg, bool status);
236
237+ void addMasterId(uint32_t id)
238+ {
239+ _master_ids.push_back(id);
240+ }
241+
242+ bool processSingleMaster(const std::string &master_id);
243+
244 private:
245 typedef std::vector<uint64_t> TrxIdList;
246
247 /** Number of seconds to sleep between checking queue for messages */
248 uint32_t _check_interval;
249
250- bool getListOfCompletedTransactions(TrxIdList &list);
251+ std::vector<uint32_t> _master_ids;
252+
253+ bool _ignore_errors;
254+
255+ /**
256+ * Get a list of transaction IDs from the queue that are complete.
257+ *
258+ * A "complete" transaction is one in which we have received the end
259+ * segment of the transaction.
260+ *
261+ * @param[in] master_id Identifier of the master we are interested in.
262+ * @param[out] list The list to populate with transaction IDs.
263+ *
264+ * @retval true Success
265+ * @retval false Error
266+ */
267+ bool getListOfCompletedTransactions(const std::string &master_id,
268+ TrxIdList &list);
269
270 bool getMessage(drizzled::message::Transaction &transaction,
271 std::string &commit_id,
272+ const std::string &master_id,
273 uint64_t trx_id,
274 std::string &originating_server_uuid,
275 uint64_t &originating_commit_id,
276@@ -111,7 +146,8 @@
277 bool executeSQLWithCommitId(std::vector<std::string> &sql,
278 const std::string &commit_id,
279 const std::string &originating_server_uuid,
280- uint64_t originating_commit_id);
281+ uint64_t originating_commit_id,
282+ const std::string &master_id);
283
284 /**
285 * Remove messages for a given transaction from the queue.
286@@ -121,7 +157,7 @@
287 * @retval true Success
288 * @retval false Failure
289 */
290- bool deleteFromQueue(uint64_t trx_id);
291+ bool deleteFromQueue(const std::string &master_id, uint64_t trx_id);
292
293 /**
294 * Determine if a Statement message is an end message.
295
296=== modified file 'plugin/slave/queue_producer.cc'
297--- plugin/slave/queue_producer.cc 2011-04-27 18:16:25 +0000
298+++ plugin/slave/queue_producer.cc 2011-07-06 23:18:34 +0000
299@@ -18,7 +18,6 @@
300 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
301 */
302
303-#include <config.h>
304 #include <plugin/slave/queue_producer.h>
305 #include <drizzled/errmsg_print.h>
306 #include <drizzled/sql/result_set.h>
307@@ -27,6 +26,8 @@
308 #include <drizzled/message/transaction.pb.h>
309 #include <boost/lexical_cast.hpp>
310 #include <google/protobuf/text_format.h>
311+#include <string>
312+#include <vector>
313
314 using namespace std;
315 using namespace drizzled;
316@@ -199,8 +200,12 @@
317 */
318 string sql("SELECT MAX(x.cid) FROM"
319 " (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
320- " UNION ALL SELECT `last_applied_commit_id` AS cid"
321- " FROM `sys_replication`.`applier_state`) AS x");
322+ " WHERE `master_id` = "
323+ + boost::lexical_cast<string>(masterId())
324+ + " UNION ALL SELECT `last_applied_commit_id` AS cid"
325+ + " FROM `sys_replication`.`applier_state` WHERE `master_id` = "
326+ + boost::lexical_cast<string>(masterId())
327+ + ") AS x");
328
329 sql::ResultSet result_set(1);
330 Execute execute(*(_session.get()), true);
331@@ -304,8 +309,10 @@
332 * The SQL to insert our results into the local queue.
333 */
334 string sql= "INSERT INTO `sys_replication`.`queue`"
335- " (`trx_id`, `seg_id`, `commit_order`,"
336+ " (`master_id`, `trx_id`, `seg_id`, `commit_order`,"
337 " `originating_server_uuid`, `originating_commit_id`, `msg`) VALUES (";
338+ sql.append(boost::lexical_cast<string>(masterId()));
339+ sql.append(", ", 2);
340 sql.append(trx_id);
341 sql.append(", ", 2);
342 sql.append(seg_id);
343@@ -490,7 +497,8 @@
344 }
345
346 sql.append(msg);
347- sql.append("'", 1);
348+ sql.append("' WHERE `master_id` = ");
349+ sql.append(boost::lexical_cast<string>(masterId()));
350
351 statements.push_back(sql);
352 executeSQL(statements);
353
354=== modified file 'plugin/slave/queue_producer.h'
355--- plugin/slave/queue_producer.h 2011-04-26 18:08:44 +0000
356+++ plugin/slave/queue_producer.h 2011-07-06 23:18:34 +0000
357@@ -37,6 +37,7 @@
358 SQLExecutor("slave", "replication"),
359 _check_interval(5),
360 _master_port(3306),
361+ _master_id(0),
362 _last_return(DRIZZLE_RETURN_OK),
363 _is_connected(false),
364 _saved_max_commit_id(0),
365@@ -95,6 +96,21 @@
366 _saved_max_commit_id= value;
367 }
368
369+ uint64_t cachedMaxCommitId()
370+ {
371+ return _saved_max_commit_id;
372+ }
373+
374+ void setMasterId(uint32_t value)
375+ {
376+ _master_id= value;
377+ }
378+
379+ uint32_t masterId()
380+ {
381+ return _master_id;
382+ }
383+
384 private:
385 /** Number of seconds to sleep between checking queue for messages */
386 uint32_t _check_interval;
387@@ -105,6 +121,8 @@
388 std::string _master_user;
389 std::string _master_pass;
390
391+ uint32_t _master_id;
392+
393 drizzle_st _drizzle;
394 drizzle_con_st _connection;
395 drizzle_return_t _last_return;
396
397=== modified file 'plugin/slave/replication_schema.cc'
398--- plugin/slave/replication_schema.cc 2011-04-27 18:16:25 +0000
399+++ plugin/slave/replication_schema.cc 2011-07-06 23:18:34 +0000
400@@ -44,19 +44,28 @@
401 return false;
402
403 /*
404- * Create our IO thread state information table if we need to.
405+ Create our IO thread state information table if we need to.
406 */
407
408+ /*
409+ Table: io_state
410+ Version 1.0: Initial definition
411+ Version 1.1: Added master_id and PK on master_id
412+ */
413+
414+
415 sql.clear();
416 sql.push_back("COMMIT");
417 sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`io_state` ("
418+ " `master_id` BIGINT NOT NULL,"
419 " `status` VARCHAR(20) NOT NULL,"
420 " `error_msg` VARCHAR(250))"
421- " COMMENT = 'VERSION 1.0'");
422+ " COMMENT = 'VERSION 1.1'");
423
424 if (not executeSQL(sql))
425 return false;
426
427+#if 0
428 sql.clear();
429 sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state`");
430
431@@ -77,6 +86,7 @@
432 return false;
433 }
434 }
435+#endif
436
437 /*
438 * Create our applier thread state information table if we need to.
439@@ -86,21 +96,24 @@
440 * Table: applier_state
441 * Version 1.0: Initial definition
442 * Version 1.1: Added originating_server_uuid and originating_commit_id
443+ * Version 1.2: Added master_id and changed PK to master_id
444 */
445
446 sql.clear();
447 sql.push_back("COMMIT");
448 sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state`"
449 " (`last_applied_commit_id` BIGINT NOT NULL PRIMARY KEY,"
450+ " `master_id` BIGINT NOT NULL,"
451 " `originating_server_uuid` VARCHAR(36) NOT NULL,"
452 " `originating_commit_id` BIGINT NOT NULL,"
453 " `status` VARCHAR(20) NOT NULL,"
454 " `error_msg` VARCHAR(250))"
455- " COMMENT = 'VERSION 1.1'");
456+ " COMMENT = 'VERSION 1.2'");
457
458 if (not executeSQL(sql))
459 return false;
460
461+#if 0
462 sql.clear();
463 sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state`");
464
465@@ -123,11 +136,13 @@
466 return false;
467 }
468 }
469+#endif
470
471 /*
472 * Create our message queue table if we need to.
473 * Version 1.0: Initial definition
474 * Version 1.1: Added originating_server_uuid and originating_commit_id
475+ * Version 1.2: Added master_id and changed PK to (master_id, trx_id, seg_id)
476 */
477
478 sql.clear();
479@@ -138,21 +153,76 @@
480 " `originating_server_uuid` VARCHAR(36) NOT NULL,"
481 " `originating_commit_id` BIGINT NOT NULL,"
482 " `msg` BLOB,"
483+ " `master_id` BIGINT NOT NULL,"
484 " PRIMARY KEY(`trx_id`, `seg_id`))"
485- " COMMENT = 'VERSION 1.1'");
486+ " COMMENT = 'VERSION 1.2'");
487 if (not executeSQL(sql))
488 return false;
489
490 return true;
491 }
492
493-bool ReplicationSchema::setInitialMaxCommitId(uint64_t value)
494+bool ReplicationSchema::createInitialIORow(uint32_t master_id)
495+{
496+ vector<string> sql;
497+
498+ sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
499+
500+ sql::ResultSet result_set(1);
501+ Execute execute(*(_session.get()), true);
502+ execute.run(sql[0], result_set);
503+ result_set.next();
504+ string count= result_set.getString(0);
505+
506+ if (count == "0")
507+ {
508+ sql.clear();
509+ sql.push_back("INSERT INTO `sys_replication`.`io_state` (`master_id`, `status`) VALUES ("
510+ + boost::lexical_cast<string>(master_id)
511+ + ", 'STOPPED')");
512+ if (not executeSQL(sql))
513+ return false;
514+ }
515+
516+ return true;
517+}
518+
519+bool ReplicationSchema::createInitialApplierRow(uint32_t master_id)
520+{
521+ vector<string> sql;
522+
523+ sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
524+
525+ sql::ResultSet result_set(1);
526+ Execute execute(*(_session.get()), true);
527+ execute.run(sql[0], result_set);
528+ result_set.next();
529+ string count= result_set.getString(0);
530+
531+ if (count == "0")
532+ {
533+ sql.clear();
534+ sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
535+ " (`master_id`, `last_applied_commit_id`, `status`) VALUES ("
536+ + boost::lexical_cast<string>(master_id)
537+ + ",0 , 'STOPPED')");
538+ if (not executeSQL(sql))
539+ return false;
540+ }
541+
542+ return true;
543+}
544+
545+
546+bool ReplicationSchema::setInitialMaxCommitId(uint32_t master_id, uint64_t value)
547 {
548 vector<string> sql;
549
550 sql.push_back("UPDATE `sys_replication`.`applier_state`"
551 " SET `last_applied_commit_id` = "
552- + lexical_cast<string>(value));
553+ + lexical_cast<string>(value)
554+ + " WHERE `master_id` = "
555+ + lexical_cast<string>(master_id));
556
557 return executeSQL(sql);
558 }
559
560=== modified file 'plugin/slave/replication_schema.h'
561--- plugin/slave/replication_schema.h 2011-04-02 01:32:02 +0000
562+++ plugin/slave/replication_schema.h 2011-07-06 23:18:34 +0000
563@@ -44,9 +44,13 @@
564 * another event from the master (which causes persistence of the value).
565 * An edge case, but still possible.
566 *
567+ * @param[in] master_id Unique master identifier.
568 * @param[in] value The initial value.
569 */
570- bool setInitialMaxCommitId(uint64_t value);
571+ bool setInitialMaxCommitId(uint32_t master_id, uint64_t value);
572+
573+ bool createInitialApplierRow(uint32_t master_id);
574+ bool createInitialIORow(uint32_t master_id);
575 };
576
577 } /* namespace slave */
578
579=== modified file 'plugin/slave/replication_slave.cc'
580--- plugin/slave/replication_slave.cc 2011-04-02 01:32:02 +0000
581+++ plugin/slave/replication_slave.cc 2011-07-06 23:18:34 +0000
582@@ -20,8 +20,9 @@
583
584 #include <config.h>
585 #include <plugin/slave/replication_slave.h>
586+#include <drizzled/errmsg_print.h>
587 #include <drizzled/program_options/config_file.h>
588-#include <drizzled/errmsg_print.h>
589+#include <boost/lexical_cast.hpp>
590 #include <boost/program_options.hpp>
591 #include <fstream>
592 #include <drizzled/plugin.h>
593@@ -34,20 +35,26 @@
594 namespace slave
595 {
596
597-/* Gets called after all plugins are initialized. */
598 void ReplicationSlave::startup(Session &session)
599 {
600 (void)session;
601 if (not initWithConfig())
602 {
603- errmsg_printf(error::ERROR,
604- _("Could not start slave services: %s\n"),
605- getError().c_str());
606+ errmsg_printf(error::ERROR, _("Could not start slave services: %s\n"),
607+ _error.c_str());
608 }
609 else
610 {
611+ /* Start the IO threads */
612+ boost::unordered_map<uint32_t, Master *>::const_iterator it;
613+ for (it= _masters.begin(); it != _masters.end(); ++it)
614+ {
615+ it->second->start();
616+ /* Consumer must know server IDs */
617+ _consumer.addMasterId(it->first);
618+ }
619+
620 _consumer_thread= boost::thread(&QueueConsumer::run, &_consumer);
621- _producer_thread= boost::thread(&QueueProducer::run, &_producer);
622 }
623 }
624
625@@ -56,15 +63,26 @@
626 po::variables_map vm;
627 po::options_description slave_options("Options for the slave plugin");
628
629+ /* Common options */
630 slave_options.add_options()
631- ("master-host", po::value<string>()->default_value(""))
632- ("master-port", po::value<uint16_t>()->default_value(3306))
633- ("master-user", po::value<string>()->default_value(""))
634- ("master-pass", po::value<string>()->default_value(""))
635- ("max-reconnects", po::value<uint32_t>()->default_value(10))
636 ("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
637 ("io-thread-sleep", po::value<uint32_t>()->default_value(5))
638- ("applier-thread-sleep", po::value<uint32_t>()->default_value(5));
639+ ("applier-thread-sleep", po::value<uint32_t>()->default_value(5))
640+ ("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());
641+
642+ /* Master defining options */
643+ for (size_t num= 1; num <= 10; num++)
644+ {
645+ string section("master");
646+ section.append(boost::lexical_cast<string>(num));
647+ slave_options.add_options()
648+ ((section + ".master-host").c_str(), po::value<string>()->default_value(""))
649+ ((section + ".master-port").c_str(), po::value<uint16_t>()->default_value(3306))
650+ ((section + ".master-user").c_str(), po::value<string>()->default_value(""))
651+ ((section + ".master-pass").c_str(), po::value<string>()->default_value(""))
652+ ((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
653+ ((section + ".max-commit-id").c_str(), po::value<uint64_t>());
654+ }
655
656 ifstream cf_stream(_config_file.c_str());
657
658@@ -79,29 +97,62 @@
659
660 po::notify(vm);
661
662- if (vm.count("master-host"))
663- _producer.setMasterHost(vm["master-host"].as<string>());
664-
665- if (vm.count("master-port"))
666- _producer.setMasterPort(vm["master-port"].as<uint16_t>());
667-
668- if (vm.count("master-user"))
669- _producer.setMasterUser(vm["master-user"].as<string>());
670-
671- if (vm.count("master-pass"))
672- _producer.setMasterPassword(vm["master-pass"].as<string>());
673-
674- if (vm.count("max-reconnects"))
675- _producer.setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
676-
677- if (vm.count("seconds-between-reconnects"))
678- _producer.setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
679-
680- if (vm.count("io-thread-sleep"))
681- _producer.setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
682+ /*
683+ * We will support 10 masters. This loope effectively creates the Master
684+ * objects as they are referenced.
685+ *
686+ * @todo Support a variable number of master hosts.
687+ */
688+ for (size_t num= 1; num <= 10; num++)
689+ {
690+ string section("master");
691+ section.append(boost::lexical_cast<string>(num));
692+
693+ /* WARNING! Hack!
694+ * We need to be able to determine when a master host is actually defined
695+ * by the user vs. we are just using defaults. So if the hostname is ever
696+ * the default value of "", then we'll assume that this section was not
697+ * user defined.
698+ */
699+ if (vm[section + ".master-host"].as<string>() == "")
700+ continue;
701+
702+ _masters[num]= new (std::nothrow) Master(num);
703+
704+ if (vm.count(section + ".master-host"))
705+ master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());
706+
707+ if (vm.count(section + ".master-port"))
708+ master(num).producer().setMasterPort(vm[section + ".master-port"].as<uint16_t>());
709+
710+ if (vm.count(section + ".master-user"))
711+ master(num).producer().setMasterUser(vm[section + ".master-user"].as<string>());
712+
713+ if (vm.count(section + ".master-pass"))
714+ master(num).producer().setMasterPassword(vm[section + ".master-pass"].as<string>());
715+
716+ if (vm.count(section + ".max-commit-id"))
717+ master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());
718+ }
719+
720+ boost::unordered_map<uint32_t, Master *>::const_iterator it;
721+
722+ for (it= _masters.begin(); it != _masters.end(); ++it)
723+ {
724+ if (vm.count("max-reconnects"))
725+ it->second->producer().setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
726+
727+ if (vm.count("seconds-between-reconnects"))
728+ it->second->producer().setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
729+
730+ if (vm.count("io-thread-sleep"))
731+ it->second->producer().setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
732+ }
733
734 if (vm.count("applier-thread-sleep"))
735 _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
736+ if (vm.count("ignore-errors"))
737+ _consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());
738
739 /* setup schema and tables */
740 ReplicationSchema rs;
741@@ -111,12 +162,20 @@
742 return false;
743 }
744
745- if (_initial_max_commit_id)
746+ for (it= _masters.begin(); it != _masters.end(); ++it)
747 {
748- if (not rs.setInitialMaxCommitId(_initial_max_commit_id))
749+ /* make certain a row exists for each master */
750+ rs.createInitialApplierRow(it->first);
751+ rs.createInitialIORow(it->first);
752+
753+ uint64_t cachedValue= it->second->producer().cachedMaxCommitId();
754+ if (cachedValue)
755 {
756- _error= rs.getErrorMessage();
757- return false;
758+ if (not rs.setInitialMaxCommitId(it->first, cachedValue))
759+ {
760+ _error= rs.getErrorMessage();
761+ return false;
762+ }
763 }
764 }
765
766
767=== modified file 'plugin/slave/replication_slave.h'
768--- plugin/slave/replication_slave.h 2011-04-02 01:32:02 +0000
769+++ plugin/slave/replication_slave.h 2011-07-06 23:18:34 +0000
770@@ -25,6 +25,7 @@
771 #include <plugin/slave/replication_schema.h>
772 #include <drizzled/plugin/daemon.h>
773 #include <boost/thread.hpp>
774+#include <boost/unordered_map.hpp>
775
776 namespace drizzled
777 {
778@@ -40,65 +41,88 @@
779
780 ReplicationSlave(const std::string &config)
781 : drizzled::plugin::Daemon("Replication Slave"),
782- _config_file(config),
783- _initial_max_commit_id(0)
784+ _config_file(config)
785 {}
786
787 ~ReplicationSlave()
788 {
789 _consumer_thread.interrupt();
790- _producer_thread.interrupt();
791+
792+ boost::unordered_map<uint32_t, Master *>::const_iterator it;
793+
794+ for (it= _masters.begin(); it != _masters.end(); ++it)
795+ {
796+ it->second->thread().interrupt();
797+ }
798 }
799
800+ /** Gets called after all plugins are initialized */
801 void startup(drizzled::Session &session);
802
803- /**
804- * Get the error message describing what went wrong during setup.
805- */
806- const std::string &getError() const
807- {
808- return _error;
809- }
810-
811- /**
812- * Set the initial value for the slave's maximum commit ID.
813- *
814- * This value basically determines where to start retrieving events from
815- * the master. Normally this is computed automatically based on the
816- * contents of the queue and/or the last applied commit ID. This allows
817- * us to override those values and start from another point. E.g., new
818- * slave provisioning or skipping a trouble statement.
819- *
820- * @param[in] value The commit ID value.
821- */
822- void setMaxCommitId(uint64_t value)
823- {
824- /* must tell producer to set its cached value */
825- _producer.setCachedMaxCommitId(value);
826- /* setting this indicates that we should store it permanently */
827- _initial_max_commit_id= value;
828- }
829-
830 private:
831+
832+ /**
833+ * Class representing a master server.
834+ */
835+ class Master
836+ {
837+ public:
838+ Master(uint32_t id)
839+ {
840+ _producer.setMasterId(id);
841+ }
842+
843+ QueueProducer &producer()
844+ {
845+ return _producer;
846+ }
847+
848+ boost::thread &thread()
849+ {
850+ return _producer_thread;
851+ }
852+
853+ void start()
854+ {
855+ _producer_thread= boost::thread(&QueueProducer::run, &_producer);
856+ }
857+
858+ private:
859+ /** Manages a single master */
860+ QueueProducer _producer;
861+
862+ /** I/O thread that will populate the work queue */
863+ boost::thread _producer_thread;
864+ };
865+
866+ /** Configuration file containing master info */
867 std::string _config_file;
868+
869 std::string _error;
870-
871+
872+ /** Object to use with the consumer thread */
873 QueueConsumer _consumer;
874- QueueProducer _producer;
875
876- /** Applier thread that will drain the work queue */
877+ /**
878+ * Applier thread that will drain the work queue.
879+ * @todo Support more than one consumer thread.
880+ */
881 boost::thread _consumer_thread;
882
883- /** I/O thread that will populate the work queue */
884- boost::thread _producer_thread;
885+ /** List of master objects, one per master */
886+ boost::unordered_map<uint32_t, Master *> _masters;
887
888- uint64_t _initial_max_commit_id;
889+ /** Convenience method to get object reference */
890+ Master &master(size_t index)
891+ {
892+ return *(_masters[index]);
893+ }
894
895 /**
896 * Initialize slave services with the given configuration file.
897 *
898- * In case of an error during initialization, you can call the getError()
899- * method to get a string describing what went wrong.
900+ * In case of an error during initialization, _error contains a
901+ * string describing what went wrong.
902 *
903 * @retval true Success
904 * @retval false Failure
905
906=== modified file 'tests/lib/server_mgmt/drizzled.py'
907--- tests/lib/server_mgmt/drizzled.py 2011-06-10 21:33:49 +0000
908+++ tests/lib/server_mgmt/drizzled.py 2011-07-06 23:18:34 +0000
909@@ -197,7 +197,8 @@
910
911 """
912
913- config_data = [ "master-host=127.0.0.1"
914+ config_data = [ "[master1]"
915+ , "master-host=127.0.0.1"
916 , "master-port=%d" %self.master_port
917 , "master-user=root"
918 , "master-pass=''"