Merge lp:~vlad-lesin/percona-playback/connections_pool into lp:percona-playback
- connections_pool
- Merge into trunk
Proposed by
Vlad Lesin
Status: | Merged |
---|---|
Approved by: | Stewart Smith |
Approved revision: | 178 |
Merged at revision: | 174 |
Proposed branch: | lp:~vlad-lesin/percona-playback/connections_pool |
Merge into: | lp:percona-playback |
Diff against target: |
1557 lines (+617/-299) 24 files modified
Makefile.am (+6/-3) percona_playback/db_thread.cc (+8/-10) percona_playback/db_thread.h (+18/-23) percona_playback/dispatcher.cc (+0/-105) percona_playback/dispatcher.h (+0/-46) percona_playback/libdrizzle_client/libdrizzle_client.cc (+4/-3) percona_playback/mysql_client/mysql_client.cc (+69/-27) percona_playback/mysql_client/mysql_client.h (+5/-2) percona_playback/null_dbclient/null_dbclient.cc (+4/-3) percona_playback/percona_playback.cc (+46/-1) percona_playback/plugin.h (+29/-0) percona_playback/query_entry.h (+2/-1) percona_playback/query_log/query_log.cc (+25/-8) percona_playback/tcpdump/connection_state.cc (+20/-15) percona_playback/tcpdump/connection_state.h (+10/-12) percona_playback/tcpdump/pcap_packets_parser.cc (+38/-24) percona_playback/tcpdump/pcap_packets_parser.h (+10/-2) percona_playback/tcpdump/tcpdump_query_entries.cc (+2/-12) percona_playback/tcpdump/tcpdump_query_entries.h (+10/-2) percona_playback/test/thread-pool-sysbench-slow.cc (+60/-0) percona_playback/thread_per_connection/plugin.ini (+6/-0) percona_playback/thread_per_connection/thread_per_connection.cc (+109/-0) percona_playback/thread_pool/plugin.ini (+6/-0) percona_playback/thread_pool/thread_pool.cc (+130/-0) |
To merge this branch: | bzr merge lp:~vlad-lesin/percona-playback/connections_pool |
Related bugs: | |
Related blueprints: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Stewart Smith (community) | Approve | ||
Review via email: mp+134884@code.launchpad.net |
Commit message
Description of the change
A bunch of changes for implementing this https:/
To post a comment you must log in.
- 171. By Vlad Lesin
-
Fixes for bug #1070824 and bug #1080654
- 172. By Vlad Lesin
-
merged with trunk
- 173. By Vlad Lesin
-
Send init session query on reconnect
- 174. By Vlad Lesin
-
Initialize session init query with the value from cmd line options.
- 175. By Vlad Lesin
-
Thread-safety fixes for mysql client plugin.
- 176. By Vlad Lesin
-
Don't push empty query log queries for execution
- 177. By Vlad Lesin
-
Convert multiline query to single line to avoid syntax errors from server.
- 178. By Vlad Lesin
-
Allow multi-statement queries.
Revision history for this message
Stewart Smith (stewart) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'Makefile.am' | |||
2 | --- Makefile.am 2012-11-08 22:17:07 +0000 | |||
3 | +++ Makefile.am 2012-11-23 20:15:24 +0000 | |||
4 | @@ -59,13 +59,11 @@ | |||
5 | 59 | libpercona_playback_la_SOURCES = \ | 59 | libpercona_playback_la_SOURCES = \ |
6 | 60 | percona_playback/percona_playback.cc \ | 60 | percona_playback/percona_playback.cc \ |
7 | 61 | percona_playback/plugin.cc \ | 61 | percona_playback/plugin.cc \ |
10 | 62 | percona_playback/db_thread.cc \ | 62 | percona_playback/db_thread.cc |
9 | 63 | percona_playback/dispatcher.cc | ||
11 | 64 | 63 | ||
12 | 65 | nobase_include_HEADERS += \ | 64 | nobase_include_HEADERS += \ |
13 | 66 | percona_playback/percona_playback.h \ | 65 | percona_playback/percona_playback.h \ |
14 | 67 | percona_playback/db_thread.h \ | 66 | percona_playback/db_thread.h \ |
15 | 68 | percona_playback/dispatcher.h \ | ||
16 | 69 | percona_playback/query_result.h \ | 67 | percona_playback/query_result.h \ |
17 | 70 | percona_playback/plugin.h \ | 68 | percona_playback/plugin.h \ |
18 | 71 | percona_playback/tokenize.h \ | 69 | percona_playback/tokenize.h \ |
19 | @@ -89,6 +87,7 @@ | |||
20 | 89 | percona_playback/test/crashme-slow \ | 87 | percona_playback/test/crashme-slow \ |
21 | 90 | percona_playback/test/sqlbench-transactions-slow \ | 88 | percona_playback/test/sqlbench-transactions-slow \ |
22 | 91 | percona_playback/test/sysbench-slow \ | 89 | percona_playback/test/sysbench-slow \ |
23 | 90 | percona_playback/test/thread-pool-sysbench-slow \ | ||
24 | 92 | percona_playback/test/preserve_query_time \ | 91 | percona_playback/test/preserve_query_time \ |
25 | 93 | percona_playback/test/tcpdump_without_handshake \ | 92 | percona_playback/test/tcpdump_without_handshake \ |
26 | 94 | percona_playback/test/tcpdump_fragmented_packet \ | 93 | percona_playback/test/tcpdump_fragmented_packet \ |
27 | @@ -104,6 +103,7 @@ | |||
28 | 104 | percona_playback_test_crashme_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" | 103 | percona_playback_test_crashme_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
29 | 105 | percona_playback_test_sqlbench_transactions_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" | 104 | percona_playback_test_sqlbench_transactions_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
30 | 106 | percona_playback_test_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" | 105 | percona_playback_test_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
31 | 106 | percona_playback_test_thread_pool_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" | ||
32 | 107 | percona_playback_test_preserve_query_time_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" | 107 | percona_playback_test_preserve_query_time_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
33 | 108 | percona_playback_test_tcpdump_without_handshake_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" | 108 | percona_playback_test_tcpdump_without_handshake_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
34 | 109 | percona_playback_test_tcpdump_fragmented_packet_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" | 109 | percona_playback_test_tcpdump_fragmented_packet_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
35 | @@ -135,6 +135,9 @@ | |||
36 | 135 | percona_playback_test_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc | 135 | percona_playback_test_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc |
37 | 136 | percona_playback_test_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS) | 136 | percona_playback_test_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS) |
38 | 137 | 137 | ||
39 | 138 | percona_playback_test_thread_pool_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc | ||
40 | 139 | percona_playback_test_thread_pool_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS) | ||
41 | 140 | |||
42 | 138 | percona_playback_test_preserve_query_time_SOURCES= percona_playback/test/preserve_query_time.cc | 141 | percona_playback_test_preserve_query_time_SOURCES= percona_playback/test/preserve_query_time.cc |
43 | 139 | percona_playback_test_preserve_query_time_LDADD= $(LDADD) $(MYSQL_LIBS) | 142 | percona_playback_test_preserve_query_time_LDADD= $(LDADD) $(MYSQL_LIBS) |
44 | 140 | 143 | ||
45 | 141 | 144 | ||
46 | === modified file 'percona_playback/db_thread.cc' | |||
47 | --- percona_playback/db_thread.cc 2012-11-14 11:44:59 +0000 | |||
48 | +++ percona_playback/db_thread.cc 2012-11-23 20:15:24 +0000 | |||
49 | @@ -24,25 +24,23 @@ | |||
50 | 24 | 24 | ||
51 | 25 | extern std::string g_session_init_query; | 25 | extern std::string g_session_init_query; |
52 | 26 | 26 | ||
54 | 27 | void DBThread::connect_and_init_session() | 27 | void DBThread::init_session() |
55 | 28 | { | 28 | { |
63 | 29 | connect(); | 29 | if (g_session_init_query.empty()) |
64 | 30 | if (!g_session_init_query.empty()) | 30 | return; |
65 | 31 | { | 31 | QueryResult r; |
66 | 32 | QueryResult r; | 32 | QueryResult er; |
67 | 33 | QueryResult er; | 33 | execute_query(g_session_init_query, &r, er); |
61 | 34 | execute_query(g_session_init_query, &r, er); | ||
62 | 35 | } | ||
68 | 36 | } | 34 | } |
69 | 37 | 35 | ||
70 | 38 | void DBThread::run() | 36 | void DBThread::run() |
71 | 39 | { | 37 | { |
72 | 40 | connect_and_init_session(); | 38 | connect_and_init_session(); |
73 | 41 | 39 | ||
74 | 42 | QueryEntryPtr query; | ||
75 | 43 | while (true) | 40 | while (true) |
76 | 44 | { | 41 | { |
78 | 45 | queries.pop(query); | 42 | QueryEntryPtr query; |
79 | 43 | queries->pop(query); | ||
80 | 46 | 44 | ||
81 | 47 | if (query->is_shutdown()) | 45 | if (query->is_shutdown()) |
82 | 48 | break; | 46 | break; |
83 | 49 | 47 | ||
84 | === modified file 'percona_playback/db_thread.h' | |||
85 | --- percona_playback/db_thread.h 2012-11-14 11:44:59 +0000 | |||
86 | +++ percona_playback/db_thread.h 2012-11-23 20:15:24 +0000 | |||
87 | @@ -16,6 +16,7 @@ | |||
88 | 16 | #ifndef PERCONA_PLAYBACK_DB_THREAD_H | 16 | #ifndef PERCONA_PLAYBACK_DB_THREAD_H |
89 | 17 | #define PERCONA_PLAYBACK_DB_THREAD_H | 17 | #define PERCONA_PLAYBACK_DB_THREAD_H |
90 | 18 | 18 | ||
91 | 19 | #include <memory> | ||
92 | 19 | #include "percona_playback/visibility.h" | 20 | #include "percona_playback/visibility.h" |
93 | 20 | #include <boost/thread.hpp> | 21 | #include <boost/thread.hpp> |
94 | 21 | #include <boost/shared_ptr.hpp> | 22 | #include <boost/shared_ptr.hpp> |
95 | @@ -33,55 +34,49 @@ | |||
96 | 33 | 34 | ||
97 | 34 | class QueryResult; | 35 | class QueryResult; |
98 | 35 | 36 | ||
99 | 36 | class DBThreadState | ||
100 | 37 | { | ||
101 | 38 | public: | ||
102 | 39 | virtual ~DBThreadState(){} | ||
103 | 40 | }; | ||
104 | 41 | |||
105 | 42 | class DBThread | 37 | class DBThread |
106 | 43 | { | 38 | { |
107 | 44 | 39 | ||
108 | 45 | private: | 40 | private: |
109 | 46 | boost::thread thread; | 41 | boost::thread thread; |
110 | 47 | uint64_t thread_id; | 42 | uint64_t thread_id; |
111 | 48 | boost::shared_ptr<DBThreadState> state; | ||
112 | 49 | 43 | ||
113 | 50 | public: | 44 | public: |
114 | 51 | typedef tbb::concurrent_bounded_queue<QueryEntryPtr> Queries; | 45 | typedef tbb::concurrent_bounded_queue<QueryEntryPtr> Queries; |
116 | 52 | Queries queries; | 46 | boost::shared_ptr<Queries> queries; |
117 | 53 | 47 | ||
120 | 54 | DBThread(uint64_t _thread_id) : thread_id(_thread_id) { | 48 | DBThread(uint64_t _thread_id, |
121 | 55 | queries.set_capacity(g_db_thread_queue_depth); | 49 | boost::shared_ptr<Queries> _queries) : |
122 | 50 | thread_id(_thread_id), queries(_queries) { | ||
123 | 51 | queries->set_capacity(g_db_thread_queue_depth); | ||
124 | 56 | } | 52 | } |
125 | 57 | 53 | ||
126 | 58 | virtual ~DBThread() {} | 54 | virtual ~DBThread() {} |
127 | 59 | 55 | ||
128 | 60 | void set_state(boost::shared_ptr<DBThreadState> new_state) | ||
129 | 61 | { | ||
130 | 62 | state= new_state; | ||
131 | 63 | } | ||
132 | 64 | |||
133 | 65 | boost::shared_ptr<DBThreadState> get_state() const | ||
134 | 66 | { | ||
135 | 67 | return state; | ||
136 | 68 | } | ||
137 | 69 | |||
138 | 70 | void join() | 56 | void join() |
139 | 71 | { | 57 | { |
140 | 72 | thread.join(); | 58 | thread.join(); |
141 | 73 | } | 59 | } |
142 | 74 | 60 | ||
144 | 75 | void connect_and_init_session(); | 61 | bool connect_and_init_session() |
145 | 62 | { | ||
146 | 63 | if (connect()) | ||
147 | 64 | { | ||
148 | 65 | init_session(); | ||
149 | 66 | return true; | ||
150 | 67 | } | ||
151 | 68 | return false; | ||
152 | 69 | } | ||
153 | 76 | 70 | ||
155 | 77 | virtual void connect()= 0; | 71 | void init_session(); |
156 | 72 | virtual bool connect()= 0; | ||
157 | 78 | 73 | ||
158 | 79 | virtual void disconnect()= 0; | 74 | virtual void disconnect()= 0; |
159 | 80 | virtual void execute_query(const std::string &query, | 75 | virtual void execute_query(const std::string &query, |
160 | 81 | QueryResult *r, | 76 | QueryResult *r, |
161 | 82 | const QueryResult &expected_result)= 0; | 77 | const QueryResult &expected_result)= 0; |
162 | 83 | 78 | ||
164 | 84 | void run(); | 79 | virtual void run(); |
165 | 85 | 80 | ||
166 | 86 | void start_thread(); | 81 | void start_thread(); |
167 | 87 | }; | 82 | }; |
168 | 88 | 83 | ||
169 | === removed file 'percona_playback/dispatcher.cc' | |||
170 | --- percona_playback/dispatcher.cc 2012-07-03 07:41:49 +0000 | |||
171 | +++ percona_playback/dispatcher.cc 1970-01-01 00:00:00 +0000 | |||
172 | @@ -1,105 +0,0 @@ | |||
173 | 1 | /* BEGIN LICENSE | ||
174 | 2 | * Copyright (C) 2011-2012 Percona Inc. | ||
175 | 3 | * This program is free software: you can redistribute it and/or modify it | ||
176 | 4 | * under the terms of the GNU General Public License version 2, as published | ||
177 | 5 | * by the Free Software Foundation. | ||
178 | 6 | * | ||
179 | 7 | * This program is distributed in the hope that it will be useful, but | ||
180 | 8 | * WITHOUT ANY WARRANTY; without even the implied warranties of | ||
181 | 9 | * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR | ||
182 | 10 | * PURPOSE. See the GNU General Public License for more details. | ||
183 | 11 | * | ||
184 | 12 | * You should have received a copy of the GNU General Public License along | ||
185 | 13 | * with this program. If not, see <http://www.gnu.org/licenses/>. | ||
186 | 14 | * END LICENSE */ | ||
187 | 15 | #include "percona_playback/dispatcher.h" | ||
188 | 16 | #include "percona_playback/db_thread.h" | ||
189 | 17 | #include "percona_playback/plugin.h" | ||
190 | 18 | |||
191 | 19 | #include <assert.h> | ||
192 | 20 | |||
193 | 21 | Dispatcher g_dispatcher; | ||
194 | 22 | |||
195 | 23 | extern percona_playback::DBClientPlugin *g_dbclient_plugin; | ||
196 | 24 | |||
197 | 25 | boost::shared_ptr<DBThreadState> | ||
198 | 26 | Dispatcher::get_thread_state( | ||
199 | 27 | uint64_t thread_id, | ||
200 | 28 | boost::function1<void, DBThread *> run_on_db_thread_create) | ||
201 | 29 | { | ||
202 | 30 | DBExecutorsTable::accessor a; | ||
203 | 31 | |||
204 | 32 | if (executors.insert(a, thread_id)) | ||
205 | 33 | { | ||
206 | 34 | DBThread *db_thread= g_dbclient_plugin->create(thread_id); | ||
207 | 35 | assert(db_thread); | ||
208 | 36 | a->second= db_thread; | ||
209 | 37 | if (!run_on_db_thread_create.empty()) | ||
210 | 38 | run_on_db_thread_create(db_thread); | ||
211 | 39 | db_thread->start_thread(); | ||
212 | 40 | } | ||
213 | 41 | |||
214 | 42 | return a->second->get_state(); | ||
215 | 43 | } | ||
216 | 44 | |||
217 | 45 | void | ||
218 | 46 | Dispatcher::dispatch(QueryEntryPtr query_entry) | ||
219 | 47 | { | ||
220 | 48 | uint64_t thread_id= query_entry->getThreadId(); | ||
221 | 49 | { | ||
222 | 50 | DBExecutorsTable::accessor a; | ||
223 | 51 | if (executors.insert(a, thread_id)) | ||
224 | 52 | { | ||
225 | 53 | DBThread *db_thread= g_dbclient_plugin->create(thread_id); | ||
226 | 54 | a->second= db_thread; | ||
227 | 55 | db_thread->start_thread(); | ||
228 | 56 | } | ||
229 | 57 | a->second->queries.push(query_entry); | ||
230 | 58 | } | ||
231 | 59 | } | ||
232 | 60 | |||
233 | 61 | bool | ||
234 | 62 | Dispatcher::finish_and_wait(uint64_t thread_id) | ||
235 | 63 | { | ||
236 | 64 | DBThread *db_thread= NULL; | ||
237 | 65 | { | ||
238 | 66 | DBExecutorsTable::accessor a; | ||
239 | 67 | if (executors.find(a, thread_id)) | ||
240 | 68 | { | ||
241 | 69 | db_thread= a->second; | ||
242 | 70 | executors.erase(a); | ||
243 | 71 | } | ||
244 | 72 | } | ||
245 | 73 | |||
246 | 74 | if (!db_thread) | ||
247 | 75 | return false; | ||
248 | 76 | |||
249 | 77 | db_thread->queries.push(QueryEntryPtr(new FinishEntry())); | ||
250 | 78 | db_thread->join(); | ||
251 | 79 | |||
252 | 80 | delete db_thread; | ||
253 | 81 | |||
254 | 82 | return true; | ||
255 | 83 | } | ||
256 | 84 | |||
257 | 85 | void | ||
258 | 86 | Dispatcher::finish_all_and_wait() | ||
259 | 87 | { | ||
260 | 88 | QueryEntryPtr shutdown_command(new FinishEntry()); | ||
261 | 89 | while(executors.size()) | ||
262 | 90 | { | ||
263 | 91 | uint64_t thread_id; | ||
264 | 92 | DBThread *t; | ||
265 | 93 | { | ||
266 | 94 | DBExecutorsTable::const_iterator iter= executors.begin(); | ||
267 | 95 | thread_id= (*iter).first; | ||
268 | 96 | t= (*iter).second; | ||
269 | 97 | } | ||
270 | 98 | executors.erase(thread_id); | ||
271 | 99 | |||
272 | 100 | t->queries.push(shutdown_command); | ||
273 | 101 | t->join(); | ||
274 | 102 | |||
275 | 103 | delete t; | ||
276 | 104 | } | ||
277 | 105 | } | ||
278 | 106 | 0 | ||
279 | === removed file 'percona_playback/dispatcher.h' | |||
280 | --- percona_playback/dispatcher.h 2012-07-06 07:15:28 +0000 | |||
281 | +++ percona_playback/dispatcher.h 1970-01-01 00:00:00 +0000 | |||
282 | @@ -1,46 +0,0 @@ | |||
283 | 1 | /* BEGIN LICENSE | ||
284 | 2 | * Copyright (C) 2011 Percona Inc. | ||
285 | 3 | * This program is free software: you can redistribute it and/or modify it | ||
286 | 4 | * under the terms of the GNU General Public License version 2, as published | ||
287 | 5 | * by the Free Software Foundation. | ||
288 | 6 | * | ||
289 | 7 | * This program is distributed in the hope that it will be useful, but | ||
290 | 8 | * WITHOUT ANY WARRANTY; without even the implied warranties of | ||
291 | 9 | * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR | ||
292 | 10 | * PURPOSE. See the GNU General Public License for more details. | ||
293 | 11 | * | ||
294 | 12 | * You should have received a copy of the GNU General Public License along | ||
295 | 13 | * with this program. If not, see <http://www.gnu.org/licenses/>. | ||
296 | 14 | * END LICENSE */ | ||
297 | 15 | |||
298 | 16 | #ifndef PERCONA_PLAYBACK_DISPATCHER_H | ||
299 | 17 | #define PERCONA_PLAYBACK_DISPATCHER_H | ||
300 | 18 | |||
301 | 19 | #include <tbb/concurrent_hash_map.h> | ||
302 | 20 | #include <boost/function.hpp> | ||
303 | 21 | |||
304 | 22 | #include "percona_playback/query_entry.h" | ||
305 | 23 | |||
306 | 24 | class DBThread; | ||
307 | 25 | class DBThreadState; | ||
308 | 26 | |||
309 | 27 | class Dispatcher | ||
310 | 28 | { | ||
311 | 29 | typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable; | ||
312 | 30 | DBExecutorsTable executors; | ||
313 | 31 | void db_thread_func(DBThread *thread); | ||
314 | 32 | void start_thread(DBThread *thread); | ||
315 | 33 | |||
316 | 34 | public: | ||
317 | 35 | boost::shared_ptr<DBThreadState> | ||
318 | 36 | get_thread_state(uint64_t thread_id, | ||
319 | 37 | boost::function1<void, DBThread *> | ||
320 | 38 | run_on_db_thread_create); | ||
321 | 39 | void dispatch(QueryEntryPtr query_entry); | ||
322 | 40 | bool finish_and_wait(uint64_t thread_id); | ||
323 | 41 | void finish_all_and_wait(); | ||
324 | 42 | }; | ||
325 | 43 | |||
326 | 44 | extern Dispatcher g_dispatcher; | ||
327 | 45 | |||
328 | 46 | #endif /* PERCONA_PLAYBACK_DISPATCHER_H */ | ||
329 | 47 | 0 | ||
330 | === modified file 'percona_playback/libdrizzle_client/libdrizzle_client.cc' | |||
331 | --- percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-07-23 07:08:23 +0000 | |||
332 | +++ percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-11-23 20:15:24 +0000 | |||
333 | @@ -39,13 +39,13 @@ | |||
334 | 39 | 39 | ||
335 | 40 | public: | 40 | public: |
336 | 41 | LibDrizzleDBThread(uint64_t _thread_id, LibDrizzleOptions *opt) : | 41 | LibDrizzleDBThread(uint64_t _thread_id, LibDrizzleOptions *opt) : |
338 | 42 | DBThread(_thread_id), | 42 | DBThread(_thread_id, boost::shared_ptr<Queries>(new Queries())), |
339 | 43 | driz(NULL), | 43 | driz(NULL), |
340 | 44 | options(opt) | 44 | options(opt) |
341 | 45 | { | 45 | { |
342 | 46 | } | 46 | } |
343 | 47 | 47 | ||
345 | 48 | void connect(); | 48 | bool connect(); |
346 | 49 | void disconnect(); | 49 | void disconnect(); |
347 | 50 | void execute_query(const std::string &query, QueryResult *r, | 50 | void execute_query(const std::string &query, QueryResult *r, |
348 | 51 | const QueryResult &expected_result); | 51 | const QueryResult &expected_result); |
349 | @@ -70,7 +70,7 @@ | |||
350 | 70 | unsigned int port; | 70 | unsigned int port; |
351 | 71 | }; | 71 | }; |
352 | 72 | 72 | ||
354 | 73 | void LibDrizzleDBThread::connect() | 73 | bool LibDrizzleDBThread::connect() |
355 | 74 | { | 74 | { |
356 | 75 | driz= drizzle_create(NULL); | 75 | driz= drizzle_create(NULL); |
357 | 76 | assert(driz != NULL); | 76 | assert(driz != NULL); |
358 | @@ -81,6 +81,7 @@ | |||
359 | 81 | options->user.c_str(), | 81 | options->user.c_str(), |
360 | 82 | options->password.c_str(), | 82 | options->password.c_str(), |
361 | 83 | options->schema.c_str(), DRIZZLE_CON_MYSQL); | 83 | options->schema.c_str(), DRIZZLE_CON_MYSQL); |
362 | 84 | return true; | ||
363 | 84 | } | 85 | } |
364 | 85 | 86 | ||
365 | 86 | void LibDrizzleDBThread::disconnect() | 87 | void LibDrizzleDBThread::disconnect() |
366 | 87 | 88 | ||
367 | === modified file 'percona_playback/mysql_client/mysql_client.cc' | |||
368 | --- percona_playback/mysql_client/mysql_client.cc 2012-07-04 09:10:16 +0000 | |||
369 | +++ percona_playback/mysql_client/mysql_client.cc 2012-11-23 20:15:24 +0000 | |||
370 | @@ -42,17 +42,23 @@ | |||
371 | 42 | unsigned int port; | 42 | unsigned int port; |
372 | 43 | }; | 43 | }; |
373 | 44 | 44 | ||
375 | 45 | void MySQLDBThread::connect() | 45 | bool MySQLDBThread::connect() |
376 | 46 | { | 46 | { |
377 | 47 | mysql_init(&handle); | 47 | mysql_init(&handle); |
386 | 48 | mysql_real_connect(&handle, | 48 | if (!mysql_real_connect(&handle, |
387 | 49 | options->host.c_str(), | 49 | options->host.c_str(), |
388 | 50 | options->user.c_str(), | 50 | options->user.c_str(), |
389 | 51 | options->password.c_str(), | 51 | options->password.c_str(), |
390 | 52 | options->schema.c_str(), | 52 | options->schema.c_str(), |
391 | 53 | options->port, | 53 | options->port, |
392 | 54 | "", | 54 | "", |
393 | 55 | 0); | 55 | CLIENT_MULTI_STATEMENTS)) |
394 | 56 | { | ||
395 | 57 | fprintf(stderr, "Can't connect to server: %s\n", | ||
396 | 58 | mysql_error(&handle)); | ||
397 | 59 | return false; | ||
398 | 60 | } | ||
399 | 61 | return true; | ||
400 | 56 | } | 62 | } |
401 | 57 | 63 | ||
402 | 58 | void MySQLDBThread::disconnect() | 64 | void MySQLDBThread::disconnect() |
403 | @@ -63,29 +69,50 @@ | |||
404 | 63 | void MySQLDBThread::execute_query(const std::string &query, QueryResult *r, | 69 | void MySQLDBThread::execute_query(const std::string &query, QueryResult *r, |
405 | 64 | const QueryResult &) | 70 | const QueryResult &) |
406 | 65 | { | 71 | { |
423 | 66 | int mr= mysql_real_query(&handle, query.c_str(), query.length()); | 72 | int mr; |
424 | 67 | if(mr != 0) | 73 | for(unsigned i = 0; i < max_err_num; ++i) |
425 | 68 | { | 74 | { |
426 | 69 | r->setError(mr); | 75 | mr= mysql_real_query(&handle, query.c_str(), query.length()); |
427 | 70 | } | 76 | r->setError(mr); |
428 | 71 | else | 77 | if(mr != 0) |
429 | 72 | { | 78 | { |
430 | 73 | MYSQL_RES* mysql_res= NULL; | 79 | fprintf(stderr, |
431 | 74 | 80 | "Error during query: %s, number of tries %u\n", | |
432 | 75 | r->setError(mr); | 81 | mysql_error(&handle), |
433 | 76 | r->setWarningCount(mysql_warning_count(&handle)); | 82 | i); |
434 | 77 | 83 | disconnect(); | |
435 | 78 | mysql_res= mysql_store_result(&handle); | 84 | connect_and_init_session(); |
436 | 79 | 85 | } | |
421 | 80 | if (mysql_res != NULL) | ||
422 | 81 | r->setRowsSent(mysql_num_rows(mysql_res)); | ||
437 | 82 | else | 86 | else |
438 | 87 | { | ||
439 | 88 | r->setWarningCount(mysql_warning_count(&handle)); | ||
440 | 83 | r->setRowsSent(0); | 89 | r->setRowsSent(0); |
443 | 84 | 90 | do | |
444 | 85 | mysql_free_result(mysql_res); | 91 | { |
445 | 92 | MYSQL_RES* mysql_res= NULL; | ||
446 | 93 | |||
447 | 94 | mysql_res= mysql_store_result(&handle); | ||
448 | 95 | |||
449 | 96 | if (mysql_res != NULL) | ||
450 | 97 | { | ||
451 | 98 | r->setRowsSent(mysql_num_rows(mysql_res)); | ||
452 | 99 | mysql_free_result(mysql_res); | ||
453 | 100 | } | ||
454 | 101 | |||
455 | 102 | } while(!mysql_next_result(&handle)); | ||
456 | 103 | |||
457 | 104 | break; | ||
458 | 105 | } | ||
459 | 86 | } | 106 | } |
460 | 87 | } | 107 | } |
461 | 88 | 108 | ||
462 | 109 | void MySQLDBThread::run() | ||
463 | 110 | { | ||
464 | 111 | mysql_thread_init(); | ||
465 | 112 | DBThread::run(); | ||
466 | 113 | mysql_thread_end(); | ||
467 | 114 | } | ||
468 | 115 | |||
469 | 89 | class MySQLDBClientPlugin : public percona_playback::DBClientPlugin | 116 | class MySQLDBClientPlugin : public percona_playback::DBClientPlugin |
470 | 90 | { | 117 | { |
471 | 91 | private: | 118 | private: |
472 | @@ -126,6 +153,21 @@ | |||
473 | 126 | return -1; | 153 | return -1; |
474 | 127 | } | 154 | } |
475 | 128 | 155 | ||
476 | 156 | if (!active) | ||
477 | 157 | return 0; | ||
478 | 158 | |||
479 | 159 | if (!mysql_thread_safe()) | ||
480 | 160 | { | ||
481 | 161 | fprintf(stderr, "libmysqlclient is not thread safe\n"); | ||
482 | 162 | return -1; | ||
483 | 163 | } | ||
484 | 164 | |||
485 | 165 | if (mysql_library_init(0, NULL, NULL)) | ||
486 | 166 | { | ||
487 | 167 | fprintf(stderr, "could not initialize mysql library\n"); | ||
488 | 168 | return -1; | ||
489 | 169 | } | ||
490 | 170 | |||
491 | 129 | if (vm.count("mysql-host")) | 171 | if (vm.count("mysql-host")) |
492 | 130 | { | 172 | { |
493 | 131 | options.host= vm["mysql-host"].as<std::string>(); | 173 | options.host= vm["mysql-host"].as<std::string>(); |
494 | 132 | 174 | ||
495 | === modified file 'percona_playback/mysql_client/mysql_client.h' | |||
496 | --- percona_playback/mysql_client/mysql_client.h 2012-06-15 10:53:23 +0000 | |||
497 | +++ percona_playback/mysql_client/mysql_client.h 2012-11-23 20:15:24 +0000 | |||
498 | @@ -10,16 +10,19 @@ | |||
499 | 10 | private: | 10 | private: |
500 | 11 | MYSQL handle; | 11 | MYSQL handle; |
501 | 12 | MySQLOptions *options; | 12 | MySQLOptions *options; |
502 | 13 | static const unsigned max_err_num = 10; | ||
503 | 13 | 14 | ||
504 | 14 | public: | 15 | public: |
505 | 15 | MySQLDBThread(uint64_t _thread_id, MySQLOptions *opt) : | 16 | MySQLDBThread(uint64_t _thread_id, MySQLOptions *opt) : |
507 | 16 | DBThread(_thread_id), | 17 | DBThread(_thread_id, |
508 | 18 | boost::shared_ptr<Queries>(new Queries())), | ||
509 | 17 | options(opt) | 19 | options(opt) |
510 | 18 | { | 20 | { |
511 | 19 | } | 21 | } |
512 | 20 | 22 | ||
514 | 21 | void connect(); | 23 | bool connect(); |
515 | 22 | void disconnect(); | 24 | void disconnect(); |
516 | 23 | void execute_query(const std::string &query, QueryResult *r, | 25 | void execute_query(const std::string &query, QueryResult *r, |
517 | 24 | const QueryResult &expected_result); | 26 | const QueryResult &expected_result); |
518 | 27 | void run(); | ||
519 | 25 | }; | 28 | }; |
520 | 26 | 29 | ||
521 | === modified file 'percona_playback/null_dbclient/null_dbclient.cc' | |||
522 | --- percona_playback/null_dbclient/null_dbclient.cc 2012-04-05 04:47:09 +0000 | |||
523 | +++ percona_playback/null_dbclient/null_dbclient.cc 2012-11-23 20:15:24 +0000 | |||
524 | @@ -20,10 +20,11 @@ | |||
525 | 20 | class NULLDBThread : public DBThread | 20 | class NULLDBThread : public DBThread |
526 | 21 | { | 21 | { |
527 | 22 | public: | 22 | public: |
530 | 23 | NULLDBThread(uint64_t _thread_id) : DBThread(_thread_id) { | 23 | NULLDBThread(uint64_t _thread_id) : |
531 | 24 | } | 24 | DBThread(_thread_id, |
532 | 25 | boost::shared_ptr<Queries>(new Queries())) {} | ||
533 | 25 | 26 | ||
535 | 26 | void connect() {}; | 27 | bool connect() { return true; }; |
536 | 27 | void disconnect() {}; | 28 | void disconnect() {}; |
537 | 28 | void execute_query(const std::string &, QueryResult *r, | 29 | void execute_query(const std::string &, QueryResult *r, |
538 | 29 | const QueryResult &expected_result) { | 30 | const QueryResult &expected_result) { |
539 | 30 | 31 | ||
540 | === modified file 'percona_playback/percona_playback.cc' | |||
541 | --- percona_playback/percona_playback.cc 2012-11-14 11:44:59 +0000 | |||
542 | +++ percona_playback/percona_playback.cc 2012-11-23 20:15:24 +0000 | |||
543 | @@ -40,6 +40,7 @@ | |||
544 | 40 | 40 | ||
545 | 41 | percona_playback::DBClientPlugin *g_dbclient_plugin= NULL; | 41 | percona_playback::DBClientPlugin *g_dbclient_plugin= NULL; |
546 | 42 | percona_playback::InputPlugin *g_input_plugin= NULL; | 42 | percona_playback::InputPlugin *g_input_plugin= NULL; |
547 | 43 | percona_playback::DispatcherPlugin *g_dispatcher_plugin= NULL; | ||
548 | 43 | unsigned int g_db_thread_queue_depth; | 44 | unsigned int g_db_thread_queue_depth; |
549 | 44 | std::string g_session_init_query; | 45 | std::string g_session_init_query; |
550 | 45 | 46 | ||
551 | @@ -132,6 +133,22 @@ | |||
552 | 132 | std::cerr << _("Selected Input Plugin: ") | 133 | std::cerr << _("Selected Input Plugin: ") |
553 | 133 | << g_input_plugin->name | 134 | << g_input_plugin->name |
554 | 134 | << std::endl; | 135 | << std::endl; |
555 | 136 | |||
556 | 137 | std::cerr << std::endl << _("Loaded Dispatcher Plugins: "); | ||
557 | 138 | |||
558 | 139 | BOOST_FOREACH(const PluginRegistry::DispatcherPluginPair &pp, | ||
559 | 140 | PluginRegistry::singleton().dispatcher_plugins) | ||
560 | 141 | { | ||
561 | 142 | std::cerr << pp.first << " "; | ||
562 | 143 | } | ||
563 | 144 | |||
564 | 145 | std::cerr << std::endl; | ||
565 | 146 | std::cerr << std::endl; | ||
566 | 147 | |||
567 | 148 | assert(g_dispatcher_plugin); | ||
568 | 149 | std::cerr << _("Selected dispatcher Plugin: ") | ||
569 | 150 | << g_dispatcher_plugin->name | ||
570 | 151 | << std::endl; | ||
571 | 135 | } | 152 | } |
572 | 136 | 153 | ||
573 | 137 | int percona_playback_argv(percona_playback_st *the_percona_playback, | 154 | int percona_playback_argv(percona_playback_st *the_percona_playback, |
574 | @@ -152,12 +169,13 @@ | |||
575 | 152 | db_options.add_options() | 169 | db_options.add_options() |
576 | 153 | ("db-plugin", po::value<std::string>(), _("Database plugin")) | 170 | ("db-plugin", po::value<std::string>(), _("Database plugin")) |
577 | 154 | ("input-plugin", po::value<std::string>(), _("Input plugin")) | 171 | ("input-plugin", po::value<std::string>(), _("Input plugin")) |
578 | 172 | ("dispatcher-plugin", po::value<std::string>(), _("Dispatcher plugin")) | ||
579 | 155 | ("queue-depth", po::value<unsigned int>(), | 173 | ("queue-depth", po::value<unsigned int>(), |
580 | 156 | _("Queue depth for DB executor (thread). The larger this number the" | 174 | _("Queue depth for DB executor (thread). The larger this number the" |
581 | 157 | " greater the played-back workload can deviate from the original workload" | 175 | " greater the played-back workload can deviate from the original workload" |
582 | 158 | " as some connections may be up to queue-depth behind. (default 1)")) | 176 | " as some connections may be up to queue-depth behind. (default 1)")) |
583 | 159 | ("session-init-query", | 177 | ("session-init-query", |
585 | 160 | po::value<std::string>()->default_value(""), | 178 | po::value<std::string>(&g_session_init_query)->default_value(""), |
586 | 161 | _("This query will be executed just after each connect to db")) | 179 | _("This query will be executed just after each connect to db")) |
587 | 162 | ; | 180 | ; |
588 | 163 | 181 | ||
589 | @@ -237,6 +255,32 @@ | |||
590 | 237 | } | 255 | } |
591 | 238 | g_input_plugin->active= true; | 256 | g_input_plugin->active= true; |
592 | 239 | 257 | ||
593 | 258 | if (vm.count("dispatcher-plugin")) | ||
594 | 259 | { | ||
595 | 260 | PluginRegistry::DispatcherPluginMap::iterator it; | ||
596 | 261 | it= PluginRegistry::singleton().dispatcher_plugins.find( | ||
597 | 262 | vm["dispatcher-plugin"].as<std::string>()); | ||
598 | 263 | if (it == PluginRegistry::singleton().dispatcher_plugins.end()) | ||
599 | 264 | { | ||
600 | 265 | fprintf(stderr, _("Invalid Dispatcher Plugin\n")); | ||
601 | 266 | return -1; | ||
602 | 267 | } | ||
603 | 268 | g_dispatcher_plugin= it->second; | ||
604 | 269 | } | ||
605 | 270 | else | ||
606 | 271 | { | ||
607 | 272 | PluginRegistry::DispatcherPluginMap::iterator it; | ||
608 | 273 | it= PluginRegistry::singleton().dispatcher_plugins. | ||
609 | 274 | find("thread-per-connection"); | ||
610 | 275 | if (it == PluginRegistry::singleton().dispatcher_plugins.end()) | ||
611 | 276 | { | ||
612 | 277 | fprintf(stderr, _("Invalid Dispatcher plugin\n")); | ||
613 | 278 | return -1; | ||
614 | 279 | } | ||
615 | 280 | g_dispatcher_plugin= it->second; | ||
616 | 281 | } | ||
617 | 282 | g_dispatcher_plugin->active= true; | ||
618 | 283 | |||
619 | 240 | if (vm.count("help") || argc==1) | 284 | if (vm.count("help") || argc==1) |
620 | 241 | { | 285 | { |
621 | 242 | help(options_description); | 286 | help(options_description); |
622 | @@ -299,6 +343,7 @@ | |||
623 | 299 | std::cerr << _("Database Plugin: ") << g_dbclient_plugin->name << std::endl; | 343 | std::cerr << _("Database Plugin: ") << g_dbclient_plugin->name << std::endl; |
624 | 300 | std::cerr << _(" Running...") << std::endl; | 344 | std::cerr << _(" Running...") << std::endl; |
625 | 301 | 345 | ||
626 | 346 | g_dispatcher_plugin->run(); | ||
627 | 302 | g_input_plugin->run(*r); | 347 | g_input_plugin->run(*r); |
628 | 303 | 348 | ||
629 | 304 | BOOST_FOREACH(const PluginRegistry::ReportPluginPair pp, | 349 | BOOST_FOREACH(const PluginRegistry::ReportPluginPair pp, |
630 | 305 | 350 | ||
631 | === modified file 'percona_playback/plugin.h' | |||
632 | --- percona_playback/plugin.h 2012-06-24 04:28:41 +0000 | |||
633 | +++ percona_playback/plugin.h 2012-11-23 20:15:24 +0000 | |||
634 | @@ -20,6 +20,9 @@ | |||
635 | 20 | #include <vector> | 20 | #include <vector> |
636 | 21 | #include <string> | 21 | #include <string> |
637 | 22 | #include <map> | 22 | #include <map> |
638 | 23 | #include <boost/shared_ptr.hpp> | ||
639 | 24 | #include <boost/function.hpp> | ||
640 | 25 | #include "percona_playback/query_entry.h" | ||
641 | 23 | #include <percona_playback/visibility.h> | 26 | #include <percona_playback/visibility.h> |
642 | 24 | #include <percona_playback/version.h> | 27 | #include <percona_playback/version.h> |
643 | 25 | 28 | ||
644 | @@ -31,6 +34,7 @@ | |||
645 | 31 | } | 34 | } |
646 | 32 | 35 | ||
647 | 33 | class DBThread; | 36 | class DBThread; |
648 | 37 | class DBThreadState; | ||
649 | 34 | class QueryResult; | 38 | class QueryResult; |
650 | 35 | class percona_playback_run_result; | 39 | class percona_playback_run_result; |
651 | 36 | 40 | ||
652 | @@ -102,6 +106,21 @@ | |||
653 | 102 | 106 | ||
654 | 103 | }; | 107 | }; |
655 | 104 | 108 | ||
656 | 109 | class DispatcherPlugin : public plugin | ||
657 | 110 | { | ||
658 | 111 | public: | ||
659 | 112 | std::string name; | ||
660 | 113 | |||
661 | 114 | DispatcherPlugin(const std::string &_name) : name(_name) {} | ||
662 | 115 | |||
663 | 116 | virtual void dispatch(QueryEntryPtr query_entry)= 0; | ||
664 | 117 | virtual bool finish_and_wait(uint64_t thread_id)= 0; | ||
665 | 118 | virtual void finish_all_and_wait()= 0; | ||
666 | 119 | |||
667 | 120 | virtual void run() {}; | ||
668 | 121 | |||
669 | 122 | }; | ||
670 | 123 | |||
671 | 105 | class PluginRegistry | 124 | class PluginRegistry |
672 | 106 | { | 125 | { |
673 | 107 | public: | 126 | public: |
674 | @@ -122,6 +141,9 @@ | |||
675 | 122 | typedef std::map<std::string, InputPlugin*> InputPluginMap; | 141 | typedef std::map<std::string, InputPlugin*> InputPluginMap; |
676 | 123 | typedef std::pair<std::string, InputPlugin*> InputPluginPair; | 142 | typedef std::pair<std::string, InputPlugin*> InputPluginPair; |
677 | 124 | 143 | ||
678 | 144 | typedef std::map<std::string, DispatcherPlugin*> DispatcherPluginMap; | ||
679 | 145 | typedef std::pair<std::string, DispatcherPlugin*> DispatcherPluginPair; | ||
680 | 146 | |||
681 | 125 | typedef std::map<std::string, plugin*> PluginMap; | 147 | typedef std::map<std::string, plugin*> PluginMap; |
682 | 126 | typedef std::pair<std::string, plugin*> PluginPair; | 148 | typedef std::pair<std::string, plugin*> PluginPair; |
683 | 127 | 149 | ||
684 | @@ -129,6 +151,7 @@ | |||
685 | 129 | DBClientPluginMap dbclient_plugins; | 151 | DBClientPluginMap dbclient_plugins; |
686 | 130 | ReportPluginMap report_plugins; | 152 | ReportPluginMap report_plugins; |
687 | 131 | InputPluginMap input_plugins; | 153 | InputPluginMap input_plugins; |
688 | 154 | DispatcherPluginMap dispatcher_plugins; | ||
689 | 132 | 155 | ||
690 | 133 | void add(const std::string &name, plugin* plugin_object) | 156 | void add(const std::string &name, plugin* plugin_object) |
691 | 134 | { | 157 | { |
692 | @@ -154,6 +177,12 @@ | |||
693 | 154 | all_plugins.insert(PluginPair(name, input_plugin)); | 177 | all_plugins.insert(PluginPair(name, input_plugin)); |
694 | 155 | } | 178 | } |
695 | 156 | 179 | ||
696 | 180 | void add(const std::string &name, DispatcherPlugin* dispatcher_plugin) | ||
697 | 181 | { | ||
698 | 182 | dispatcher_plugins.insert(DispatcherPluginPair(name, dispatcher_plugin)); | ||
699 | 183 | all_plugins.insert(PluginPair(name, dispatcher_plugin)); | ||
700 | 184 | } | ||
701 | 185 | |||
702 | 157 | }; | 186 | }; |
703 | 158 | 187 | ||
704 | 159 | 188 | ||
705 | 160 | 189 | ||
706 | === modified file 'percona_playback/query_entry.h' | |||
707 | --- percona_playback/query_entry.h 2012-07-03 02:37:51 +0000 | |||
708 | +++ percona_playback/query_entry.h 2012-11-23 20:15:24 +0000 | |||
709 | @@ -45,7 +45,8 @@ | |||
710 | 45 | class FinishEntry : public QueryEntry | 45 | class FinishEntry : public QueryEntry |
711 | 46 | { | 46 | { |
712 | 47 | public: | 47 | public: |
714 | 48 | FinishEntry() { set_shutdown(); } | 48 | FinishEntry(uint64_t _thread_id) : |
715 | 49 | QueryEntry (_thread_id, true) {} | ||
716 | 49 | bool is_quit() { return false; } | 50 | bool is_quit() { return false; } |
717 | 50 | 51 | ||
718 | 51 | void execute(DBThread *) {} | 52 | void execute(DBThread *) {} |
719 | 52 | 53 | ||
720 | === modified file 'percona_playback/query_log/query_log.cc' | |||
721 | --- percona_playback/query_log/query_log.cc 2012-11-08 22:17:07 +0000 | |||
722 | +++ percona_playback/query_log/query_log.cc 2012-11-23 20:15:24 +0000 | |||
723 | @@ -39,7 +39,6 @@ | |||
724 | 39 | #include <percona_playback/db_thread.h> | 39 | #include <percona_playback/db_thread.h> |
725 | 40 | #include <percona_playback/query_log/query_log.h> | 40 | #include <percona_playback/query_log/query_log.h> |
726 | 41 | #include <percona_playback/query_result.h> | 41 | #include <percona_playback/query_result.h> |
727 | 42 | #include "percona_playback/dispatcher.h" | ||
728 | 43 | #include <percona_playback/gettext.h> | 42 | #include <percona_playback/gettext.h> |
729 | 44 | 43 | ||
730 | 45 | #include <boost/foreach.hpp> | 44 | #include <boost/foreach.hpp> |
731 | @@ -51,6 +50,8 @@ | |||
732 | 51 | static bool g_run_set_timestamp; | 50 | static bool g_run_set_timestamp; |
733 | 52 | static bool g_preserve_query_time; | 51 | static bool g_preserve_query_time; |
734 | 53 | 52 | ||
735 | 53 | extern percona_playback::DispatcherPlugin *g_dispatcher_plugin; | ||
736 | 54 | |||
737 | 54 | class ParseQueryLogFunc: public tbb::filter { | 55 | class ParseQueryLogFunc: public tbb::filter { |
738 | 55 | public: | 56 | public: |
739 | 56 | ParseQueryLogFunc(FILE *input_file_, | 57 | ParseQueryLogFunc(FILE *input_file_, |
740 | @@ -83,7 +84,7 @@ | |||
741 | 83 | new std::vector<boost::shared_ptr<QueryLogEntry> >(); | 84 | new std::vector<boost::shared_ptr<QueryLogEntry> >(); |
742 | 84 | 85 | ||
743 | 85 | boost::shared_ptr<QueryLogEntry> tmp_entry(new QueryLogEntry()); | 86 | boost::shared_ptr<QueryLogEntry> tmp_entry(new QueryLogEntry()); |
745 | 86 | entries->push_back(tmp_entry); | 87 | // entries->push_back(tmp_entry); |
746 | 87 | 88 | ||
747 | 88 | char *line= NULL; | 89 | char *line= NULL; |
748 | 89 | size_t buflen = 0; | 90 | size_t buflen = 0; |
749 | @@ -130,9 +131,10 @@ | |||
750 | 130 | 131 | ||
751 | 131 | if (strncmp(p, "# User@Host", strlen("# User@Host")) == 0) | 132 | if (strncmp(p, "# User@Host", strlen("# User@Host")) == 0) |
752 | 132 | { | 133 | { |
753 | 134 | if (!tmp_entry->getQuery().empty()) | ||
754 | 135 | entries->push_back(tmp_entry); | ||
755 | 133 | count++; | 136 | count++; |
756 | 134 | tmp_entry.reset(new QueryLogEntry()); | 137 | tmp_entry.reset(new QueryLogEntry()); |
757 | 135 | entries->push_back(tmp_entry); | ||
758 | 136 | (*this->nr_entries)++; | 138 | (*this->nr_entries)++; |
759 | 137 | } | 139 | } |
760 | 138 | 140 | ||
761 | @@ -172,6 +174,9 @@ | |||
762 | 172 | p= line; | 174 | p= line; |
763 | 173 | } | 175 | } |
764 | 174 | 176 | ||
765 | 177 | if (!tmp_entry->getQuery().empty()) | ||
766 | 178 | entries->push_back(tmp_entry); | ||
767 | 179 | |||
768 | 175 | free(line); | 180 | free(line); |
769 | 176 | return entries; | 181 | return entries; |
770 | 177 | } | 182 | } |
771 | @@ -239,7 +244,19 @@ | |||
772 | 239 | && s.compare(0, timestamp_query.length(), timestamp_query) == 0) | 244 | && s.compare(0, timestamp_query.length(), timestamp_query) == 0) |
773 | 240 | set_timestamp_query= s; | 245 | set_timestamp_query= s; |
774 | 241 | else | 246 | else |
776 | 242 | query.append(s); | 247 | { |
777 | 248 | //Append space insead of \r\n | ||
778 | 249 | std::string::const_iterator end = s.end() - 1; | ||
779 | 250 | if (s.length() >= 2 && *(s.end() - 2) == '\r') | ||
780 | 251 | --end; | ||
781 | 252 | //Remove initial spaces for best query viewing in reports | ||
782 | 253 | std::string::const_iterator begin; | ||
783 | 254 | for (begin = s.begin(); begin != end; ++begin) | ||
784 | 255 | if (*begin != ' ' && *begin != '\t') | ||
785 | 256 | break; | ||
786 | 257 | query.append(begin, end); | ||
787 | 258 | query.append(" "); | ||
788 | 259 | } | ||
789 | 243 | } | 260 | } |
790 | 244 | 261 | ||
791 | 245 | bool QueryLogEntry::parse_metadata(const std::string &s) | 262 | bool QueryLogEntry::parse_metadata(const std::string &s) |
792 | @@ -281,13 +298,13 @@ | |||
793 | 281 | r= true; | 298 | r= true; |
794 | 282 | } | 299 | } |
795 | 283 | } | 300 | } |
797 | 284 | 301 | /* | |
798 | 285 | if (s[0] == '#' && strncmp(s.c_str(), "# administrator", strlen("# administrator"))) | 302 | if (s[0] == '#' && strncmp(s.c_str(), "# administrator", strlen("# administrator"))) |
799 | 286 | { | 303 | { |
800 | 287 | query.append(s); | 304 | query.append(s); |
801 | 288 | r= true; | 305 | r= true; |
802 | 289 | } | 306 | } |
804 | 290 | 307 | */ | |
805 | 291 | return r; | 308 | return r; |
806 | 292 | } | 309 | } |
807 | 293 | 310 | ||
808 | @@ -300,7 +317,7 @@ | |||
809 | 300 | for (unsigned int i=0; i< input->size(); i++) | 317 | for (unsigned int i=0; i< input->size(); i++) |
810 | 301 | { | 318 | { |
811 | 302 | // usleep(10); | 319 | // usleep(10); |
813 | 303 | g_dispatcher.dispatch((*input)[i]); | 320 | g_dispatcher_plugin->dispatch((*input)[i]); |
814 | 304 | } | 321 | } |
815 | 305 | delete input; | 322 | delete input; |
816 | 306 | return NULL; | 323 | return NULL; |
817 | @@ -330,7 +347,7 @@ | |||
818 | 330 | p.add_filter(f4); | 347 | p.add_filter(f4); |
819 | 331 | p.run(2); | 348 | p.run(2); |
820 | 332 | 349 | ||
822 | 333 | g_dispatcher.finish_all_and_wait(); | 350 | g_dispatcher_plugin->finish_all_and_wait(); |
823 | 334 | 351 | ||
824 | 335 | r->n_log_entries= entries; | 352 | r->n_log_entries= entries; |
825 | 336 | r->n_queries= queries; | 353 | r->n_queries= queries; |
826 | 337 | 354 | ||
827 | === modified file 'percona_playback/tcpdump/connection_state.cc' | |||
828 | --- percona_playback/tcpdump/connection_state.cc 2012-07-02 17:51:47 +0000 | |||
829 | +++ percona_playback/tcpdump/connection_state.cc 2012-11-23 20:15:24 +0000 | |||
830 | @@ -25,9 +25,11 @@ | |||
831 | 25 | #include "percona_playback/plugin.h" | 25 | #include "percona_playback/plugin.h" |
832 | 26 | 26 | ||
833 | 27 | extern percona_playback::DBClientPlugin *g_dbclient_plugin; | 27 | extern percona_playback::DBClientPlugin *g_dbclient_plugin; |
834 | 28 | extern percona_playback::DispatcherPlugin *g_dispatcher_plugin; | ||
835 | 28 | 29 | ||
836 | 29 | void | 30 | void |
838 | 30 | ConnectionState::ProcessMysqlPkts(const u_char *pkts, | 31 | ConnectionState::ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr, |
839 | 32 | const u_char *pkts, | ||
840 | 31 | u_int pkts_len, | 33 | u_int pkts_len, |
841 | 32 | const timeval &ts, | 34 | const timeval &ts, |
842 | 33 | const AddrPort &addr_port, | 35 | const AddrPort &addr_port, |
843 | @@ -76,13 +78,13 @@ | |||
844 | 76 | { | 78 | { |
845 | 77 | case PKT_QUERY: | 79 | case PKT_QUERY: |
846 | 78 | if (!query.empty()) | 80 | if (!query.empty()) |
848 | 79 | DispatchQuery(ts, query, addr_port); | 81 | DispatchQuery(cs_ptr, ts, query, addr_port); |
849 | 80 | ++stats.nr_of_parsed_queries; | 82 | ++stats.nr_of_parsed_queries; |
850 | 81 | ++stats.nr_of_parsed_packets; | 83 | ++stats.nr_of_parsed_packets; |
851 | 82 | break; | 84 | break; |
852 | 83 | 85 | ||
853 | 84 | case PKT_RESULT: | 86 | case PKT_RESULT: |
855 | 85 | DispatchResult(ts, addr_port); | 87 | DispatchResult(cs_ptr, ts, addr_port); |
856 | 86 | ++stats.nr_of_parsed_packets; | 88 | ++stats.nr_of_parsed_packets; |
857 | 87 | break; | 89 | break; |
858 | 88 | 90 | ||
859 | @@ -107,12 +109,6 @@ | |||
860 | 107 | 109 | ||
861 | 108 | } | 110 | } |
862 | 109 | 111 | ||
863 | 110 | void | ||
864 | 111 | ConnectionState::ProcessFinishConnection() | ||
865 | 112 | { | ||
866 | 113 | db_thread->queries.push(QueryEntryPtr(new FinishEntry())); | ||
867 | 114 | } | ||
868 | 115 | |||
869 | 116 | PktResult | 112 | PktResult |
870 | 117 | ConnectionState::ParseMysqlPkt(IN UCharBuffer &buff, | 113 | ConnectionState::ParseMysqlPkt(IN UCharBuffer &buff, |
871 | 118 | OUT std::string &query) | 114 | OUT std::string &query) |
872 | @@ -303,23 +299,32 @@ | |||
873 | 303 | } | 299 | } |
874 | 304 | 300 | ||
875 | 305 | void | 301 | void |
877 | 306 | ConnectionState::DispatchQuery(const timeval &ts, | 302 | ConnectionState::DispatchQuery(/* Having shared pointer to "this" as an argument |
878 | 303 | is just temporary step. We need a shared pointer | ||
879 | 304 | to connection state inside of query entry. The | ||
880 | 305 | better way to do this is to move code that creates | ||
881 | 306 | query entries to the upper layer. | ||
882 | 307 | */ | ||
883 | 308 | boost::shared_ptr<ConnectionState> cs_ptr, | ||
884 | 309 | const timeval &ts, | ||
885 | 307 | const std::string &query, | 310 | const std::string &query, |
886 | 308 | const AddrPort &addr_port) | 311 | const AddrPort &addr_port) |
887 | 309 | { | 312 | { |
888 | 310 | boost::shared_ptr<TcpdumpQueryEntry> | 313 | boost::shared_ptr<TcpdumpQueryEntry> |
890 | 311 | query_entry(new TcpdumpQueryEntry(ts, | 314 | query_entry(new TcpdumpQueryEntry(cs_ptr, |
891 | 315 | ts, | ||
892 | 312 | query, | 316 | query, |
893 | 313 | addr_port)); | 317 | addr_port)); |
895 | 314 | db_thread->queries.push(query_entry); | 318 | g_dispatcher_plugin->dispatch(query_entry); |
896 | 315 | } | 319 | } |
897 | 316 | 320 | ||
898 | 317 | void | 321 | void |
900 | 318 | ConnectionState::DispatchResult(const timeval &ts, | 322 | ConnectionState::DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr, |
901 | 323 | const timeval &ts, | ||
902 | 319 | const AddrPort &addr_port) | 324 | const AddrPort &addr_port) |
903 | 320 | { | 325 | { |
904 | 321 | boost::shared_ptr<TcpdumpResultEntry> | 326 | boost::shared_ptr<TcpdumpResultEntry> |
907 | 322 | result_entry(new TcpdumpResultEntry(addr_port, ts, last_query_result)); | 327 | result_entry(new TcpdumpResultEntry(cs_ptr, addr_port, ts, last_query_result)); |
908 | 323 | db_thread->queries.push(result_entry); | 328 | g_dispatcher_plugin->dispatch(result_entry); |
909 | 324 | } | 329 | } |
910 | 325 | 330 | ||
911 | 326 | 331 | ||
912 | === modified file 'percona_playback/tcpdump/connection_state.h' | |||
913 | --- percona_playback/tcpdump/connection_state.h 2012-07-03 07:41:49 +0000 | |||
914 | +++ percona_playback/tcpdump/connection_state.h 2012-11-23 20:15:24 +0000 | |||
915 | @@ -94,8 +94,7 @@ | |||
916 | 94 | }; | 94 | }; |
917 | 95 | 95 | ||
918 | 96 | 96 | ||
921 | 97 | class ConnectionState : public DBThreadState | 97 | class ConnectionState |
920 | 98 | |||
922 | 99 | { | 98 | { |
923 | 100 | 99 | ||
924 | 101 | public: | 100 | public: |
925 | @@ -107,7 +106,7 @@ | |||
926 | 107 | SERVER | 106 | SERVER |
927 | 108 | }; | 107 | }; |
928 | 109 | 108 | ||
930 | 110 | ConnectionState() : | 109 | ConnectionState(uint64_t _thread_id) : |
931 | 111 | current_origin(UNDEF), | 110 | current_origin(UNDEF), |
932 | 112 | fragmented(false), | 111 | fragmented(false), |
933 | 113 | handshake_from_client(false), | 112 | handshake_from_client(false), |
934 | @@ -126,7 +125,7 @@ | |||
935 | 126 | (drizzle_con_options_t) | 125 | (drizzle_con_options_t) |
936 | 127 | DRIZZLE_CON_MYSQL), | 126 | DRIZZLE_CON_MYSQL), |
937 | 128 | drizzle_con_free), | 127 | drizzle_con_free), |
939 | 129 | db_thread(NULL) | 128 | thread_id(_thread_id) |
940 | 130 | { | 129 | { |
941 | 131 | drizzle_con->result= NULL; | 130 | drizzle_con->result= NULL; |
942 | 132 | } | 131 | } |
943 | @@ -136,18 +135,15 @@ | |||
944 | 136 | drizzle_result_free_all(drizzle_con.get()); | 135 | drizzle_result_free_all(drizzle_con.get()); |
945 | 137 | } | 136 | } |
946 | 138 | 137 | ||
948 | 139 | void ProcessMysqlPkts(const u_char *pkts, | 138 | void ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr, |
949 | 139 | const u_char *pkts, | ||
950 | 140 | u_int total_len, | 140 | u_int total_len, |
951 | 141 | const timeval &ts, | 141 | const timeval &ts, |
952 | 142 | const AddrPort &addr_port, | 142 | const AddrPort &addr_port, |
953 | 143 | OUT TcpdumpMysqlParserStats &stats); | 143 | OUT TcpdumpMysqlParserStats &stats); |
954 | 144 | 144 | ||
955 | 145 | void ProcessFinishConnection(); | ||
956 | 146 | |||
957 | 147 | void SetCurrentOrigin(Origin o) { current_origin = o; } | 145 | void SetCurrentOrigin(Origin o) { current_origin = o; } |
958 | 148 | 146 | ||
959 | 149 | void SetDBThread(DBThread *t) { db_thread= t; } | ||
960 | 150 | |||
961 | 151 | LastExecutedQueryInfo last_executed_query_info; | 147 | LastExecutedQueryInfo last_executed_query_info; |
962 | 152 | 148 | ||
963 | 153 | private: | 149 | private: |
964 | @@ -159,11 +155,13 @@ | |||
965 | 159 | PktResult ServerPacket(IN UCharBuffer &buff); | 155 | PktResult ServerPacket(IN UCharBuffer &buff); |
966 | 160 | PktResult ClientPacket(IN UCharBuffer &buff, OUT std::string &query); | 156 | PktResult ClientPacket(IN UCharBuffer &buff, OUT std::string &query); |
967 | 161 | 157 | ||
969 | 162 | void DispatchQuery(const timeval &ts, | 158 | void DispatchQuery(boost::shared_ptr<ConnectionState> cs_ptr, |
970 | 159 | const timeval &ts, | ||
971 | 163 | const std::string &query, | 160 | const std::string &query, |
972 | 164 | const AddrPort &addr_port); | 161 | const AddrPort &addr_port); |
973 | 165 | 162 | ||
975 | 166 | void DispatchResult(const timeval &ts, | 163 | void DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr, |
976 | 164 | const timeval &ts, | ||
977 | 167 | const AddrPort &addr_port); | 165 | const AddrPort &addr_port); |
978 | 168 | 166 | ||
979 | 169 | Origin current_origin; | 167 | Origin current_origin; |
980 | @@ -179,7 +177,7 @@ | |||
981 | 179 | 177 | ||
982 | 180 | QueryResult last_query_result; | 178 | QueryResult last_query_result; |
983 | 181 | 179 | ||
985 | 182 | DBThread *db_thread; | 180 | uint64_t thread_id; |
986 | 183 | 181 | ||
987 | 184 | }; | 182 | }; |
988 | 185 | 183 | ||
989 | 186 | 184 | ||
990 | === modified file 'percona_playback/tcpdump/pcap_packets_parser.cc' | |||
991 | --- percona_playback/tcpdump/pcap_packets_parser.cc 2012-07-03 07:41:49 +0000 | |||
992 | +++ percona_playback/tcpdump/pcap_packets_parser.cc 2012-11-23 20:15:24 +0000 | |||
993 | @@ -18,7 +18,6 @@ | |||
994 | 18 | 18 | ||
995 | 19 | #include "percona_playback/db_thread.h" | 19 | #include "percona_playback/db_thread.h" |
996 | 20 | #include "percona_playback/plugin.h" | 20 | #include "percona_playback/plugin.h" |
997 | 21 | #include "percona_playback/dispatcher.h" | ||
998 | 22 | 21 | ||
999 | 23 | #include <netinet/in.h> | 22 | #include <netinet/in.h> |
1000 | 24 | #include <arpa/inet.h> | 23 | #include <arpa/inet.h> |
1001 | @@ -29,6 +28,7 @@ | |||
1002 | 29 | #define SNAP_LEN 16500 // pcap's max capture size | 28 | #define SNAP_LEN 16500 // pcap's max capture size |
1003 | 30 | 29 | ||
1004 | 31 | extern percona_playback::DBClientPlugin *g_dbclient_plugin; | 30 | extern percona_playback::DBClientPlugin *g_dbclient_plugin; |
1005 | 31 | extern percona_playback::DispatcherPlugin *g_dispatcher_plugin; | ||
1006 | 32 | 32 | ||
1007 | 33 | void | 33 | void |
1008 | 34 | PcapPacketsParser::ParsePkt(const struct pcap_pkthdr *header, | 34 | PcapPacketsParser::ParsePkt(const struct pcap_pkthdr *header, |
1009 | @@ -81,7 +81,8 @@ | |||
1010 | 81 | if (size_mysql == 0 && | 81 | if (size_mysql == 0 && |
1011 | 82 | ((tcp->th_flags & TH_FIN) || (tcp->th_flags & TH_RST))) | 82 | ((tcp->th_flags & TH_FIN) || (tcp->th_flags & TH_RST))) |
1012 | 83 | { | 83 | { |
1014 | 84 | g_dispatcher.finish_and_wait(addr_port.ThreadId()); | 84 | g_dispatcher_plugin->finish_and_wait(addr_port.ThreadId()); |
1015 | 85 | RemoveConnectionState(addr_port.ThreadId()); | ||
1016 | 85 | return; | 86 | return; |
1017 | 86 | } | 87 | } |
1018 | 87 | 88 | ||
1019 | @@ -97,37 +98,50 @@ | |||
1020 | 97 | } | 98 | } |
1021 | 98 | 99 | ||
1022 | 99 | /* If there is no DBThread with such id create it */ | 100 | /* If there is no DBThread with such id create it */ |
1028 | 100 | boost::shared_ptr<DBThreadState> | 101 | boost::shared_ptr<ConnectionState> |
1029 | 101 | state= g_dispatcher.get_thread_state(addr_port.ThreadId(), | 102 | state= GetConnectionState(addr_port.ThreadId()); |
1025 | 102 | boost::bind(&PcapPacketsParser::CreateConnectionState, | ||
1026 | 103 | this, | ||
1027 | 104 | _1)); | ||
1030 | 105 | 103 | ||
1031 | 106 | assert(state.get()); | 104 | assert(state.get()); |
1032 | 107 | 105 | ||
1039 | 108 | ((ConnectionState *)state.get())->SetCurrentOrigin(origin); | 106 | state->SetCurrentOrigin(origin); |
1040 | 109 | ((ConnectionState *)state.get())->ProcessMysqlPkts(mysql, | 107 | state->ProcessMysqlPkts(state, |
1041 | 110 | size_mysql, | 108 | mysql, |
1042 | 111 | header->ts, | 109 | size_mysql, |
1043 | 112 | addr_port, | 110 | header->ts, |
1044 | 113 | stats); | 111 | addr_port, |
1045 | 112 | stats); | ||
1046 | 114 | } | 113 | } |
1047 | 115 | 114 | ||
1048 | 116 | void | 115 | void |
1049 | 117 | PcapPacketsParser::WaitForUnfinishedTasks() | 116 | PcapPacketsParser::WaitForUnfinishedTasks() |
1050 | 118 | { | 117 | { |
1059 | 119 | g_dispatcher.finish_all_and_wait(); | 118 | g_dispatcher_plugin->finish_all_and_wait(); |
1060 | 120 | } | 119 | } |
1061 | 121 | 120 | ||
1062 | 122 | void | 121 | boost::shared_ptr<ConnectionState> |
1063 | 123 | PcapPacketsParser::CreateConnectionState(DBThread *db_thread) | 122 | PcapPacketsParser::GetConnectionState(uint64_t thread_id) |
1064 | 124 | { | 123 | { |
1065 | 125 | assert(db_thread); | 124 | Connections::iterator it = connections.find(thread_id); |
1066 | 126 | boost::shared_ptr<ConnectionState> state(new ConnectionState()); | 125 | if (it == connections.end()) |
1067 | 126 | return CreateConnectionState(thread_id); | ||
1068 | 127 | return it->second; | ||
1069 | 128 | } | ||
1070 | 129 | |||
1071 | 130 | boost::shared_ptr<ConnectionState> | ||
1072 | 131 | PcapPacketsParser::CreateConnectionState(uint64_t thread_id) | ||
1073 | 132 | { | ||
1074 | 133 | boost::shared_ptr<ConnectionState> state(new ConnectionState(thread_id)); | ||
1075 | 127 | state->last_executed_query_info.end_pcap_timestamp= | 134 | state->last_executed_query_info.end_pcap_timestamp= |
1076 | 128 | first_packet_pcap_timestamp; | 135 | first_packet_pcap_timestamp; |
1077 | 129 | state->last_executed_query_info.end_timestamp= | 136 | state->last_executed_query_info.end_timestamp= |
1078 | 130 | first_packet_timestamp; | 137 | first_packet_timestamp; |
1082 | 131 | db_thread->set_state(state); | 138 | connections[thread_id] = state; |
1083 | 132 | state->SetDBThread(db_thread); | 139 | return state; |
1084 | 133 | } | 140 | } |
1085 | 141 | |||
1086 | 142 | void | ||
1087 | 143 | PcapPacketsParser::RemoveConnectionState(uint64_t thread_id) | ||
1088 | 144 | { | ||
1089 | 145 | connections.erase(thread_id); | ||
1090 | 146 | } | ||
1091 | 147 | |||
1092 | 134 | 148 | ||
1093 | === modified file 'percona_playback/tcpdump/pcap_packets_parser.h' | |||
1094 | --- percona_playback/tcpdump/pcap_packets_parser.h 2012-06-19 15:26:53 +0000 | |||
1095 | +++ percona_playback/tcpdump/pcap_packets_parser.h 2012-11-23 20:15:24 +0000 | |||
1096 | @@ -23,6 +23,8 @@ | |||
1097 | 23 | #include <tbb/concurrent_queue.h> | 23 | #include <tbb/concurrent_queue.h> |
1098 | 24 | #include <boost/thread.hpp> | 24 | #include <boost/thread.hpp> |
1099 | 25 | #include <boost/bind.hpp> | 25 | #include <boost/bind.hpp> |
1100 | 26 | #include <boost/unordered_map.hpp> | ||
1101 | 27 | #include <boost/shared_ptr.hpp> | ||
1102 | 26 | 28 | ||
1103 | 27 | class DBThread; | 29 | class DBThread; |
1104 | 28 | 30 | ||
1105 | @@ -32,10 +34,14 @@ | |||
1106 | 32 | timeval first_packet_timestamp; | 34 | timeval first_packet_timestamp; |
1107 | 33 | timeval first_packet_pcap_timestamp; | 35 | timeval first_packet_pcap_timestamp; |
1108 | 34 | bool was_first_packet; | 36 | bool was_first_packet; |
1109 | 37 | typedef boost::unordered_map<uint64_t, | ||
1110 | 38 | boost::shared_ptr<ConnectionState> > | ||
1111 | 39 | Connections; | ||
1112 | 40 | Connections connections; | ||
1113 | 35 | 41 | ||
1114 | 36 | public: | 42 | public: |
1115 | 37 | 43 | ||
1117 | 38 | PcapPacketsParser() : was_first_packet(false) | 44 | PcapPacketsParser() : was_first_packet(false), connections(10000) |
1118 | 39 | { | 45 | { |
1119 | 40 | first_packet_timestamp.tv_sec= first_packet_timestamp.tv_usec= 0; | 46 | first_packet_timestamp.tv_sec= first_packet_timestamp.tv_usec= 0; |
1120 | 41 | first_packet_pcap_timestamp.tv_sec= first_packet_pcap_timestamp.tv_usec= 0; | 47 | first_packet_pcap_timestamp.tv_sec= first_packet_pcap_timestamp.tv_usec= 0; |
1121 | @@ -52,9 +58,11 @@ | |||
1122 | 52 | 58 | ||
1123 | 53 | void WaitForUnfinishedTasks(); | 59 | void WaitForUnfinishedTasks(); |
1124 | 54 | 60 | ||
1125 | 55 | void CreateConnectionState(DBThread *db_thread); | ||
1126 | 56 | 61 | ||
1127 | 57 | private: | 62 | private: |
1128 | 63 | boost::shared_ptr<ConnectionState> CreateConnectionState(uint64_t thread_id); | ||
1129 | 64 | boost::shared_ptr<ConnectionState> GetConnectionState(uint64_t thread_id); | ||
1130 | 65 | void RemoveConnectionState(uint64_t thread_id); | ||
1131 | 58 | 66 | ||
1132 | 59 | void ParsePkt(const struct pcap_pkthdr *header, | 67 | void ParsePkt(const struct pcap_pkthdr *header, |
1133 | 60 | const u_char *packet); | 68 | const u_char *packet); |
1134 | 61 | 69 | ||
1135 | === modified file 'percona_playback/tcpdump/tcpdump_query_entries.cc' | |||
1136 | --- percona_playback/tcpdump/tcpdump_query_entries.cc 2012-07-09 11:54:52 +0000 | |||
1137 | +++ percona_playback/tcpdump/tcpdump_query_entries.cc 2012-11-23 20:15:24 +0000 | |||
1138 | @@ -30,15 +30,10 @@ | |||
1139 | 30 | QueryResult expected_result; | 30 | QueryResult expected_result; |
1140 | 31 | timeval start_time; | 31 | timeval start_time; |
1141 | 32 | timeval end_time; | 32 | timeval end_time; |
1142 | 33 | boost::shared_ptr<DBThreadState> thr_state; | ||
1143 | 34 | 33 | ||
1144 | 35 | assert(t); | 34 | assert(t); |
1145 | 36 | 35 | ||
1151 | 37 | thr_state= t->get_state(); | 36 | ConnectionState &state= *connection_state; |
1147 | 38 | |||
1148 | 39 | assert(thr_state.get()); | ||
1149 | 40 | |||
1150 | 41 | ConnectionState &state= (ConnectionState &)*thr_state; | ||
1152 | 42 | 37 | ||
1153 | 43 | LastExecutedQueryInfo &last_query_info= state.last_executed_query_info; | 38 | LastExecutedQueryInfo &last_query_info= state.last_executed_query_info; |
1154 | 44 | 39 | ||
1155 | @@ -96,12 +91,7 @@ | |||
1156 | 96 | 91 | ||
1157 | 97 | assert(t); | 92 | assert(t); |
1158 | 98 | 93 | ||
1165 | 99 | thr_state= t->get_state(); | 94 | ConnectionState &state= *connection_state; |
1160 | 100 | |||
1161 | 101 | assert(thr_state.get()); | ||
1162 | 102 | |||
1163 | 103 | |||
1164 | 104 | ConnectionState &state= (ConnectionState &)*thr_state; | ||
1166 | 105 | LastExecutedQueryInfo &last_query_info= state.last_executed_query_info; | 95 | LastExecutedQueryInfo &last_query_info= state.last_executed_query_info; |
1167 | 106 | 96 | ||
1168 | 107 | timersub(&pcap_timestamp, | 97 | timersub(&pcap_timestamp, |
1169 | 108 | 98 | ||
1170 | === modified file 'percona_playback/tcpdump/tcpdump_query_entries.h' | |||
1171 | --- percona_playback/tcpdump/tcpdump_query_entries.h 2012-07-03 07:41:49 +0000 | |||
1172 | +++ percona_playback/tcpdump/tcpdump_query_entries.h 2012-11-23 20:15:24 +0000 | |||
1173 | @@ -21,9 +21,12 @@ | |||
1174 | 21 | #include "connection_state.h" | 21 | #include "connection_state.h" |
1175 | 22 | 22 | ||
1176 | 23 | #include <sys/time.h> | 23 | #include <sys/time.h> |
1177 | 24 | #include <boost/shared_ptr.hpp> | ||
1178 | 24 | 25 | ||
1179 | 25 | class TcpdumpQueryEntry : public QueryEntry | 26 | class TcpdumpQueryEntry : public QueryEntry |
1180 | 26 | { | 27 | { |
1181 | 28 | |||
1182 | 29 | boost::shared_ptr<ConnectionState> connection_state; | ||
1183 | 27 | timeval pcap_timestamp; | 30 | timeval pcap_timestamp; |
1184 | 28 | std::string query; | 31 | std::string query; |
1185 | 29 | 32 | ||
1186 | @@ -34,10 +37,12 @@ | |||
1187 | 34 | pcap_timestamp.tv_sec= pcap_timestamp.tv_usec= 0; | 37 | pcap_timestamp.tv_sec= pcap_timestamp.tv_usec= 0; |
1188 | 35 | } | 38 | } |
1189 | 36 | 39 | ||
1191 | 37 | TcpdumpQueryEntry(const timeval &_pcap_timestamp, | 40 | TcpdumpQueryEntry(boost::shared_ptr<ConnectionState> _connection_state, |
1192 | 41 | const timeval &_pcap_timestamp, | ||
1193 | 38 | const std::string &_query, | 42 | const std::string &_query, |
1194 | 39 | const AddrPort &addr_port) : | 43 | const AddrPort &addr_port) : |
1195 | 40 | QueryEntry(addr_port.ThreadId(), false), | 44 | QueryEntry(addr_port.ThreadId(), false), |
1196 | 45 | connection_state(_connection_state), | ||
1197 | 41 | pcap_timestamp(_pcap_timestamp), | 46 | pcap_timestamp(_pcap_timestamp), |
1198 | 42 | query(_query) | 47 | query(_query) |
1199 | 43 | {} | 48 | {} |
1200 | @@ -50,15 +55,18 @@ | |||
1201 | 50 | class TcpdumpResultEntry : public QueryEntry | 55 | class TcpdumpResultEntry : public QueryEntry |
1202 | 51 | { | 56 | { |
1203 | 52 | 57 | ||
1204 | 58 | boost::shared_ptr<ConnectionState> connection_state; | ||
1205 | 53 | timeval pcap_timestamp; | 59 | timeval pcap_timestamp; |
1206 | 54 | QueryResult expected_result; | 60 | QueryResult expected_result; |
1207 | 55 | 61 | ||
1208 | 56 | public: | 62 | public: |
1209 | 57 | TcpdumpResultEntry() {} | 63 | TcpdumpResultEntry() {} |
1211 | 58 | TcpdumpResultEntry(const AddrPort &addr_port, | 64 | TcpdumpResultEntry(boost::shared_ptr<ConnectionState> _connection_state, |
1212 | 65 | const AddrPort &addr_port, | ||
1213 | 59 | const timeval &_pcap_timestamp, | 66 | const timeval &_pcap_timestamp, |
1214 | 60 | const QueryResult &_expected_result) : | 67 | const QueryResult &_expected_result) : |
1215 | 61 | QueryEntry(addr_port.ThreadId(), false), | 68 | QueryEntry(addr_port.ThreadId(), false), |
1216 | 69 | connection_state(_connection_state), | ||
1217 | 62 | pcap_timestamp(_pcap_timestamp), | 70 | pcap_timestamp(_pcap_timestamp), |
1218 | 63 | expected_result(_expected_result) | 71 | expected_result(_expected_result) |
1219 | 64 | {} | 72 | {} |
1220 | 65 | 73 | ||
1221 | === added file 'percona_playback/test/thread-pool-sysbench-slow.cc' | |||
1222 | --- percona_playback/test/thread-pool-sysbench-slow.cc 1970-01-01 00:00:00 +0000 | |||
1223 | +++ percona_playback/test/thread-pool-sysbench-slow.cc 2012-11-23 20:15:24 +0000 | |||
1224 | @@ -0,0 +1,60 @@ | |||
1225 | 1 | /* BEGIN LICENSE | ||
1226 | 2 | * Copyright (C) 2011 Stewart Smith <stewart@flamingspork.com> | ||
1227 | 3 | * This program is free software: you can redistribute it and/or modify it | ||
1228 | 4 | * under the terms of the GNU General Public License version 3, as published | ||
1229 | 5 | * by the Free Software Foundation. | ||
1230 | 6 | * | ||
1231 | 7 | * This program is distributed in the hope that it will be useful, but | ||
1232 | 8 | * WITHOUT ANY WARRANTY; without even the implied warranties of | ||
1233 | 9 | * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR | ||
1234 | 10 | * PURPOSE. See the GNU General Public License for more details. | ||
1235 | 11 | * | ||
1236 | 12 | * You should have received a copy of the GNU General Public License along | ||
1237 | 13 | * with this program. If not, see <http://www.gnu.org/licenses/>. | ||
1238 | 14 | * END LICENSE */ | ||
1239 | 15 | |||
1240 | 16 | #include "config.h" | ||
1241 | 17 | |||
1242 | 18 | #include <iostream> | ||
1243 | 19 | #include <cstdio> | ||
1244 | 20 | #include <string> | ||
1245 | 21 | #include <cstring> | ||
1246 | 22 | #include <assert.h> | ||
1247 | 23 | #include <unistd.h> | ||
1248 | 24 | |||
1249 | 25 | #include <percona_playback/percona_playback.h> | ||
1250 | 26 | |||
1251 | 27 | /** | ||
1252 | 28 | * @TODO Actually write a real test suite here | ||
1253 | 29 | */ | ||
1254 | 30 | int main(int argc, char **argv) | ||
1255 | 31 | { | ||
1256 | 32 | (void)argc; (void)argv; | ||
1257 | 33 | |||
1258 | 34 | fprintf(stderr, "Working dir: %s\n\n", get_current_dir_name()); | ||
1259 | 35 | |||
1260 | 36 | percona_playback_st *the_percona_playback= percona_playback_create("test_Percona Playback"); | ||
1261 | 37 | assert(the_percona_playback); | ||
1262 | 38 | |||
1263 | 39 | char dbplugin[]="--db-plugin=null"; | ||
1264 | 40 | char querylog[]="--query-log-file="SRCDIR"/percona_playback/test/sysbench-slow.log"; | ||
1265 | 41 | char threadpool[]="--dispatcher-plugin=thread-pool" | ||
1266 | 42 | |||
1267 | 43 | char **dbplugin_argv= new char*[4]; | ||
1268 | 44 | dbplugin_argv[0]= argv[0]; | ||
1269 | 45 | dbplugin_argv[1]= dbplugin; | ||
1270 | 46 | dbplugin_argv[2]= querylog; | ||
1271 | 47 | dbplugin_argv[3]= threadpool; | ||
1272 | 48 | |||
1273 | 49 | |||
1274 | 50 | assert(0 == percona_playback_argv(the_percona_playback, 3, dbplugin_argv)); | ||
1275 | 51 | |||
1276 | 52 | struct percona_playback_run_result *r= percona_playback_run(the_percona_playback); | ||
1277 | 53 | |||
1278 | 54 | assert(r->err == 0); | ||
1279 | 55 | assert(r->n_queries == 10150); | ||
1280 | 56 | assert(r->n_log_entries = 10149); | ||
1281 | 57 | |||
1282 | 58 | percona_playback_destroy(&the_percona_playback); | ||
1283 | 59 | return r->err; | ||
1284 | 60 | } | ||
1285 | 0 | 61 | ||
1286 | === added directory 'percona_playback/thread_per_connection' | |||
1287 | === added file 'percona_playback/thread_per_connection/plugin.ini' | |||
1288 | --- percona_playback/thread_per_connection/plugin.ini 1970-01-01 00:00:00 +0000 | |||
1289 | +++ percona_playback/thread_per_connection/plugin.ini 2012-11-23 20:15:24 +0000 | |||
1290 | @@ -0,0 +1,6 @@ | |||
1291 | 1 | [plugin] | ||
1292 | 2 | title=Thread-per-connection dispatcher plugin | ||
1293 | 3 | description=Creates thread per each DB connection | ||
1294 | 4 | version=0.1 | ||
1295 | 5 | static=yes | ||
1296 | 6 | load_by_default=yes | ||
1297 | 0 | 7 | ||
1298 | === added file 'percona_playback/thread_per_connection/thread_per_connection.cc' | |||
1299 | --- percona_playback/thread_per_connection/thread_per_connection.cc 1970-01-01 00:00:00 +0000 | |||
1300 | +++ percona_playback/thread_per_connection/thread_per_connection.cc 2012-11-23 20:15:24 +0000 | |||
1301 | @@ -0,0 +1,109 @@ | |||
1302 | 1 | /* BEGIN LICENSE | ||
1303 | 2 | * Copyright (C) 2011-2012 Percona Inc. | ||
1304 | 3 | * This program is free software: you can redistribute it and/or modify it | ||
1305 | 4 | * under the terms of the GNU General Public License version 2, as published | ||
1306 | 5 | * by the Free Software Foundation. | ||
1307 | 6 | * | ||
1308 | 7 | * This program is distributed in the hope that it will be useful, but | ||
1309 | 8 | * WITHOUT ANY WARRANTY; without even the implied warranties of | ||
1310 | 9 | * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR | ||
1311 | 10 | * PURPOSE. See the GNU General Public License for more details. | ||
1312 | 11 | * | ||
1313 | 12 | * You should have received a copy of the GNU General Public License along | ||
1314 | 13 | * with this program. If not, see <http://www.gnu.org/licenses/>. | ||
1315 | 14 | * END LICENSE */ | ||
1316 | 15 | |||
1317 | 16 | #include "percona_playback/plugin.h" | ||
1318 | 17 | #include "percona_playback/db_thread.h" | ||
1319 | 18 | |||
1320 | 19 | #include <tbb/concurrent_hash_map.h> | ||
1321 | 20 | |||
1322 | 21 | class ThreadPerConnectionDispatcher : | ||
1323 | 22 | public percona_playback::DispatcherPlugin | ||
1324 | 23 | { | ||
1325 | 24 | typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable; | ||
1326 | 25 | DBExecutorsTable executors; | ||
1327 | 26 | void db_thread_func(DBThread *thread); | ||
1328 | 27 | void start_thread(DBThread *thread); | ||
1329 | 28 | |||
1330 | 29 | public: | ||
1331 | 30 | ThreadPerConnectionDispatcher(std::string _name) : | ||
1332 | 31 | DispatcherPlugin(_name) {} | ||
1333 | 32 | |||
1334 | 33 | void dispatch(QueryEntryPtr query_entry); | ||
1335 | 34 | bool finish_and_wait(uint64_t thread_id); | ||
1336 | 35 | void finish_all_and_wait(); | ||
1337 | 36 | }; | ||
1338 | 37 | |||
1339 | 38 | extern percona_playback::DBClientPlugin *g_dbclient_plugin; | ||
1340 | 39 | |||
1341 | 40 | void | ||
1342 | 41 | ThreadPerConnectionDispatcher::dispatch(QueryEntryPtr query_entry) | ||
1343 | 42 | { | ||
1344 | 43 | uint64_t thread_id= query_entry->getThreadId(); | ||
1345 | 44 | { | ||
1346 | 45 | DBExecutorsTable::accessor a; | ||
1347 | 46 | if (executors.insert(a, thread_id)) | ||
1348 | 47 | { | ||
1349 | 48 | DBThread *db_thread= g_dbclient_plugin->create(thread_id); | ||
1350 | 49 | a->second= db_thread; | ||
1351 | 50 | db_thread->start_thread(); | ||
1352 | 51 | } | ||
1353 | 52 | a->second->queries->push(query_entry); | ||
1354 | 53 | } | ||
1355 | 54 | } | ||
1356 | 55 | |||
1357 | 56 | bool | ||
1358 | 57 | ThreadPerConnectionDispatcher::finish_and_wait(uint64_t thread_id) | ||
1359 | 58 | { | ||
1360 | 59 | DBThread *db_thread= NULL; | ||
1361 | 60 | { | ||
1362 | 61 | DBExecutorsTable::accessor a; | ||
1363 | 62 | if (executors.find(a, thread_id)) | ||
1364 | 63 | { | ||
1365 | 64 | db_thread= a->second; | ||
1366 | 65 | executors.erase(a); | ||
1367 | 66 | } | ||
1368 | 67 | } | ||
1369 | 68 | |||
1370 | 69 | if (!db_thread) | ||
1371 | 70 | return false; | ||
1372 | 71 | |||
1373 | 72 | db_thread->queries->push(QueryEntryPtr(new FinishEntry(thread_id))); | ||
1374 | 73 | db_thread->join(); | ||
1375 | 74 | |||
1376 | 75 | delete db_thread; | ||
1377 | 76 | |||
1378 | 77 | return true; | ||
1379 | 78 | } | ||
1380 | 79 | |||
1381 | 80 | void | ||
1382 | 81 | ThreadPerConnectionDispatcher::finish_all_and_wait() | ||
1383 | 82 | { | ||
1384 | 83 | QueryEntryPtr shutdown_command(new FinishEntry(0)); | ||
1385 | 84 | |||
1386 | 85 | while(executors.size()) | ||
1387 | 86 | { | ||
1388 | 87 | uint64_t thread_id; | ||
1389 | 88 | DBThread *t; | ||
1390 | 89 | { | ||
1391 | 90 | DBExecutorsTable::const_iterator iter= executors.begin(); | ||
1392 | 91 | thread_id= (*iter).first; | ||
1393 | 92 | t= (*iter).second; | ||
1394 | 93 | } | ||
1395 | 94 | executors.erase(thread_id); | ||
1396 | 95 | |||
1397 | 96 | t->queries->push(shutdown_command); | ||
1398 | 97 | t->join(); | ||
1399 | 98 | |||
1400 | 99 | delete t; | ||
1401 | 100 | } | ||
1402 | 101 | } | ||
1403 | 102 | |||
1404 | 103 | static void init_plugin(percona_playback::PluginRegistry &r) | ||
1405 | 104 | { | ||
1406 | 105 | r.add("thread-per-connection", | ||
1407 | 106 | new ThreadPerConnectionDispatcher("thread-per-connection")); | ||
1408 | 107 | } | ||
1409 | 108 | |||
1410 | 109 | PERCONA_PLAYBACK_PLUGIN(init_plugin); | ||
1411 | 0 | 110 | ||
1412 | === added directory 'percona_playback/thread_pool' | |||
1413 | === added file 'percona_playback/thread_pool/plugin.ini' | |||
1414 | --- percona_playback/thread_pool/plugin.ini 1970-01-01 00:00:00 +0000 | |||
1415 | +++ percona_playback/thread_pool/plugin.ini 2012-11-23 20:15:24 +0000 | |||
1416 | @@ -0,0 +1,6 @@ | |||
1417 | 1 | [plugin] | ||
1418 | 2 | title=Thread-pool dispatcher plugin | ||
1419 | 3 | description=Creates threads pool to process connections to server | ||
1420 | 4 | version=0.1 | ||
1421 | 5 | static=yes | ||
1422 | 6 | load_by_default=yes | ||
1423 | 0 | 7 | ||
1424 | === added file 'percona_playback/thread_pool/thread_pool.cc' | |||
1425 | --- percona_playback/thread_pool/thread_pool.cc 1970-01-01 00:00:00 +0000 | |||
1426 | +++ percona_playback/thread_pool/thread_pool.cc 2012-11-23 20:15:24 +0000 | |||
1427 | @@ -0,0 +1,130 @@ | |||
1428 | 1 | /* BEGIN LICENSE | ||
1429 | 2 | * Copyright (C) 2011-2012 Percona Inc. | ||
1430 | 3 | * This program is free software: you can redistribute it and/or modify it | ||
1431 | 4 | * under the terms of the GNU General Public License version 2, as published | ||
1432 | 5 | * by the Free Software Foundation. | ||
1433 | 6 | * | ||
1434 | 7 | * This program is distributed in the hope that it will be useful, but | ||
1435 | 8 | * WITHOUT ANY WARRANTY; without even the implied warranties of | ||
1436 | 9 | * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR | ||
1437 | 10 | * PURPOSE. See the GNU General Public License for more details. | ||
1438 | 11 | * | ||
1439 | 12 | * You should have received a copy of the GNU General Public License along | ||
1440 | 13 | * with this program. If not, see <http://www.gnu.org/licenses/>. | ||
1441 | 14 | * END LICENSE */ | ||
1442 | 15 | |||
1443 | 16 | #include "percona_playback/plugin.h" | ||
1444 | 17 | #include "percona_playback/db_thread.h" | ||
1445 | 18 | #include <percona_playback/gettext.h> | ||
1446 | 19 | #include <boost/program_options.hpp> | ||
1447 | 20 | #include <boost/shared_ptr.hpp> | ||
1448 | 21 | #include <boost/crc.hpp> | ||
1449 | 22 | #include <vector> | ||
1450 | 23 | |||
1451 | 24 | #include <tbb/concurrent_hash_map.h> | ||
1452 | 25 | #include <stdio.h> | ||
1453 | 26 | |||
1454 | 27 | extern percona_playback::DBClientPlugin *g_dbclient_plugin; | ||
1455 | 28 | |||
1456 | 29 | class ThreadPoolDispatcher : | ||
1457 | 30 | public percona_playback::DispatcherPlugin | ||
1458 | 31 | { | ||
1459 | 32 | typedef std::vector<boost::shared_ptr<DBThread> > Workers; | ||
1460 | 33 | |||
1461 | 34 | unsigned threads_count; | ||
1462 | 35 | boost:: | ||
1463 | 36 | program_options:: | ||
1464 | 37 | options_description options; | ||
1465 | 38 | Workers workers; | ||
1466 | 39 | |||
1467 | 40 | public: | ||
1468 | 41 | ThreadPoolDispatcher(std::string _name) : | ||
1469 | 42 | DispatcherPlugin(_name), | ||
1470 | 43 | threads_count(0), | ||
1471 | 44 | options("Threads-pool Options") {} | ||
1472 | 45 | |||
1473 | 46 | virtual void dispatch(QueryEntryPtr query_entry); | ||
1474 | 47 | virtual bool finish_and_wait(uint64_t) { return true; } | ||
1475 | 48 | virtual void finish_all_and_wait(); | ||
1476 | 49 | virtual void run(); | ||
1477 | 50 | |||
1478 | 51 | boost::program_options::options_description* getProgramOptions(); | ||
1479 | 52 | int processOptions(boost::program_options::variables_map &vm); | ||
1480 | 53 | |||
1481 | 54 | }; | ||
1482 | 55 | |||
1483 | 56 | void ThreadPoolDispatcher::run() | ||
1484 | 57 | { | ||
1485 | 58 | for (unsigned i = 0; i < threads_count; ++i) | ||
1486 | 59 | { | ||
1487 | 60 | boost::shared_ptr<DBThread> db_thread(g_dbclient_plugin->create(i)); | ||
1488 | 61 | workers.push_back(db_thread); | ||
1489 | 62 | db_thread->start_thread(); | ||
1490 | 63 | } | ||
1491 | 64 | } | ||
1492 | 65 | |||
1493 | 66 | void ThreadPoolDispatcher::dispatch(QueryEntryPtr query_entry) | ||
1494 | 67 | { | ||
1495 | 68 | /* | ||
1496 | 69 | Each worker has its own queue. For some types of input plugins | ||
1497 | 70 | it is important to execute query entries with the same thread id | ||
1498 | 71 | by the same worker. That is why we choose worker by simple hash from | ||
1499 | 72 | thread id. | ||
1500 | 73 | */ | ||
1501 | 74 | uint64_t thread_id= query_entry->getThreadId(); | ||
1502 | 75 | boost::crc_32_type crc; | ||
1503 | 76 | crc.process_bytes(&thread_id, sizeof(thread_id)); | ||
1504 | 77 | uint32_t worker_index = crc.checksum() % workers.size(); | ||
1505 | 78 | workers[worker_index]->queries->push(query_entry); | ||
1506 | 79 | } | ||
1507 | 80 | |||
1508 | 81 | void ThreadPoolDispatcher::finish_all_and_wait() | ||
1509 | 82 | { | ||
1510 | 83 | QueryEntryPtr shutdown_command(new FinishEntry(0)); | ||
1511 | 84 | for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i) | ||
1512 | 85 | (*i)->queries->push(shutdown_command); | ||
1513 | 86 | for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i) | ||
1514 | 87 | (*i)->join(); | ||
1515 | 88 | workers.clear(); | ||
1516 | 89 | } | ||
1517 | 90 | |||
1518 | 91 | |||
1519 | 92 | boost::program_options::options_description* | ||
1520 | 93 | ThreadPoolDispatcher::getProgramOptions() | ||
1521 | 94 | { | ||
1522 | 95 | unsigned default_threads_count = | ||
1523 | 96 | boost::thread::hardware_concurrency(); | ||
1524 | 97 | options.add_options() | ||
1525 | 98 | ("thread-pool-threads-count", | ||
1526 | 99 | boost::program_options::value<unsigned>(&threads_count) | ||
1527 | 100 | ->default_value(default_threads_count), | ||
1528 | 101 | _("The number of threads in thread pool. If this options is omitted " | ||
1529 | 102 | "the number of threads equals to hardware concurency.")) | ||
1530 | 103 | ; | ||
1531 | 104 | |||
1532 | 105 | return &options; | ||
1533 | 106 | |||
1534 | 107 | } | ||
1535 | 108 | |||
1536 | 109 | int | ||
1537 | 110 | ThreadPoolDispatcher::processOptions(boost::program_options::variables_map &vm) | ||
1538 | 111 | { | ||
1539 | 112 | if (!active && | ||
1540 | 113 | !vm["thread-pool-threads-count"].defaulted()) | ||
1541 | 114 | { | ||
1542 | 115 | fprintf(stderr, _("thread-pool plugin is not selected, " | ||
1543 | 116 | "you shouldn't use this plugin-related " | ||
1544 | 117 | "command line options\n")); | ||
1545 | 118 | return -1; | ||
1546 | 119 | } | ||
1547 | 120 | |||
1548 | 121 | return 0; | ||
1549 | 122 | } | ||
1550 | 123 | |||
1551 | 124 | static void init_plugin(percona_playback::PluginRegistry &r) | ||
1552 | 125 | { | ||
1553 | 126 | r.add("thread-pool", | ||
1554 | 127 | new ThreadPoolDispatcher("thread-pool")); | ||
1555 | 128 | } | ||
1556 | 129 | |||
1557 | 130 | PERCONA_PLAYBACK_PLUGIN(init_plugin); |