Merge lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200 into lp:drizzle

Proposed by Daniel Nichter
Status: Work in progress
Proposed branch: lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200
Merge into: lp:drizzle
Diff against target: 267 lines (+84/-65)
3 files modified
plugin/slave/queue_producer.cc (+55/-41)
plugin/slave/queue_producer.h (+2/-2)
plugin/slave/replication_slave.cc (+27/-22)
To merge this branch: bzr merge lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200
Reviewer Review Type Date Requested Status
David Shrewsbury (community) Disapprove
Drizzle Trunk Pending
Review via email: mp+104046@code.launchpad.net

Description of the change

Fixes bug 956200 (slave config file option max-reconnects doesn't work) and bug 956214 (slave doesn't set status to STOPPED after max-reconnecct failures).

To post a comment you must log in.
Revision history for this message
David Shrewsbury (dshrews) wrote :

Although I whole-heartedly agree with the changes you are trying to make, I think parts of it need reworked a bit.

We need to keep the concept of when a thread is actually alive (i.e., RUNNING) vs. when the thread has terminated (STOPPED). You've essentially redefined those states here, and that's the part I disagree with. What we need instead is to add additional thread states. E.g.,

  * running and initializing (INITIALIZING). This would likely replace RUNNING.
  * running and disconnected and will not attempt reconnect (DISCONNECTED)
  * running and awaiting reconnect (RECONNECTING)
  * running and connected normally (CONNECTED)
  * running but in some awful error state (ERROR)
  * thread has terminated normally (STOPPED)
  * etc.

Something along those lines. Those states are just off the top of my head and I'm open to what is defined.

And although I was the one to move max-reconnects to each [masterN] section for some inexplicable reason, wouldn't we want the max-reconnect, seconds-between-reconnects, and io-thread-sleep values to be common values among all defined masters, rather than defined separately for each master?

review: Disapprove
Revision history for this message
Daniel Nichter (daniel-nichter) wrote :

Dave,

I agree: we need additional states, and I had already planned to tackle that too after further discussion. For right now, however, slave is broken and this patch is just a first step to fix it. More work is needed, but who knows when that work will be done. Since the patch doesn't break or regress any features, I think it's worth accepting now, and we'll continue to improve it when we can.

Le 1 mai 2012 à 10:23, David Shrewsbury a écrit :

> Review: Disapprove
>
> Although I whole-heartedly agree with the changes you are trying to make, I think parts of it need reworked a bit.
>
> We need to keep the concept of when a thread is actually alive (i.e., RUNNING) vs. when the thread has terminated (STOPPED). You've essentially redefined those states here, and that's the part I disagree with. What we need instead is to add additional thread states. E.g.,
>
> * running and initializing (INITIALIZING). This would likely replace RUNNING.
> * running and disconnected and will not attempt reconnect (DISCONNECTED)
> * running and awaiting reconnect (RECONNECTING)
> * running and connected normally (CONNECTED)
> * running but in some awful error state (ERROR)
> * thread has terminated normally (STOPPED)
> * etc.
>
> Something along those lines. Those states are just off the top of my head and I'm open to what is defined.
>
> And although I was the one to move max-reconnects to each [masterN] section for some inexplicable reason, wouldn't we want the max-reconnect, seconds-between-reconnects, and io-thread-sleep values to be common values among all defined masters, rather than defined separately for each master?
> --
> https://code.launchpad.net/~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200/+merge/104046
> You are the owner of lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200.

Revision history for this message
Brian Aker (brianaker) wrote :

Any thoughts on how to proceed?

Revision history for this message
Daniel Nichter (daniel-nichter) wrote :

Yes, I sent another email to the discussion list about the additional status states. We can put this merge on hold for now until I implement these addition states.

Le 3 mai 2012 à 23:39, Brian Aker a écrit :

> Any thoughts on how to proceed?
> --
> https://code.launchpad.net/~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200/+merge/104046
> You are the owner of lp:~daniel-nichter/drizzle/fix-slave-reconnect-bug-956200.

2552. By Daniel Nichter

Add extra IO thread states: INITIALIZING, DISCONNECTED, RECONNECTING, CONNECTING, OK, STOPPED, ERROR. Change setIOState() params. Use C++ str instead of C str for err msg.

Unmerged revisions

2552. By Daniel Nichter

Add extra IO thread states: INITIALIZING, DISCONNECTED, RECONNECTING, CONNECTING, OK, STOPPED, ERROR. Change setIOState() params. Use C++ str instead of C str for err msg.

2551. By Daniel Nichter

Make status=STOPPED until IO thread connects. Set error_msg if all reconnects fail.

2550. By Daniel Nichter

Fix max-reconnects. Make seconds-between-reconnects and io-thread-sleep per-master options.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'plugin/slave/queue_producer.cc'
2--- plugin/slave/queue_producer.cc 2012-04-14 20:43:20 +0000
3+++ plugin/slave/queue_producer.cc 2012-05-06 02:51:19 +0000
4@@ -27,6 +27,7 @@
5 #include <drizzled/gettext.h>
6 #include <drizzled/message/transaction.pb.h>
7 #include <boost/lexical_cast.hpp>
8+#include <boost/format.hpp>
9 #include <google/protobuf/text_format.h>
10 #include <string>
11 #include <vector>
12@@ -45,7 +46,7 @@
13
14 bool QueueProducer::init()
15 {
16- setIOState("", true);
17+ setIOState("INITIALIZING");
18 return reconnect(true);
19 }
20
21@@ -104,7 +105,11 @@
22
23 void QueueProducer::shutdown()
24 {
25- setIOState(_last_error_message, false);
26+ if (_last_error_message.empty())
27+ setIOState("STOPPED");
28+ else
29+ setIOState("ERROR", _last_error_message);
30+
31 if (_is_connected)
32 closeConnection();
33 }
34@@ -121,28 +126,45 @@
35 _last_error_message.clear();
36 boost::posix_time::seconds duration(_seconds_between_reconnects);
37
38- uint32_t attempts= 1;
39-
40- setIOState("Connecting...", true);
41-
42+ uint32_t attempts= 0;
43 while (not openConnection())
44 {
45- char buf[250];
46- snprintf(buf, sizeof(buf),_("Connection attempt %d of %d failed, sleeping for %d seconds and retrying. %s"), attempts, _max_reconnects, _seconds_between_reconnects, _last_error_message.c_str());
47+ if (++attempts == _max_reconnects)
48+ break;
49
50- setIOState(buf, true);
51- if (attempts++ == _max_reconnects)
52- break;
53+ std::string err_msg= boost::str(
54+ boost::format(_("Connection attempt %1% of %2% failed. "
55+ "Sleeping for %3% seconds and retrying. %4%"))
56+ % attempts
57+ % _max_reconnects
58+ % _seconds_between_reconnects
59+ % _last_error_message
60+ );
61+ setIOState("RECONNECTING", err_msg);
62 boost::this_thread::sleep(duration);
63 }
64
65- setIOState(_is_connected ? _("Connected") : _("Disconnected"), true);
66+ if (_is_connected)
67+ {
68+ setIOState("OK");
69+ }
70+ else
71+ {
72+ std::string err_msg= boost::str(
73+ boost::format(_("Failed to connect to master after %1% attempts. %2%"))
74+ % _max_reconnects
75+ % _last_error_message.c_str()
76+ );
77+ setIOState("DISCONNECTED", err_msg);
78+ }
79
80 return _is_connected;
81 }
82
83 bool QueueProducer::openConnection()
84 {
85+ setIOState("CONNECTING");
86+
87 if ((_drizzle= drizzle_create()) == NULL)
88 {
89 _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
90@@ -476,41 +498,33 @@
91 return EE_OK;
92 }
93
94-
95-void QueueProducer::setIOState(const string &err_msg, bool status)
96+void QueueProducer::setIOState(const std::string &state, const std::string &err_msg)
97 {
98 vector<string> statements;
99- string sql;
100- string msg(err_msg);
101-
102- if (not status)
103- {
104- sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
105- }
106- else
107- {
108- sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
109- }
110-
111- sql.append(", `error_msg` = '", 17);
112+ string sql= "UPDATE `sys_replication`.`io_state` SET `status` = '"
113+ + state + "', `error_msg` = '";
114
115 /* Escape embedded quotes and statement terminators */
116- string::iterator it;
117- for (it= msg.begin(); it != msg.end(); ++it)
118+ if (not err_msg.empty())
119 {
120- if (*it == '\'')
121- {
122- it= msg.insert(it, '\'');
123- ++it; /* advance back to the quote */
124- }
125- else if (*it == ';')
126- {
127- it= msg.insert(it, '\\');
128- ++it; /* advance back to the semicolon */
129- }
130+ string msg(err_msg);
131+ string::iterator it;
132+ for (it= msg.begin(); it != msg.end(); ++it)
133+ {
134+ if (*it == '\'')
135+ {
136+ it= msg.insert(it, '\'');
137+ ++it; /* advance back to the quote */
138+ }
139+ else if (*it == ';')
140+ {
141+ it= msg.insert(it, '\\');
142+ ++it; /* advance back to the semicolon */
143+ }
144+ }
145+ sql.append(msg);
146 }
147-
148- sql.append(msg);
149+
150 sql.append("' WHERE `master_id` = ");
151 sql.append(boost::lexical_cast<string>(masterId()));
152
153
154=== modified file 'plugin/slave/queue_producer.h'
155--- plugin/slave/queue_producer.h 2011-11-26 23:14:59 +0000
156+++ plugin/slave/queue_producer.h 2012-05-06 02:51:19 +0000
157@@ -193,10 +193,10 @@
158 /**
159 * Update IO thread status in state table.
160 *
161+ * @param state State of IO thread
162 * @param err_msg Error message string
163- * @param status false = STOPPED, true = RUNNING
164 */
165- void setIOState(const std::string &err_msg, bool status);
166+ void setIOState(const std::string &state, const std::string &err_msg="");
167
168 };
169
170
171=== modified file 'plugin/slave/replication_slave.cc'
172--- plugin/slave/replication_slave.cc 2011-07-06 23:13:00 +0000
173+++ plugin/slave/replication_slave.cc 2012-05-06 02:51:19 +0000
174@@ -63,14 +63,19 @@
175 po::variables_map vm;
176 po::options_description slave_options("Options for the slave plugin");
177
178- /* Common options */
179+ /*
180+ * Define the slave's applier (consumer) thread options. These are
181+ * the valid options at the start of the slave config file before
182+ * any [masterN] headers.
183+ */
184 slave_options.add_options()
185- ("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
186- ("io-thread-sleep", po::value<uint32_t>()->default_value(5))
187 ("applier-thread-sleep", po::value<uint32_t>()->default_value(5))
188 ("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());
189
190- /* Master defining options */
191+ /*
192+ * Define per-master IO (producer) thread options. These are
193+ * the valid options under each [masterN] header in the slave config file.
194+ */
195 for (size_t num= 1; num <= 10; num++)
196 {
197 string section("master");
198@@ -81,6 +86,8 @@
199 ((section + ".master-user").c_str(), po::value<string>()->default_value(""))
200 ((section + ".master-pass").c_str(), po::value<string>()->default_value(""))
201 ((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
202+ ((section + ".seconds-between-reconnects").c_str(), po::value<uint32_t>()->default_value(30))
203+ ((section + ".io-thread-sleep").c_str(), po::value<uint32_t>()->default_value(5))
204 ((section + ".max-commit-id").c_str(), po::value<uint64_t>());
205 }
206
207@@ -119,6 +126,7 @@
208
209 _masters[num]= new (std::nothrow) Master(num);
210
211+ /* Set the IO (consumer) thread options for this master. */
212 if (vm.count(section + ".master-host"))
213 master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());
214
215@@ -133,28 +141,24 @@
216
217 if (vm.count(section + ".max-commit-id"))
218 master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());
219- }
220-
221- boost::unordered_map<uint32_t, Master *>::const_iterator it;
222-
223- for (it= _masters.begin(); it != _masters.end(); ++it)
224- {
225- if (vm.count("max-reconnects"))
226- it->second->producer().setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
227-
228- if (vm.count("seconds-between-reconnects"))
229- it->second->producer().setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
230-
231- if (vm.count("io-thread-sleep"))
232- it->second->producer().setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
233- }
234-
235+
236+ if (vm.count(section + ".max-reconnects"))
237+ master(num).producer().setMaxReconnectAttempts(vm[section + ".max-reconnects"].as<uint32_t>());
238+
239+ if (vm.count(section + ".seconds-between-reconnects"))
240+ master(num).producer().setSecondsBetweenReconnects(vm[section + ".seconds-between-reconnects"].as<uint32_t>());
241+
242+ if (vm.count(section + ".io-thread-sleep"))
243+ master(num).producer().setSleepInterval(vm[section + ".io-thread-sleep"].as<uint32_t>());
244+ }
245+
246+ /* Set the slave's applier (consumer) thread options. */
247 if (vm.count("applier-thread-sleep"))
248 _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
249 if (vm.count("ignore-errors"))
250 _consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());
251
252- /* setup schema and tables */
253+ /* Create the sys_replication schema and its tables. */
254 ReplicationSchema rs;
255 if (not rs.create())
256 {
257@@ -162,9 +166,10 @@
258 return false;
259 }
260
261+ /* Create a row in the sys_replication tables for each master. */
262+ boost::unordered_map<uint32_t, Master *>::const_iterator it;
263 for (it= _masters.begin(); it != _masters.end(); ++it)
264 {
265- /* make certain a row exists for each master */
266 rs.createInitialApplierRow(it->first);
267 rs.createInitialIORow(it->first);
268

Subscribers

People subscribed via source and target branches

to all changes: