Merge lp:~vlad-lesin/percona-playback/connections_pool into lp:percona-playback

Proposed by Vlad Lesin
Status: Merged
Approved by: Stewart Smith
Approved revision: 178
Merged at revision: 174
Proposed branch: lp:~vlad-lesin/percona-playback/connections_pool
Merge into: lp:percona-playback
Diff against target: 1557 lines (+617/-299)
24 files modified
Makefile.am (+6/-3)
percona_playback/db_thread.cc (+8/-10)
percona_playback/db_thread.h (+18/-23)
percona_playback/dispatcher.cc (+0/-105)
percona_playback/dispatcher.h (+0/-46)
percona_playback/libdrizzle_client/libdrizzle_client.cc (+4/-3)
percona_playback/mysql_client/mysql_client.cc (+69/-27)
percona_playback/mysql_client/mysql_client.h (+5/-2)
percona_playback/null_dbclient/null_dbclient.cc (+4/-3)
percona_playback/percona_playback.cc (+46/-1)
percona_playback/plugin.h (+29/-0)
percona_playback/query_entry.h (+2/-1)
percona_playback/query_log/query_log.cc (+25/-8)
percona_playback/tcpdump/connection_state.cc (+20/-15)
percona_playback/tcpdump/connection_state.h (+10/-12)
percona_playback/tcpdump/pcap_packets_parser.cc (+38/-24)
percona_playback/tcpdump/pcap_packets_parser.h (+10/-2)
percona_playback/tcpdump/tcpdump_query_entries.cc (+2/-12)
percona_playback/tcpdump/tcpdump_query_entries.h (+10/-2)
percona_playback/test/thread-pool-sysbench-slow.cc (+60/-0)
percona_playback/thread_per_connection/plugin.ini (+6/-0)
percona_playback/thread_per_connection/thread_per_connection.cc (+109/-0)
percona_playback/thread_pool/plugin.ini (+6/-0)
percona_playback/thread_pool/thread_pool.cc (+130/-0)
To merge this branch: bzr merge lp:~vlad-lesin/percona-playback/connections_pool
Reviewer Review Type Date Requested Status
Stewart Smith (community) Approve
Review via email: mp+134884@code.launchpad.net

Description of the change

A bunch of changes for implementing this https://blueprints.launchpad.net/percona-playback/+spec/pool-of-threads blueprint as well as fixes for some critical bugs.

To post a comment you must log in.
171. By Vlad Lesin

Fixes for bug #1070824 and bug #1080654

172. By Vlad Lesin

merged with trunk

173. By Vlad Lesin

Send init session query on reconnect

174. By Vlad Lesin

Initialize session init query with the value from cmd line options.

175. By Vlad Lesin

Thread-safety fixes for mysql client plugin.

176. By Vlad Lesin

Don't push empty query log queries for execution

177. By Vlad Lesin

Convert multiline query to single line to avoid syntax errors from server.

178. By Vlad Lesin

Allow multi-statement queries.

Revision history for this message
Stewart Smith (stewart) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Makefile.am'
--- Makefile.am 2012-11-08 22:17:07 +0000
+++ Makefile.am 2012-11-23 20:15:24 +0000
@@ -59,13 +59,11 @@
59libpercona_playback_la_SOURCES = \59libpercona_playback_la_SOURCES = \
60 percona_playback/percona_playback.cc \60 percona_playback/percona_playback.cc \
61 percona_playback/plugin.cc \61 percona_playback/plugin.cc \
62 percona_playback/db_thread.cc \62 percona_playback/db_thread.cc
63 percona_playback/dispatcher.cc
6463
65nobase_include_HEADERS += \64nobase_include_HEADERS += \
66 percona_playback/percona_playback.h \65 percona_playback/percona_playback.h \
67 percona_playback/db_thread.h \66 percona_playback/db_thread.h \
68 percona_playback/dispatcher.h \
69 percona_playback/query_result.h \67 percona_playback/query_result.h \
70 percona_playback/plugin.h \68 percona_playback/plugin.h \
71 percona_playback/tokenize.h \69 percona_playback/tokenize.h \
@@ -89,6 +87,7 @@
89 percona_playback/test/crashme-slow \87 percona_playback/test/crashme-slow \
90 percona_playback/test/sqlbench-transactions-slow \88 percona_playback/test/sqlbench-transactions-slow \
91 percona_playback/test/sysbench-slow \89 percona_playback/test/sysbench-slow \
90 percona_playback/test/thread-pool-sysbench-slow \
92 percona_playback/test/preserve_query_time \91 percona_playback/test/preserve_query_time \
93 percona_playback/test/tcpdump_without_handshake \92 percona_playback/test/tcpdump_without_handshake \
94 percona_playback/test/tcpdump_fragmented_packet \93 percona_playback/test/tcpdump_fragmented_packet \
@@ -104,6 +103,7 @@
104percona_playback_test_crashme_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"103percona_playback_test_crashme_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
105percona_playback_test_sqlbench_transactions_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"104percona_playback_test_sqlbench_transactions_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
106percona_playback_test_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"105percona_playback_test_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
106percona_playback_test_thread_pool_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
107percona_playback_test_preserve_query_time_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"107percona_playback_test_preserve_query_time_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
108percona_playback_test_tcpdump_without_handshake_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"108percona_playback_test_tcpdump_without_handshake_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
109percona_playback_test_tcpdump_fragmented_packet_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"109percona_playback_test_tcpdump_fragmented_packet_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\"
@@ -135,6 +135,9 @@
135percona_playback_test_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc135percona_playback_test_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc
136percona_playback_test_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS)136percona_playback_test_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS)
137137
138percona_playback_test_thread_pool_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc
139percona_playback_test_thread_pool_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS)
140
138percona_playback_test_preserve_query_time_SOURCES= percona_playback/test/preserve_query_time.cc141percona_playback_test_preserve_query_time_SOURCES= percona_playback/test/preserve_query_time.cc
139percona_playback_test_preserve_query_time_LDADD= $(LDADD) $(MYSQL_LIBS)142percona_playback_test_preserve_query_time_LDADD= $(LDADD) $(MYSQL_LIBS)
140143
141144
=== modified file 'percona_playback/db_thread.cc'
--- percona_playback/db_thread.cc 2012-11-14 11:44:59 +0000
+++ percona_playback/db_thread.cc 2012-11-23 20:15:24 +0000
@@ -24,25 +24,23 @@
2424
25extern std::string g_session_init_query;25extern std::string g_session_init_query;
2626
27void DBThread::connect_and_init_session()27void DBThread::init_session()
28{28{
29 connect();29 if (g_session_init_query.empty())
30 if (!g_session_init_query.empty())30 return;
31 {31 QueryResult r;
32 QueryResult r;32 QueryResult er;
33 QueryResult er;33 execute_query(g_session_init_query, &r, er);
34 execute_query(g_session_init_query, &r, er);
35 }
36}34}
3735
38void DBThread::run()36void DBThread::run()
39{37{
40 connect_and_init_session();38 connect_and_init_session();
4139
42 QueryEntryPtr query;
43 while (true)40 while (true)
44 {41 {
45 queries.pop(query);42 QueryEntryPtr query;
43 queries->pop(query);
4644
47 if (query->is_shutdown())45 if (query->is_shutdown())
48 break;46 break;
4947
=== modified file 'percona_playback/db_thread.h'
--- percona_playback/db_thread.h 2012-11-14 11:44:59 +0000
+++ percona_playback/db_thread.h 2012-11-23 20:15:24 +0000
@@ -16,6 +16,7 @@
16#ifndef PERCONA_PLAYBACK_DB_THREAD_H16#ifndef PERCONA_PLAYBACK_DB_THREAD_H
17#define PERCONA_PLAYBACK_DB_THREAD_H17#define PERCONA_PLAYBACK_DB_THREAD_H
1818
19#include <memory>
19#include "percona_playback/visibility.h"20#include "percona_playback/visibility.h"
20#include <boost/thread.hpp>21#include <boost/thread.hpp>
21#include <boost/shared_ptr.hpp>22#include <boost/shared_ptr.hpp>
@@ -33,55 +34,49 @@
3334
34class QueryResult;35class QueryResult;
3536
36class DBThreadState
37{
38public:
39 virtual ~DBThreadState(){}
40};
41
42class DBThread37class DBThread
43{38{
4439
45private:40private:
46 boost::thread thread;41 boost::thread thread;
47 uint64_t thread_id;42 uint64_t thread_id;
48 boost::shared_ptr<DBThreadState> state;
4943
50public:44public:
51 typedef tbb::concurrent_bounded_queue<QueryEntryPtr> Queries;45 typedef tbb::concurrent_bounded_queue<QueryEntryPtr> Queries;
52 Queries queries;46 boost::shared_ptr<Queries> queries;
5347
54 DBThread(uint64_t _thread_id) : thread_id(_thread_id) {48 DBThread(uint64_t _thread_id,
55 queries.set_capacity(g_db_thread_queue_depth);49 boost::shared_ptr<Queries> _queries) :
50 thread_id(_thread_id), queries(_queries) {
51 queries->set_capacity(g_db_thread_queue_depth);
56 }52 }
5753
58 virtual ~DBThread() {}54 virtual ~DBThread() {}
5955
60 void set_state(boost::shared_ptr<DBThreadState> new_state)
61 {
62 state= new_state;
63 }
64
65 boost::shared_ptr<DBThreadState> get_state() const
66 {
67 return state;
68 }
69
70 void join()56 void join()
71 {57 {
72 thread.join();58 thread.join();
73 }59 }
7460
75 void connect_and_init_session();61 bool connect_and_init_session()
62 {
63 if (connect())
64 {
65 init_session();
66 return true;
67 }
68 return false;
69 }
7670
77 virtual void connect()= 0;71 void init_session();
72 virtual bool connect()= 0;
7873
79 virtual void disconnect()= 0;74 virtual void disconnect()= 0;
80 virtual void execute_query(const std::string &query,75 virtual void execute_query(const std::string &query,
81 QueryResult *r,76 QueryResult *r,
82 const QueryResult &expected_result)= 0;77 const QueryResult &expected_result)= 0;
8378
84 void run();79 virtual void run();
8580
86 void start_thread();81 void start_thread();
87};82};
8883
=== removed file 'percona_playback/dispatcher.cc'
--- percona_playback/dispatcher.cc 2012-07-03 07:41:49 +0000
+++ percona_playback/dispatcher.cc 1970-01-01 00:00:00 +0000
@@ -1,105 +0,0 @@
1/* BEGIN LICENSE
2 * Copyright (C) 2011-2012 Percona Inc.
3 * This program is free software: you can redistribute it and/or modify it
4 * under the terms of the GNU General Public License version 2, as published
5 * by the Free Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful, but
8 * WITHOUT ANY WARRANTY; without even the implied warranties of
9 * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
10 * PURPOSE. See the GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License along
13 * with this program. If not, see <http://www.gnu.org/licenses/>.
14 * END LICENSE */
15#include "percona_playback/dispatcher.h"
16#include "percona_playback/db_thread.h"
17#include "percona_playback/plugin.h"
18
19#include <assert.h>
20
21Dispatcher g_dispatcher;
22
23extern percona_playback::DBClientPlugin *g_dbclient_plugin;
24
25boost::shared_ptr<DBThreadState>
26Dispatcher::get_thread_state(
27 uint64_t thread_id,
28 boost::function1<void, DBThread *> run_on_db_thread_create)
29{
30 DBExecutorsTable::accessor a;
31
32 if (executors.insert(a, thread_id))
33 {
34 DBThread *db_thread= g_dbclient_plugin->create(thread_id);
35 assert(db_thread);
36 a->second= db_thread;
37 if (!run_on_db_thread_create.empty())
38 run_on_db_thread_create(db_thread);
39 db_thread->start_thread();
40 }
41
42 return a->second->get_state();
43}
44
45void
46Dispatcher::dispatch(QueryEntryPtr query_entry)
47{
48 uint64_t thread_id= query_entry->getThreadId();
49 {
50 DBExecutorsTable::accessor a;
51 if (executors.insert(a, thread_id))
52 {
53 DBThread *db_thread= g_dbclient_plugin->create(thread_id);
54 a->second= db_thread;
55 db_thread->start_thread();
56 }
57 a->second->queries.push(query_entry);
58 }
59}
60
61bool
62Dispatcher::finish_and_wait(uint64_t thread_id)
63{
64 DBThread *db_thread= NULL;
65 {
66 DBExecutorsTable::accessor a;
67 if (executors.find(a, thread_id))
68 {
69 db_thread= a->second;
70 executors.erase(a);
71 }
72 }
73
74 if (!db_thread)
75 return false;
76
77 db_thread->queries.push(QueryEntryPtr(new FinishEntry()));
78 db_thread->join();
79
80 delete db_thread;
81
82 return true;
83}
84
85void
86Dispatcher::finish_all_and_wait()
87{
88 QueryEntryPtr shutdown_command(new FinishEntry());
89 while(executors.size())
90 {
91 uint64_t thread_id;
92 DBThread *t;
93 {
94 DBExecutorsTable::const_iterator iter= executors.begin();
95 thread_id= (*iter).first;
96 t= (*iter).second;
97 }
98 executors.erase(thread_id);
99
100 t->queries.push(shutdown_command);
101 t->join();
102
103 delete t;
104 }
105}
1060
=== removed file 'percona_playback/dispatcher.h'
--- percona_playback/dispatcher.h 2012-07-06 07:15:28 +0000
+++ percona_playback/dispatcher.h 1970-01-01 00:00:00 +0000
@@ -1,46 +0,0 @@
1/* BEGIN LICENSE
2 * Copyright (C) 2011 Percona Inc.
3 * This program is free software: you can redistribute it and/or modify it
4 * under the terms of the GNU General Public License version 2, as published
5 * by the Free Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful, but
8 * WITHOUT ANY WARRANTY; without even the implied warranties of
9 * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
10 * PURPOSE. See the GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License along
13 * with this program. If not, see <http://www.gnu.org/licenses/>.
14 * END LICENSE */
15
16#ifndef PERCONA_PLAYBACK_DISPATCHER_H
17#define PERCONA_PLAYBACK_DISPATCHER_H
18
19#include <tbb/concurrent_hash_map.h>
20#include <boost/function.hpp>
21
22#include "percona_playback/query_entry.h"
23
24class DBThread;
25class DBThreadState;
26
27class Dispatcher
28{
29 typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable;
30 DBExecutorsTable executors;
31 void db_thread_func(DBThread *thread);
32 void start_thread(DBThread *thread);
33
34public:
35 boost::shared_ptr<DBThreadState>
36 get_thread_state(uint64_t thread_id,
37 boost::function1<void, DBThread *>
38 run_on_db_thread_create);
39 void dispatch(QueryEntryPtr query_entry);
40 bool finish_and_wait(uint64_t thread_id);
41 void finish_all_and_wait();
42};
43
44extern Dispatcher g_dispatcher;
45
46#endif /* PERCONA_PLAYBACK_DISPATCHER_H */
470
=== modified file 'percona_playback/libdrizzle_client/libdrizzle_client.cc'
--- percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-07-23 07:08:23 +0000
+++ percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-11-23 20:15:24 +0000
@@ -39,13 +39,13 @@
3939
40 public:40 public:
41 LibDrizzleDBThread(uint64_t _thread_id, LibDrizzleOptions *opt) :41 LibDrizzleDBThread(uint64_t _thread_id, LibDrizzleOptions *opt) :
42 DBThread(_thread_id),42 DBThread(_thread_id, boost::shared_ptr<Queries>(new Queries())),
43 driz(NULL),43 driz(NULL),
44 options(opt)44 options(opt)
45 {45 {
46 }46 }
4747
48 void connect();48 bool connect();
49 void disconnect();49 void disconnect();
50 void execute_query(const std::string &query, QueryResult *r,50 void execute_query(const std::string &query, QueryResult *r,
51 const QueryResult &expected_result);51 const QueryResult &expected_result);
@@ -70,7 +70,7 @@
70 unsigned int port;70 unsigned int port;
71};71};
7272
73void LibDrizzleDBThread::connect()73bool LibDrizzleDBThread::connect()
74{74{
75 driz= drizzle_create(NULL);75 driz= drizzle_create(NULL);
76 assert(driz != NULL);76 assert(driz != NULL);
@@ -81,6 +81,7 @@
81 options->user.c_str(),81 options->user.c_str(),
82 options->password.c_str(),82 options->password.c_str(),
83 options->schema.c_str(), DRIZZLE_CON_MYSQL);83 options->schema.c_str(), DRIZZLE_CON_MYSQL);
84 return true;
84}85}
8586
86void LibDrizzleDBThread::disconnect()87void LibDrizzleDBThread::disconnect()
8788
=== modified file 'percona_playback/mysql_client/mysql_client.cc'
--- percona_playback/mysql_client/mysql_client.cc 2012-07-04 09:10:16 +0000
+++ percona_playback/mysql_client/mysql_client.cc 2012-11-23 20:15:24 +0000
@@ -42,17 +42,23 @@
42 unsigned int port;42 unsigned int port;
43};43};
4444
45void MySQLDBThread::connect()45bool MySQLDBThread::connect()
46{46{
47 mysql_init(&handle);47 mysql_init(&handle);
48 mysql_real_connect(&handle,48 if (!mysql_real_connect(&handle,
49 options->host.c_str(),49 options->host.c_str(),
50 options->user.c_str(),50 options->user.c_str(),
51 options->password.c_str(),51 options->password.c_str(),
52 options->schema.c_str(),52 options->schema.c_str(),
53 options->port,53 options->port,
54 "",54 "",
55 0);55 CLIENT_MULTI_STATEMENTS))
56 {
57 fprintf(stderr, "Can't connect to server: %s\n",
58 mysql_error(&handle));
59 return false;
60 }
61 return true;
56}62}
5763
58void MySQLDBThread::disconnect()64void MySQLDBThread::disconnect()
@@ -63,29 +69,50 @@
63void MySQLDBThread::execute_query(const std::string &query, QueryResult *r,69void MySQLDBThread::execute_query(const std::string &query, QueryResult *r,
64 const QueryResult &)70 const QueryResult &)
65{71{
66 int mr= mysql_real_query(&handle, query.c_str(), query.length());72 int mr;
67 if(mr != 0)73 for(unsigned i = 0; i < max_err_num; ++i)
68 {74 {
69 r->setError(mr);75 mr= mysql_real_query(&handle, query.c_str(), query.length());
70 }76 r->setError(mr);
71 else77 if(mr != 0)
72 {78 {
73 MYSQL_RES* mysql_res= NULL;79 fprintf(stderr,
7480 "Error during query: %s, number of tries %u\n",
75 r->setError(mr);81 mysql_error(&handle),
76 r->setWarningCount(mysql_warning_count(&handle));82 i);
7783 disconnect();
78 mysql_res= mysql_store_result(&handle);84 connect_and_init_session();
7985 }
80 if (mysql_res != NULL)
81 r->setRowsSent(mysql_num_rows(mysql_res));
82 else86 else
87 {
88 r->setWarningCount(mysql_warning_count(&handle));
83 r->setRowsSent(0);89 r->setRowsSent(0);
8490 do
85 mysql_free_result(mysql_res);91 {
92 MYSQL_RES* mysql_res= NULL;
93
94 mysql_res= mysql_store_result(&handle);
95
96 if (mysql_res != NULL)
97 {
98 r->setRowsSent(mysql_num_rows(mysql_res));
99 mysql_free_result(mysql_res);
100 }
101
102 } while(!mysql_next_result(&handle));
103
104 break;
105 }
86 }106 }
87}107}
88108
109void MySQLDBThread::run()
110{
111 mysql_thread_init();
112 DBThread::run();
113 mysql_thread_end();
114}
115
89class MySQLDBClientPlugin : public percona_playback::DBClientPlugin116class MySQLDBClientPlugin : public percona_playback::DBClientPlugin
90{117{
91private:118private:
@@ -126,6 +153,21 @@
126 return -1;153 return -1;
127 }154 }
128155
156 if (!active)
157 return 0;
158
159 if (!mysql_thread_safe())
160 {
161 fprintf(stderr, "libmysqlclient is not thread safe\n");
162 return -1;
163 }
164
165 if (mysql_library_init(0, NULL, NULL))
166 {
167 fprintf(stderr, "could not initialize mysql library\n");
168 return -1;
169 }
170
129 if (vm.count("mysql-host"))171 if (vm.count("mysql-host"))
130 {172 {
131 options.host= vm["mysql-host"].as<std::string>();173 options.host= vm["mysql-host"].as<std::string>();
132174
=== modified file 'percona_playback/mysql_client/mysql_client.h'
--- percona_playback/mysql_client/mysql_client.h 2012-06-15 10:53:23 +0000
+++ percona_playback/mysql_client/mysql_client.h 2012-11-23 20:15:24 +0000
@@ -10,16 +10,19 @@
10 private:10 private:
11 MYSQL handle;11 MYSQL handle;
12 MySQLOptions *options;12 MySQLOptions *options;
13 static const unsigned max_err_num = 10;
1314
14 public:15 public:
15 MySQLDBThread(uint64_t _thread_id, MySQLOptions *opt) :16 MySQLDBThread(uint64_t _thread_id, MySQLOptions *opt) :
16 DBThread(_thread_id),17 DBThread(_thread_id,
18 boost::shared_ptr<Queries>(new Queries())),
17 options(opt)19 options(opt)
18 {20 {
19 }21 }
2022
21 void connect();23 bool connect();
22 void disconnect();24 void disconnect();
23 void execute_query(const std::string &query, QueryResult *r,25 void execute_query(const std::string &query, QueryResult *r,
24 const QueryResult &expected_result);26 const QueryResult &expected_result);
27 void run();
25};28};
2629
=== modified file 'percona_playback/null_dbclient/null_dbclient.cc'
--- percona_playback/null_dbclient/null_dbclient.cc 2012-04-05 04:47:09 +0000
+++ percona_playback/null_dbclient/null_dbclient.cc 2012-11-23 20:15:24 +0000
@@ -20,10 +20,11 @@
20class NULLDBThread : public DBThread20class NULLDBThread : public DBThread
21{21{
22 public:22 public:
23 NULLDBThread(uint64_t _thread_id) : DBThread(_thread_id) {23 NULLDBThread(uint64_t _thread_id) :
24 }24 DBThread(_thread_id,
25 boost::shared_ptr<Queries>(new Queries())) {}
2526
26 void connect() {};27 bool connect() { return true; };
27 void disconnect() {};28 void disconnect() {};
28 void execute_query(const std::string &, QueryResult *r,29 void execute_query(const std::string &, QueryResult *r,
29 const QueryResult &expected_result) {30 const QueryResult &expected_result) {
3031
=== modified file 'percona_playback/percona_playback.cc'
--- percona_playback/percona_playback.cc 2012-11-14 11:44:59 +0000
+++ percona_playback/percona_playback.cc 2012-11-23 20:15:24 +0000
@@ -40,6 +40,7 @@
4040
41percona_playback::DBClientPlugin *g_dbclient_plugin= NULL;41percona_playback::DBClientPlugin *g_dbclient_plugin= NULL;
42percona_playback::InputPlugin *g_input_plugin= NULL;42percona_playback::InputPlugin *g_input_plugin= NULL;
43percona_playback::DispatcherPlugin *g_dispatcher_plugin= NULL;
43unsigned int g_db_thread_queue_depth;44unsigned int g_db_thread_queue_depth;
44std::string g_session_init_query;45std::string g_session_init_query;
4546
@@ -132,6 +133,22 @@
132 std::cerr << _("Selected Input Plugin: ")133 std::cerr << _("Selected Input Plugin: ")
133 << g_input_plugin->name134 << g_input_plugin->name
134 << std::endl;135 << std::endl;
136
137 std::cerr << std::endl << _("Loaded Dispatcher Plugins: ");
138
139 BOOST_FOREACH(const PluginRegistry::DispatcherPluginPair &pp,
140 PluginRegistry::singleton().dispatcher_plugins)
141 {
142 std::cerr << pp.first << " ";
143 }
144
145 std::cerr << std::endl;
146 std::cerr << std::endl;
147
148 assert(g_dispatcher_plugin);
149 std::cerr << _("Selected dispatcher Plugin: ")
150 << g_dispatcher_plugin->name
151 << std::endl;
135}152}
136153
137int percona_playback_argv(percona_playback_st *the_percona_playback,154int percona_playback_argv(percona_playback_st *the_percona_playback,
@@ -152,12 +169,13 @@
152 db_options.add_options()169 db_options.add_options()
153 ("db-plugin", po::value<std::string>(), _("Database plugin"))170 ("db-plugin", po::value<std::string>(), _("Database plugin"))
154 ("input-plugin", po::value<std::string>(), _("Input plugin"))171 ("input-plugin", po::value<std::string>(), _("Input plugin"))
172 ("dispatcher-plugin", po::value<std::string>(), _("Dispatcher plugin"))
155 ("queue-depth", po::value<unsigned int>(),173 ("queue-depth", po::value<unsigned int>(),
156 _("Queue depth for DB executor (thread). The larger this number the"174 _("Queue depth for DB executor (thread). The larger this number the"
157 " greater the played-back workload can deviate from the original workload"175 " greater the played-back workload can deviate from the original workload"
158 " as some connections may be up to queue-depth behind. (default 1)"))176 " as some connections may be up to queue-depth behind. (default 1)"))
159 ("session-init-query",177 ("session-init-query",
160 po::value<std::string>()->default_value(""),178 po::value<std::string>(&g_session_init_query)->default_value(""),
161 _("This query will be executed just after each connect to db"))179 _("This query will be executed just after each connect to db"))
162 ;180 ;
163181
@@ -237,6 +255,32 @@
237 }255 }
238 g_input_plugin->active= true;256 g_input_plugin->active= true;
239257
258 if (vm.count("dispatcher-plugin"))
259 {
260 PluginRegistry::DispatcherPluginMap::iterator it;
261 it= PluginRegistry::singleton().dispatcher_plugins.find(
262 vm["dispatcher-plugin"].as<std::string>());
263 if (it == PluginRegistry::singleton().dispatcher_plugins.end())
264 {
265 fprintf(stderr, _("Invalid Dispatcher Plugin\n"));
266 return -1;
267 }
268 g_dispatcher_plugin= it->second;
269 }
270 else
271 {
272 PluginRegistry::DispatcherPluginMap::iterator it;
273 it= PluginRegistry::singleton().dispatcher_plugins.
274 find("thread-per-connection");
275 if (it == PluginRegistry::singleton().dispatcher_plugins.end())
276 {
277 fprintf(stderr, _("Invalid Dispatcher plugin\n"));
278 return -1;
279 }
280 g_dispatcher_plugin= it->second;
281 }
282 g_dispatcher_plugin->active= true;
283
240 if (vm.count("help") || argc==1)284 if (vm.count("help") || argc==1)
241 {285 {
242 help(options_description);286 help(options_description);
@@ -299,6 +343,7 @@
299 std::cerr << _("Database Plugin: ") << g_dbclient_plugin->name << std::endl;343 std::cerr << _("Database Plugin: ") << g_dbclient_plugin->name << std::endl;
300 std::cerr << _(" Running...") << std::endl;344 std::cerr << _(" Running...") << std::endl;
301345
346 g_dispatcher_plugin->run();
302 g_input_plugin->run(*r);347 g_input_plugin->run(*r);
303348
304 BOOST_FOREACH(const PluginRegistry::ReportPluginPair pp,349 BOOST_FOREACH(const PluginRegistry::ReportPluginPair pp,
305350
=== modified file 'percona_playback/plugin.h'
--- percona_playback/plugin.h 2012-06-24 04:28:41 +0000
+++ percona_playback/plugin.h 2012-11-23 20:15:24 +0000
@@ -20,6 +20,9 @@
20#include <vector>20#include <vector>
21#include <string>21#include <string>
22#include <map>22#include <map>
23#include <boost/shared_ptr.hpp>
24#include <boost/function.hpp>
25#include "percona_playback/query_entry.h"
23#include <percona_playback/visibility.h>26#include <percona_playback/visibility.h>
24#include <percona_playback/version.h>27#include <percona_playback/version.h>
2528
@@ -31,6 +34,7 @@
31}34}
3235
33class DBThread;36class DBThread;
37class DBThreadState;
34class QueryResult;38class QueryResult;
35class percona_playback_run_result;39class percona_playback_run_result;
3640
@@ -102,6 +106,21 @@
102106
103};107};
104108
109class DispatcherPlugin : public plugin
110{
111 public:
112 std::string name;
113
114 DispatcherPlugin(const std::string &_name) : name(_name) {}
115
116 virtual void dispatch(QueryEntryPtr query_entry)= 0;
117 virtual bool finish_and_wait(uint64_t thread_id)= 0;
118 virtual void finish_all_and_wait()= 0;
119
120 virtual void run() {};
121
122};
123
105class PluginRegistry124class PluginRegistry
106{125{
107 public:126 public:
@@ -122,6 +141,9 @@
122 typedef std::map<std::string, InputPlugin*> InputPluginMap;141 typedef std::map<std::string, InputPlugin*> InputPluginMap;
123 typedef std::pair<std::string, InputPlugin*> InputPluginPair;142 typedef std::pair<std::string, InputPlugin*> InputPluginPair;
124143
144 typedef std::map<std::string, DispatcherPlugin*> DispatcherPluginMap;
145 typedef std::pair<std::string, DispatcherPlugin*> DispatcherPluginPair;
146
125 typedef std::map<std::string, plugin*> PluginMap;147 typedef std::map<std::string, plugin*> PluginMap;
126 typedef std::pair<std::string, plugin*> PluginPair;148 typedef std::pair<std::string, plugin*> PluginPair;
127149
@@ -129,6 +151,7 @@
129 DBClientPluginMap dbclient_plugins;151 DBClientPluginMap dbclient_plugins;
130 ReportPluginMap report_plugins;152 ReportPluginMap report_plugins;
131 InputPluginMap input_plugins;153 InputPluginMap input_plugins;
154 DispatcherPluginMap dispatcher_plugins;
132155
133 void add(const std::string &name, plugin* plugin_object)156 void add(const std::string &name, plugin* plugin_object)
134 {157 {
@@ -154,6 +177,12 @@
154 all_plugins.insert(PluginPair(name, input_plugin));177 all_plugins.insert(PluginPair(name, input_plugin));
155 }178 }
156179
180 void add(const std::string &name, DispatcherPlugin* dispatcher_plugin)
181 {
182 dispatcher_plugins.insert(DispatcherPluginPair(name, dispatcher_plugin));
183 all_plugins.insert(PluginPair(name, dispatcher_plugin));
184 }
185
157};186};
158187
159188
160189
=== modified file 'percona_playback/query_entry.h'
--- percona_playback/query_entry.h 2012-07-03 02:37:51 +0000
+++ percona_playback/query_entry.h 2012-11-23 20:15:24 +0000
@@ -45,7 +45,8 @@
45class FinishEntry : public QueryEntry45class FinishEntry : public QueryEntry
46{46{
47public:47public:
48 FinishEntry() { set_shutdown(); }48 FinishEntry(uint64_t _thread_id) :
49 QueryEntry (_thread_id, true) {}
49 bool is_quit() { return false; }50 bool is_quit() { return false; }
5051
51 void execute(DBThread *) {}52 void execute(DBThread *) {}
5253
=== modified file 'percona_playback/query_log/query_log.cc'
--- percona_playback/query_log/query_log.cc 2012-11-08 22:17:07 +0000
+++ percona_playback/query_log/query_log.cc 2012-11-23 20:15:24 +0000
@@ -39,7 +39,6 @@
39#include <percona_playback/db_thread.h>39#include <percona_playback/db_thread.h>
40#include <percona_playback/query_log/query_log.h>40#include <percona_playback/query_log/query_log.h>
41#include <percona_playback/query_result.h>41#include <percona_playback/query_result.h>
42#include "percona_playback/dispatcher.h"
43#include <percona_playback/gettext.h>42#include <percona_playback/gettext.h>
4443
45#include <boost/foreach.hpp>44#include <boost/foreach.hpp>
@@ -51,6 +50,8 @@
51static bool g_run_set_timestamp;50static bool g_run_set_timestamp;
52static bool g_preserve_query_time;51static bool g_preserve_query_time;
5352
53extern percona_playback::DispatcherPlugin *g_dispatcher_plugin;
54
54class ParseQueryLogFunc: public tbb::filter {55class ParseQueryLogFunc: public tbb::filter {
55public:56public:
56 ParseQueryLogFunc(FILE *input_file_,57 ParseQueryLogFunc(FILE *input_file_,
@@ -83,7 +84,7 @@
83 new std::vector<boost::shared_ptr<QueryLogEntry> >();84 new std::vector<boost::shared_ptr<QueryLogEntry> >();
8485
85 boost::shared_ptr<QueryLogEntry> tmp_entry(new QueryLogEntry());86 boost::shared_ptr<QueryLogEntry> tmp_entry(new QueryLogEntry());
86 entries->push_back(tmp_entry);87 // entries->push_back(tmp_entry);
8788
88 char *line= NULL;89 char *line= NULL;
89 size_t buflen = 0;90 size_t buflen = 0;
@@ -130,9 +131,10 @@
130131
131 if (strncmp(p, "# User@Host", strlen("# User@Host")) == 0)132 if (strncmp(p, "# User@Host", strlen("# User@Host")) == 0)
132 {133 {
134 if (!tmp_entry->getQuery().empty())
135 entries->push_back(tmp_entry);
133 count++;136 count++;
134 tmp_entry.reset(new QueryLogEntry());137 tmp_entry.reset(new QueryLogEntry());
135 entries->push_back(tmp_entry);
136 (*this->nr_entries)++;138 (*this->nr_entries)++;
137 }139 }
138140
@@ -172,6 +174,9 @@
172 p= line;174 p= line;
173 }175 }
174176
177 if (!tmp_entry->getQuery().empty())
178 entries->push_back(tmp_entry);
179
175 free(line);180 free(line);
176 return entries;181 return entries;
177}182}
@@ -239,7 +244,19 @@
239 && s.compare(0, timestamp_query.length(), timestamp_query) == 0)244 && s.compare(0, timestamp_query.length(), timestamp_query) == 0)
240 set_timestamp_query= s;245 set_timestamp_query= s;
241 else246 else
242 query.append(s);247 {
248 //Append space insead of \r\n
249 std::string::const_iterator end = s.end() - 1;
250 if (s.length() >= 2 && *(s.end() - 2) == '\r')
251 --end;
252 //Remove initial spaces for best query viewing in reports
253 std::string::const_iterator begin;
254 for (begin = s.begin(); begin != end; ++begin)
255 if (*begin != ' ' && *begin != '\t')
256 break;
257 query.append(begin, end);
258 query.append(" ");
259 }
243}260}
244261
245bool QueryLogEntry::parse_metadata(const std::string &s)262bool QueryLogEntry::parse_metadata(const std::string &s)
@@ -281,13 +298,13 @@
281 r= true;298 r= true;
282 }299 }
283 }300 }
284301/*
285 if (s[0] == '#' && strncmp(s.c_str(), "# administrator", strlen("# administrator")))302 if (s[0] == '#' && strncmp(s.c_str(), "# administrator", strlen("# administrator")))
286 {303 {
287 query.append(s);304 query.append(s);
288 r= true;305 r= true;
289 }306 }
290307*/
291 return r;308 return r;
292}309}
293310
@@ -300,7 +317,7 @@
300 for (unsigned int i=0; i< input->size(); i++)317 for (unsigned int i=0; i< input->size(); i++)
301 {318 {
302 // usleep(10);319 // usleep(10);
303 g_dispatcher.dispatch((*input)[i]);320 g_dispatcher_plugin->dispatch((*input)[i]);
304 }321 }
305 delete input;322 delete input;
306 return NULL;323 return NULL;
@@ -330,7 +347,7 @@
330 p.add_filter(f4);347 p.add_filter(f4);
331 p.run(2);348 p.run(2);
332349
333 g_dispatcher.finish_all_and_wait();350 g_dispatcher_plugin->finish_all_and_wait();
334351
335 r->n_log_entries= entries;352 r->n_log_entries= entries;
336 r->n_queries= queries;353 r->n_queries= queries;
337354
=== modified file 'percona_playback/tcpdump/connection_state.cc'
--- percona_playback/tcpdump/connection_state.cc 2012-07-02 17:51:47 +0000
+++ percona_playback/tcpdump/connection_state.cc 2012-11-23 20:15:24 +0000
@@ -25,9 +25,11 @@
25#include "percona_playback/plugin.h"25#include "percona_playback/plugin.h"
2626
27extern percona_playback::DBClientPlugin *g_dbclient_plugin;27extern percona_playback::DBClientPlugin *g_dbclient_plugin;
28extern percona_playback::DispatcherPlugin *g_dispatcher_plugin;
2829
29void30void
30ConnectionState::ProcessMysqlPkts(const u_char *pkts,31ConnectionState::ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr,
32 const u_char *pkts,
31 u_int pkts_len,33 u_int pkts_len,
32 const timeval &ts,34 const timeval &ts,
33 const AddrPort &addr_port,35 const AddrPort &addr_port,
@@ -76,13 +78,13 @@
76 {78 {
77 case PKT_QUERY:79 case PKT_QUERY:
78 if (!query.empty())80 if (!query.empty())
79 DispatchQuery(ts, query, addr_port);81 DispatchQuery(cs_ptr, ts, query, addr_port);
80 ++stats.nr_of_parsed_queries;82 ++stats.nr_of_parsed_queries;
81 ++stats.nr_of_parsed_packets;83 ++stats.nr_of_parsed_packets;
82 break;84 break;
8385
84 case PKT_RESULT:86 case PKT_RESULT:
85 DispatchResult(ts, addr_port);87 DispatchResult(cs_ptr, ts, addr_port);
86 ++stats.nr_of_parsed_packets;88 ++stats.nr_of_parsed_packets;
87 break;89 break;
8890
@@ -107,12 +109,6 @@
107109
108}110}
109111
110void
111ConnectionState::ProcessFinishConnection()
112{
113 db_thread->queries.push(QueryEntryPtr(new FinishEntry()));
114}
115
116PktResult112PktResult
117ConnectionState::ParseMysqlPkt(IN UCharBuffer &buff,113ConnectionState::ParseMysqlPkt(IN UCharBuffer &buff,
118 OUT std::string &query)114 OUT std::string &query)
@@ -303,23 +299,32 @@
303}299}
304300
305void301void
306ConnectionState::DispatchQuery(const timeval &ts,302ConnectionState::DispatchQuery(/* Having shared pointer to "this" as an argument
303 is just temporary step. We need a shared pointer
304 to connection state inside of query entry. The
305 better way to do this is to move code that creates
306 query entries to the upper layer.
307 */
308 boost::shared_ptr<ConnectionState> cs_ptr,
309 const timeval &ts,
307 const std::string &query,310 const std::string &query,
308 const AddrPort &addr_port)311 const AddrPort &addr_port)
309{312{
310 boost::shared_ptr<TcpdumpQueryEntry> 313 boost::shared_ptr<TcpdumpQueryEntry>
311 query_entry(new TcpdumpQueryEntry(ts,314 query_entry(new TcpdumpQueryEntry(cs_ptr,
315 ts,
312 query,316 query,
313 addr_port));317 addr_port));
314 db_thread->queries.push(query_entry);318 g_dispatcher_plugin->dispatch(query_entry);
315}319}
316320
317void321void
318ConnectionState::DispatchResult(const timeval &ts,322ConnectionState::DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr,
323 const timeval &ts,
319 const AddrPort &addr_port)324 const AddrPort &addr_port)
320{325{
321 boost::shared_ptr<TcpdumpResultEntry> 326 boost::shared_ptr<TcpdumpResultEntry>
322 result_entry(new TcpdumpResultEntry(addr_port, ts, last_query_result));327 result_entry(new TcpdumpResultEntry(cs_ptr, addr_port, ts, last_query_result));
323 db_thread->queries.push(result_entry);328 g_dispatcher_plugin->dispatch(result_entry);
324}329}
325330
326331
=== modified file 'percona_playback/tcpdump/connection_state.h'
--- percona_playback/tcpdump/connection_state.h 2012-07-03 07:41:49 +0000
+++ percona_playback/tcpdump/connection_state.h 2012-11-23 20:15:24 +0000
@@ -94,8 +94,7 @@
94};94};
9595
9696
97class ConnectionState : public DBThreadState97class ConnectionState
98
99{98{
10099
101public:100public:
@@ -107,7 +106,7 @@
107 SERVER106 SERVER
108 };107 };
109108
110 ConnectionState() :109 ConnectionState(uint64_t _thread_id) :
111 current_origin(UNDEF),110 current_origin(UNDEF),
112 fragmented(false),111 fragmented(false),
113 handshake_from_client(false),112 handshake_from_client(false),
@@ -126,7 +125,7 @@
126 (drizzle_con_options_t)125 (drizzle_con_options_t)
127 DRIZZLE_CON_MYSQL),126 DRIZZLE_CON_MYSQL),
128 drizzle_con_free),127 drizzle_con_free),
129 db_thread(NULL)128 thread_id(_thread_id)
130 {129 {
131 drizzle_con->result= NULL;130 drizzle_con->result= NULL;
132 }131 }
@@ -136,18 +135,15 @@
136 drizzle_result_free_all(drizzle_con.get());135 drizzle_result_free_all(drizzle_con.get());
137 }136 }
138137
139 void ProcessMysqlPkts(const u_char *pkts,138 void ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr,
139 const u_char *pkts,
140 u_int total_len,140 u_int total_len,
141 const timeval &ts,141 const timeval &ts,
142 const AddrPort &addr_port,142 const AddrPort &addr_port,
143 OUT TcpdumpMysqlParserStats &stats);143 OUT TcpdumpMysqlParserStats &stats);
144144
145 void ProcessFinishConnection();
146
147 void SetCurrentOrigin(Origin o) { current_origin = o; }145 void SetCurrentOrigin(Origin o) { current_origin = o; }
148146
149 void SetDBThread(DBThread *t) { db_thread= t; }
150
151 LastExecutedQueryInfo last_executed_query_info;147 LastExecutedQueryInfo last_executed_query_info;
152148
153private:149private:
@@ -159,11 +155,13 @@
159 PktResult ServerPacket(IN UCharBuffer &buff);155 PktResult ServerPacket(IN UCharBuffer &buff);
160 PktResult ClientPacket(IN UCharBuffer &buff, OUT std::string &query);156 PktResult ClientPacket(IN UCharBuffer &buff, OUT std::string &query);
161157
162 void DispatchQuery(const timeval &ts,158 void DispatchQuery(boost::shared_ptr<ConnectionState> cs_ptr,
159 const timeval &ts,
163 const std::string &query,160 const std::string &query,
164 const AddrPort &addr_port);161 const AddrPort &addr_port);
165162
166 void DispatchResult(const timeval &ts,163 void DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr,
164 const timeval &ts,
167 const AddrPort &addr_port);165 const AddrPort &addr_port);
168166
169 Origin current_origin;167 Origin current_origin;
@@ -179,7 +177,7 @@
179177
180 QueryResult last_query_result;178 QueryResult last_query_result;
181179
182 DBThread *db_thread;180 uint64_t thread_id;
183181
184};182};
185183
186184
=== modified file 'percona_playback/tcpdump/pcap_packets_parser.cc'
--- percona_playback/tcpdump/pcap_packets_parser.cc 2012-07-03 07:41:49 +0000
+++ percona_playback/tcpdump/pcap_packets_parser.cc 2012-11-23 20:15:24 +0000
@@ -18,7 +18,6 @@
1818
19#include "percona_playback/db_thread.h"19#include "percona_playback/db_thread.h"
20#include "percona_playback/plugin.h"20#include "percona_playback/plugin.h"
21#include "percona_playback/dispatcher.h"
2221
23#include <netinet/in.h>22#include <netinet/in.h>
24#include <arpa/inet.h>23#include <arpa/inet.h>
@@ -29,6 +28,7 @@
29#define SNAP_LEN 16500 // pcap's max capture size28#define SNAP_LEN 16500 // pcap's max capture size
3029
31extern percona_playback::DBClientPlugin *g_dbclient_plugin;30extern percona_playback::DBClientPlugin *g_dbclient_plugin;
31extern percona_playback::DispatcherPlugin *g_dispatcher_plugin;
3232
33void33void
34PcapPacketsParser::ParsePkt(const struct pcap_pkthdr *header,34PcapPacketsParser::ParsePkt(const struct pcap_pkthdr *header,
@@ -81,7 +81,8 @@
81 if (size_mysql == 0 &&81 if (size_mysql == 0 &&
82 ((tcp->th_flags & TH_FIN) || (tcp->th_flags & TH_RST)))82 ((tcp->th_flags & TH_FIN) || (tcp->th_flags & TH_RST)))
83 {83 {
84 g_dispatcher.finish_and_wait(addr_port.ThreadId());84 g_dispatcher_plugin->finish_and_wait(addr_port.ThreadId());
85 RemoveConnectionState(addr_port.ThreadId());
85 return;86 return;
86 }87 }
8788
@@ -97,37 +98,50 @@
97 }98 }
9899
99 /* If there is no DBThread with such id create it */100 /* If there is no DBThread with such id create it */
100 boost::shared_ptr<DBThreadState>101 boost::shared_ptr<ConnectionState>
101 state= g_dispatcher.get_thread_state(addr_port.ThreadId(),102 state= GetConnectionState(addr_port.ThreadId());
102 boost::bind(&PcapPacketsParser::CreateConnectionState,
103 this,
104 _1));
105103
106 assert(state.get());104 assert(state.get());
107105
108 ((ConnectionState *)state.get())->SetCurrentOrigin(origin);106 state->SetCurrentOrigin(origin);
109 ((ConnectionState *)state.get())->ProcessMysqlPkts(mysql,107 state->ProcessMysqlPkts(state,
110 size_mysql,108 mysql,
111 header->ts,109 size_mysql,
112 addr_port,110 header->ts,
113 stats);111 addr_port,
112 stats);
114}113}
115114
116void115void
117PcapPacketsParser::WaitForUnfinishedTasks()116PcapPacketsParser::WaitForUnfinishedTasks()
118{117{
119 g_dispatcher.finish_all_and_wait();118 g_dispatcher_plugin->finish_all_and_wait();
120}119}
121120
122void121boost::shared_ptr<ConnectionState>
123PcapPacketsParser::CreateConnectionState(DBThread *db_thread)122PcapPacketsParser::GetConnectionState(uint64_t thread_id)
124{123{
125 assert(db_thread);124 Connections::iterator it = connections.find(thread_id);
126 boost::shared_ptr<ConnectionState> state(new ConnectionState());125 if (it == connections.end())
126 return CreateConnectionState(thread_id);
127 return it->second;
128}
129
130boost::shared_ptr<ConnectionState>
131PcapPacketsParser::CreateConnectionState(uint64_t thread_id)
132{
133 boost::shared_ptr<ConnectionState> state(new ConnectionState(thread_id));
127 state->last_executed_query_info.end_pcap_timestamp=134 state->last_executed_query_info.end_pcap_timestamp=
128 first_packet_pcap_timestamp;135 first_packet_pcap_timestamp;
129 state->last_executed_query_info.end_timestamp=136 state->last_executed_query_info.end_timestamp=
130 first_packet_timestamp;137 first_packet_timestamp;
131 db_thread->set_state(state);138 connections[thread_id] = state;
132 state->SetDBThread(db_thread);139 return state;
133}140}
141
142void
143PcapPacketsParser::RemoveConnectionState(uint64_t thread_id)
144{
145 connections.erase(thread_id);
146}
147
134148
=== modified file 'percona_playback/tcpdump/pcap_packets_parser.h'
--- percona_playback/tcpdump/pcap_packets_parser.h 2012-06-19 15:26:53 +0000
+++ percona_playback/tcpdump/pcap_packets_parser.h 2012-11-23 20:15:24 +0000
@@ -23,6 +23,8 @@
23#include <tbb/concurrent_queue.h>23#include <tbb/concurrent_queue.h>
24#include <boost/thread.hpp>24#include <boost/thread.hpp>
25#include <boost/bind.hpp>25#include <boost/bind.hpp>
26#include <boost/unordered_map.hpp>
27#include <boost/shared_ptr.hpp>
2628
27class DBThread;29class DBThread;
2830
@@ -32,10 +34,14 @@
32 timeval first_packet_timestamp;34 timeval first_packet_timestamp;
33 timeval first_packet_pcap_timestamp;35 timeval first_packet_pcap_timestamp;
34 bool was_first_packet;36 bool was_first_packet;
37 typedef boost::unordered_map<uint64_t,
38 boost::shared_ptr<ConnectionState> >
39 Connections;
40 Connections connections;
3541
36public:42public:
3743
38 PcapPacketsParser() : was_first_packet(false)44 PcapPacketsParser() : was_first_packet(false), connections(10000)
39 {45 {
40 first_packet_timestamp.tv_sec= first_packet_timestamp.tv_usec= 0;46 first_packet_timestamp.tv_sec= first_packet_timestamp.tv_usec= 0;
41 first_packet_pcap_timestamp.tv_sec= first_packet_pcap_timestamp.tv_usec= 0;47 first_packet_pcap_timestamp.tv_sec= first_packet_pcap_timestamp.tv_usec= 0;
@@ -52,9 +58,11 @@
5258
53 void WaitForUnfinishedTasks();59 void WaitForUnfinishedTasks();
5460
55 void CreateConnectionState(DBThread *db_thread);
5661
57private:62private:
63 boost::shared_ptr<ConnectionState> CreateConnectionState(uint64_t thread_id);
64 boost::shared_ptr<ConnectionState> GetConnectionState(uint64_t thread_id);
65 void RemoveConnectionState(uint64_t thread_id);
5866
59 void ParsePkt(const struct pcap_pkthdr *header,67 void ParsePkt(const struct pcap_pkthdr *header,
60 const u_char *packet);68 const u_char *packet);
6169
=== modified file 'percona_playback/tcpdump/tcpdump_query_entries.cc'
--- percona_playback/tcpdump/tcpdump_query_entries.cc 2012-07-09 11:54:52 +0000
+++ percona_playback/tcpdump/tcpdump_query_entries.cc 2012-11-23 20:15:24 +0000
@@ -30,15 +30,10 @@
30 QueryResult expected_result;30 QueryResult expected_result;
31 timeval start_time;31 timeval start_time;
32 timeval end_time;32 timeval end_time;
33 boost::shared_ptr<DBThreadState> thr_state;
3433
35 assert(t);34 assert(t);
3635
37 thr_state= t->get_state();36 ConnectionState &state= *connection_state;
38
39 assert(thr_state.get());
40
41 ConnectionState &state= (ConnectionState &)*thr_state;
4237
43 LastExecutedQueryInfo &last_query_info= state.last_executed_query_info;38 LastExecutedQueryInfo &last_query_info= state.last_executed_query_info;
4439
@@ -96,12 +91,7 @@
9691
97 assert(t);92 assert(t);
9893
99 thr_state= t->get_state();94 ConnectionState &state= *connection_state;
100
101 assert(thr_state.get());
102
103
104 ConnectionState &state= (ConnectionState &)*thr_state;
105 LastExecutedQueryInfo &last_query_info= state.last_executed_query_info;95 LastExecutedQueryInfo &last_query_info= state.last_executed_query_info;
10696
107 timersub(&pcap_timestamp,97 timersub(&pcap_timestamp,
10898
=== modified file 'percona_playback/tcpdump/tcpdump_query_entries.h'
--- percona_playback/tcpdump/tcpdump_query_entries.h 2012-07-03 07:41:49 +0000
+++ percona_playback/tcpdump/tcpdump_query_entries.h 2012-11-23 20:15:24 +0000
@@ -21,9 +21,12 @@
21#include "connection_state.h"21#include "connection_state.h"
2222
23#include <sys/time.h>23#include <sys/time.h>
24#include <boost/shared_ptr.hpp>
2425
25class TcpdumpQueryEntry : public QueryEntry26class TcpdumpQueryEntry : public QueryEntry
26{27{
28
29 boost::shared_ptr<ConnectionState> connection_state;
27 timeval pcap_timestamp;30 timeval pcap_timestamp;
28 std::string query;31 std::string query;
2932
@@ -34,10 +37,12 @@
34 pcap_timestamp.tv_sec= pcap_timestamp.tv_usec= 0;37 pcap_timestamp.tv_sec= pcap_timestamp.tv_usec= 0;
35 }38 }
3639
37 TcpdumpQueryEntry(const timeval &_pcap_timestamp,40 TcpdumpQueryEntry(boost::shared_ptr<ConnectionState> _connection_state,
41 const timeval &_pcap_timestamp,
38 const std::string &_query,42 const std::string &_query,
39 const AddrPort &addr_port) :43 const AddrPort &addr_port) :
40 QueryEntry(addr_port.ThreadId(), false),44 QueryEntry(addr_port.ThreadId(), false),
45 connection_state(_connection_state),
41 pcap_timestamp(_pcap_timestamp),46 pcap_timestamp(_pcap_timestamp),
42 query(_query)47 query(_query)
43 {}48 {}
@@ -50,15 +55,18 @@
50class TcpdumpResultEntry : public QueryEntry55class TcpdumpResultEntry : public QueryEntry
51{56{
5257
58 boost::shared_ptr<ConnectionState> connection_state;
53 timeval pcap_timestamp;59 timeval pcap_timestamp;
54 QueryResult expected_result;60 QueryResult expected_result;
5561
56public:62public:
57 TcpdumpResultEntry() {}63 TcpdumpResultEntry() {}
58 TcpdumpResultEntry(const AddrPort &addr_port,64 TcpdumpResultEntry(boost::shared_ptr<ConnectionState> _connection_state,
65 const AddrPort &addr_port,
59 const timeval &_pcap_timestamp,66 const timeval &_pcap_timestamp,
60 const QueryResult &_expected_result) :67 const QueryResult &_expected_result) :
61 QueryEntry(addr_port.ThreadId(), false),68 QueryEntry(addr_port.ThreadId(), false),
69 connection_state(_connection_state),
62 pcap_timestamp(_pcap_timestamp),70 pcap_timestamp(_pcap_timestamp),
63 expected_result(_expected_result)71 expected_result(_expected_result)
64 {}72 {}
6573
=== added file 'percona_playback/test/thread-pool-sysbench-slow.cc'
--- percona_playback/test/thread-pool-sysbench-slow.cc 1970-01-01 00:00:00 +0000
+++ percona_playback/test/thread-pool-sysbench-slow.cc 2012-11-23 20:15:24 +0000
@@ -0,0 +1,60 @@
1/* BEGIN LICENSE
2 * Copyright (C) 2011 Stewart Smith <stewart@flamingspork.com>
3 * This program is free software: you can redistribute it and/or modify it
4 * under the terms of the GNU General Public License version 3, as published
5 * by the Free Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful, but
8 * WITHOUT ANY WARRANTY; without even the implied warranties of
9 * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
10 * PURPOSE. See the GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License along
13 * with this program. If not, see <http://www.gnu.org/licenses/>.
14 * END LICENSE */
15
16#include "config.h"
17
18#include <iostream>
19#include <cstdio>
20#include <string>
21#include <cstring>
22#include <assert.h>
23#include <unistd.h>
24
25#include <percona_playback/percona_playback.h>
26
27/**
28 * @TODO Actually write a real test suite here
29 */
30int main(int argc, char **argv)
31{
32 (void)argc; (void)argv;
33
34 fprintf(stderr, "Working dir: %s\n\n", get_current_dir_name());
35
36 percona_playback_st *the_percona_playback= percona_playback_create("test_Percona Playback");
37 assert(the_percona_playback);
38
39 char dbplugin[]="--db-plugin=null";
40 char querylog[]="--query-log-file="SRCDIR"/percona_playback/test/sysbench-slow.log";
41 char threadpool[]="--dispatcher-plugin=thread-pool"
42
43 char **dbplugin_argv= new char*[4];
44 dbplugin_argv[0]= argv[0];
45 dbplugin_argv[1]= dbplugin;
46 dbplugin_argv[2]= querylog;
47 dbplugin_argv[3]= threadpool;
48
49
50 assert(0 == percona_playback_argv(the_percona_playback, 3, dbplugin_argv));
51
52 struct percona_playback_run_result *r= percona_playback_run(the_percona_playback);
53
54 assert(r->err == 0);
55 assert(r->n_queries == 10150);
56 assert(r->n_log_entries = 10149);
57
58 percona_playback_destroy(&the_percona_playback);
59 return r->err;
60}
061
=== added directory 'percona_playback/thread_per_connection'
=== added file 'percona_playback/thread_per_connection/plugin.ini'
--- percona_playback/thread_per_connection/plugin.ini 1970-01-01 00:00:00 +0000
+++ percona_playback/thread_per_connection/plugin.ini 2012-11-23 20:15:24 +0000
@@ -0,0 +1,6 @@
1[plugin]
2title=Thread-per-connection dispatcher plugin
3description=Creates thread per each DB connection
4version=0.1
5static=yes
6load_by_default=yes
07
=== added file 'percona_playback/thread_per_connection/thread_per_connection.cc'
--- percona_playback/thread_per_connection/thread_per_connection.cc 1970-01-01 00:00:00 +0000
+++ percona_playback/thread_per_connection/thread_per_connection.cc 2012-11-23 20:15:24 +0000
@@ -0,0 +1,109 @@
1/* BEGIN LICENSE
2 * Copyright (C) 2011-2012 Percona Inc.
3 * This program is free software: you can redistribute it and/or modify it
4 * under the terms of the GNU General Public License version 2, as published
5 * by the Free Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful, but
8 * WITHOUT ANY WARRANTY; without even the implied warranties of
9 * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
10 * PURPOSE. See the GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License along
13 * with this program. If not, see <http://www.gnu.org/licenses/>.
14 * END LICENSE */
15
16#include "percona_playback/plugin.h"
17#include "percona_playback/db_thread.h"
18
19#include <tbb/concurrent_hash_map.h>
20
21class ThreadPerConnectionDispatcher :
22 public percona_playback::DispatcherPlugin
23{
24 typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable;
25 DBExecutorsTable executors;
26 void db_thread_func(DBThread *thread);
27 void start_thread(DBThread *thread);
28
29public:
30 ThreadPerConnectionDispatcher(std::string _name) :
31 DispatcherPlugin(_name) {}
32
33 void dispatch(QueryEntryPtr query_entry);
34 bool finish_and_wait(uint64_t thread_id);
35 void finish_all_and_wait();
36};
37
38extern percona_playback::DBClientPlugin *g_dbclient_plugin;
39
40void
41ThreadPerConnectionDispatcher::dispatch(QueryEntryPtr query_entry)
42{
43 uint64_t thread_id= query_entry->getThreadId();
44 {
45 DBExecutorsTable::accessor a;
46 if (executors.insert(a, thread_id))
47 {
48 DBThread *db_thread= g_dbclient_plugin->create(thread_id);
49 a->second= db_thread;
50 db_thread->start_thread();
51 }
52 a->second->queries->push(query_entry);
53 }
54}
55
56bool
57ThreadPerConnectionDispatcher::finish_and_wait(uint64_t thread_id)
58{
59 DBThread *db_thread= NULL;
60 {
61 DBExecutorsTable::accessor a;
62 if (executors.find(a, thread_id))
63 {
64 db_thread= a->second;
65 executors.erase(a);
66 }
67 }
68
69 if (!db_thread)
70 return false;
71
72 db_thread->queries->push(QueryEntryPtr(new FinishEntry(thread_id)));
73 db_thread->join();
74
75 delete db_thread;
76
77 return true;
78}
79
80void
81ThreadPerConnectionDispatcher::finish_all_and_wait()
82{
83 QueryEntryPtr shutdown_command(new FinishEntry(0));
84
85 while(executors.size())
86 {
87 uint64_t thread_id;
88 DBThread *t;
89 {
90 DBExecutorsTable::const_iterator iter= executors.begin();
91 thread_id= (*iter).first;
92 t= (*iter).second;
93 }
94 executors.erase(thread_id);
95
96 t->queries->push(shutdown_command);
97 t->join();
98
99 delete t;
100 }
101}
102
103static void init_plugin(percona_playback::PluginRegistry &r)
104{
105 r.add("thread-per-connection",
106 new ThreadPerConnectionDispatcher("thread-per-connection"));
107}
108
109PERCONA_PLAYBACK_PLUGIN(init_plugin);
0110
=== added directory 'percona_playback/thread_pool'
=== added file 'percona_playback/thread_pool/plugin.ini'
--- percona_playback/thread_pool/plugin.ini 1970-01-01 00:00:00 +0000
+++ percona_playback/thread_pool/plugin.ini 2012-11-23 20:15:24 +0000
@@ -0,0 +1,6 @@
1[plugin]
2title=Thread-pool dispatcher plugin
3description=Creates threads pool to process connections to server
4version=0.1
5static=yes
6load_by_default=yes
07
=== added file 'percona_playback/thread_pool/thread_pool.cc'
--- percona_playback/thread_pool/thread_pool.cc 1970-01-01 00:00:00 +0000
+++ percona_playback/thread_pool/thread_pool.cc 2012-11-23 20:15:24 +0000
@@ -0,0 +1,130 @@
1/* BEGIN LICENSE
2 * Copyright (C) 2011-2012 Percona Inc.
3 * This program is free software: you can redistribute it and/or modify it
4 * under the terms of the GNU General Public License version 2, as published
5 * by the Free Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful, but
8 * WITHOUT ANY WARRANTY; without even the implied warranties of
9 * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
10 * PURPOSE. See the GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License along
13 * with this program. If not, see <http://www.gnu.org/licenses/>.
14 * END LICENSE */
15
16#include "percona_playback/plugin.h"
17#include "percona_playback/db_thread.h"
18#include <percona_playback/gettext.h>
19#include <boost/program_options.hpp>
20#include <boost/shared_ptr.hpp>
21#include <boost/crc.hpp>
22#include <vector>
23
24#include <tbb/concurrent_hash_map.h>
25#include <stdio.h>
26
27extern percona_playback::DBClientPlugin *g_dbclient_plugin;
28
29class ThreadPoolDispatcher :
30 public percona_playback::DispatcherPlugin
31{
32 typedef std::vector<boost::shared_ptr<DBThread> > Workers;
33
34 unsigned threads_count;
35 boost::
36 program_options::
37 options_description options;
38 Workers workers;
39
40public:
41 ThreadPoolDispatcher(std::string _name) :
42 DispatcherPlugin(_name),
43 threads_count(0),
44 options("Threads-pool Options") {}
45
46 virtual void dispatch(QueryEntryPtr query_entry);
47 virtual bool finish_and_wait(uint64_t) { return true; }
48 virtual void finish_all_and_wait();
49 virtual void run();
50
51 boost::program_options::options_description* getProgramOptions();
52 int processOptions(boost::program_options::variables_map &vm);
53
54};
55
56void ThreadPoolDispatcher::run()
57{
58 for (unsigned i = 0; i < threads_count; ++i)
59 {
60 boost::shared_ptr<DBThread> db_thread(g_dbclient_plugin->create(i));
61 workers.push_back(db_thread);
62 db_thread->start_thread();
63 }
64}
65
66void ThreadPoolDispatcher::dispatch(QueryEntryPtr query_entry)
67{
68 /*
69 Each worker has its own queue. For some types of input plugins
70 it is important to execute query entries with the same thread id
71 by the same worker. That is why we choose worker by simple hash from
72 thread id.
73 */
74 uint64_t thread_id= query_entry->getThreadId();
75 boost::crc_32_type crc;
76 crc.process_bytes(&thread_id, sizeof(thread_id));
77 uint32_t worker_index = crc.checksum() % workers.size();
78 workers[worker_index]->queries->push(query_entry);
79}
80
81void ThreadPoolDispatcher::finish_all_and_wait()
82{
83 QueryEntryPtr shutdown_command(new FinishEntry(0));
84 for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i)
85 (*i)->queries->push(shutdown_command);
86 for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i)
87 (*i)->join();
88 workers.clear();
89}
90
91
92boost::program_options::options_description*
93ThreadPoolDispatcher::getProgramOptions()
94{
95 unsigned default_threads_count =
96 boost::thread::hardware_concurrency();
97 options.add_options()
98 ("thread-pool-threads-count",
99 boost::program_options::value<unsigned>(&threads_count)
100 ->default_value(default_threads_count),
101 _("The number of threads in thread pool. If this options is omitted "
102 "the number of threads equals to hardware concurency."))
103 ;
104
105 return &options;
106
107}
108
109int
110ThreadPoolDispatcher::processOptions(boost::program_options::variables_map &vm)
111{
112 if (!active &&
113 !vm["thread-pool-threads-count"].defaulted())
114 {
115 fprintf(stderr, _("thread-pool plugin is not selected, "
116 "you shouldn't use this plugin-related "
117 "command line options\n"));
118 return -1;
119 }
120
121 return 0;
122}
123
124static void init_plugin(percona_playback::PluginRegistry &r)
125{
126 r.add("thread-pool",
127 new ThreadPoolDispatcher("thread-pool"));
128}
129
130PERCONA_PLAYBACK_PLUGIN(init_plugin);

Subscribers

People subscribed via source and target branches

to all changes: