Merge lp:~vlad-lesin/percona-playback/connections_pool into lp:percona-playback

Proposed by Vlad Lesin on 2012-11-19
Status: Merged
Approved by: Stewart Smith on 2012-12-07
Approved revision: 178
Merged at revision: 174
Proposed branch: lp:~vlad-lesin/percona-playback/connections_pool
Merge into: lp:percona-playback
Diff against target: 1557 lines (+617/-299)
24 files modified
Makefile.am (+6/-3)
percona_playback/db_thread.cc (+8/-10)
percona_playback/db_thread.h (+18/-23)
percona_playback/dispatcher.cc (+0/-105)
percona_playback/dispatcher.h (+0/-46)
percona_playback/libdrizzle_client/libdrizzle_client.cc (+4/-3)
percona_playback/mysql_client/mysql_client.cc (+69/-27)
percona_playback/mysql_client/mysql_client.h (+5/-2)
percona_playback/null_dbclient/null_dbclient.cc (+4/-3)
percona_playback/percona_playback.cc (+46/-1)
percona_playback/plugin.h (+29/-0)
percona_playback/query_entry.h (+2/-1)
percona_playback/query_log/query_log.cc (+25/-8)
percona_playback/tcpdump/connection_state.cc (+20/-15)
percona_playback/tcpdump/connection_state.h (+10/-12)
percona_playback/tcpdump/pcap_packets_parser.cc (+38/-24)
percona_playback/tcpdump/pcap_packets_parser.h (+10/-2)
percona_playback/tcpdump/tcpdump_query_entries.cc (+2/-12)
percona_playback/tcpdump/tcpdump_query_entries.h (+10/-2)
percona_playback/test/thread-pool-sysbench-slow.cc (+60/-0)
percona_playback/thread_per_connection/plugin.ini (+6/-0)
percona_playback/thread_per_connection/thread_per_connection.cc (+109/-0)
percona_playback/thread_pool/plugin.ini (+6/-0)
percona_playback/thread_pool/thread_pool.cc (+130/-0)
To merge this branch: bzr merge lp:~vlad-lesin/percona-playback/connections_pool
Reviewer Review Type Date Requested Status
Stewart Smith (community) 2012-11-19 Approve on 2012-12-07
Review via email: mp+134884@code.launchpad.net

Description of the change

A bunch of changes for implementing this https://blueprints.launchpad.net/percona-playback/+spec/pool-of-threads blueprint as well as fixes for some critical bugs.

To post a comment you must log in.
171. By Vlad Lesin on 2012-11-19

Fixes for bug #1070824 and bug #1080654

172. By Vlad Lesin on 2012-11-19

merged with trunk

173. By Vlad Lesin on 2012-11-20

Send init session query on reconnect

174. By Vlad Lesin on 2012-11-20

Initialize session init query with the value from cmd line options.

175. By Vlad Lesin on 2012-11-23

Thread-safety fixes for mysql client plugin.

176. By Vlad Lesin on 2012-11-23

Don't push empty query log queries for execution

177. By Vlad Lesin on 2012-11-23

Convert multiline query to single line to avoid syntax errors from server.

178. By Vlad Lesin on 2012-11-23

Allow multi-statement queries.

Stewart Smith (stewart) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Makefile.am'
2--- Makefile.am 2012-11-08 22:17:07 +0000
3+++ Makefile.am 2012-11-23 20:15:24 +0000
4@@ -59,13 +59,11 @@
5 libpercona_playback_la_SOURCES = \
6 percona_playback/percona_playback.cc \
7 percona_playback/plugin.cc \
8- percona_playback/db_thread.cc \
9- percona_playback/dispatcher.cc
10+ percona_playback/db_thread.cc
11
12 nobase_include_HEADERS += \
13 percona_playback/percona_playback.h \
14 percona_playback/db_thread.h \
15- percona_playback/dispatcher.h \
16 percona_playback/query_result.h \
17 percona_playback/plugin.h \
18 percona_playback/tokenize.h \
19@@ -89,6 +87,7 @@
20 percona_playback/test/crashme-slow \
21 percona_playback/test/sqlbench-transactions-slow \
22 percona_playback/test/sysbench-slow \
23+ percona_playback/test/thread-pool-sysbench-slow \
24 percona_playback/test/preserve_query_time \
25 percona_playback/test/tcpdump_without_handshake \
26 percona_playback/test/tcpdump_fragmented_packet \
27@@ -104,6 +103,7 @@
28 percona_playback_test_crashme_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
29 percona_playback_test_sqlbench_transactions_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
30 percona_playback_test_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
31+percona_playback_test_thread_pool_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
32 percona_playback_test_preserve_query_time_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
33 percona_playback_test_tcpdump_without_handshake_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
34 percona_playback_test_tcpdump_fragmented_packet_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
35@@ -135,6 +135,9 @@
36 percona_playback_test_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc
37 percona_playback_test_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS)
38
39+percona_playback_test_thread_pool_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc
40+percona_playback_test_thread_pool_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS)
41+
42 percona_playback_test_preserve_query_time_SOURCES= percona_playback/test/preserve_query_time.cc
43 percona_playback_test_preserve_query_time_LDADD= $(LDADD) $(MYSQL_LIBS)
44
45
46=== modified file 'percona_playback/db_thread.cc'
47--- percona_playback/db_thread.cc 2012-11-14 11:44:59 +0000
48+++ percona_playback/db_thread.cc 2012-11-23 20:15:24 +0000
49@@ -24,25 +24,23 @@
50
51 extern std::string g_session_init_query;
52
53-void DBThread::connect_and_init_session()
54+void DBThread::init_session()
55 {
56- connect();
57- if (!g_session_init_query.empty())
58- {
59- QueryResult r;
60- QueryResult er;
61- execute_query(g_session_init_query, &r, er);
62- }
63+ if (g_session_init_query.empty())
64+ return;
65+ QueryResult r;
66+ QueryResult er;
67+ execute_query(g_session_init_query, &r, er);
68 }
69
70 void DBThread::run()
71 {
72 connect_and_init_session();
73
74- QueryEntryPtr query;
75 while (true)
76 {
77- queries.pop(query);
78+ QueryEntryPtr query;
79+ queries->pop(query);
80
81 if (query->is_shutdown())
82 break;
83
84=== modified file 'percona_playback/db_thread.h'
85--- percona_playback/db_thread.h 2012-11-14 11:44:59 +0000
86+++ percona_playback/db_thread.h 2012-11-23 20:15:24 +0000
87@@ -16,6 +16,7 @@
88 #ifndef PERCONA_PLAYBACK_DB_THREAD_H
89 #define PERCONA_PLAYBACK_DB_THREAD_H
90
91+#include <memory>
92 #include "percona_playback/visibility.h"
93 #include <boost/thread.hpp>
94 #include <boost/shared_ptr.hpp>
95@@ -33,55 +34,49 @@
96
97 class QueryResult;
98
99-class DBThreadState
100-{
101-public:
102- virtual ~DBThreadState(){}
103-};
104-
105 class DBThread
106 {
107
108 private:
109 boost::thread thread;
110 uint64_t thread_id;
111- boost::shared_ptr<DBThreadState> state;
112
113 public:
114 typedef tbb::concurrent_bounded_queue<QueryEntryPtr> Queries;
115- Queries queries;
116+ boost::shared_ptr<Queries> queries;
117
118- DBThread(uint64_t _thread_id) : thread_id(_thread_id) {
119- queries.set_capacity(g_db_thread_queue_depth);
120+ DBThread(uint64_t _thread_id,
121+ boost::shared_ptr<Queries> _queries) :
122+ thread_id(_thread_id), queries(_queries) {
123+ queries->set_capacity(g_db_thread_queue_depth);
124 }
125
126 virtual ~DBThread() {}
127
128- void set_state(boost::shared_ptr<DBThreadState> new_state)
129- {
130- state= new_state;
131- }
132-
133- boost::shared_ptr<DBThreadState> get_state() const
134- {
135- return state;
136- }
137-
138 void join()
139 {
140 thread.join();
141 }
142
143- void connect_and_init_session();
144+ bool connect_and_init_session()
145+ {
146+ if (connect())
147+ {
148+ init_session();
149+ return true;
150+ }
151+ return false;
152+ }
153
154- virtual void connect()= 0;
155+ void init_session();
156+ virtual bool connect()= 0;
157
158 virtual void disconnect()= 0;
159 virtual void execute_query(const std::string &query,
160 QueryResult *r,
161 const QueryResult &expected_result)= 0;
162
163- void run();
164+ virtual void run();
165
166 void start_thread();
167 };
168
169=== removed file 'percona_playback/dispatcher.cc'
170--- percona_playback/dispatcher.cc 2012-07-03 07:41:49 +0000
171+++ percona_playback/dispatcher.cc 1970-01-01 00:00:00 +0000
172@@ -1,105 +0,0 @@
173-/* BEGIN LICENSE
174- * Copyright (C) 2011-2012 Percona Inc.
175- * This program is free software: you can redistribute it and/or modify it
176- * under the terms of the GNU General Public License version 2, as published
177- * by the Free Software Foundation.
178- *
179- * This program is distributed in the hope that it will be useful, but
180- * WITHOUT ANY WARRANTY; without even the implied warranties of
181- * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
182- * PURPOSE. See the GNU General Public License for more details.
183- *
184- * You should have received a copy of the GNU General Public License along
185- * with this program. If not, see <http://www.gnu.org/licenses/>.
186- * END LICENSE */
187-#include "percona_playback/dispatcher.h"
188-#include "percona_playback/db_thread.h"
189-#include "percona_playback/plugin.h"
190-
191-#include <assert.h>
192-
193-Dispatcher g_dispatcher;
194-
195-extern percona_playback::DBClientPlugin *g_dbclient_plugin;
196-
197-boost::shared_ptr<DBThreadState>
198-Dispatcher::get_thread_state(
199- uint64_t thread_id,
200- boost::function1<void, DBThread *> run_on_db_thread_create)
201-{
202- DBExecutorsTable::accessor a;
203-
204- if (executors.insert(a, thread_id))
205- {
206- DBThread *db_thread= g_dbclient_plugin->create(thread_id);
207- assert(db_thread);
208- a->second= db_thread;
209- if (!run_on_db_thread_create.empty())
210- run_on_db_thread_create(db_thread);
211- db_thread->start_thread();
212- }
213-
214- return a->second->get_state();
215-}
216-
217-void
218-Dispatcher::dispatch(QueryEntryPtr query_entry)
219-{
220- uint64_t thread_id= query_entry->getThreadId();
221- {
222- DBExecutorsTable::accessor a;
223- if (executors.insert(a, thread_id))
224- {
225- DBThread *db_thread= g_dbclient_plugin->create(thread_id);
226- a->second= db_thread;
227- db_thread->start_thread();
228- }
229- a->second->queries.push(query_entry);
230- }
231-}
232-
233-bool
234-Dispatcher::finish_and_wait(uint64_t thread_id)
235-{
236- DBThread *db_thread= NULL;
237- {
238- DBExecutorsTable::accessor a;
239- if (executors.find(a, thread_id))
240- {
241- db_thread= a->second;
242- executors.erase(a);
243- }
244- }
245-
246- if (!db_thread)
247- return false;
248-
249- db_thread->queries.push(QueryEntryPtr(new FinishEntry()));
250- db_thread->join();
251-
252- delete db_thread;
253-
254- return true;
255-}
256-
257-void
258-Dispatcher::finish_all_and_wait()
259-{
260- QueryEntryPtr shutdown_command(new FinishEntry());
261- while(executors.size())
262- {
263- uint64_t thread_id;
264- DBThread *t;
265- {
266- DBExecutorsTable::const_iterator iter= executors.begin();
267- thread_id= (*iter).first;
268- t= (*iter).second;
269- }
270- executors.erase(thread_id);
271-
272- t->queries.push(shutdown_command);
273- t->join();
274-
275- delete t;
276- }
277-}
278
279=== removed file 'percona_playback/dispatcher.h'
280--- percona_playback/dispatcher.h 2012-07-06 07:15:28 +0000
281+++ percona_playback/dispatcher.h 1970-01-01 00:00:00 +0000
282@@ -1,46 +0,0 @@
283-/* BEGIN LICENSE
284- * Copyright (C) 2011 Percona Inc.
285- * This program is free software: you can redistribute it and/or modify it
286- * under the terms of the GNU General Public License version 2, as published
287- * by the Free Software Foundation.
288- *
289- * This program is distributed in the hope that it will be useful, but
290- * WITHOUT ANY WARRANTY; without even the implied warranties of
291- * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
292- * PURPOSE. See the GNU General Public License for more details.
293- *
294- * You should have received a copy of the GNU General Public License along
295- * with this program. If not, see <http://www.gnu.org/licenses/>.
296- * END LICENSE */
297-
298-#ifndef PERCONA_PLAYBACK_DISPATCHER_H
299-#define PERCONA_PLAYBACK_DISPATCHER_H
300-
301-#include <tbb/concurrent_hash_map.h>
302-#include <boost/function.hpp>
303-
304-#include "percona_playback/query_entry.h"
305-
306-class DBThread;
307-class DBThreadState;
308-
309-class Dispatcher
310-{
311- typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable;
312- DBExecutorsTable executors;
313- void db_thread_func(DBThread *thread);
314- void start_thread(DBThread *thread);
315-
316-public:
317- boost::shared_ptr<DBThreadState>
318- get_thread_state(uint64_t thread_id,
319- boost::function1<void, DBThread *>
320- run_on_db_thread_create);
321- void dispatch(QueryEntryPtr query_entry);
322- bool finish_and_wait(uint64_t thread_id);
323- void finish_all_and_wait();
324-};
325-
326-extern Dispatcher g_dispatcher;
327-
328-#endif /* PERCONA_PLAYBACK_DISPATCHER_H */
329
330=== modified file 'percona_playback/libdrizzle_client/libdrizzle_client.cc'
331--- percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-07-23 07:08:23 +0000
332+++ percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-11-23 20:15:24 +0000
333@@ -39,13 +39,13 @@
334
335 public:
336 LibDrizzleDBThread(uint64_t _thread_id, LibDrizzleOptions *opt) :
337- DBThread(_thread_id),
338+ DBThread(_thread_id, boost::shared_ptr<Queries>(new Queries())),
339 driz(NULL),
340 options(opt)
341 {
342 }
343
344- void connect();
345+ bool connect();
346 void disconnect();
347 void execute_query(const std::string &query, QueryResult *r,
348 const QueryResult &expected_result);
349@@ -70,7 +70,7 @@
350 unsigned int port;
351 };
352
353-void LibDrizzleDBThread::connect()
354+bool LibDrizzleDBThread::connect()
355 {
356 driz= drizzle_create(NULL);
357 assert(driz != NULL);
358@@ -81,6 +81,7 @@
359 options->user.c_str(),
360 options->password.c_str(),
361 options->schema.c_str(), DRIZZLE_CON_MYSQL);
362+ return true;
363 }
364
365 void LibDrizzleDBThread::disconnect()
366
367=== modified file 'percona_playback/mysql_client/mysql_client.cc'
368--- percona_playback/mysql_client/mysql_client.cc 2012-07-04 09:10:16 +0000
369+++ percona_playback/mysql_client/mysql_client.cc 2012-11-23 20:15:24 +0000
370@@ -42,17 +42,23 @@
371 unsigned int port;
372 };
373
374-void MySQLDBThread::connect()
375+bool MySQLDBThread::connect()
376 {
377 mysql_init(&handle);
378- mysql_real_connect(&handle,
379- options->host.c_str(),
380- options->user.c_str(),
381- options->password.c_str(),
382- options->schema.c_str(),
383- options->port,
384- "",
385- 0);
386+ if (!mysql_real_connect(&handle,
387+ options->host.c_str(),
388+ options->user.c_str(),
389+ options->password.c_str(),
390+ options->schema.c_str(),
391+ options->port,
392+ "",
393+ CLIENT_MULTI_STATEMENTS))
394+ {
395+ fprintf(stderr, "Can't connect to server: %s\n",
396+ mysql_error(&handle));
397+ return false;
398+ }
399+ return true;
400 }
401
402 void MySQLDBThread::disconnect()
403@@ -63,29 +69,50 @@
404 void MySQLDBThread::execute_query(const std::string &query, QueryResult *r,
405 const QueryResult &)
406 {
407- int mr= mysql_real_query(&handle, query.c_str(), query.length());
408- if(mr != 0)
409- {
410- r->setError(mr);
411- }
412- else
413- {
414- MYSQL_RES* mysql_res= NULL;
415-
416- r->setError(mr);
417- r->setWarningCount(mysql_warning_count(&handle));
418-
419- mysql_res= mysql_store_result(&handle);
420-
421- if (mysql_res != NULL)
422- r->setRowsSent(mysql_num_rows(mysql_res));
423+ int mr;
424+ for(unsigned i = 0; i < max_err_num; ++i)
425+ {
426+ mr= mysql_real_query(&handle, query.c_str(), query.length());
427+ r->setError(mr);
428+ if(mr != 0)
429+ {
430+ fprintf(stderr,
431+ "Error during query: %s, number of tries %u\n",
432+ mysql_error(&handle),
433+ i);
434+ disconnect();
435+ connect_and_init_session();
436+ }
437 else
438+ {
439+ r->setWarningCount(mysql_warning_count(&handle));
440 r->setRowsSent(0);
441-
442- mysql_free_result(mysql_res);
443+ do
444+ {
445+ MYSQL_RES* mysql_res= NULL;
446+
447+ mysql_res= mysql_store_result(&handle);
448+
449+ if (mysql_res != NULL)
450+ {
451+ r->setRowsSent(mysql_num_rows(mysql_res));
452+ mysql_free_result(mysql_res);
453+ }
454+
455+ } while(!mysql_next_result(&handle));
456+
457+ break;
458+ }
459 }
460 }
461
462+void MySQLDBThread::run()
463+{
464+ mysql_thread_init();
465+ DBThread::run();
466+ mysql_thread_end();
467+}
468+
469 class MySQLDBClientPlugin : public percona_playback::DBClientPlugin
470 {
471 private:
472@@ -126,6 +153,21 @@
473 return -1;
474 }
475
476+ if (!active)
477+ return 0;
478+
479+ if (!mysql_thread_safe())
480+ {
481+ fprintf(stderr, "libmysqlclient is not thread safe\n");
482+ return -1;
483+ }
484+
485+ if (mysql_library_init(0, NULL, NULL))
486+ {
487+ fprintf(stderr, "could not initialize mysql library\n");
488+ return -1;
489+ }
490+
491 if (vm.count("mysql-host"))
492 {
493 options.host= vm["mysql-host"].as<std::string>();
494
495=== modified file 'percona_playback/mysql_client/mysql_client.h'
496--- percona_playback/mysql_client/mysql_client.h 2012-06-15 10:53:23 +0000
497+++ percona_playback/mysql_client/mysql_client.h 2012-11-23 20:15:24 +0000
498@@ -10,16 +10,19 @@
499 private:
500 MYSQL handle;
501 MySQLOptions *options;
502+ static const unsigned max_err_num = 10;
503
504 public:
505 MySQLDBThread(uint64_t _thread_id, MySQLOptions *opt) :
506- DBThread(_thread_id),
507+ DBThread(_thread_id,
508+ boost::shared_ptr<Queries>(new Queries())),
509 options(opt)
510 {
511 }
512
513- void connect();
514+ bool connect();
515 void disconnect();
516 void execute_query(const std::string &query, QueryResult *r,
517 const QueryResult &expected_result);
518+ void run();
519 };
520
521=== modified file 'percona_playback/null_dbclient/null_dbclient.cc'
522--- percona_playback/null_dbclient/null_dbclient.cc 2012-04-05 04:47:09 +0000
523+++ percona_playback/null_dbclient/null_dbclient.cc 2012-11-23 20:15:24 +0000
524@@ -20,10 +20,11 @@
525 class NULLDBThread : public DBThread
526 {
527 public:
528- NULLDBThread(uint64_t _thread_id) : DBThread(_thread_id) {
529- }
530+ NULLDBThread(uint64_t _thread_id) :
531+ DBThread(_thread_id,
532+ boost::shared_ptr<Queries>(new Queries())) {}
533
534- void connect() {};
535+ bool connect() { return true; };
536 void disconnect() {};
537 void execute_query(const std::string &, QueryResult *r,
538 const QueryResult &expected_result) {
539
540=== modified file 'percona_playback/percona_playback.cc'
541--- percona_playback/percona_playback.cc 2012-11-14 11:44:59 +0000
542+++ percona_playback/percona_playback.cc 2012-11-23 20:15:24 +0000
543@@ -40,6 +40,7 @@
544
545 percona_playback::DBClientPlugin *g_dbclient_plugin= NULL;
546 percona_playback::InputPlugin *g_input_plugin= NULL;
547+percona_playback::DispatcherPlugin *g_dispatcher_plugin= NULL;
548 unsigned int g_db_thread_queue_depth;
549 std::string g_session_init_query;
550
551@@ -132,6 +133,22 @@
552 std::cerr << _("Selected Input Plugin: ")
553 << g_input_plugin->name
554 << std::endl;
555+
556+ std::cerr << std::endl << _("Loaded Dispatcher Plugins: ");
557+
558+ BOOST_FOREACH(const PluginRegistry::DispatcherPluginPair &pp,
559+ PluginRegistry::singleton().dispatcher_plugins)
560+ {
561+ std::cerr << pp.first << " ";
562+ }
563+
564+ std::cerr << std::endl;
565+ std::cerr << std::endl;
566+
567+ assert(g_dispatcher_plugin);
568+ std::cerr << _("Selected dispatcher Plugin: ")
569+ << g_dispatcher_plugin->name
570+ << std::endl;
571 }
572
573 int percona_playback_argv(percona_playback_st *the_percona_playback,
574@@ -152,12 +169,13 @@
575 db_options.add_options()
576 ("db-plugin", po::value<std::string>(), _("Database plugin"))
577 ("input-plugin", po::value<std::string>(), _("Input plugin"))
578+ ("dispatcher-plugin", po::value<std::string>(), _("Dispatcher plugin"))
579 ("queue-depth", po::value<unsigned int>(),
580 _("Queue depth for DB executor (thread). The larger this number the"
581 " greater the played-back workload can deviate from the original workload"
582 " as some connections may be up to queue-depth behind. (default 1)"))
583 ("session-init-query",
584- po::value<std::string>()->default_value(""),
585+ po::value<std::string>(&g_session_init_query)->default_value(""),
586 _("This query will be executed just after each connect to db"))
587 ;
588
589@@ -237,6 +255,32 @@
590 }
591 g_input_plugin->active= true;
592
593+ if (vm.count("dispatcher-plugin"))
594+ {
595+ PluginRegistry::DispatcherPluginMap::iterator it;
596+ it= PluginRegistry::singleton().dispatcher_plugins.find(
597+ vm["dispatcher-plugin"].as<std::string>());
598+ if (it == PluginRegistry::singleton().dispatcher_plugins.end())
599+ {
600+ fprintf(stderr, _("Invalid Dispatcher Plugin\n"));
601+ return -1;
602+ }
603+ g_dispatcher_plugin= it->second;
604+ }
605+ else
606+ {
607+ PluginRegistry::DispatcherPluginMap::iterator it;
608+ it= PluginRegistry::singleton().dispatcher_plugins.
609+ find("thread-per-connection");
610+ if (it == PluginRegistry::singleton().dispatcher_plugins.end())
611+ {
612+ fprintf(stderr, _("Invalid Dispatcher plugin\n"));
613+ return -1;
614+ }
615+ g_dispatcher_plugin= it->second;
616+ }
617+ g_dispatcher_plugin->active= true;
618+
619 if (vm.count("help") || argc==1)
620 {
621 help(options_description);
622@@ -299,6 +343,7 @@
623 std::cerr << _("Database Plugin: ") << g_dbclient_plugin->name << std::endl;
624 std::cerr << _(" Running...") << std::endl;
625
626+ g_dispatcher_plugin->run();
627 g_input_plugin->run(*r);
628
629 BOOST_FOREACH(const PluginRegistry::ReportPluginPair pp,
630
631=== modified file 'percona_playback/plugin.h'
632--- percona_playback/plugin.h 2012-06-24 04:28:41 +0000
633+++ percona_playback/plugin.h 2012-11-23 20:15:24 +0000
634@@ -20,6 +20,9 @@
635 #include <vector>
636 #include <string>
637 #include <map>
638+#include <boost/shared_ptr.hpp>
639+#include <boost/function.hpp>
640+#include "percona_playback/query_entry.h"
641 #include <percona_playback/visibility.h>
642 #include <percona_playback/version.h>
643
644@@ -31,6 +34,7 @@
645 }
646
647 class DBThread;
648+class DBThreadState;
649 class QueryResult;
650 class percona_playback_run_result;
651
652@@ -102,6 +106,21 @@
653
654 };
655
656+class DispatcherPlugin : public plugin
657+{
658+ public:
659+ std::string name;
660+
661+ DispatcherPlugin(const std::string &_name) : name(_name) {}
662+
663+ virtual void dispatch(QueryEntryPtr query_entry)= 0;
664+ virtual bool finish_and_wait(uint64_t thread_id)= 0;
665+ virtual void finish_all_and_wait()= 0;
666+
667+ virtual void run() {};
668+
669+};
670+
671 class PluginRegistry
672 {
673 public:
674@@ -122,6 +141,9 @@
675 typedef std::map<std::string, InputPlugin*> InputPluginMap;
676 typedef std::pair<std::string, InputPlugin*> InputPluginPair;
677
678+ typedef std::map<std::string, DispatcherPlugin*> DispatcherPluginMap;
679+ typedef std::pair<std::string, DispatcherPlugin*> DispatcherPluginPair;
680+
681 typedef std::map<std::string, plugin*> PluginMap;
682 typedef std::pair<std::string, plugin*> PluginPair;
683
684@@ -129,6 +151,7 @@
685 DBClientPluginMap dbclient_plugins;
686 ReportPluginMap report_plugins;
687 InputPluginMap input_plugins;
688+ DispatcherPluginMap dispatcher_plugins;
689
690 void add(const std::string &name, plugin* plugin_object)
691 {
692@@ -154,6 +177,12 @@
693 all_plugins.insert(PluginPair(name, input_plugin));
694 }
695
696+ void add(const std::string &name, DispatcherPlugin* dispatcher_plugin)
697+ {
698+ dispatcher_plugins.insert(DispatcherPluginPair(name, dispatcher_plugin));
699+ all_plugins.insert(PluginPair(name, dispatcher_plugin));
700+ }
701+
702 };
703
704
705
706=== modified file 'percona_playback/query_entry.h'
707--- percona_playback/query_entry.h 2012-07-03 02:37:51 +0000
708+++ percona_playback/query_entry.h 2012-11-23 20:15:24 +0000
709@@ -45,7 +45,8 @@
710 class FinishEntry : public QueryEntry
711 {
712 public:
713- FinishEntry() { set_shutdown(); }
714+ FinishEntry(uint64_t _thread_id) :
715+ QueryEntry (_thread_id, true) {}
716 bool is_quit() { return false; }
717
718 void execute(DBThread *) {}
719
720=== modified file 'percona_playback/query_log/query_log.cc'
721--- percona_playback/query_log/query_log.cc 2012-11-08 22:17:07 +0000
722+++ percona_playback/query_log/query_log.cc 2012-11-23 20:15:24 +0000
723@@ -39,7 +39,6 @@
724 #include <percona_playback/db_thread.h>
725 #include <percona_playback/query_log/query_log.h>
726 #include <percona_playback/query_result.h>
727-#include "percona_playback/dispatcher.h"
728 #include <percona_playback/gettext.h>
729
730 #include <boost/foreach.hpp>
731@@ -51,6 +50,8 @@
732 static bool g_run_set_timestamp;
733 static bool g_preserve_query_time;
734
735+extern percona_playback::DispatcherPlugin *g_dispatcher_plugin;
736+
737 class ParseQueryLogFunc: public tbb::filter {
738 public:
739 ParseQueryLogFunc(FILE *input_file_,
740@@ -83,7 +84,7 @@
741 new std::vector<boost::shared_ptr<QueryLogEntry> >();
742
743 boost::shared_ptr<QueryLogEntry> tmp_entry(new QueryLogEntry());
744- entries->push_back(tmp_entry);
745+ // entries->push_back(tmp_entry);
746
747 char *line= NULL;
748 size_t buflen = 0;
749@@ -130,9 +131,10 @@
750
751 if (strncmp(p, "# User@Host", strlen("# User@Host")) == 0)
752 {
753+ if (!tmp_entry->getQuery().empty())
754+ entries->push_back(tmp_entry);
755 count++;
756 tmp_entry.reset(new QueryLogEntry());
757- entries->push_back(tmp_entry);
758 (*this->nr_entries)++;
759 }
760
761@@ -172,6 +174,9 @@
762 p= line;
763 }
764
765+ if (!tmp_entry->getQuery().empty())
766+ entries->push_back(tmp_entry);
767+
768 free(line);
769 return entries;
770 }
771@@ -239,7 +244,19 @@
772 && s.compare(0, timestamp_query.length(), timestamp_query) == 0)
773 set_timestamp_query= s;
774 else
775- query.append(s);
776+ {
777+ //Append space insead of \r\n
778+ std::string::const_iterator end = s.end() - 1;
779+ if (s.length() >= 2 && *(s.end() - 2) == '\r')
780+ --end;
781+ //Remove initial spaces for best query viewing in reports
782+ std::string::const_iterator begin;
783+ for (begin = s.begin(); begin != end; ++begin)
784+ if (*begin != ' ' && *begin != '\t')
785+ break;
786+ query.append(begin, end);
787+ query.append(" ");
788+ }
789 }
790
791 bool QueryLogEntry::parse_metadata(const std::string &s)
792@@ -281,13 +298,13 @@
793 r= true;
794 }
795 }
796-
797+/*
798 if (s[0] == '#' && strncmp(s.c_str(), "# administrator", strlen("# administrator")))
799 {
800 query.append(s);
801 r= true;
802 }
803-
804+*/
805 return r;
806 }
807
808@@ -300,7 +317,7 @@
809 for (unsigned int i=0; i< input->size(); i++)
810 {
811 // usleep(10);
812- g_dispatcher.dispatch((*input)[i]);
813+ g_dispatcher_plugin->dispatch((*input)[i]);
814 }
815 delete input;
816 return NULL;
817@@ -330,7 +347,7 @@
818 p.add_filter(f4);
819 p.run(2);
820
821- g_dispatcher.finish_all_and_wait();
822+ g_dispatcher_plugin->finish_all_and_wait();
823
824 r->n_log_entries= entries;
825 r->n_queries= queries;
826
827=== modified file 'percona_playback/tcpdump/connection_state.cc'
828--- percona_playback/tcpdump/connection_state.cc 2012-07-02 17:51:47 +0000
829+++ percona_playback/tcpdump/connection_state.cc 2012-11-23 20:15:24 +0000
830@@ -25,9 +25,11 @@
831 #include "percona_playback/plugin.h"
832
833 extern percona_playback::DBClientPlugin *g_dbclient_plugin;
834+extern percona_playback::DispatcherPlugin *g_dispatcher_plugin;
835
836 void
837-ConnectionState::ProcessMysqlPkts(const u_char *pkts,
838+ConnectionState::ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr,
839+ const u_char *pkts,
840 u_int pkts_len,
841 const timeval &ts,
842 const AddrPort &addr_port,
843@@ -76,13 +78,13 @@
844 {
845 case PKT_QUERY:
846 if (!query.empty())
847- DispatchQuery(ts, query, addr_port);
848+ DispatchQuery(cs_ptr, ts, query, addr_port);
849 ++stats.nr_of_parsed_queries;
850 ++stats.nr_of_parsed_packets;
851 break;
852
853 case PKT_RESULT:
854- DispatchResult(ts, addr_port);
855+ DispatchResult(cs_ptr, ts, addr_port);
856 ++stats.nr_of_parsed_packets;
857 break;
858
859@@ -107,12 +109,6 @@
860
861 }
862
863-void
864-ConnectionState::ProcessFinishConnection()
865-{
866- db_thread->queries.push(QueryEntryPtr(new FinishEntry()));
867-}
868-
869 PktResult
870 ConnectionState::ParseMysqlPkt(IN UCharBuffer &buff,
871 OUT std::string &query)
872@@ -303,23 +299,32 @@
873 }
874
875 void
876-ConnectionState::DispatchQuery(const timeval &ts,
877+ConnectionState::DispatchQuery(/* Having shared pointer to "this" as an argument
878+ is just temporary step. We need a shared pointer
879+ to connection state inside of query entry. The
880+ better way to do this is to move code that creates
881+ query entries to the upper layer.
882+ */
883+ boost::shared_ptr<ConnectionState> cs_ptr,
884+ const timeval &ts,
885 const std::string &query,
886 const AddrPort &addr_port)
887 {
888 boost::shared_ptr<TcpdumpQueryEntry>
889- query_entry(new TcpdumpQueryEntry(ts,
890+ query_entry(new TcpdumpQueryEntry(cs_ptr,
891+ ts,
892 query,
893 addr_port));
894- db_thread->queries.push(query_entry);
895+ g_dispatcher_plugin->dispatch(query_entry);
896 }
897
898 void
899-ConnectionState::DispatchResult(const timeval &ts,
900+ConnectionState::DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr,
901+ const timeval &ts,
902 const AddrPort &addr_port)
903 {
904 boost::shared_ptr<TcpdumpResultEntry>
905- result_entry(new TcpdumpResultEntry(addr_port, ts, last_query_result));
906- db_thread->queries.push(result_entry);
907+ result_entry(new TcpdumpResultEntry(cs_ptr, addr_port, ts, last_query_result));
908+ g_dispatcher_plugin->dispatch(result_entry);
909 }
910
911
912=== modified file 'percona_playback/tcpdump/connection_state.h'
913--- percona_playback/tcpdump/connection_state.h 2012-07-03 07:41:49 +0000
914+++ percona_playback/tcpdump/connection_state.h 2012-11-23 20:15:24 +0000
915@@ -94,8 +94,7 @@
916 };
917
918
919-class ConnectionState : public DBThreadState
920-
921+class ConnectionState
922 {
923
924 public:
925@@ -107,7 +106,7 @@
926 SERVER
927 };
928
929- ConnectionState() :
930+ ConnectionState(uint64_t _thread_id) :
931 current_origin(UNDEF),
932 fragmented(false),
933 handshake_from_client(false),
934@@ -126,7 +125,7 @@
935 (drizzle_con_options_t)
936 DRIZZLE_CON_MYSQL),
937 drizzle_con_free),
938- db_thread(NULL)
939+ thread_id(_thread_id)
940 {
941 drizzle_con->result= NULL;
942 }
943@@ -136,18 +135,15 @@
944 drizzle_result_free_all(drizzle_con.get());
945 }
946
947- void ProcessMysqlPkts(const u_char *pkts,
948+ void ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr,
949+ const u_char *pkts,
950 u_int total_len,
951 const timeval &ts,
952 const AddrPort &addr_port,
953 OUT TcpdumpMysqlParserStats &stats);
954
955- void ProcessFinishConnection();
956-
957 void SetCurrentOrigin(Origin o) { current_origin = o; }
958
959- void SetDBThread(DBThread *t) { db_thread= t; }
960-
961 LastExecutedQueryInfo last_executed_query_info;
962
963 private:
964@@ -159,11 +155,13 @@
965 PktResult ServerPacket(IN UCharBuffer &buff);
966 PktResult ClientPacket(IN UCharBuffer &buff, OUT std::string &query);
967
968- void DispatchQuery(const timeval &ts,
969+ void DispatchQuery(boost::shared_ptr<ConnectionState> cs_ptr,
970+ const timeval &ts,
971 const std::string &query,
972 const AddrPort &addr_port);
973
974- void DispatchResult(const timeval &ts,
975+ void DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr,
976+ const timeval &ts,
977 const AddrPort &addr_port);
978
979 Origin current_origin;
980@@ -179,7 +177,7 @@
981
982 QueryResult last_query_result;
983
984- DBThread *db_thread;
985+ uint64_t thread_id;
986
987 };
988
989
990=== modified file 'percona_playback/tcpdump/pcap_packets_parser.cc'
991--- percona_playback/tcpdump/pcap_packets_parser.cc 2012-07-03 07:41:49 +0000
992+++ percona_playback/tcpdump/pcap_packets_parser.cc 2012-11-23 20:15:24 +0000
993@@ -18,7 +18,6 @@
994
995 #include "percona_playback/db_thread.h"
996 #include "percona_playback/plugin.h"
997-#include "percona_playback/dispatcher.h"
998
999 #include <netinet/in.h>
1000 #include <arpa/inet.h>
1001@@ -29,6 +28,7 @@
1002 #define SNAP_LEN 16500 // pcap's max capture size
1003
1004 extern percona_playback::DBClientPlugin *g_dbclient_plugin;
1005+extern percona_playback::DispatcherPlugin *g_dispatcher_plugin;
1006
1007 void
1008 PcapPacketsParser::ParsePkt(const struct pcap_pkthdr *header,
1009@@ -81,7 +81,8 @@
1010 if (size_mysql == 0 &&
1011 ((tcp->th_flags & TH_FIN) || (tcp->th_flags & TH_RST)))
1012 {
1013- g_dispatcher.finish_and_wait(addr_port.ThreadId());
1014+ g_dispatcher_plugin->finish_and_wait(addr_port.ThreadId());
1015+ RemoveConnectionState(addr_port.ThreadId());
1016 return;
1017 }
1018
1019@@ -97,37 +98,50 @@
1020 }
1021
1022 /* If there is no DBThread with such id create it */
1023- boost::shared_ptr<DBThreadState>
1024- state= g_dispatcher.get_thread_state(addr_port.ThreadId(),
1025- boost::bind(&PcapPacketsParser::CreateConnectionState,
1026- this,
1027- _1));
1028+ boost::shared_ptr<ConnectionState>
1029+ state= GetConnectionState(addr_port.ThreadId());
1030
1031 assert(state.get());
1032
1033- ((ConnectionState *)state.get())->SetCurrentOrigin(origin);
1034- ((ConnectionState *)state.get())->ProcessMysqlPkts(mysql,
1035- size_mysql,
1036- header->ts,
1037- addr_port,
1038- stats);
1039+ state->SetCurrentOrigin(origin);
1040+ state->ProcessMysqlPkts(state,
1041+ mysql,
1042+ size_mysql,
1043+ header->ts,
1044+ addr_port,
1045+ stats);
1046 }
1047
1048 void
1049 PcapPacketsParser::WaitForUnfinishedTasks()
1050 {
1051- g_dispatcher.finish_all_and_wait();
1052-}
1053-
1054-void
1055-PcapPacketsParser::CreateConnectionState(DBThread *db_thread)
1056-{
1057- assert(db_thread);
1058- boost::shared_ptr<ConnectionState> state(new ConnectionState());
1059+ g_dispatcher_plugin->finish_all_and_wait();
1060+}
1061+
1062+boost::shared_ptr<ConnectionState>
1063+PcapPacketsParser::GetConnectionState(uint64_t thread_id)
1064+{
1065+ Connections::iterator it = connections.find(thread_id);
1066+ if (it == connections.end())
1067+ return CreateConnectionState(thread_id);
1068+ return it->second;
1069+}
1070+
1071+boost::shared_ptr<ConnectionState>
1072+PcapPacketsParser::CreateConnectionState(uint64_t thread_id)
1073+{
1074+ boost::shared_ptr<ConnectionState> state(new ConnectionState(thread_id));
1075 state->last_executed_query_info.end_pcap_timestamp=
1076 first_packet_pcap_timestamp;
1077 state->last_executed_query_info.end_timestamp=
1078 first_packet_timestamp;
1079- db_thread->set_state(state);
1080- state->SetDBThread(db_thread);
1081-}
1082+ connections[thread_id] = state;
1083+ return state;
1084+}
1085+
1086+void
1087+PcapPacketsParser::RemoveConnectionState(uint64_t thread_id)
1088+{
1089+ connections.erase(thread_id);
1090+}
1091+
1092
1093=== modified file 'percona_playback/tcpdump/pcap_packets_parser.h'
1094--- percona_playback/tcpdump/pcap_packets_parser.h 2012-06-19 15:26:53 +0000
1095+++ percona_playback/tcpdump/pcap_packets_parser.h 2012-11-23 20:15:24 +0000
1096@@ -23,6 +23,8 @@
1097 #include <tbb/concurrent_queue.h>
1098 #include <boost/thread.hpp>
1099 #include <boost/bind.hpp>
1100+#include <boost/unordered_map.hpp>
1101+#include <boost/shared_ptr.hpp>
1102
1103 class DBThread;
1104
1105@@ -32,10 +34,14 @@
1106 timeval first_packet_timestamp;
1107 timeval first_packet_pcap_timestamp;
1108 bool was_first_packet;
1109+ typedef boost::unordered_map<uint64_t,
1110+ boost::shared_ptr<ConnectionState> >
1111+ Connections;
1112+ Connections connections;
1113
1114 public:
1115
1116- PcapPacketsParser() : was_first_packet(false)
1117+ PcapPacketsParser() : was_first_packet(false), connections(10000)
1118 {
1119 first_packet_timestamp.tv_sec= first_packet_timestamp.tv_usec= 0;
1120 first_packet_pcap_timestamp.tv_sec= first_packet_pcap_timestamp.tv_usec= 0;
1121@@ -52,9 +58,11 @@
1122
1123 void WaitForUnfinishedTasks();
1124
1125- void CreateConnectionState(DBThread *db_thread);
1126
1127 private:
1128+ boost::shared_ptr<ConnectionState> CreateConnectionState(uint64_t thread_id);
1129+ boost::shared_ptr<ConnectionState> GetConnectionState(uint64_t thread_id);
1130+ void RemoveConnectionState(uint64_t thread_id);
1131
1132 void ParsePkt(const struct pcap_pkthdr *header,
1133 const u_char *packet);
1134
1135=== modified file 'percona_playback/tcpdump/tcpdump_query_entries.cc'
1136--- percona_playback/tcpdump/tcpdump_query_entries.cc 2012-07-09 11:54:52 +0000
1137+++ percona_playback/tcpdump/tcpdump_query_entries.cc 2012-11-23 20:15:24 +0000
1138@@ -30,15 +30,10 @@
1139 QueryResult expected_result;
1140 timeval start_time;
1141 timeval end_time;
1142- boost::shared_ptr<DBThreadState> thr_state;
1143
1144 assert(t);
1145
1146- thr_state= t->get_state();
1147-
1148- assert(thr_state.get());
1149-
1150- ConnectionState &state= (ConnectionState &)*thr_state;
1151+ ConnectionState &state= *connection_state;
1152
1153 LastExecutedQueryInfo &last_query_info= state.last_executed_query_info;
1154
1155@@ -96,12 +91,7 @@
1156
1157 assert(t);
1158
1159- thr_state= t->get_state();
1160-
1161- assert(thr_state.get());
1162-
1163-
1164- ConnectionState &state= (ConnectionState &)*thr_state;
1165+ ConnectionState &state= *connection_state;
1166 LastExecutedQueryInfo &last_query_info= state.last_executed_query_info;
1167
1168 timersub(&pcap_timestamp,
1169
1170=== modified file 'percona_playback/tcpdump/tcpdump_query_entries.h'
1171--- percona_playback/tcpdump/tcpdump_query_entries.h 2012-07-03 07:41:49 +0000
1172+++ percona_playback/tcpdump/tcpdump_query_entries.h 2012-11-23 20:15:24 +0000
1173@@ -21,9 +21,12 @@
1174 #include "connection_state.h"
1175
1176 #include <sys/time.h>
1177+#include <boost/shared_ptr.hpp>
1178
1179 class TcpdumpQueryEntry : public QueryEntry
1180 {
1181+
1182+ boost::shared_ptr<ConnectionState> connection_state;
1183 timeval pcap_timestamp;
1184 std::string query;
1185
1186@@ -34,10 +37,12 @@
1187 pcap_timestamp.tv_sec= pcap_timestamp.tv_usec= 0;
1188 }
1189
1190- TcpdumpQueryEntry(const timeval &_pcap_timestamp,
1191+ TcpdumpQueryEntry(boost::shared_ptr<ConnectionState> _connection_state,
1192+ const timeval &_pcap_timestamp,
1193 const std::string &_query,
1194 const AddrPort &addr_port) :
1195 QueryEntry(addr_port.ThreadId(), false),
1196+ connection_state(_connection_state),
1197 pcap_timestamp(_pcap_timestamp),
1198 query(_query)
1199 {}
1200@@ -50,15 +55,18 @@
1201 class TcpdumpResultEntry : public QueryEntry
1202 {
1203
1204+ boost::shared_ptr<ConnectionState> connection_state;
1205 timeval pcap_timestamp;
1206 QueryResult expected_result;
1207
1208 public:
1209 TcpdumpResultEntry() {}
1210- TcpdumpResultEntry(const AddrPort &addr_port,
1211+ TcpdumpResultEntry(boost::shared_ptr<ConnectionState> _connection_state,
1212+ const AddrPort &addr_port,
1213 const timeval &_pcap_timestamp,
1214 const QueryResult &_expected_result) :
1215 QueryEntry(addr_port.ThreadId(), false),
1216+ connection_state(_connection_state),
1217 pcap_timestamp(_pcap_timestamp),
1218 expected_result(_expected_result)
1219 {}
1220
1221=== added file 'percona_playback/test/thread-pool-sysbench-slow.cc'
1222--- percona_playback/test/thread-pool-sysbench-slow.cc 1970-01-01 00:00:00 +0000
1223+++ percona_playback/test/thread-pool-sysbench-slow.cc 2012-11-23 20:15:24 +0000
1224@@ -0,0 +1,60 @@
1225+/* BEGIN LICENSE
1226+ * Copyright (C) 2011 Stewart Smith <stewart@flamingspork.com>
1227+ * This program is free software: you can redistribute it and/or modify it
1228+ * under the terms of the GNU General Public License version 3, as published
1229+ * by the Free Software Foundation.
1230+ *
1231+ * This program is distributed in the hope that it will be useful, but
1232+ * WITHOUT ANY WARRANTY; without even the implied warranties of
1233+ * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1234+ * PURPOSE. See the GNU General Public License for more details.
1235+ *
1236+ * You should have received a copy of the GNU General Public License along
1237+ * with this program. If not, see <http://www.gnu.org/licenses/>.
1238+ * END LICENSE */
1239+
1240+#include "config.h"
1241+
1242+#include <iostream>
1243+#include <cstdio>
1244+#include <string>
1245+#include <cstring>
1246+#include <assert.h>
1247+#include <unistd.h>
1248+
1249+#include <percona_playback/percona_playback.h>
1250+
1251+/**
1252+ * @TODO Actually write a real test suite here
1253+ */
1254+int main(int argc, char **argv)
1255+{
1256+ (void)argc; (void)argv;
1257+
1258+ fprintf(stderr, "Working dir: %s\n\n", get_current_dir_name());
1259+
1260+ percona_playback_st *the_percona_playback= percona_playback_create("test_Percona Playback");
1261+ assert(the_percona_playback);
1262+
1263+ char dbplugin[]="--db-plugin=null";
1264+ char querylog[]="--query-log-file="SRCDIR"/percona_playback/test/sysbench-slow.log";
1265+ char threadpool[]="--dispatcher-plugin=thread-pool"
1266+
1267+ char **dbplugin_argv= new char*[4];
1268+ dbplugin_argv[0]= argv[0];
1269+ dbplugin_argv[1]= dbplugin;
1270+ dbplugin_argv[2]= querylog;
1271+ dbplugin_argv[3]= threadpool;
1272+
1273+
1274+ assert(0 == percona_playback_argv(the_percona_playback, 3, dbplugin_argv));
1275+
1276+ struct percona_playback_run_result *r= percona_playback_run(the_percona_playback);
1277+
1278+ assert(r->err == 0);
1279+ assert(r->n_queries == 10150);
1280+ assert(r->n_log_entries = 10149);
1281+
1282+ percona_playback_destroy(&the_percona_playback);
1283+ return r->err;
1284+}
1285
1286=== added directory 'percona_playback/thread_per_connection'
1287=== added file 'percona_playback/thread_per_connection/plugin.ini'
1288--- percona_playback/thread_per_connection/plugin.ini 1970-01-01 00:00:00 +0000
1289+++ percona_playback/thread_per_connection/plugin.ini 2012-11-23 20:15:24 +0000
1290@@ -0,0 +1,6 @@
1291+[plugin]
1292+title=Thread-per-connection dispatcher plugin
1293+description=Creates thread per each DB connection
1294+version=0.1
1295+static=yes
1296+load_by_default=yes
1297
1298=== added file 'percona_playback/thread_per_connection/thread_per_connection.cc'
1299--- percona_playback/thread_per_connection/thread_per_connection.cc 1970-01-01 00:00:00 +0000
1300+++ percona_playback/thread_per_connection/thread_per_connection.cc 2012-11-23 20:15:24 +0000
1301@@ -0,0 +1,109 @@
1302+/* BEGIN LICENSE
1303+ * Copyright (C) 2011-2012 Percona Inc.
1304+ * This program is free software: you can redistribute it and/or modify it
1305+ * under the terms of the GNU General Public License version 2, as published
1306+ * by the Free Software Foundation.
1307+ *
1308+ * This program is distributed in the hope that it will be useful, but
1309+ * WITHOUT ANY WARRANTY; without even the implied warranties of
1310+ * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1311+ * PURPOSE. See the GNU General Public License for more details.
1312+ *
1313+ * You should have received a copy of the GNU General Public License along
1314+ * with this program. If not, see <http://www.gnu.org/licenses/>.
1315+ * END LICENSE */
1316+
1317+#include "percona_playback/plugin.h"
1318+#include "percona_playback/db_thread.h"
1319+
1320+#include <tbb/concurrent_hash_map.h>
1321+
1322+class ThreadPerConnectionDispatcher :
1323+ public percona_playback::DispatcherPlugin
1324+{
1325+ typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable;
1326+ DBExecutorsTable executors;
1327+ void db_thread_func(DBThread *thread);
1328+ void start_thread(DBThread *thread);
1329+
1330+public:
1331+ ThreadPerConnectionDispatcher(std::string _name) :
1332+ DispatcherPlugin(_name) {}
1333+
1334+ void dispatch(QueryEntryPtr query_entry);
1335+ bool finish_and_wait(uint64_t thread_id);
1336+ void finish_all_and_wait();
1337+};
1338+
1339+extern percona_playback::DBClientPlugin *g_dbclient_plugin;
1340+
1341+void
1342+ThreadPerConnectionDispatcher::dispatch(QueryEntryPtr query_entry)
1343+{
1344+ uint64_t thread_id= query_entry->getThreadId();
1345+ {
1346+ DBExecutorsTable::accessor a;
1347+ if (executors.insert(a, thread_id))
1348+ {
1349+ DBThread *db_thread= g_dbclient_plugin->create(thread_id);
1350+ a->second= db_thread;
1351+ db_thread->start_thread();
1352+ }
1353+ a->second->queries->push(query_entry);
1354+ }
1355+}
1356+
1357+bool
1358+ThreadPerConnectionDispatcher::finish_and_wait(uint64_t thread_id)
1359+{
1360+ DBThread *db_thread= NULL;
1361+ {
1362+ DBExecutorsTable::accessor a;
1363+ if (executors.find(a, thread_id))
1364+ {
1365+ db_thread= a->second;
1366+ executors.erase(a);
1367+ }
1368+ }
1369+
1370+ if (!db_thread)
1371+ return false;
1372+
1373+ db_thread->queries->push(QueryEntryPtr(new FinishEntry(thread_id)));
1374+ db_thread->join();
1375+
1376+ delete db_thread;
1377+
1378+ return true;
1379+}
1380+
1381+void
1382+ThreadPerConnectionDispatcher::finish_all_and_wait()
1383+{
1384+ QueryEntryPtr shutdown_command(new FinishEntry(0));
1385+
1386+ while(executors.size())
1387+ {
1388+ uint64_t thread_id;
1389+ DBThread *t;
1390+ {
1391+ DBExecutorsTable::const_iterator iter= executors.begin();
1392+ thread_id= (*iter).first;
1393+ t= (*iter).second;
1394+ }
1395+ executors.erase(thread_id);
1396+
1397+ t->queries->push(shutdown_command);
1398+ t->join();
1399+
1400+ delete t;
1401+ }
1402+}
1403+
1404+static void init_plugin(percona_playback::PluginRegistry &r)
1405+{
1406+ r.add("thread-per-connection",
1407+ new ThreadPerConnectionDispatcher("thread-per-connection"));
1408+}
1409+
1410+PERCONA_PLAYBACK_PLUGIN(init_plugin);
1411
1412=== added directory 'percona_playback/thread_pool'
1413=== added file 'percona_playback/thread_pool/plugin.ini'
1414--- percona_playback/thread_pool/plugin.ini 1970-01-01 00:00:00 +0000
1415+++ percona_playback/thread_pool/plugin.ini 2012-11-23 20:15:24 +0000
1416@@ -0,0 +1,6 @@
1417+[plugin]
1418+title=Thread-pool dispatcher plugin
1419+description=Creates threads pool to process connections to server
1420+version=0.1
1421+static=yes
1422+load_by_default=yes
1423
1424=== added file 'percona_playback/thread_pool/thread_pool.cc'
1425--- percona_playback/thread_pool/thread_pool.cc 1970-01-01 00:00:00 +0000
1426+++ percona_playback/thread_pool/thread_pool.cc 2012-11-23 20:15:24 +0000
1427@@ -0,0 +1,130 @@
1428+/* BEGIN LICENSE
1429+ * Copyright (C) 2011-2012 Percona Inc.
1430+ * This program is free software: you can redistribute it and/or modify it
1431+ * under the terms of the GNU General Public License version 2, as published
1432+ * by the Free Software Foundation.
1433+ *
1434+ * This program is distributed in the hope that it will be useful, but
1435+ * WITHOUT ANY WARRANTY; without even the implied warranties of
1436+ * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1437+ * PURPOSE. See the GNU General Public License for more details.
1438+ *
1439+ * You should have received a copy of the GNU General Public License along
1440+ * with this program. If not, see <http://www.gnu.org/licenses/>.
1441+ * END LICENSE */
1442+
1443+#include "percona_playback/plugin.h"
1444+#include "percona_playback/db_thread.h"
1445+#include <percona_playback/gettext.h>
1446+#include <boost/program_options.hpp>
1447+#include <boost/shared_ptr.hpp>
1448+#include <boost/crc.hpp>
1449+#include <vector>
1450+
1451+#include <tbb/concurrent_hash_map.h>
1452+#include <stdio.h>
1453+
1454+extern percona_playback::DBClientPlugin *g_dbclient_plugin;
1455+
1456+class ThreadPoolDispatcher :
1457+ public percona_playback::DispatcherPlugin
1458+{
1459+ typedef std::vector<boost::shared_ptr<DBThread> > Workers;
1460+
1461+ unsigned threads_count;
1462+ boost::
1463+ program_options::
1464+ options_description options;
1465+ Workers workers;
1466+
1467+public:
1468+ ThreadPoolDispatcher(std::string _name) :
1469+ DispatcherPlugin(_name),
1470+ threads_count(0),
1471+ options("Threads-pool Options") {}
1472+
1473+ virtual void dispatch(QueryEntryPtr query_entry);
1474+ virtual bool finish_and_wait(uint64_t) { return true; }
1475+ virtual void finish_all_and_wait();
1476+ virtual void run();
1477+
1478+ boost::program_options::options_description* getProgramOptions();
1479+ int processOptions(boost::program_options::variables_map &vm);
1480+
1481+};
1482+
1483+void ThreadPoolDispatcher::run()
1484+{
1485+ for (unsigned i = 0; i < threads_count; ++i)
1486+ {
1487+ boost::shared_ptr<DBThread> db_thread(g_dbclient_plugin->create(i));
1488+ workers.push_back(db_thread);
1489+ db_thread->start_thread();
1490+ }
1491+}
1492+
1493+void ThreadPoolDispatcher::dispatch(QueryEntryPtr query_entry)
1494+{
1495+ /*
1496+ Each worker has its own queue. For some types of input plugins
1497+ it is important to execute query entries with the same thread id
1498+ by the same worker. That is why we choose worker by simple hash from
1499+ thread id.
1500+ */
1501+ uint64_t thread_id= query_entry->getThreadId();
1502+ boost::crc_32_type crc;
1503+ crc.process_bytes(&thread_id, sizeof(thread_id));
1504+ uint32_t worker_index = crc.checksum() % workers.size();
1505+ workers[worker_index]->queries->push(query_entry);
1506+}
1507+
1508+void ThreadPoolDispatcher::finish_all_and_wait()
1509+{
1510+ QueryEntryPtr shutdown_command(new FinishEntry(0));
1511+ for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i)
1512+ (*i)->queries->push(shutdown_command);
1513+ for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i)
1514+ (*i)->join();
1515+ workers.clear();
1516+}
1517+
1518+
1519+boost::program_options::options_description*
1520+ThreadPoolDispatcher::getProgramOptions()
1521+{
1522+ unsigned default_threads_count =
1523+ boost::thread::hardware_concurrency();
1524+ options.add_options()
1525+ ("thread-pool-threads-count",
1526+ boost::program_options::value<unsigned>(&threads_count)
1527+ ->default_value(default_threads_count),
1528+ _("The number of threads in thread pool. If this options is omitted "
1529+ "the number of threads equals to hardware concurency."))
1530+ ;
1531+
1532+ return &options;
1533+
1534+}
1535+
1536+int
1537+ThreadPoolDispatcher::processOptions(boost::program_options::variables_map &vm)
1538+{
1539+ if (!active &&
1540+ !vm["thread-pool-threads-count"].defaulted())
1541+ {
1542+ fprintf(stderr, _("thread-pool plugin is not selected, "
1543+ "you shouldn't use this plugin-related "
1544+ "command line options\n"));
1545+ return -1;
1546+ }
1547+
1548+ return 0;
1549+}
1550+
1551+static void init_plugin(percona_playback::PluginRegistry &r)
1552+{
1553+ r.add("thread-pool",
1554+ new ThreadPoolDispatcher("thread-pool"));
1555+}
1556+
1557+PERCONA_PLAYBACK_PLUGIN(init_plugin);

Subscribers

People subscribed via source and target branches

to all changes: