Merge lp:~vlad-lesin/percona-playback/connections_pool into lp:percona-playback

Proposed by Vlad Lesin
Status: Merged
Approved by: Stewart Smith
Approved revision: 178
Merged at revision: 174
Proposed branch: lp:~vlad-lesin/percona-playback/connections_pool
Merge into: lp:percona-playback
Diff against target: 1557 lines (+617/-299)
24 files modified
Makefile.am (+6/-3)
percona_playback/db_thread.cc (+8/-10)
percona_playback/db_thread.h (+18/-23)
percona_playback/dispatcher.cc (+0/-105)
percona_playback/dispatcher.h (+0/-46)
percona_playback/libdrizzle_client/libdrizzle_client.cc (+4/-3)
percona_playback/mysql_client/mysql_client.cc (+69/-27)
percona_playback/mysql_client/mysql_client.h (+5/-2)
percona_playback/null_dbclient/null_dbclient.cc (+4/-3)
percona_playback/percona_playback.cc (+46/-1)
percona_playback/plugin.h (+29/-0)
percona_playback/query_entry.h (+2/-1)
percona_playback/query_log/query_log.cc (+25/-8)
percona_playback/tcpdump/connection_state.cc (+20/-15)
percona_playback/tcpdump/connection_state.h (+10/-12)
percona_playback/tcpdump/pcap_packets_parser.cc (+38/-24)
percona_playback/tcpdump/pcap_packets_parser.h (+10/-2)
percona_playback/tcpdump/tcpdump_query_entries.cc (+2/-12)
percona_playback/tcpdump/tcpdump_query_entries.h (+10/-2)
percona_playback/test/thread-pool-sysbench-slow.cc (+60/-0)
percona_playback/thread_per_connection/plugin.ini (+6/-0)
percona_playback/thread_per_connection/thread_per_connection.cc (+109/-0)
percona_playback/thread_pool/plugin.ini (+6/-0)
percona_playback/thread_pool/thread_pool.cc (+130/-0)
To merge this branch: bzr merge lp:~vlad-lesin/percona-playback/connections_pool
Reviewer Review Type Date Requested Status
Stewart Smith (community) Approve
Review via email: mp+134884@code.launchpad.net

Description of the change

A bunch of changes for implementing this https://blueprints.launchpad.net/percona-playback/+spec/pool-of-threads blueprint as well as fixes for some critical bugs.

To post a comment you must log in.
171. By Vlad Lesin

Fixes for bug #1070824 and bug #1080654

172. By Vlad Lesin

merged with trunk

173. By Vlad Lesin

Send init session query on reconnect

174. By Vlad Lesin

Initialize session init query with the value from cmd line options.

175. By Vlad Lesin

Thread-safety fixes for mysql client plugin.

176. By Vlad Lesin

Don't push empty query log queries for execution

177. By Vlad Lesin

Convert multiline query to single line to avoid syntax errors from server.

178. By Vlad Lesin

Allow multi-statement queries.

Revision history for this message
Stewart Smith (stewart) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Makefile.am'
2--- Makefile.am 2012-11-08 22:17:07 +0000
3+++ Makefile.am 2012-11-23 20:15:24 +0000
4@@ -59,13 +59,11 @@
5 libpercona_playback_la_SOURCES = \
6 percona_playback/percona_playback.cc \
7 percona_playback/plugin.cc \
8- percona_playback/db_thread.cc \
9- percona_playback/dispatcher.cc
10+ percona_playback/db_thread.cc
11
12 nobase_include_HEADERS += \
13 percona_playback/percona_playback.h \
14 percona_playback/db_thread.h \
15- percona_playback/dispatcher.h \
16 percona_playback/query_result.h \
17 percona_playback/plugin.h \
18 percona_playback/tokenize.h \
19@@ -89,6 +87,7 @@
20 percona_playback/test/crashme-slow \
21 percona_playback/test/sqlbench-transactions-slow \
22 percona_playback/test/sysbench-slow \
23+ percona_playback/test/thread-pool-sysbench-slow \
24 percona_playback/test/preserve_query_time \
25 percona_playback/test/tcpdump_without_handshake \
26 percona_playback/test/tcpdump_fragmented_packet \
27@@ -104,6 +103,7 @@
28 percona_playback_test_crashme_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
29 percona_playback_test_sqlbench_transactions_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
30 percona_playback_test_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
31+percona_playback_test_thread_pool_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
32 percona_playback_test_preserve_query_time_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
33 percona_playback_test_tcpdump_without_handshake_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
34 percona_playback_test_tcpdump_fragmented_packet_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
35@@ -135,6 +135,9 @@
36 percona_playback_test_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc
37 percona_playback_test_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS)
38
39+percona_playback_test_thread_pool_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc
40+percona_playback_test_thread_pool_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS)
41+
42 percona_playback_test_preserve_query_time_SOURCES= percona_playback/test/preserve_query_time.cc
43 percona_playback_test_preserve_query_time_LDADD= $(LDADD) $(MYSQL_LIBS)
44
45
46=== modified file 'percona_playback/db_thread.cc'
47--- percona_playback/db_thread.cc 2012-11-14 11:44:59 +0000
48+++ percona_playback/db_thread.cc 2012-11-23 20:15:24 +0000
49@@ -24,25 +24,23 @@
50
51 extern std::string g_session_init_query;
52
53-void DBThread::connect_and_init_session()
54+void DBThread::init_session()
55 {
56- connect();
57- if (!g_session_init_query.empty())
58- {
59- QueryResult r;
60- QueryResult er;
61- execute_query(g_session_init_query, &r, er);
62- }
63+ if (g_session_init_query.empty())
64+ return;
65+ QueryResult r;
66+ QueryResult er;
67+ execute_query(g_session_init_query, &r, er);
68 }
69
70 void DBThread::run()
71 {
72 connect_and_init_session();
73
74- QueryEntryPtr query;
75 while (true)
76 {
77- queries.pop(query);
78+ QueryEntryPtr query;
79+ queries->pop(query);
80
81 if (query->is_shutdown())
82 break;
83
84=== modified file 'percona_playback/db_thread.h'
85--- percona_playback/db_thread.h 2012-11-14 11:44:59 +0000
86+++ percona_playback/db_thread.h 2012-11-23 20:15:24 +0000
87@@ -16,6 +16,7 @@
88 #ifndef PERCONA_PLAYBACK_DB_THREAD_H
89 #define PERCONA_PLAYBACK_DB_THREAD_H
90
91+#include <memory>
92 #include "percona_playback/visibility.h"
93 #include <boost/thread.hpp>
94 #include <boost/shared_ptr.hpp>
95@@ -33,55 +34,49 @@
96
97 class QueryResult;
98
99-class DBThreadState
100-{
101-public:
102- virtual ~DBThreadState(){}
103-};
104-
105 class DBThread
106 {
107
108 private:
109 boost::thread thread;
110 uint64_t thread_id;
111- boost::shared_ptr<DBThreadState> state;
112
113 public:
114 typedef tbb::concurrent_bounded_queue<QueryEntryPtr> Queries;
115- Queries queries;
116+ boost::shared_ptr<Queries> queries;
117
118- DBThread(uint64_t _thread_id) : thread_id(_thread_id) {
119- queries.set_capacity(g_db_thread_queue_depth);
120+ DBThread(uint64_t _thread_id,
121+ boost::shared_ptr<Queries> _queries) :
122+ thread_id(_thread_id), queries(_queries) {
123+ queries->set_capacity(g_db_thread_queue_depth);
124 }
125
126 virtual ~DBThread() {}
127
128- void set_state(boost::shared_ptr<DBThreadState> new_state)
129- {
130- state= new_state;
131- }
132-
133- boost::shared_ptr<DBThreadState> get_state() const
134- {
135- return state;
136- }
137-
138 void join()
139 {
140 thread.join();
141 }
142
143- void connect_and_init_session();
144+ bool connect_and_init_session()
145+ {
146+ if (connect())
147+ {
148+ init_session();
149+ return true;
150+ }
151+ return false;
152+ }
153
154- virtual void connect()= 0;
155+ void init_session();
156+ virtual bool connect()= 0;
157
158 virtual void disconnect()= 0;
159 virtual void execute_query(const std::string &query,
160 QueryResult *r,
161 const QueryResult &expected_result)= 0;
162
163- void run();
164+ virtual void run();
165
166 void start_thread();
167 };
168
169=== removed file 'percona_playback/dispatcher.cc'
170--- percona_playback/dispatcher.cc 2012-07-03 07:41:49 +0000
171+++ percona_playback/dispatcher.cc 1970-01-01 00:00:00 +0000
172@@ -1,105 +0,0 @@
173-/* BEGIN LICENSE
174- * Copyright (C) 2011-2012 Percona Inc.
175- * This program is free software: you can redistribute it and/or modify it
176- * under the terms of the GNU General Public License version 2, as published
177- * by the Free Software Foundation.
178- *
179- * This program is distributed in the hope that it will be useful, but
180- * WITHOUT ANY WARRANTY; without even the implied warranties of
181- * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
182- * PURPOSE. See the GNU General Public License for more details.
183- *
184- * You should have received a copy of the GNU General Public License along
185- * with this program. If not, see <http://www.gnu.org/licenses/>.
186- * END LICENSE */
187-#include "percona_playback/dispatcher.h"
188-#include "percona_playback/db_thread.h"
189-#include "percona_playback/plugin.h"
190-
191-#include <assert.h>
192-
193-Dispatcher g_dispatcher;
194-
195-extern percona_playback::DBClientPlugin *g_dbclient_plugin;
196-
197-boost::shared_ptr<DBThreadState>
198-Dispatcher::get_thread_state(
199- uint64_t thread_id,
200- boost::function1<void, DBThread *> run_on_db_thread_create)
201-{
202- DBExecutorsTable::accessor a;
203-
204- if (executors.insert(a, thread_id))
205- {
206- DBThread *db_thread= g_dbclient_plugin->create(thread_id);
207- assert(db_thread);
208- a->second= db_thread;
209- if (!run_on_db_thread_create.empty())
210- run_on_db_thread_create(db_thread);
211- db_thread->start_thread();
212- }
213-
214- return a->second->get_state();
215-}
216-
217-void
218-Dispatcher::dispatch(QueryEntryPtr query_entry)
219-{
220- uint64_t thread_id= query_entry->getThreadId();
221- {
222- DBExecutorsTable::accessor a;
223- if (executors.insert(a, thread_id))
224- {
225- DBThread *db_thread= g_dbclient_plugin->create(thread_id);
226- a->second= db_thread;
227- db_thread->start_thread();
228- }
229- a->second->queries.push(query_entry);
230- }
231-}
232-
233-bool
234-Dispatcher::finish_and_wait(uint64_t thread_id)
235-{
236- DBThread *db_thread= NULL;
237- {
238- DBExecutorsTable::accessor a;
239- if (executors.find(a, thread_id))
240- {
241- db_thread= a->second;
242- executors.erase(a);
243- }
244- }
245-
246- if (!db_thread)
247- return false;
248-
249- db_thread->queries.push(QueryEntryPtr(new FinishEntry()));
250- db_thread->join();
251-
252- delete db_thread;
253-
254- return true;
255-}
256-
257-void
258-Dispatcher::finish_all_and_wait()
259-{
260- QueryEntryPtr shutdown_command(new FinishEntry());
261- while(executors.size())
262- {
263- uint64_t thread_id;
264- DBThread *t;
265- {
266- DBExecutorsTable::const_iterator iter= executors.begin();
267- thread_id= (*iter).first;
268- t= (*iter).second;
269- }
270- executors.erase(thread_id);
271-
272- t->queries.push(shutdown_command);
273- t->join();
274-
275- delete t;
276- }
277-}
278
279=== removed file 'percona_playback/dispatcher.h'
280--- percona_playback/dispatcher.h 2012-07-06 07:15:28 +0000
281+++ percona_playback/dispatcher.h 1970-01-01 00:00:00 +0000
282@@ -1,46 +0,0 @@
283-/* BEGIN LICENSE
284- * Copyright (C) 2011 Percona Inc.
285- * This program is free software: you can redistribute it and/or modify it
286- * under the terms of the GNU General Public License version 2, as published
287- * by the Free Software Foundation.
288- *
289- * This program is distributed in the hope that it will be useful, but
290- * WITHOUT ANY WARRANTY; without even the implied warranties of
291- * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
292- * PURPOSE. See the GNU General Public License for more details.
293- *
294- * You should have received a copy of the GNU General Public License along
295- * with this program. If not, see <http://www.gnu.org/licenses/>.
296- * END LICENSE */
297-
298-#ifndef PERCONA_PLAYBACK_DISPATCHER_H
299-#define PERCONA_PLAYBACK_DISPATCHER_H
300-
301-#include <tbb/concurrent_hash_map.h>
302-#include <boost/function.hpp>
303-
304-#include "percona_playback/query_entry.h"
305-
306-class DBThread;
307-class DBThreadState;
308-
309-class Dispatcher
310-{
311- typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable;
312- DBExecutorsTable executors;
313- void db_thread_func(DBThread *thread);
314- void start_thread(DBThread *thread);
315-
316-public:
317- boost::shared_ptr<DBThreadState>
318- get_thread_state(uint64_t thread_id,
319- boost::function1<void, DBThread *>
320- run_on_db_thread_create);
321- void dispatch(QueryEntryPtr query_entry);
322- bool finish_and_wait(uint64_t thread_id);
323- void finish_all_and_wait();
324-};
325-
326-extern Dispatcher g_dispatcher;
327-
328-#endif /* PERCONA_PLAYBACK_DISPATCHER_H */
329
330=== modified file 'percona_playback/libdrizzle_client/libdrizzle_client.cc'
331--- percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-07-23 07:08:23 +0000
332+++ percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-11-23 20:15:24 +0000
333@@ -39,13 +39,13 @@
334
335 public:
336 LibDrizzleDBThread(uint64_t _thread_id, LibDrizzleOptions *opt) :
337- DBThread(_thread_id),
338+ DBThread(_thread_id, boost::shared_ptr<Queries>(new Queries())),
339 driz(NULL),
340 options(opt)
341 {
342 }
343
344- void connect();
345+ bool connect();
346 void disconnect();
347 void execute_query(const std::string &query, QueryResult *r,
348 const QueryResult &expected_result);
349@@ -70,7 +70,7 @@
350 unsigned int port;
351 };
352
353-void LibDrizzleDBThread::connect()
354+bool LibDrizzleDBThread::connect()
355 {
356 driz= drizzle_create(NULL);
357 assert(driz != NULL);
358@@ -81,6 +81,7 @@
359 options->user.c_str(),
360 options->password.c_str(),
361 options->schema.c_str(), DRIZZLE_CON_MYSQL);
362+ return true;
363 }
364
365 void LibDrizzleDBThread::disconnect()
366
367=== modified file 'percona_playback/mysql_client/mysql_client.cc'
368--- percona_playback/mysql_client/mysql_client.cc 2012-07-04 09:10:16 +0000
369+++ percona_playback/mysql_client/mysql_client.cc 2012-11-23 20:15:24 +0000
370@@ -42,17 +42,23 @@
371 unsigned int port;
372 };
373
374-void MySQLDBThread::connect()
375+bool MySQLDBThread::connect()
376 {
377 mysql_init(&handle);
378- mysql_real_connect(&handle,
379- options->host.c_str(),
380- options->user.c_str(),
381- options->password.c_str(),
382- options->schema.c_str(),
383- options->port,
384- "",
385- 0);
386+ if (!mysql_real_connect(&handle,
387+ options->host.c_str(),
388+ options->user.c_str(),
389+ options->password.c_str(),
390+ options->schema.c_str(),
391+ options->port,
392+ "",
393+ CLIENT_MULTI_STATEMENTS))
394+ {
395+ fprintf(stderr, "Can't connect to server: %s\n",
396+ mysql_error(&handle));
397+ return false;
398+ }
399+ return true;
400 }
401
402 void MySQLDBThread::disconnect()
403@@ -63,29 +69,50 @@
404 void MySQLDBThread::execute_query(const std::string &query, QueryResult *r,
405 const QueryResult &)
406 {
407- int mr= mysql_real_query(&handle, query.c_str(), query.length());
408- if(mr != 0)
409- {
410- r->setError(mr);
411- }
412- else
413- {
414- MYSQL_RES* mysql_res= NULL;
415-
416- r->setError(mr);
417- r->setWarningCount(mysql_warning_count(&handle));
418-
419- mysql_res= mysql_store_result(&handle);
420-
421- if (mysql_res != NULL)
422- r->setRowsSent(mysql_num_rows(mysql_res));
423+ int mr;
424+ for(unsigned i = 0; i < max_err_num; ++i)
425+ {
426+ mr= mysql_real_query(&handle, query.c_str(), query.length());
427+ r->setError(mr);
428+ if(mr != 0)
429+ {
430+ fprintf(stderr,
431+ "Error during query: %s, number of tries %u\n",
432+ mysql_error(&handle),
433+ i);
434+ disconnect();
435+ connect_and_init_session();
436+ }
437 else
438+ {
439+ r->setWarningCount(mysql_warning_count(&handle));
440 r->setRowsSent(0);
441-
442- mysql_free_result(mysql_res);
443+ do
444+ {
445+ MYSQL_RES* mysql_res= NULL;
446+
447+ mysql_res= mysql_store_result(&handle);
448+
449+ if (mysql_res != NULL)
450+ {
451+ r->setRowsSent(mysql_num_rows(mysql_res));
452+ mysql_free_result(mysql_res);
453+ }
454+
455+ } while(!mysql_next_result(&handle));
456+
457+ break;
458+ }
459 }
460 }
461
462+void MySQLDBThread::run()
463+{
464+ mysql_thread_init();
465+ DBThread::run();
466+ mysql_thread_end();
467+}
468+
469 class MySQLDBClientPlugin : public percona_playback::DBClientPlugin
470 {
471 private:
472@@ -126,6 +153,21 @@
473 return -1;
474 }
475
476+ if (!active)
477+ return 0;
478+
479+ if (!mysql_thread_safe())
480+ {
481+ fprintf(stderr, "libmysqlclient is not thread safe\n");
482+ return -1;
483+ }
484+
485+ if (mysql_library_init(0, NULL, NULL))
486+ {
487+ fprintf(stderr, "could not initialize mysql library\n");
488+ return -1;
489+ }
490+
491 if (vm.count("mysql-host"))
492 {
493 options.host= vm["mysql-host"].as<std::string>();
494
495=== modified file 'percona_playback/mysql_client/mysql_client.h'
496--- percona_playback/mysql_client/mysql_client.h 2012-06-15 10:53:23 +0000
497+++ percona_playback/mysql_client/mysql_client.h 2012-11-23 20:15:24 +0000
498@@ -10,16 +10,19 @@
499 private:
500 MYSQL handle;
501 MySQLOptions *options;
502+ static const unsigned max_err_num = 10;
503
504 public:
505 MySQLDBThread(uint64_t _thread_id, MySQLOptions *opt) :
506- DBThread(_thread_id),
507+ DBThread(_thread_id,
508+ boost::shared_ptr<Queries>(new Queries())),
509 options(opt)
510 {
511 }
512
513- void connect();
514+ bool connect();
515 void disconnect();
516 void execute_query(const std::string &query, QueryResult *r,
517 const QueryResult &expected_result);
518+ void run();
519 };
520
521=== modified file 'percona_playback/null_dbclient/null_dbclient.cc'
522--- percona_playback/null_dbclient/null_dbclient.cc 2012-04-05 04:47:09 +0000
523+++ percona_playback/null_dbclient/null_dbclient.cc 2012-11-23 20:15:24 +0000
524@@ -20,10 +20,11 @@
525 class NULLDBThread : public DBThread
526 {
527 public:
528- NULLDBThread(uint64_t _thread_id) : DBThread(_thread_id) {
529- }
530+ NULLDBThread(uint64_t _thread_id) :
531+ DBThread(_thread_id,
532+ boost::shared_ptr<Queries>(new Queries())) {}
533
534- void connect() {};
535+ bool connect() { return true; };
536 void disconnect() {};
537 void execute_query(const std::string &, QueryResult *r,
538 const QueryResult &expected_result) {
539
540=== modified file 'percona_playback/percona_playback.cc'
541--- percona_playback/percona_playback.cc 2012-11-14 11:44:59 +0000
542+++ percona_playback/percona_playback.cc 2012-11-23 20:15:24 +0000
543@@ -40,6 +40,7 @@
544
545 percona_playback::DBClientPlugin *g_dbclient_plugin= NULL;
546 percona_playback::InputPlugin *g_input_plugin= NULL;
547+percona_playback::DispatcherPlugin *g_dispatcher_plugin= NULL;
548 unsigned int g_db_thread_queue_depth;
549 std::string g_session_init_query;
550
551@@ -132,6 +133,22 @@
552 std::cerr << _("Selected Input Plugin: ")
553 << g_input_plugin->name
554 << std::endl;
555+
556+ std::cerr << std::endl << _("Loaded Dispatcher Plugins: ");
557+
558+ BOOST_FOREACH(const PluginRegistry::DispatcherPluginPair &pp,
559+ PluginRegistry::singleton().dispatcher_plugins)
560+ {
561+ std::cerr << pp.first << " ";
562+ }
563+
564+ std::cerr << std::endl;
565+ std::cerr << std::endl;
566+
567+ assert(g_dispatcher_plugin);
568+ std::cerr << _("Selected dispatcher Plugin: ")
569+ << g_dispatcher_plugin->name
570+ << std::endl;
571 }
572
573 int percona_playback_argv(percona_playback_st *the_percona_playback,
574@@ -152,12 +169,13 @@
575 db_options.add_options()
576 ("db-plugin", po::value<std::string>(), _("Database plugin"))
577 ("input-plugin", po::value<std::string>(), _("Input plugin"))
578+ ("dispatcher-plugin", po::value<std::string>(), _("Dispatcher plugin"))
579 ("queue-depth", po::value<unsigned int>(),
580 _("Queue depth for DB executor (thread). The larger this number the"
581 " greater the played-back workload can deviate from the original workload"
582 " as some connections may be up to queue-depth behind. (default 1)"))
583 ("session-init-query",
584- po::value<std::string>()->default_value(""),
585+ po::value<std::string>(&g_session_init_query)->default_value(""),
586 _("This query will be executed just after each connect to db"))
587 ;
588
589@@ -237,6 +255,32 @@
590 }
591 g_input_plugin->active= true;
592
593+ if (vm.count("dispatcher-plugin"))
594+ {
595+ PluginRegistry::DispatcherPluginMap::iterator it;
596+ it= PluginRegistry::singleton().dispatcher_plugins.find(
597+ vm["dispatcher-plugin"].as<std::string>());
598+ if (it == PluginRegistry::singleton().dispatcher_plugins.end())
599+ {
600+ fprintf(stderr, _("Invalid Dispatcher Plugin\n"));
601+ return -1;
602+ }
603+ g_dispatcher_plugin= it->second;
604+ }
605+ else
606+ {
607+ PluginRegistry::DispatcherPluginMap::iterator it;
608+ it= PluginRegistry::singleton().dispatcher_plugins.
609+ find("thread-per-connection");
610+ if (it == PluginRegistry::singleton().dispatcher_plugins.end())
611+ {
612+ fprintf(stderr, _("Invalid Dispatcher plugin\n"));
613+ return -1;
614+ }
615+ g_dispatcher_plugin= it->second;
616+ }
617+ g_dispatcher_plugin->active= true;
618+
619 if (vm.count("help") || argc==1)
620 {
621 help(options_description);
622@@ -299,6 +343,7 @@
623 std::cerr << _("Database Plugin: ") << g_dbclient_plugin->name << std::endl;
624 std::cerr << _(" Running...") << std::endl;
625
626+ g_dispatcher_plugin->run();
627 g_input_plugin->run(*r);
628
629 BOOST_FOREACH(const PluginRegistry::ReportPluginPair pp,
630
631=== modified file 'percona_playback/plugin.h'
632--- percona_playback/plugin.h 2012-06-24 04:28:41 +0000
633+++ percona_playback/plugin.h 2012-11-23 20:15:24 +0000
634@@ -20,6 +20,9 @@
635 #include <vector>
636 #include <string>
637 #include <map>
638+#include <boost/shared_ptr.hpp>
639+#include <boost/function.hpp>
640+#include "percona_playback/query_entry.h"
641 #include <percona_playback/visibility.h>
642 #include <percona_playback/version.h>
643
644@@ -31,6 +34,7 @@
645 }
646
647 class DBThread;
648+class DBThreadState;
649 class QueryResult;
650 class percona_playback_run_result;
651
652@@ -102,6 +106,21 @@
653
654 };
655
656+class DispatcherPlugin : public plugin
657+{
658+ public:
659+ std::string name;
660+
661+ DispatcherPlugin(const std::string &_name) : name(_name) {}
662+
663+ virtual void dispatch(QueryEntryPtr query_entry)= 0;
664+ virtual bool finish_and_wait(uint64_t thread_id)= 0;
665+ virtual void finish_all_and_wait()= 0;
666+
667+ virtual void run() {};
668+
669+};
670+
671 class PluginRegistry
672 {
673 public:
674@@ -122,6 +141,9 @@
675 typedef std::map<std::string, InputPlugin*> InputPluginMap;
676 typedef std::pair<std::string, InputPlugin*> InputPluginPair;
677
678+ typedef std::map<std::string, DispatcherPlugin*> DispatcherPluginMap;
679+ typedef std::pair<std::string, DispatcherPlugin*> DispatcherPluginPair;
680+
681 typedef std::map<std::string, plugin*> PluginMap;
682 typedef std::pair<std::string, plugin*> PluginPair;
683
684@@ -129,6 +151,7 @@
685 DBClientPluginMap dbclient_plugins;
686 ReportPluginMap report_plugins;
687 InputPluginMap input_plugins;
688+ DispatcherPluginMap dispatcher_plugins;
689
690 void add(const std::string &name, plugin* plugin_object)
691 {
692@@ -154,6 +177,12 @@
693 all_plugins.insert(PluginPair(name, input_plugin));
694 }
695
696+ void add(const std::string &name, DispatcherPlugin* dispatcher_plugin)
697+ {
698+ dispatcher_plugins.insert(DispatcherPluginPair(name, dispatcher_plugin));
699+ all_plugins.insert(PluginPair(name, dispatcher_plugin));
700+ }
701+
702 };
703
704
705
706=== modified file 'percona_playback/query_entry.h'
707--- percona_playback/query_entry.h 2012-07-03 02:37:51 +0000
708+++ percona_playback/query_entry.h 2012-11-23 20:15:24 +0000
709@@ -45,7 +45,8 @@
710 class FinishEntry : public QueryEntry
711 {
712 public:
713- FinishEntry() { set_shutdown(); }
714+ FinishEntry(uint64_t _thread_id) :
715+ QueryEntry (_thread_id, true) {}
716 bool is_quit() { return false; }
717
718 void execute(DBThread *) {}
719
720=== modified file 'percona_playback/query_log/query_log.cc'
721--- percona_playback/query_log/query_log.cc 2012-11-08 22:17:07 +0000
722+++ percona_playback/query_log/query_log.cc 2012-11-23 20:15:24 +0000
723@@ -39,7 +39,6 @@
724 #include <percona_playback/db_thread.h>
725 #include <percona_playback/query_log/query_log.h>
726 #include <percona_playback/query_result.h>
727-#include "percona_playback/dispatcher.h"
728 #include <percona_playback/gettext.h>
729
730 #include <boost/foreach.hpp>
731@@ -51,6 +50,8 @@
732 static bool g_run_set_timestamp;
733 static bool g_preserve_query_time;
734
735+extern percona_playback::DispatcherPlugin *g_dispatcher_plugin;
736+
737 class ParseQueryLogFunc: public tbb::filter {
738 public:
739 ParseQueryLogFunc(FILE *input_file_,
740@@ -83,7 +84,7 @@
741 new std::vector<boost::shared_ptr<QueryLogEntry> >();
742
743 boost::shared_ptr<QueryLogEntry> tmp_entry(new QueryLogEntry());
744- entries->push_back(tmp_entry);
745+ // entries->push_back(tmp_entry);
746
747 char *line= NULL;
748 size_t buflen = 0;
749@@ -130,9 +131,10 @@
750
751 if (strncmp(p, "# User@Host", strlen("# User@Host")) == 0)
752 {
753+ if (!tmp_entry->getQuery().empty())
754+ entries->push_back(tmp_entry);
755 count++;
756 tmp_entry.reset(new QueryLogEntry());
757- entries->push_back(tmp_entry);
758 (*this->nr_entries)++;
759 }
760
761@@ -172,6 +174,9 @@
762 p= line;
763 }
764
765+ if (!tmp_entry->getQuery().empty())
766+ entries->push_back(tmp_entry);
767+
768 free(line);
769 return entries;
770 }
771@@ -239,7 +244,19 @@
772 && s.compare(0, timestamp_query.length(), timestamp_query) == 0)
773 set_timestamp_query= s;
774 else
775- query.append(s);
776+ {
777+ //Append space insead of \r\n
778+ std::string::const_iterator end = s.end() - 1;
779+ if (s.length() >= 2 && *(s.end() - 2) == '\r')
780+ --end;
781+ //Remove initial spaces for best query viewing in reports
782+ std::string::const_iterator begin;
783+ for (begin = s.begin(); begin != end; ++begin)
784+ if (*begin != ' ' && *begin != '\t')
785+ break;
786+ query.append(begin, end);
787+ query.append(" ");
788+ }
789 }
790
791 bool QueryLogEntry::parse_metadata(const std::string &s)
792@@ -281,13 +298,13 @@
793 r= true;
794 }
795 }
796-
797+/*
798 if (s[0] == '#' && strncmp(s.c_str(), "# administrator", strlen("# administrator")))
799 {
800 query.append(s);
801 r= true;
802 }
803-
804+*/
805 return r;
806 }
807
808@@ -300,7 +317,7 @@
809 for (unsigned int i=0; i< input->size(); i++)
810 {
811 // usleep(10);
812- g_dispatcher.dispatch((*input)[i]);
813+ g_dispatcher_plugin->dispatch((*input)[i]);
814 }
815 delete input;
816 return NULL;
817@@ -330,7 +347,7 @@
818 p.add_filter(f4);
819 p.run(2);
820
821- g_dispatcher.finish_all_and_wait();
822+ g_dispatcher_plugin->finish_all_and_wait();
823
824 r->n_log_entries= entries;
825 r->n_queries= queries;
826
827=== modified file 'percona_playback/tcpdump/connection_state.cc'
828--- percona_playback/tcpdump/connection_state.cc 2012-07-02 17:51:47 +0000
829+++ percona_playback/tcpdump/connection_state.cc 2012-11-23 20:15:24 +0000
830@@ -25,9 +25,11 @@
831 #include "percona_playback/plugin.h"
832
833 extern percona_playback::DBClientPlugin *g_dbclient_plugin;
834+extern percona_playback::DispatcherPlugin *g_dispatcher_plugin;
835
836 void
837-ConnectionState::ProcessMysqlPkts(const u_char *pkts,
838+ConnectionState::ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr,
839+ const u_char *pkts,
840 u_int pkts_len,
841 const timeval &ts,
842 const AddrPort &addr_port,
843@@ -76,13 +78,13 @@
844 {
845 case PKT_QUERY:
846 if (!query.empty())
847- DispatchQuery(ts, query, addr_port);
848+ DispatchQuery(cs_ptr, ts, query, addr_port);
849 ++stats.nr_of_parsed_queries;
850 ++stats.nr_of_parsed_packets;
851 break;
852
853 case PKT_RESULT:
854- DispatchResult(ts, addr_port);
855+ DispatchResult(cs_ptr, ts, addr_port);
856 ++stats.nr_of_parsed_packets;
857 break;
858
859@@ -107,12 +109,6 @@
860
861 }
862
863-void
864-ConnectionState::ProcessFinishConnection()
865-{
866- db_thread->queries.push(QueryEntryPtr(new FinishEntry()));
867-}
868-
869 PktResult
870 ConnectionState::ParseMysqlPkt(IN UCharBuffer &buff,
871 OUT std::string &query)
872@@ -303,23 +299,32 @@
873 }
874
875 void
876-ConnectionState::DispatchQuery(const timeval &ts,
877+ConnectionState::DispatchQuery(/* Having shared pointer to "this" as an argument
878+ is just temporary step. We need a shared pointer
879+ to connection state inside of query entry. The
880+ better way to do this is to move code that creates
881+ query entries to the upper layer.
882+ */
883+ boost::shared_ptr<ConnectionState> cs_ptr,
884+ const timeval &ts,
885 const std::string &query,
886 const AddrPort &addr_port)
887 {
888 boost::shared_ptr<TcpdumpQueryEntry>
889- query_entry(new TcpdumpQueryEntry(ts,
890+ query_entry(new TcpdumpQueryEntry(cs_ptr,
891+ ts,
892 query,
893 addr_port));
894- db_thread->queries.push(query_entry);
895+ g_dispatcher_plugin->dispatch(query_entry);
896 }
897
898 void
899-ConnectionState::DispatchResult(const timeval &ts,
900+ConnectionState::DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr,
901+ const timeval &ts,
902 const AddrPort &addr_port)
903 {
904 boost::shared_ptr<TcpdumpResultEntry>
905- result_entry(new TcpdumpResultEntry(addr_port, ts, last_query_result));
906- db_thread->queries.push(result_entry);
907+ result_entry(new TcpdumpResultEntry(cs_ptr, addr_port, ts, last_query_result));
908+ g_dispatcher_plugin->dispatch(result_entry);
909 }
910
911
912=== modified file 'percona_playback/tcpdump/connection_state.h'
913--- percona_playback/tcpdump/connection_state.h 2012-07-03 07:41:49 +0000
914+++ percona_playback/tcpdump/connection_state.h 2012-11-23 20:15:24 +0000
915@@ -94,8 +94,7 @@
916 };
917
918
919-class ConnectionState : public DBThreadState
920-
921+class ConnectionState
922 {
923
924 public:
925@@ -107,7 +106,7 @@
926 SERVER
927 };
928
929- ConnectionState() :
930+ ConnectionState(uint64_t _thread_id) :
931 current_origin(UNDEF),
932 fragmented(false),
933 handshake_from_client(false),
934@@ -126,7 +125,7 @@
935 (drizzle_con_options_t)
936 DRIZZLE_CON_MYSQL),
937 drizzle_con_free),
938- db_thread(NULL)
939+ thread_id(_thread_id)
940 {
941 drizzle_con->result= NULL;
942 }
943@@ -136,18 +135,15 @@
944 drizzle_result_free_all(drizzle_con.get());
945 }
946
947- void ProcessMysqlPkts(const u_char *pkts,
948+ void ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr,
949+ const u_char *pkts,
950 u_int total_len,
951 const timeval &ts,
952 const AddrPort &addr_port,
953 OUT TcpdumpMysqlParserStats &stats);
954
955- void ProcessFinishConnection();
956-
957 void SetCurrentOrigin(Origin o) { current_origin = o; }
958
959- void SetDBThread(DBThread *t) { db_thread= t; }
960-
961 LastExecutedQueryInfo last_executed_query_info;
962
963 private:
964@@ -159,11 +155,13 @@
965 PktResult ServerPacket(IN UCharBuffer &buff);
966 PktResult ClientPacket(IN UCharBuffer &buff, OUT std::string &query);
967
968- void DispatchQuery(const timeval &ts,
969+ void DispatchQuery(boost::shared_ptr<ConnectionState> cs_ptr,
970+ const timeval &ts,
971 const std::string &query,
972 const AddrPort &addr_port);
973
974- void DispatchResult(const timeval &ts,
975+ void DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr,
976+ const timeval &ts,
977 const AddrPort &addr_port);
978
979 Origin current_origin;
980@@ -179,7 +177,7 @@
981
982 QueryResult last_query_result;
983
984- DBThread *db_thread;
985+ uint64_t thread_id;
986
987 };
988
989
990=== modified file 'percona_playback/tcpdump/pcap_packets_parser.cc'
991--- percona_playback/tcpdump/pcap_packets_parser.cc 2012-07-03 07:41:49 +0000
992+++ percona_playback/tcpdump/pcap_packets_parser.cc 2012-11-23 20:15:24 +0000
993@@ -18,7 +18,6 @@
994
995 #include "percona_playback/db_thread.h"
996 #include "percona_playback/plugin.h"
997-#include "percona_playback/dispatcher.h"
998
999 #include <netinet/in.h>
1000 #include <arpa/inet.h>
1001@@ -29,6 +28,7 @@
1002 #define SNAP_LEN 16500 // pcap's max capture size
1003
1004 extern percona_playback::DBClientPlugin *g_dbclient_plugin;
1005+extern percona_playback::DispatcherPlugin *g_dispatcher_plugin;
1006
1007 void
1008 PcapPacketsParser::ParsePkt(const struct pcap_pkthdr *header,
1009@@ -81,7 +81,8 @@
1010 if (size_mysql == 0 &&
1011 ((tcp->th_flags & TH_FIN) || (tcp->th_flags & TH_RST)))
1012 {
1013- g_dispatcher.finish_and_wait(addr_port.ThreadId());
1014+ g_dispatcher_plugin->finish_and_wait(addr_port.ThreadId());
1015+ RemoveConnectionState(addr_port.ThreadId());
1016 return;
1017 }
1018
1019@@ -97,37 +98,50 @@
1020 }
1021
1022 /* If there is no DBThread with such id create it */
1023- boost::shared_ptr<DBThreadState>
1024- state= g_dispatcher.get_thread_state(addr_port.ThreadId(),
1025- boost::bind(&PcapPacketsParser::CreateConnectionState,
1026- this,
1027- _1));
1028+ boost::shared_ptr<ConnectionState>
1029+ state= GetConnectionState(addr_port.ThreadId());
1030
1031 assert(state.get());
1032
1033- ((ConnectionState *)state.get())->SetCurrentOrigin(origin);
1034- ((ConnectionState *)state.get())->ProcessMysqlPkts(mysql,
1035- size_mysql,
1036- header->ts,
1037- addr_port,
1038- stats);
1039+ state->SetCurrentOrigin(origin);
1040+ state->ProcessMysqlPkts(state,
1041+ mysql,
1042+ size_mysql,
1043+ header->ts,
1044+ addr_port,
1045+ stats);
1046 }
1047
1048 void
1049 PcapPacketsParser::WaitForUnfinishedTasks()
1050 {
1051- g_dispatcher.finish_all_and_wait();
1052-}
1053-
1054-void
1055-PcapPacketsParser::CreateConnectionState(DBThread *db_thread)
1056-{
1057- assert(db_thread);
1058- boost::shared_ptr<ConnectionState> state(new ConnectionState());
1059+ g_dispatcher_plugin->finish_all_and_wait();
1060+}
1061+
1062+boost::shared_ptr<ConnectionState>
1063+PcapPacketsParser::GetConnectionState(uint64_t thread_id)
1064+{
1065+ Connections::iterator it = connections.find(thread_id);
1066+ if (it == connections.end())
1067+ return CreateConnectionState(thread_id);
1068+ return it->second;
1069+}
1070+
1071+boost::shared_ptr<ConnectionState>
1072+PcapPacketsParser::CreateConnectionState(uint64_t thread_id)
1073+{
1074+ boost::shared_ptr<ConnectionState> state(new ConnectionState(thread_id));
1075 state->last_executed_query_info.end_pcap_timestamp=
1076 first_packet_pcap_timestamp;
1077 state->last_executed_query_info.end_timestamp=
1078 first_packet_timestamp;
1079- db_thread->set_state(state);
1080- state->SetDBThread(db_thread);
1081-}
1082+ connections[thread_id] = state;
1083+ return state;
1084+}
1085+
1086+void
1087+PcapPacketsParser::RemoveConnectionState(uint64_t thread_id)
1088+{
1089+ connections.erase(thread_id);
1090+}
1091+
1092
1093=== modified file 'percona_playback/tcpdump/pcap_packets_parser.h'
1094--- percona_playback/tcpdump/pcap_packets_parser.h 2012-06-19 15:26:53 +0000
1095+++ percona_playback/tcpdump/pcap_packets_parser.h 2012-11-23 20:15:24 +0000
1096@@ -23,6 +23,8 @@
1097 #include <tbb/concurrent_queue.h>
1098 #include <boost/thread.hpp>
1099 #include <boost/bind.hpp>
1100+#include <boost/unordered_map.hpp>
1101+#include <boost/shared_ptr.hpp>
1102
1103 class DBThread;
1104
1105@@ -32,10 +34,14 @@
1106 timeval first_packet_timestamp;
1107 timeval first_packet_pcap_timestamp;
1108 bool was_first_packet;
1109+ typedef boost::unordered_map<uint64_t,
1110+ boost::shared_ptr<ConnectionState> >
1111+ Connections;
1112+ Connections connections;
1113
1114 public:
1115
1116- PcapPacketsParser() : was_first_packet(false)
1117+ PcapPacketsParser() : was_first_packet(false), connections(10000)
1118 {
1119 first_packet_timestamp.tv_sec= first_packet_timestamp.tv_usec= 0;
1120 first_packet_pcap_timestamp.tv_sec= first_packet_pcap_timestamp.tv_usec= 0;
1121@@ -52,9 +58,11 @@
1122
1123 void WaitForUnfinishedTasks();
1124
1125- void CreateConnectionState(DBThread *db_thread);
1126
1127 private:
1128+ boost::shared_ptr<ConnectionState> CreateConnectionState(uint64_t thread_id);
1129+ boost::shared_ptr<ConnectionState> GetConnectionState(uint64_t thread_id);
1130+ void RemoveConnectionState(uint64_t thread_id);
1131
1132 void ParsePkt(const struct pcap_pkthdr *header,
1133 const u_char *packet);
1134
1135=== modified file 'percona_playback/tcpdump/tcpdump_query_entries.cc'
1136--- percona_playback/tcpdump/tcpdump_query_entries.cc 2012-07-09 11:54:52 +0000
1137+++ percona_playback/tcpdump/tcpdump_query_entries.cc 2012-11-23 20:15:24 +0000
1138@@ -30,15 +30,10 @@
1139 QueryResult expected_result;
1140 timeval start_time;
1141 timeval end_time;
1142- boost::shared_ptr<DBThreadState> thr_state;
1143
1144 assert(t);
1145
1146- thr_state= t->get_state();
1147-
1148- assert(thr_state.get());
1149-
1150- ConnectionState &state= (ConnectionState &)*thr_state;
1151+ ConnectionState &state= *connection_state;
1152
1153 LastExecutedQueryInfo &last_query_info= state.last_executed_query_info;
1154
1155@@ -96,12 +91,7 @@
1156
1157 assert(t);
1158
1159- thr_state= t->get_state();
1160-
1161- assert(thr_state.get());
1162-
1163-
1164- ConnectionState &state= (ConnectionState &)*thr_state;
1165+ ConnectionState &state= *connection_state;
1166 LastExecutedQueryInfo &last_query_info= state.last_executed_query_info;
1167
1168 timersub(&pcap_timestamp,
1169
1170=== modified file 'percona_playback/tcpdump/tcpdump_query_entries.h'
1171--- percona_playback/tcpdump/tcpdump_query_entries.h 2012-07-03 07:41:49 +0000
1172+++ percona_playback/tcpdump/tcpdump_query_entries.h 2012-11-23 20:15:24 +0000
1173@@ -21,9 +21,12 @@
1174 #include "connection_state.h"
1175
1176 #include <sys/time.h>
1177+#include <boost/shared_ptr.hpp>
1178
1179 class TcpdumpQueryEntry : public QueryEntry
1180 {
1181+
1182+ boost::shared_ptr<ConnectionState> connection_state;
1183 timeval pcap_timestamp;
1184 std::string query;
1185
1186@@ -34,10 +37,12 @@
1187 pcap_timestamp.tv_sec= pcap_timestamp.tv_usec= 0;
1188 }
1189
1190- TcpdumpQueryEntry(const timeval &_pcap_timestamp,
1191+ TcpdumpQueryEntry(boost::shared_ptr<ConnectionState> _connection_state,
1192+ const timeval &_pcap_timestamp,
1193 const std::string &_query,
1194 const AddrPort &addr_port) :
1195 QueryEntry(addr_port.ThreadId(), false),
1196+ connection_state(_connection_state),
1197 pcap_timestamp(_pcap_timestamp),
1198 query(_query)
1199 {}
1200@@ -50,15 +55,18 @@
1201 class TcpdumpResultEntry : public QueryEntry
1202 {
1203
1204+ boost::shared_ptr<ConnectionState> connection_state;
1205 timeval pcap_timestamp;
1206 QueryResult expected_result;
1207
1208 public:
1209 TcpdumpResultEntry() {}
1210- TcpdumpResultEntry(const AddrPort &addr_port,
1211+ TcpdumpResultEntry(boost::shared_ptr<ConnectionState> _connection_state,
1212+ const AddrPort &addr_port,
1213 const timeval &_pcap_timestamp,
1214 const QueryResult &_expected_result) :
1215 QueryEntry(addr_port.ThreadId(), false),
1216+ connection_state(_connection_state),
1217 pcap_timestamp(_pcap_timestamp),
1218 expected_result(_expected_result)
1219 {}
1220
1221=== added file 'percona_playback/test/thread-pool-sysbench-slow.cc'
1222--- percona_playback/test/thread-pool-sysbench-slow.cc 1970-01-01 00:00:00 +0000
1223+++ percona_playback/test/thread-pool-sysbench-slow.cc 2012-11-23 20:15:24 +0000
1224@@ -0,0 +1,60 @@
1225+/* BEGIN LICENSE
1226+ * Copyright (C) 2011 Stewart Smith <stewart@flamingspork.com>
1227+ * This program is free software: you can redistribute it and/or modify it
1228+ * under the terms of the GNU General Public License version 3, as published
1229+ * by the Free Software Foundation.
1230+ *
1231+ * This program is distributed in the hope that it will be useful, but
1232+ * WITHOUT ANY WARRANTY; without even the implied warranties of
1233+ * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1234+ * PURPOSE. See the GNU General Public License for more details.
1235+ *
1236+ * You should have received a copy of the GNU General Public License along
1237+ * with this program. If not, see <http://www.gnu.org/licenses/>.
1238+ * END LICENSE */
1239+
1240+#include "config.h"
1241+
1242+#include <iostream>
1243+#include <cstdio>
1244+#include <string>
1245+#include <cstring>
1246+#include <assert.h>
1247+#include <unistd.h>
1248+
1249+#include <percona_playback/percona_playback.h>
1250+
1251+/**
1252+ * @TODO Actually write a real test suite here
1253+ */
1254+int main(int argc, char **argv)
1255+{
1256+ (void)argc; (void)argv;
1257+
1258+ fprintf(stderr, "Working dir: %s\n\n", get_current_dir_name());
1259+
1260+ percona_playback_st *the_percona_playback= percona_playback_create("test_Percona Playback");
1261+ assert(the_percona_playback);
1262+
1263+ char dbplugin[]="--db-plugin=null";
1264+ char querylog[]="--query-log-file="SRCDIR"/percona_playback/test/sysbench-slow.log";
1265+ char threadpool[]="--dispatcher-plugin=thread-pool"
1266+
1267+ char **dbplugin_argv= new char*[4];
1268+ dbplugin_argv[0]= argv[0];
1269+ dbplugin_argv[1]= dbplugin;
1270+ dbplugin_argv[2]= querylog;
1271+ dbplugin_argv[3]= threadpool;
1272+
1273+
1274+ assert(0 == percona_playback_argv(the_percona_playback, 3, dbplugin_argv));
1275+
1276+ struct percona_playback_run_result *r= percona_playback_run(the_percona_playback);
1277+
1278+ assert(r->err == 0);
1279+ assert(r->n_queries == 10150);
1280+ assert(r->n_log_entries = 10149);
1281+
1282+ percona_playback_destroy(&the_percona_playback);
1283+ return r->err;
1284+}
1285
1286=== added directory 'percona_playback/thread_per_connection'
1287=== added file 'percona_playback/thread_per_connection/plugin.ini'
1288--- percona_playback/thread_per_connection/plugin.ini 1970-01-01 00:00:00 +0000
1289+++ percona_playback/thread_per_connection/plugin.ini 2012-11-23 20:15:24 +0000
1290@@ -0,0 +1,6 @@
1291+[plugin]
1292+title=Thread-per-connection dispatcher plugin
1293+description=Creates thread per each DB connection
1294+version=0.1
1295+static=yes
1296+load_by_default=yes
1297
1298=== added file 'percona_playback/thread_per_connection/thread_per_connection.cc'
1299--- percona_playback/thread_per_connection/thread_per_connection.cc 1970-01-01 00:00:00 +0000
1300+++ percona_playback/thread_per_connection/thread_per_connection.cc 2012-11-23 20:15:24 +0000
1301@@ -0,0 +1,109 @@
1302+/* BEGIN LICENSE
1303+ * Copyright (C) 2011-2012 Percona Inc.
1304+ * This program is free software: you can redistribute it and/or modify it
1305+ * under the terms of the GNU General Public License version 2, as published
1306+ * by the Free Software Foundation.
1307+ *
1308+ * This program is distributed in the hope that it will be useful, but
1309+ * WITHOUT ANY WARRANTY; without even the implied warranties of
1310+ * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1311+ * PURPOSE. See the GNU General Public License for more details.
1312+ *
1313+ * You should have received a copy of the GNU General Public License along
1314+ * with this program. If not, see <http://www.gnu.org/licenses/>.
1315+ * END LICENSE */
1316+
1317+#include "percona_playback/plugin.h"
1318+#include "percona_playback/db_thread.h"
1319+
1320+#include <tbb/concurrent_hash_map.h>
1321+
1322+class ThreadPerConnectionDispatcher :
1323+ public percona_playback::DispatcherPlugin
1324+{
1325+ typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable;
1326+ DBExecutorsTable executors;
1327+ void db_thread_func(DBThread *thread);
1328+ void start_thread(DBThread *thread);
1329+
1330+public:
1331+ ThreadPerConnectionDispatcher(std::string _name) :
1332+ DispatcherPlugin(_name) {}
1333+
1334+ void dispatch(QueryEntryPtr query_entry);
1335+ bool finish_and_wait(uint64_t thread_id);
1336+ void finish_all_and_wait();
1337+};
1338+
1339+extern percona_playback::DBClientPlugin *g_dbclient_plugin;
1340+
1341+void
1342+ThreadPerConnectionDispatcher::dispatch(QueryEntryPtr query_entry)
1343+{
1344+ uint64_t thread_id= query_entry->getThreadId();
1345+ {
1346+ DBExecutorsTable::accessor a;
1347+ if (executors.insert(a, thread_id))
1348+ {
1349+ DBThread *db_thread= g_dbclient_plugin->create(thread_id);
1350+ a->second= db_thread;
1351+ db_thread->start_thread();
1352+ }
1353+ a->second->queries->push(query_entry);
1354+ }
1355+}
1356+
1357+bool
1358+ThreadPerConnectionDispatcher::finish_and_wait(uint64_t thread_id)
1359+{
1360+ DBThread *db_thread= NULL;
1361+ {
1362+ DBExecutorsTable::accessor a;
1363+ if (executors.find(a, thread_id))
1364+ {
1365+ db_thread= a->second;
1366+ executors.erase(a);
1367+ }
1368+ }
1369+
1370+ if (!db_thread)
1371+ return false;
1372+
1373+ db_thread->queries->push(QueryEntryPtr(new FinishEntry(thread_id)));
1374+ db_thread->join();
1375+
1376+ delete db_thread;
1377+
1378+ return true;
1379+}
1380+
1381+void
1382+ThreadPerConnectionDispatcher::finish_all_and_wait()
1383+{
1384+ QueryEntryPtr shutdown_command(new FinishEntry(0));
1385+
1386+ while(executors.size())
1387+ {
1388+ uint64_t thread_id;
1389+ DBThread *t;
1390+ {
1391+ DBExecutorsTable::const_iterator iter= executors.begin();
1392+ thread_id= (*iter).first;
1393+ t= (*iter).second;
1394+ }
1395+ executors.erase(thread_id);
1396+
1397+ t->queries->push(shutdown_command);
1398+ t->join();
1399+
1400+ delete t;
1401+ }
1402+}
1403+
1404+static void init_plugin(percona_playback::PluginRegistry &r)
1405+{
1406+ r.add("thread-per-connection",
1407+ new ThreadPerConnectionDispatcher("thread-per-connection"));
1408+}
1409+
1410+PERCONA_PLAYBACK_PLUGIN(init_plugin);
1411
1412=== added directory 'percona_playback/thread_pool'
1413=== added file 'percona_playback/thread_pool/plugin.ini'
1414--- percona_playback/thread_pool/plugin.ini 1970-01-01 00:00:00 +0000
1415+++ percona_playback/thread_pool/plugin.ini 2012-11-23 20:15:24 +0000
1416@@ -0,0 +1,6 @@
1417+[plugin]
1418+title=Thread-pool dispatcher plugin
1419+description=Creates threads pool to process connections to server
1420+version=0.1
1421+static=yes
1422+load_by_default=yes
1423
1424=== added file 'percona_playback/thread_pool/thread_pool.cc'
1425--- percona_playback/thread_pool/thread_pool.cc 1970-01-01 00:00:00 +0000
1426+++ percona_playback/thread_pool/thread_pool.cc 2012-11-23 20:15:24 +0000
1427@@ -0,0 +1,130 @@
1428+/* BEGIN LICENSE
1429+ * Copyright (C) 2011-2012 Percona Inc.
1430+ * This program is free software: you can redistribute it and/or modify it
1431+ * under the terms of the GNU General Public License version 2, as published
1432+ * by the Free Software Foundation.
1433+ *
1434+ * This program is distributed in the hope that it will be useful, but
1435+ * WITHOUT ANY WARRANTY; without even the implied warranties of
1436+ * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
1437+ * PURPOSE. See the GNU General Public License for more details.
1438+ *
1439+ * You should have received a copy of the GNU General Public License along
1440+ * with this program. If not, see <http://www.gnu.org/licenses/>.
1441+ * END LICENSE */
1442+
1443+#include "percona_playback/plugin.h"
1444+#include "percona_playback/db_thread.h"
1445+#include <percona_playback/gettext.h>
1446+#include <boost/program_options.hpp>
1447+#include <boost/shared_ptr.hpp>
1448+#include <boost/crc.hpp>
1449+#include <vector>
1450+
1451+#include <tbb/concurrent_hash_map.h>
1452+#include <stdio.h>
1453+
1454+extern percona_playback::DBClientPlugin *g_dbclient_plugin;
1455+
1456+class ThreadPoolDispatcher :
1457+ public percona_playback::DispatcherPlugin
1458+{
1459+ typedef std::vector<boost::shared_ptr<DBThread> > Workers;
1460+
1461+ unsigned threads_count;
1462+ boost::
1463+ program_options::
1464+ options_description options;
1465+ Workers workers;
1466+
1467+public:
1468+ ThreadPoolDispatcher(std::string _name) :
1469+ DispatcherPlugin(_name),
1470+ threads_count(0),
1471+ options("Threads-pool Options") {}
1472+
1473+ virtual void dispatch(QueryEntryPtr query_entry);
1474+ virtual bool finish_and_wait(uint64_t) { return true; }
1475+ virtual void finish_all_and_wait();
1476+ virtual void run();
1477+
1478+ boost::program_options::options_description* getProgramOptions();
1479+ int processOptions(boost::program_options::variables_map &vm);
1480+
1481+};
1482+
1483+void ThreadPoolDispatcher::run()
1484+{
1485+ for (unsigned i = 0; i < threads_count; ++i)
1486+ {
1487+ boost::shared_ptr<DBThread> db_thread(g_dbclient_plugin->create(i));
1488+ workers.push_back(db_thread);
1489+ db_thread->start_thread();
1490+ }
1491+}
1492+
1493+void ThreadPoolDispatcher::dispatch(QueryEntryPtr query_entry)
1494+{
1495+ /*
1496+ Each worker has its own queue. For some types of input plugins
1497+ it is important to execute query entries with the same thread id
1498+ by the same worker. That is why we choose worker by simple hash from
1499+ thread id.
1500+ */
1501+ uint64_t thread_id= query_entry->getThreadId();
1502+ boost::crc_32_type crc;
1503+ crc.process_bytes(&thread_id, sizeof(thread_id));
1504+ uint32_t worker_index = crc.checksum() % workers.size();
1505+ workers[worker_index]->queries->push(query_entry);
1506+}
1507+
1508+void ThreadPoolDispatcher::finish_all_and_wait()
1509+{
1510+ QueryEntryPtr shutdown_command(new FinishEntry(0));
1511+ for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i)
1512+ (*i)->queries->push(shutdown_command);
1513+ for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i)
1514+ (*i)->join();
1515+ workers.clear();
1516+}
1517+
1518+
1519+boost::program_options::options_description*
1520+ThreadPoolDispatcher::getProgramOptions()
1521+{
1522+ unsigned default_threads_count =
1523+ boost::thread::hardware_concurrency();
1524+ options.add_options()
1525+ ("thread-pool-threads-count",
1526+ boost::program_options::value<unsigned>(&threads_count)
1527+ ->default_value(default_threads_count),
1528+ _("The number of threads in thread pool. If this options is omitted "
1529+ "the number of threads equals to hardware concurency."))
1530+ ;
1531+
1532+ return &options;
1533+
1534+}
1535+
1536+int
1537+ThreadPoolDispatcher::processOptions(boost::program_options::variables_map &vm)
1538+{
1539+ if (!active &&
1540+ !vm["thread-pool-threads-count"].defaulted())
1541+ {
1542+ fprintf(stderr, _("thread-pool plugin is not selected, "
1543+ "you shouldn't use this plugin-related "
1544+ "command line options\n"));
1545+ return -1;
1546+ }
1547+
1548+ return 0;
1549+}
1550+
1551+static void init_plugin(percona_playback::PluginRegistry &r)
1552+{
1553+ r.add("thread-pool",
1554+ new ThreadPoolDispatcher("thread-pool"));
1555+}
1556+
1557+PERCONA_PLAYBACK_PLUGIN(init_plugin);

Subscribers

People subscribed via source and target branches

to all changes: