Merge lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200 into lp:drizzle

Proposed by Daniel Nichter
Status: Work in progress
Proposed branch: lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200
Merge into: lp:drizzle
Diff against target: 267 lines (+84/-65)
3 files modified
plugin/slave/queue_producer.cc (+55/-41)
plugin/slave/queue_producer.h (+2/-2)
plugin/slave/replication_slave.cc (+27/-22)
To merge this branch: bzr merge lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200
Reviewer Review Type Date Requested Status
David Shrewsbury (community) Disapprove
Drizzle Trunk Pending
Review via email: mp+104046@code.launchpad.net

Description of the change

Fixes bug 956200 (slave config file option max-reconnects doesn't work) and bug 956214 (slave doesn't set status to STOPPED after max-reconnecct failures).

To post a comment you must log in.
Revision history for this message
David Shrewsbury (dshrews) wrote :

Although I whole-heartedly agree with the changes you are trying to make, I think parts of it need reworked a bit.

We need to keep the concept of when a thread is actually alive (i.e., RUNNING) vs. when the thread has terminated (STOPPED). You've essentially redefined those states here, and that's the part I disagree with. What we need instead is to add additional thread states. E.g.,

  * running and initializing (INITIALIZING). This would likely replace RUNNING.
  * running and disconnected and will not attempt reconnect (DISCONNECTED)
  * running and awaiting reconnect (RECONNECTING)
  * running and connected normally (CONNECTED)
  * running but in some awful error state (ERROR)
  * thread has terminated normally (STOPPED)
  * etc.

Something along those lines. Those states are just off the top of my head and I'm open to what is defined.

And although I was the one to move max-reconnects to each [masterN] section for some inexplicable reason, wouldn't we want the max-reconnect, seconds-between-reconnects, and io-thread-sleep values to be common values among all defined masters, rather than defined separately for each master?

review: Disapprove
Revision history for this message
Daniel Nichter (daniel-nichter) wrote :

Dave,

I agree: we need additional states, and I had already planned to tackle that too after further discussion. For right now, however, slave is broken and this patch is just a first step to fix it. More work is needed, but who knows when that work will be done. Since the patch doesn't break or regress any features, I think it's worth accepting now, and we'll continue to improve it when we can.

Le 1 mai 2012 à 10:23, David Shrewsbury a écrit :

> Review: Disapprove
>
> Although I whole-heartedly agree with the changes you are trying to make, I think parts of it need reworked a bit.
>
> We need to keep the concept of when a thread is actually alive (i.e., RUNNING) vs. when the thread has terminated (STOPPED). You've essentially redefined those states here, and that's the part I disagree with. What we need instead is to add additional thread states. E.g.,
>
> * running and initializing (INITIALIZING). This would likely replace RUNNING.
> * running and disconnected and will not attempt reconnect (DISCONNECTED)
> * running and awaiting reconnect (RECONNECTING)
> * running and connected normally (CONNECTED)
> * running but in some awful error state (ERROR)
> * thread has terminated normally (STOPPED)
> * etc.
>
> Something along those lines. Those states are just off the top of my head and I'm open to what is defined.
>
> And although I was the one to move max-reconnects to each [masterN] section for some inexplicable reason, wouldn't we want the max-reconnect, seconds-between-reconnects, and io-thread-sleep values to be common values among all defined masters, rather than defined separately for each master?
> --
> https://code.launchpad.net/~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200/+merge/104046
> You are the owner of lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200.

Revision history for this message
Brian Aker (brianaker) wrote :

Any thoughts on how to proceed?

Revision history for this message
Daniel Nichter (daniel-nichter) wrote :

Yes, I sent another email to the discussion list about the additional status states. We can put this merge on hold for now until I implement these addition states.

Le 3 mai 2012 à 23:39, Brian Aker a écrit :

> Any thoughts on how to proceed?
> --
> https://code.launchpad.net/~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200/+merge/104046
> You are the owner of lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200.

2552. By Daniel Nichter

Add extra IO thread states: INITIALIZING, DISCONNECTED, RECONNECTING, CONNECTING, OK, STOPPED, ERROR. Change setIOState() params. Use C++ str instead of C str for err msg.

Unmerged revisions

2552. By Daniel Nichter

Add extra IO thread states: INITIALIZING, DISCONNECTED, RECONNECTING, CONNECTING, OK, STOPPED, ERROR. Change setIOState() params. Use C++ str instead of C str for err msg.

2551. By Daniel Nichter

Make status=STOPPED until IO thread connects. Set error_msg if all reconnects fail.

2550. By Daniel Nichter

Fix max-reconnects. Make seconds-between-reconnects and io-thread-sleep per-master options.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'plugin/slave/queue_producer.cc'
--- plugin/slave/queue_producer.cc 2012-04-14 20:43:20 +0000
+++ plugin/slave/queue_producer.cc 2012-05-06 02:51:19 +0000
@@ -27,6 +27,7 @@
27#include <drizzled/gettext.h>27#include <drizzled/gettext.h>
28#include <drizzled/message/transaction.pb.h>28#include <drizzled/message/transaction.pb.h>
29#include <boost/lexical_cast.hpp>29#include <boost/lexical_cast.hpp>
30#include <boost/format.hpp>
30#include <google/protobuf/text_format.h>31#include <google/protobuf/text_format.h>
31#include <string>32#include <string>
32#include <vector>33#include <vector>
@@ -45,7 +46,7 @@
4546
46bool QueueProducer::init()47bool QueueProducer::init()
47{48{
48 setIOState("", true);49 setIOState("INITIALIZING");
49 return reconnect(true);50 return reconnect(true);
50}51}
5152
@@ -104,7 +105,11 @@
104105
105void QueueProducer::shutdown()106void QueueProducer::shutdown()
106{107{
107 setIOState(_last_error_message, false);108 if (_last_error_message.empty())
109 setIOState("STOPPED");
110 else
111 setIOState("ERROR", _last_error_message);
112
108 if (_is_connected)113 if (_is_connected)
109 closeConnection();114 closeConnection();
110}115}
@@ -121,28 +126,45 @@
121 _last_error_message.clear();126 _last_error_message.clear();
122 boost::posix_time::seconds duration(_seconds_between_reconnects);127 boost::posix_time::seconds duration(_seconds_between_reconnects);
123128
124 uint32_t attempts= 1;129 uint32_t attempts= 0;
125
126 setIOState("Connecting...", true);
127
128 while (not openConnection())130 while (not openConnection())
129 {131 {
130 char buf[250];132 if (++attempts == _max_reconnects)
131 snprintf(buf, sizeof(buf),_("Connection attempt %d of %d failed, sleeping for %d seconds and retrying. %s"), attempts, _max_reconnects, _seconds_between_reconnects, _last_error_message.c_str());133 break;
132134
133 setIOState(buf, true);135 std::string err_msg= boost::str(
134 if (attempts++ == _max_reconnects)136 boost::format(_("Connection attempt %1% of %2% failed. "
135 break;137 "Sleeping for %3% seconds and retrying. %4%"))
138 % attempts
139 % _max_reconnects
140 % _seconds_between_reconnects
141 % _last_error_message
142 );
143 setIOState("RECONNECTING", err_msg);
136 boost::this_thread::sleep(duration);144 boost::this_thread::sleep(duration);
137 }145 }
138146
139 setIOState(_is_connected ? _("Connected") : _("Disconnected"), true);147 if (_is_connected)
148 {
149 setIOState("OK");
150 }
151 else
152 {
153 std::string err_msg= boost::str(
154 boost::format(_("Failed to connect to master after %1% attempts. %2%"))
155 % _max_reconnects
156 % _last_error_message.c_str()
157 );
158 setIOState("DISCONNECTED", err_msg);
159 }
140160
141 return _is_connected;161 return _is_connected;
142}162}
143163
144bool QueueProducer::openConnection()164bool QueueProducer::openConnection()
145{165{
166 setIOState("CONNECTING");
167
146 if ((_drizzle= drizzle_create()) == NULL)168 if ((_drizzle= drizzle_create()) == NULL)
147 {169 {
148 _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;170 _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
@@ -476,41 +498,33 @@
476 return EE_OK;498 return EE_OK;
477}499}
478500
479501void QueueProducer::setIOState(const std::string &state, const std::string &err_msg)
480void QueueProducer::setIOState(const string &err_msg, bool status)
481{502{
482 vector<string> statements;503 vector<string> statements;
483 string sql;504 string sql= "UPDATE `sys_replication`.`io_state` SET `status` = '"
484 string msg(err_msg);505 + state + "', `error_msg` = '";
485
486 if (not status)
487 {
488 sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
489 }
490 else
491 {
492 sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
493 }
494
495 sql.append(", `error_msg` = '", 17);
496506
497 /* Escape embedded quotes and statement terminators */507 /* Escape embedded quotes and statement terminators */
498 string::iterator it;508 if (not err_msg.empty())
499 for (it= msg.begin(); it != msg.end(); ++it)
500 {509 {
501 if (*it == '\'')510 string msg(err_msg);
502 {511 string::iterator it;
503 it= msg.insert(it, '\'');512 for (it= msg.begin(); it != msg.end(); ++it)
504 ++it; /* advance back to the quote */513 {
505 }514 if (*it == '\'')
506 else if (*it == ';')515 {
507 {516 it= msg.insert(it, '\'');
508 it= msg.insert(it, '\\');517 ++it; /* advance back to the quote */
509 ++it; /* advance back to the semicolon */518 }
510 }519 else if (*it == ';')
520 {
521 it= msg.insert(it, '\\');
522 ++it; /* advance back to the semicolon */
523 }
524 }
525 sql.append(msg);
511 }526 }
512 527
513 sql.append(msg);
514 sql.append("' WHERE `master_id` = ");528 sql.append("' WHERE `master_id` = ");
515 sql.append(boost::lexical_cast<string>(masterId()));529 sql.append(boost::lexical_cast<string>(masterId()));
516530
517531
=== modified file 'plugin/slave/queue_producer.h'
--- plugin/slave/queue_producer.h 2011-11-26 23:14:59 +0000
+++ plugin/slave/queue_producer.h 2012-05-06 02:51:19 +0000
@@ -193,10 +193,10 @@
193 /**193 /**
194 * Update IO thread status in state table.194 * Update IO thread status in state table.
195 *195 *
196 * @param state State of IO thread
196 * @param err_msg Error message string197 * @param err_msg Error message string
197 * @param status false = STOPPED, true = RUNNING
198 */198 */
199 void setIOState(const std::string &err_msg, bool status);199 void setIOState(const std::string &state, const std::string &err_msg="");
200200
201};201};
202202
203203
=== modified file 'plugin/slave/replication_slave.cc'
--- plugin/slave/replication_slave.cc 2011-07-06 23:13:00 +0000
+++ plugin/slave/replication_slave.cc 2012-05-06 02:51:19 +0000
@@ -63,14 +63,19 @@
63 po::variables_map vm;63 po::variables_map vm;
64 po::options_description slave_options("Options for the slave plugin");64 po::options_description slave_options("Options for the slave plugin");
6565
66 /* Common options */66 /*
67 * Define the slave's applier (consumer) thread options. These are
68 * the valid options at the start of the slave config file before
69 * any [masterN] headers.
70 */
67 slave_options.add_options()71 slave_options.add_options()
68 ("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
69 ("io-thread-sleep", po::value<uint32_t>()->default_value(5))
70 ("applier-thread-sleep", po::value<uint32_t>()->default_value(5))72 ("applier-thread-sleep", po::value<uint32_t>()->default_value(5))
71 ("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());73 ("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());
7274
73 /* Master defining options */75 /*
76 * Define per-master IO (producer) thread options. These are
77 * the valid options under each [masterN] header in the slave config file.
78 */
74 for (size_t num= 1; num <= 10; num++)79 for (size_t num= 1; num <= 10; num++)
75 {80 {
76 string section("master");81 string section("master");
@@ -81,6 +86,8 @@
81 ((section + ".master-user").c_str(), po::value<string>()->default_value(""))86 ((section + ".master-user").c_str(), po::value<string>()->default_value(""))
82 ((section + ".master-pass").c_str(), po::value<string>()->default_value(""))87 ((section + ".master-pass").c_str(), po::value<string>()->default_value(""))
83 ((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))88 ((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
89 ((section + ".seconds-between-reconnects").c_str(), po::value<uint32_t>()->default_value(30))
90 ((section + ".io-thread-sleep").c_str(), po::value<uint32_t>()->default_value(5))
84 ((section + ".max-commit-id").c_str(), po::value<uint64_t>());91 ((section + ".max-commit-id").c_str(), po::value<uint64_t>());
85 }92 }
8693
@@ -119,6 +126,7 @@
119126
120 _masters[num]= new (std::nothrow) Master(num);127 _masters[num]= new (std::nothrow) Master(num);
121128
129 /* Set the IO (consumer) thread options for this master. */
122 if (vm.count(section + ".master-host"))130 if (vm.count(section + ".master-host"))
123 master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());131 master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());
124132
@@ -133,28 +141,24 @@
133141
134 if (vm.count(section + ".max-commit-id"))142 if (vm.count(section + ".max-commit-id"))
135 master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());143 master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());
136 }144
137145 if (vm.count(section + ".max-reconnects"))
138 boost::unordered_map<uint32_t, Master *>::const_iterator it;146 master(num).producer().setMaxReconnectAttempts(vm[section + ".max-reconnects"].as<uint32_t>());
139147
140 for (it= _masters.begin(); it != _masters.end(); ++it)148 if (vm.count(section + ".seconds-between-reconnects"))
141 {149 master(num).producer().setSecondsBetweenReconnects(vm[section + ".seconds-between-reconnects"].as<uint32_t>());
142 if (vm.count("max-reconnects"))150
143 it->second->producer().setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());151 if (vm.count(section + ".io-thread-sleep"))
144152 master(num).producer().setSleepInterval(vm[section + ".io-thread-sleep"].as<uint32_t>());
145 if (vm.count("seconds-between-reconnects"))153 }
146 it->second->producer().setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());154
147155 /* Set the slave's applier (consumer) thread options. */
148 if (vm.count("io-thread-sleep"))
149 it->second->producer().setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
150 }
151
152 if (vm.count("applier-thread-sleep"))156 if (vm.count("applier-thread-sleep"))
153 _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());157 _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
154 if (vm.count("ignore-errors"))158 if (vm.count("ignore-errors"))
155 _consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());159 _consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());
156160
157 /* setup schema and tables */161 /* Create the sys_replication schema and its tables. */
158 ReplicationSchema rs;162 ReplicationSchema rs;
159 if (not rs.create())163 if (not rs.create())
160 {164 {
@@ -162,9 +166,10 @@
162 return false;166 return false;
163 }167 }
164168
169 /* Create a row in the sys_replication tables for each master. */
170 boost::unordered_map<uint32_t, Master *>::const_iterator it;
165 for (it= _masters.begin(); it != _masters.end(); ++it)171 for (it= _masters.begin(); it != _masters.end(); ++it)
166 {172 {
167 /* make certain a row exists for each master */
168 rs.createInitialApplierRow(it->first);173 rs.createInitialApplierRow(it->first);
169 rs.createInitialIORow(it->first);174 rs.createInitialIORow(it->first);
170175

Subscribers

People subscribed via source and target branches

to all changes: