Merge lp:~vlad-lesin/percona-playback/connections_pool into lp:percona-playback
- connections_pool
- Merge into trunk
Proposed by
Vlad Lesin
Status: | Merged |
---|---|
Approved by: | Stewart Smith |
Approved revision: | 178 |
Merged at revision: | 174 |
Proposed branch: | lp:~vlad-lesin/percona-playback/connections_pool |
Merge into: | lp:percona-playback |
Diff against target: |
1557 lines (+617/-299) 24 files modified
Makefile.am (+6/-3) percona_playback/db_thread.cc (+8/-10) percona_playback/db_thread.h (+18/-23) percona_playback/dispatcher.cc (+0/-105) percona_playback/dispatcher.h (+0/-46) percona_playback/libdrizzle_client/libdrizzle_client.cc (+4/-3) percona_playback/mysql_client/mysql_client.cc (+69/-27) percona_playback/mysql_client/mysql_client.h (+5/-2) percona_playback/null_dbclient/null_dbclient.cc (+4/-3) percona_playback/percona_playback.cc (+46/-1) percona_playback/plugin.h (+29/-0) percona_playback/query_entry.h (+2/-1) percona_playback/query_log/query_log.cc (+25/-8) percona_playback/tcpdump/connection_state.cc (+20/-15) percona_playback/tcpdump/connection_state.h (+10/-12) percona_playback/tcpdump/pcap_packets_parser.cc (+38/-24) percona_playback/tcpdump/pcap_packets_parser.h (+10/-2) percona_playback/tcpdump/tcpdump_query_entries.cc (+2/-12) percona_playback/tcpdump/tcpdump_query_entries.h (+10/-2) percona_playback/test/thread-pool-sysbench-slow.cc (+60/-0) percona_playback/thread_per_connection/plugin.ini (+6/-0) percona_playback/thread_per_connection/thread_per_connection.cc (+109/-0) percona_playback/thread_pool/plugin.ini (+6/-0) percona_playback/thread_pool/thread_pool.cc (+130/-0) |
To merge this branch: | bzr merge lp:~vlad-lesin/percona-playback/connections_pool |
Related bugs: | |
Related blueprints: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Stewart Smith (community) | Approve | ||
Review via email: mp+134884@code.launchpad.net |
Commit message
Description of the change
A bunch of changes for implementing this https:/
To post a comment you must log in.
- 171. By Vlad Lesin
-
Fixes for bug #1070824 and bug #1080654
- 172. By Vlad Lesin
-
merged with trunk
- 173. By Vlad Lesin
-
Send init session query on reconnect
- 174. By Vlad Lesin
-
Initialize session init query with the value from cmd line options.
- 175. By Vlad Lesin
-
Thread-safety fixes for mysql client plugin.
- 176. By Vlad Lesin
-
Don't push empty query log queries for execution
- 177. By Vlad Lesin
-
Convert multiline query to single line to avoid syntax errors from server.
- 178. By Vlad Lesin
-
Allow multi-statement queries.
Revision history for this message
Stewart Smith (stewart) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'Makefile.am' |
2 | --- Makefile.am 2012-11-08 22:17:07 +0000 |
3 | +++ Makefile.am 2012-11-23 20:15:24 +0000 |
4 | @@ -59,13 +59,11 @@ |
5 | libpercona_playback_la_SOURCES = \ |
6 | percona_playback/percona_playback.cc \ |
7 | percona_playback/plugin.cc \ |
8 | - percona_playback/db_thread.cc \ |
9 | - percona_playback/dispatcher.cc |
10 | + percona_playback/db_thread.cc |
11 | |
12 | nobase_include_HEADERS += \ |
13 | percona_playback/percona_playback.h \ |
14 | percona_playback/db_thread.h \ |
15 | - percona_playback/dispatcher.h \ |
16 | percona_playback/query_result.h \ |
17 | percona_playback/plugin.h \ |
18 | percona_playback/tokenize.h \ |
19 | @@ -89,6 +87,7 @@ |
20 | percona_playback/test/crashme-slow \ |
21 | percona_playback/test/sqlbench-transactions-slow \ |
22 | percona_playback/test/sysbench-slow \ |
23 | + percona_playback/test/thread-pool-sysbench-slow \ |
24 | percona_playback/test/preserve_query_time \ |
25 | percona_playback/test/tcpdump_without_handshake \ |
26 | percona_playback/test/tcpdump_fragmented_packet \ |
27 | @@ -104,6 +103,7 @@ |
28 | percona_playback_test_crashme_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
29 | percona_playback_test_sqlbench_transactions_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
30 | percona_playback_test_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
31 | +percona_playback_test_thread_pool_sysbench_slow_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
32 | percona_playback_test_preserve_query_time_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
33 | percona_playback_test_tcpdump_without_handshake_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
34 | percona_playback_test_tcpdump_fragmented_packet_CXXFLAGS= $(AM_CXXFLAGS) -DSRCDIR=\"${srcdir}\" |
35 | @@ -135,6 +135,9 @@ |
36 | percona_playback_test_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc |
37 | percona_playback_test_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS) |
38 | |
39 | +percona_playback_test_thread_pool_sysbench_slow_SOURCES= percona_playback/test/sysbench-slow.cc |
40 | +percona_playback_test_thread_pool_sysbench_slow_LDADD= $(LDADD) $(MYSQL_LIBS) |
41 | + |
42 | percona_playback_test_preserve_query_time_SOURCES= percona_playback/test/preserve_query_time.cc |
43 | percona_playback_test_preserve_query_time_LDADD= $(LDADD) $(MYSQL_LIBS) |
44 | |
45 | |
46 | === modified file 'percona_playback/db_thread.cc' |
47 | --- percona_playback/db_thread.cc 2012-11-14 11:44:59 +0000 |
48 | +++ percona_playback/db_thread.cc 2012-11-23 20:15:24 +0000 |
49 | @@ -24,25 +24,23 @@ |
50 | |
51 | extern std::string g_session_init_query; |
52 | |
53 | -void DBThread::connect_and_init_session() |
54 | +void DBThread::init_session() |
55 | { |
56 | - connect(); |
57 | - if (!g_session_init_query.empty()) |
58 | - { |
59 | - QueryResult r; |
60 | - QueryResult er; |
61 | - execute_query(g_session_init_query, &r, er); |
62 | - } |
63 | + if (g_session_init_query.empty()) |
64 | + return; |
65 | + QueryResult r; |
66 | + QueryResult er; |
67 | + execute_query(g_session_init_query, &r, er); |
68 | } |
69 | |
70 | void DBThread::run() |
71 | { |
72 | connect_and_init_session(); |
73 | |
74 | - QueryEntryPtr query; |
75 | while (true) |
76 | { |
77 | - queries.pop(query); |
78 | + QueryEntryPtr query; |
79 | + queries->pop(query); |
80 | |
81 | if (query->is_shutdown()) |
82 | break; |
83 | |
84 | === modified file 'percona_playback/db_thread.h' |
85 | --- percona_playback/db_thread.h 2012-11-14 11:44:59 +0000 |
86 | +++ percona_playback/db_thread.h 2012-11-23 20:15:24 +0000 |
87 | @@ -16,6 +16,7 @@ |
88 | #ifndef PERCONA_PLAYBACK_DB_THREAD_H |
89 | #define PERCONA_PLAYBACK_DB_THREAD_H |
90 | |
91 | +#include <memory> |
92 | #include "percona_playback/visibility.h" |
93 | #include <boost/thread.hpp> |
94 | #include <boost/shared_ptr.hpp> |
95 | @@ -33,55 +34,49 @@ |
96 | |
97 | class QueryResult; |
98 | |
99 | -class DBThreadState |
100 | -{ |
101 | -public: |
102 | - virtual ~DBThreadState(){} |
103 | -}; |
104 | - |
105 | class DBThread |
106 | { |
107 | |
108 | private: |
109 | boost::thread thread; |
110 | uint64_t thread_id; |
111 | - boost::shared_ptr<DBThreadState> state; |
112 | |
113 | public: |
114 | typedef tbb::concurrent_bounded_queue<QueryEntryPtr> Queries; |
115 | - Queries queries; |
116 | + boost::shared_ptr<Queries> queries; |
117 | |
118 | - DBThread(uint64_t _thread_id) : thread_id(_thread_id) { |
119 | - queries.set_capacity(g_db_thread_queue_depth); |
120 | + DBThread(uint64_t _thread_id, |
121 | + boost::shared_ptr<Queries> _queries) : |
122 | + thread_id(_thread_id), queries(_queries) { |
123 | + queries->set_capacity(g_db_thread_queue_depth); |
124 | } |
125 | |
126 | virtual ~DBThread() {} |
127 | |
128 | - void set_state(boost::shared_ptr<DBThreadState> new_state) |
129 | - { |
130 | - state= new_state; |
131 | - } |
132 | - |
133 | - boost::shared_ptr<DBThreadState> get_state() const |
134 | - { |
135 | - return state; |
136 | - } |
137 | - |
138 | void join() |
139 | { |
140 | thread.join(); |
141 | } |
142 | |
143 | - void connect_and_init_session(); |
144 | + bool connect_and_init_session() |
145 | + { |
146 | + if (connect()) |
147 | + { |
148 | + init_session(); |
149 | + return true; |
150 | + } |
151 | + return false; |
152 | + } |
153 | |
154 | - virtual void connect()= 0; |
155 | + void init_session(); |
156 | + virtual bool connect()= 0; |
157 | |
158 | virtual void disconnect()= 0; |
159 | virtual void execute_query(const std::string &query, |
160 | QueryResult *r, |
161 | const QueryResult &expected_result)= 0; |
162 | |
163 | - void run(); |
164 | + virtual void run(); |
165 | |
166 | void start_thread(); |
167 | }; |
168 | |
169 | === removed file 'percona_playback/dispatcher.cc' |
170 | --- percona_playback/dispatcher.cc 2012-07-03 07:41:49 +0000 |
171 | +++ percona_playback/dispatcher.cc 1970-01-01 00:00:00 +0000 |
172 | @@ -1,105 +0,0 @@ |
173 | -/* BEGIN LICENSE |
174 | - * Copyright (C) 2011-2012 Percona Inc. |
175 | - * This program is free software: you can redistribute it and/or modify it |
176 | - * under the terms of the GNU General Public License version 2, as published |
177 | - * by the Free Software Foundation. |
178 | - * |
179 | - * This program is distributed in the hope that it will be useful, but |
180 | - * WITHOUT ANY WARRANTY; without even the implied warranties of |
181 | - * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
182 | - * PURPOSE. See the GNU General Public License for more details. |
183 | - * |
184 | - * You should have received a copy of the GNU General Public License along |
185 | - * with this program. If not, see <http://www.gnu.org/licenses/>. |
186 | - * END LICENSE */ |
187 | -#include "percona_playback/dispatcher.h" |
188 | -#include "percona_playback/db_thread.h" |
189 | -#include "percona_playback/plugin.h" |
190 | - |
191 | -#include <assert.h> |
192 | - |
193 | -Dispatcher g_dispatcher; |
194 | - |
195 | -extern percona_playback::DBClientPlugin *g_dbclient_plugin; |
196 | - |
197 | -boost::shared_ptr<DBThreadState> |
198 | -Dispatcher::get_thread_state( |
199 | - uint64_t thread_id, |
200 | - boost::function1<void, DBThread *> run_on_db_thread_create) |
201 | -{ |
202 | - DBExecutorsTable::accessor a; |
203 | - |
204 | - if (executors.insert(a, thread_id)) |
205 | - { |
206 | - DBThread *db_thread= g_dbclient_plugin->create(thread_id); |
207 | - assert(db_thread); |
208 | - a->second= db_thread; |
209 | - if (!run_on_db_thread_create.empty()) |
210 | - run_on_db_thread_create(db_thread); |
211 | - db_thread->start_thread(); |
212 | - } |
213 | - |
214 | - return a->second->get_state(); |
215 | -} |
216 | - |
217 | -void |
218 | -Dispatcher::dispatch(QueryEntryPtr query_entry) |
219 | -{ |
220 | - uint64_t thread_id= query_entry->getThreadId(); |
221 | - { |
222 | - DBExecutorsTable::accessor a; |
223 | - if (executors.insert(a, thread_id)) |
224 | - { |
225 | - DBThread *db_thread= g_dbclient_plugin->create(thread_id); |
226 | - a->second= db_thread; |
227 | - db_thread->start_thread(); |
228 | - } |
229 | - a->second->queries.push(query_entry); |
230 | - } |
231 | -} |
232 | - |
233 | -bool |
234 | -Dispatcher::finish_and_wait(uint64_t thread_id) |
235 | -{ |
236 | - DBThread *db_thread= NULL; |
237 | - { |
238 | - DBExecutorsTable::accessor a; |
239 | - if (executors.find(a, thread_id)) |
240 | - { |
241 | - db_thread= a->second; |
242 | - executors.erase(a); |
243 | - } |
244 | - } |
245 | - |
246 | - if (!db_thread) |
247 | - return false; |
248 | - |
249 | - db_thread->queries.push(QueryEntryPtr(new FinishEntry())); |
250 | - db_thread->join(); |
251 | - |
252 | - delete db_thread; |
253 | - |
254 | - return true; |
255 | -} |
256 | - |
257 | -void |
258 | -Dispatcher::finish_all_and_wait() |
259 | -{ |
260 | - QueryEntryPtr shutdown_command(new FinishEntry()); |
261 | - while(executors.size()) |
262 | - { |
263 | - uint64_t thread_id; |
264 | - DBThread *t; |
265 | - { |
266 | - DBExecutorsTable::const_iterator iter= executors.begin(); |
267 | - thread_id= (*iter).first; |
268 | - t= (*iter).second; |
269 | - } |
270 | - executors.erase(thread_id); |
271 | - |
272 | - t->queries.push(shutdown_command); |
273 | - t->join(); |
274 | - |
275 | - delete t; |
276 | - } |
277 | -} |
278 | |
279 | === removed file 'percona_playback/dispatcher.h' |
280 | --- percona_playback/dispatcher.h 2012-07-06 07:15:28 +0000 |
281 | +++ percona_playback/dispatcher.h 1970-01-01 00:00:00 +0000 |
282 | @@ -1,46 +0,0 @@ |
283 | -/* BEGIN LICENSE |
284 | - * Copyright (C) 2011 Percona Inc. |
285 | - * This program is free software: you can redistribute it and/or modify it |
286 | - * under the terms of the GNU General Public License version 2, as published |
287 | - * by the Free Software Foundation. |
288 | - * |
289 | - * This program is distributed in the hope that it will be useful, but |
290 | - * WITHOUT ANY WARRANTY; without even the implied warranties of |
291 | - * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
292 | - * PURPOSE. See the GNU General Public License for more details. |
293 | - * |
294 | - * You should have received a copy of the GNU General Public License along |
295 | - * with this program. If not, see <http://www.gnu.org/licenses/>. |
296 | - * END LICENSE */ |
297 | - |
298 | -#ifndef PERCONA_PLAYBACK_DISPATCHER_H |
299 | -#define PERCONA_PLAYBACK_DISPATCHER_H |
300 | - |
301 | -#include <tbb/concurrent_hash_map.h> |
302 | -#include <boost/function.hpp> |
303 | - |
304 | -#include "percona_playback/query_entry.h" |
305 | - |
306 | -class DBThread; |
307 | -class DBThreadState; |
308 | - |
309 | -class Dispatcher |
310 | -{ |
311 | - typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable; |
312 | - DBExecutorsTable executors; |
313 | - void db_thread_func(DBThread *thread); |
314 | - void start_thread(DBThread *thread); |
315 | - |
316 | -public: |
317 | - boost::shared_ptr<DBThreadState> |
318 | - get_thread_state(uint64_t thread_id, |
319 | - boost::function1<void, DBThread *> |
320 | - run_on_db_thread_create); |
321 | - void dispatch(QueryEntryPtr query_entry); |
322 | - bool finish_and_wait(uint64_t thread_id); |
323 | - void finish_all_and_wait(); |
324 | -}; |
325 | - |
326 | -extern Dispatcher g_dispatcher; |
327 | - |
328 | -#endif /* PERCONA_PLAYBACK_DISPATCHER_H */ |
329 | |
330 | === modified file 'percona_playback/libdrizzle_client/libdrizzle_client.cc' |
331 | --- percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-07-23 07:08:23 +0000 |
332 | +++ percona_playback/libdrizzle_client/libdrizzle_client.cc 2012-11-23 20:15:24 +0000 |
333 | @@ -39,13 +39,13 @@ |
334 | |
335 | public: |
336 | LibDrizzleDBThread(uint64_t _thread_id, LibDrizzleOptions *opt) : |
337 | - DBThread(_thread_id), |
338 | + DBThread(_thread_id, boost::shared_ptr<Queries>(new Queries())), |
339 | driz(NULL), |
340 | options(opt) |
341 | { |
342 | } |
343 | |
344 | - void connect(); |
345 | + bool connect(); |
346 | void disconnect(); |
347 | void execute_query(const std::string &query, QueryResult *r, |
348 | const QueryResult &expected_result); |
349 | @@ -70,7 +70,7 @@ |
350 | unsigned int port; |
351 | }; |
352 | |
353 | -void LibDrizzleDBThread::connect() |
354 | +bool LibDrizzleDBThread::connect() |
355 | { |
356 | driz= drizzle_create(NULL); |
357 | assert(driz != NULL); |
358 | @@ -81,6 +81,7 @@ |
359 | options->user.c_str(), |
360 | options->password.c_str(), |
361 | options->schema.c_str(), DRIZZLE_CON_MYSQL); |
362 | + return true; |
363 | } |
364 | |
365 | void LibDrizzleDBThread::disconnect() |
366 | |
367 | === modified file 'percona_playback/mysql_client/mysql_client.cc' |
368 | --- percona_playback/mysql_client/mysql_client.cc 2012-07-04 09:10:16 +0000 |
369 | +++ percona_playback/mysql_client/mysql_client.cc 2012-11-23 20:15:24 +0000 |
370 | @@ -42,17 +42,23 @@ |
371 | unsigned int port; |
372 | }; |
373 | |
374 | -void MySQLDBThread::connect() |
375 | +bool MySQLDBThread::connect() |
376 | { |
377 | mysql_init(&handle); |
378 | - mysql_real_connect(&handle, |
379 | - options->host.c_str(), |
380 | - options->user.c_str(), |
381 | - options->password.c_str(), |
382 | - options->schema.c_str(), |
383 | - options->port, |
384 | - "", |
385 | - 0); |
386 | + if (!mysql_real_connect(&handle, |
387 | + options->host.c_str(), |
388 | + options->user.c_str(), |
389 | + options->password.c_str(), |
390 | + options->schema.c_str(), |
391 | + options->port, |
392 | + "", |
393 | + CLIENT_MULTI_STATEMENTS)) |
394 | + { |
395 | + fprintf(stderr, "Can't connect to server: %s\n", |
396 | + mysql_error(&handle)); |
397 | + return false; |
398 | + } |
399 | + return true; |
400 | } |
401 | |
402 | void MySQLDBThread::disconnect() |
403 | @@ -63,29 +69,50 @@ |
404 | void MySQLDBThread::execute_query(const std::string &query, QueryResult *r, |
405 | const QueryResult &) |
406 | { |
407 | - int mr= mysql_real_query(&handle, query.c_str(), query.length()); |
408 | - if(mr != 0) |
409 | - { |
410 | - r->setError(mr); |
411 | - } |
412 | - else |
413 | - { |
414 | - MYSQL_RES* mysql_res= NULL; |
415 | - |
416 | - r->setError(mr); |
417 | - r->setWarningCount(mysql_warning_count(&handle)); |
418 | - |
419 | - mysql_res= mysql_store_result(&handle); |
420 | - |
421 | - if (mysql_res != NULL) |
422 | - r->setRowsSent(mysql_num_rows(mysql_res)); |
423 | + int mr; |
424 | + for(unsigned i = 0; i < max_err_num; ++i) |
425 | + { |
426 | + mr= mysql_real_query(&handle, query.c_str(), query.length()); |
427 | + r->setError(mr); |
428 | + if(mr != 0) |
429 | + { |
430 | + fprintf(stderr, |
431 | + "Error during query: %s, number of tries %u\n", |
432 | + mysql_error(&handle), |
433 | + i); |
434 | + disconnect(); |
435 | + connect_and_init_session(); |
436 | + } |
437 | else |
438 | + { |
439 | + r->setWarningCount(mysql_warning_count(&handle)); |
440 | r->setRowsSent(0); |
441 | - |
442 | - mysql_free_result(mysql_res); |
443 | + do |
444 | + { |
445 | + MYSQL_RES* mysql_res= NULL; |
446 | + |
447 | + mysql_res= mysql_store_result(&handle); |
448 | + |
449 | + if (mysql_res != NULL) |
450 | + { |
451 | + r->setRowsSent(mysql_num_rows(mysql_res)); |
452 | + mysql_free_result(mysql_res); |
453 | + } |
454 | + |
455 | + } while(!mysql_next_result(&handle)); |
456 | + |
457 | + break; |
458 | + } |
459 | } |
460 | } |
461 | |
462 | +void MySQLDBThread::run() |
463 | +{ |
464 | + mysql_thread_init(); |
465 | + DBThread::run(); |
466 | + mysql_thread_end(); |
467 | +} |
468 | + |
469 | class MySQLDBClientPlugin : public percona_playback::DBClientPlugin |
470 | { |
471 | private: |
472 | @@ -126,6 +153,21 @@ |
473 | return -1; |
474 | } |
475 | |
476 | + if (!active) |
477 | + return 0; |
478 | + |
479 | + if (!mysql_thread_safe()) |
480 | + { |
481 | + fprintf(stderr, "libmysqlclient is not thread safe\n"); |
482 | + return -1; |
483 | + } |
484 | + |
485 | + if (mysql_library_init(0, NULL, NULL)) |
486 | + { |
487 | + fprintf(stderr, "could not initialize mysql library\n"); |
488 | + return -1; |
489 | + } |
490 | + |
491 | if (vm.count("mysql-host")) |
492 | { |
493 | options.host= vm["mysql-host"].as<std::string>(); |
494 | |
495 | === modified file 'percona_playback/mysql_client/mysql_client.h' |
496 | --- percona_playback/mysql_client/mysql_client.h 2012-06-15 10:53:23 +0000 |
497 | +++ percona_playback/mysql_client/mysql_client.h 2012-11-23 20:15:24 +0000 |
498 | @@ -10,16 +10,19 @@ |
499 | private: |
500 | MYSQL handle; |
501 | MySQLOptions *options; |
502 | + static const unsigned max_err_num = 10; |
503 | |
504 | public: |
505 | MySQLDBThread(uint64_t _thread_id, MySQLOptions *opt) : |
506 | - DBThread(_thread_id), |
507 | + DBThread(_thread_id, |
508 | + boost::shared_ptr<Queries>(new Queries())), |
509 | options(opt) |
510 | { |
511 | } |
512 | |
513 | - void connect(); |
514 | + bool connect(); |
515 | void disconnect(); |
516 | void execute_query(const std::string &query, QueryResult *r, |
517 | const QueryResult &expected_result); |
518 | + void run(); |
519 | }; |
520 | |
521 | === modified file 'percona_playback/null_dbclient/null_dbclient.cc' |
522 | --- percona_playback/null_dbclient/null_dbclient.cc 2012-04-05 04:47:09 +0000 |
523 | +++ percona_playback/null_dbclient/null_dbclient.cc 2012-11-23 20:15:24 +0000 |
524 | @@ -20,10 +20,11 @@ |
525 | class NULLDBThread : public DBThread |
526 | { |
527 | public: |
528 | - NULLDBThread(uint64_t _thread_id) : DBThread(_thread_id) { |
529 | - } |
530 | + NULLDBThread(uint64_t _thread_id) : |
531 | + DBThread(_thread_id, |
532 | + boost::shared_ptr<Queries>(new Queries())) {} |
533 | |
534 | - void connect() {}; |
535 | + bool connect() { return true; }; |
536 | void disconnect() {}; |
537 | void execute_query(const std::string &, QueryResult *r, |
538 | const QueryResult &expected_result) { |
539 | |
540 | === modified file 'percona_playback/percona_playback.cc' |
541 | --- percona_playback/percona_playback.cc 2012-11-14 11:44:59 +0000 |
542 | +++ percona_playback/percona_playback.cc 2012-11-23 20:15:24 +0000 |
543 | @@ -40,6 +40,7 @@ |
544 | |
545 | percona_playback::DBClientPlugin *g_dbclient_plugin= NULL; |
546 | percona_playback::InputPlugin *g_input_plugin= NULL; |
547 | +percona_playback::DispatcherPlugin *g_dispatcher_plugin= NULL; |
548 | unsigned int g_db_thread_queue_depth; |
549 | std::string g_session_init_query; |
550 | |
551 | @@ -132,6 +133,22 @@ |
552 | std::cerr << _("Selected Input Plugin: ") |
553 | << g_input_plugin->name |
554 | << std::endl; |
555 | + |
556 | + std::cerr << std::endl << _("Loaded Dispatcher Plugins: "); |
557 | + |
558 | + BOOST_FOREACH(const PluginRegistry::DispatcherPluginPair &pp, |
559 | + PluginRegistry::singleton().dispatcher_plugins) |
560 | + { |
561 | + std::cerr << pp.first << " "; |
562 | + } |
563 | + |
564 | + std::cerr << std::endl; |
565 | + std::cerr << std::endl; |
566 | + |
567 | + assert(g_dispatcher_plugin); |
568 | + std::cerr << _("Selected dispatcher Plugin: ") |
569 | + << g_dispatcher_plugin->name |
570 | + << std::endl; |
571 | } |
572 | |
573 | int percona_playback_argv(percona_playback_st *the_percona_playback, |
574 | @@ -152,12 +169,13 @@ |
575 | db_options.add_options() |
576 | ("db-plugin", po::value<std::string>(), _("Database plugin")) |
577 | ("input-plugin", po::value<std::string>(), _("Input plugin")) |
578 | + ("dispatcher-plugin", po::value<std::string>(), _("Dispatcher plugin")) |
579 | ("queue-depth", po::value<unsigned int>(), |
580 | _("Queue depth for DB executor (thread). The larger this number the" |
581 | " greater the played-back workload can deviate from the original workload" |
582 | " as some connections may be up to queue-depth behind. (default 1)")) |
583 | ("session-init-query", |
584 | - po::value<std::string>()->default_value(""), |
585 | + po::value<std::string>(&g_session_init_query)->default_value(""), |
586 | _("This query will be executed just after each connect to db")) |
587 | ; |
588 | |
589 | @@ -237,6 +255,32 @@ |
590 | } |
591 | g_input_plugin->active= true; |
592 | |
593 | + if (vm.count("dispatcher-plugin")) |
594 | + { |
595 | + PluginRegistry::DispatcherPluginMap::iterator it; |
596 | + it= PluginRegistry::singleton().dispatcher_plugins.find( |
597 | + vm["dispatcher-plugin"].as<std::string>()); |
598 | + if (it == PluginRegistry::singleton().dispatcher_plugins.end()) |
599 | + { |
600 | + fprintf(stderr, _("Invalid Dispatcher Plugin\n")); |
601 | + return -1; |
602 | + } |
603 | + g_dispatcher_plugin= it->second; |
604 | + } |
605 | + else |
606 | + { |
607 | + PluginRegistry::DispatcherPluginMap::iterator it; |
608 | + it= PluginRegistry::singleton().dispatcher_plugins. |
609 | + find("thread-per-connection"); |
610 | + if (it == PluginRegistry::singleton().dispatcher_plugins.end()) |
611 | + { |
612 | + fprintf(stderr, _("Invalid Dispatcher plugin\n")); |
613 | + return -1; |
614 | + } |
615 | + g_dispatcher_plugin= it->second; |
616 | + } |
617 | + g_dispatcher_plugin->active= true; |
618 | + |
619 | if (vm.count("help") || argc==1) |
620 | { |
621 | help(options_description); |
622 | @@ -299,6 +343,7 @@ |
623 | std::cerr << _("Database Plugin: ") << g_dbclient_plugin->name << std::endl; |
624 | std::cerr << _(" Running...") << std::endl; |
625 | |
626 | + g_dispatcher_plugin->run(); |
627 | g_input_plugin->run(*r); |
628 | |
629 | BOOST_FOREACH(const PluginRegistry::ReportPluginPair pp, |
630 | |
631 | === modified file 'percona_playback/plugin.h' |
632 | --- percona_playback/plugin.h 2012-06-24 04:28:41 +0000 |
633 | +++ percona_playback/plugin.h 2012-11-23 20:15:24 +0000 |
634 | @@ -20,6 +20,9 @@ |
635 | #include <vector> |
636 | #include <string> |
637 | #include <map> |
638 | +#include <boost/shared_ptr.hpp> |
639 | +#include <boost/function.hpp> |
640 | +#include "percona_playback/query_entry.h" |
641 | #include <percona_playback/visibility.h> |
642 | #include <percona_playback/version.h> |
643 | |
644 | @@ -31,6 +34,7 @@ |
645 | } |
646 | |
647 | class DBThread; |
648 | +class DBThreadState; |
649 | class QueryResult; |
650 | class percona_playback_run_result; |
651 | |
652 | @@ -102,6 +106,21 @@ |
653 | |
654 | }; |
655 | |
656 | +class DispatcherPlugin : public plugin |
657 | +{ |
658 | + public: |
659 | + std::string name; |
660 | + |
661 | + DispatcherPlugin(const std::string &_name) : name(_name) {} |
662 | + |
663 | + virtual void dispatch(QueryEntryPtr query_entry)= 0; |
664 | + virtual bool finish_and_wait(uint64_t thread_id)= 0; |
665 | + virtual void finish_all_and_wait()= 0; |
666 | + |
667 | + virtual void run() {}; |
668 | + |
669 | +}; |
670 | + |
671 | class PluginRegistry |
672 | { |
673 | public: |
674 | @@ -122,6 +141,9 @@ |
675 | typedef std::map<std::string, InputPlugin*> InputPluginMap; |
676 | typedef std::pair<std::string, InputPlugin*> InputPluginPair; |
677 | |
678 | + typedef std::map<std::string, DispatcherPlugin*> DispatcherPluginMap; |
679 | + typedef std::pair<std::string, DispatcherPlugin*> DispatcherPluginPair; |
680 | + |
681 | typedef std::map<std::string, plugin*> PluginMap; |
682 | typedef std::pair<std::string, plugin*> PluginPair; |
683 | |
684 | @@ -129,6 +151,7 @@ |
685 | DBClientPluginMap dbclient_plugins; |
686 | ReportPluginMap report_plugins; |
687 | InputPluginMap input_plugins; |
688 | + DispatcherPluginMap dispatcher_plugins; |
689 | |
690 | void add(const std::string &name, plugin* plugin_object) |
691 | { |
692 | @@ -154,6 +177,12 @@ |
693 | all_plugins.insert(PluginPair(name, input_plugin)); |
694 | } |
695 | |
696 | + void add(const std::string &name, DispatcherPlugin* dispatcher_plugin) |
697 | + { |
698 | + dispatcher_plugins.insert(DispatcherPluginPair(name, dispatcher_plugin)); |
699 | + all_plugins.insert(PluginPair(name, dispatcher_plugin)); |
700 | + } |
701 | + |
702 | }; |
703 | |
704 | |
705 | |
706 | === modified file 'percona_playback/query_entry.h' |
707 | --- percona_playback/query_entry.h 2012-07-03 02:37:51 +0000 |
708 | +++ percona_playback/query_entry.h 2012-11-23 20:15:24 +0000 |
709 | @@ -45,7 +45,8 @@ |
710 | class FinishEntry : public QueryEntry |
711 | { |
712 | public: |
713 | - FinishEntry() { set_shutdown(); } |
714 | + FinishEntry(uint64_t _thread_id) : |
715 | + QueryEntry (_thread_id, true) {} |
716 | bool is_quit() { return false; } |
717 | |
718 | void execute(DBThread *) {} |
719 | |
720 | === modified file 'percona_playback/query_log/query_log.cc' |
721 | --- percona_playback/query_log/query_log.cc 2012-11-08 22:17:07 +0000 |
722 | +++ percona_playback/query_log/query_log.cc 2012-11-23 20:15:24 +0000 |
723 | @@ -39,7 +39,6 @@ |
724 | #include <percona_playback/db_thread.h> |
725 | #include <percona_playback/query_log/query_log.h> |
726 | #include <percona_playback/query_result.h> |
727 | -#include "percona_playback/dispatcher.h" |
728 | #include <percona_playback/gettext.h> |
729 | |
730 | #include <boost/foreach.hpp> |
731 | @@ -51,6 +50,8 @@ |
732 | static bool g_run_set_timestamp; |
733 | static bool g_preserve_query_time; |
734 | |
735 | +extern percona_playback::DispatcherPlugin *g_dispatcher_plugin; |
736 | + |
737 | class ParseQueryLogFunc: public tbb::filter { |
738 | public: |
739 | ParseQueryLogFunc(FILE *input_file_, |
740 | @@ -83,7 +84,7 @@ |
741 | new std::vector<boost::shared_ptr<QueryLogEntry> >(); |
742 | |
743 | boost::shared_ptr<QueryLogEntry> tmp_entry(new QueryLogEntry()); |
744 | - entries->push_back(tmp_entry); |
745 | + // entries->push_back(tmp_entry); |
746 | |
747 | char *line= NULL; |
748 | size_t buflen = 0; |
749 | @@ -130,9 +131,10 @@ |
750 | |
751 | if (strncmp(p, "# User@Host", strlen("# User@Host")) == 0) |
752 | { |
753 | + if (!tmp_entry->getQuery().empty()) |
754 | + entries->push_back(tmp_entry); |
755 | count++; |
756 | tmp_entry.reset(new QueryLogEntry()); |
757 | - entries->push_back(tmp_entry); |
758 | (*this->nr_entries)++; |
759 | } |
760 | |
761 | @@ -172,6 +174,9 @@ |
762 | p= line; |
763 | } |
764 | |
765 | + if (!tmp_entry->getQuery().empty()) |
766 | + entries->push_back(tmp_entry); |
767 | + |
768 | free(line); |
769 | return entries; |
770 | } |
771 | @@ -239,7 +244,19 @@ |
772 | && s.compare(0, timestamp_query.length(), timestamp_query) == 0) |
773 | set_timestamp_query= s; |
774 | else |
775 | - query.append(s); |
776 | + { |
777 | + //Append space insead of \r\n |
778 | + std::string::const_iterator end = s.end() - 1; |
779 | + if (s.length() >= 2 && *(s.end() - 2) == '\r') |
780 | + --end; |
781 | + //Remove initial spaces for best query viewing in reports |
782 | + std::string::const_iterator begin; |
783 | + for (begin = s.begin(); begin != end; ++begin) |
784 | + if (*begin != ' ' && *begin != '\t') |
785 | + break; |
786 | + query.append(begin, end); |
787 | + query.append(" "); |
788 | + } |
789 | } |
790 | |
791 | bool QueryLogEntry::parse_metadata(const std::string &s) |
792 | @@ -281,13 +298,13 @@ |
793 | r= true; |
794 | } |
795 | } |
796 | - |
797 | +/* |
798 | if (s[0] == '#' && strncmp(s.c_str(), "# administrator", strlen("# administrator"))) |
799 | { |
800 | query.append(s); |
801 | r= true; |
802 | } |
803 | - |
804 | +*/ |
805 | return r; |
806 | } |
807 | |
808 | @@ -300,7 +317,7 @@ |
809 | for (unsigned int i=0; i< input->size(); i++) |
810 | { |
811 | // usleep(10); |
812 | - g_dispatcher.dispatch((*input)[i]); |
813 | + g_dispatcher_plugin->dispatch((*input)[i]); |
814 | } |
815 | delete input; |
816 | return NULL; |
817 | @@ -330,7 +347,7 @@ |
818 | p.add_filter(f4); |
819 | p.run(2); |
820 | |
821 | - g_dispatcher.finish_all_and_wait(); |
822 | + g_dispatcher_plugin->finish_all_and_wait(); |
823 | |
824 | r->n_log_entries= entries; |
825 | r->n_queries= queries; |
826 | |
827 | === modified file 'percona_playback/tcpdump/connection_state.cc' |
828 | --- percona_playback/tcpdump/connection_state.cc 2012-07-02 17:51:47 +0000 |
829 | +++ percona_playback/tcpdump/connection_state.cc 2012-11-23 20:15:24 +0000 |
830 | @@ -25,9 +25,11 @@ |
831 | #include "percona_playback/plugin.h" |
832 | |
833 | extern percona_playback::DBClientPlugin *g_dbclient_plugin; |
834 | +extern percona_playback::DispatcherPlugin *g_dispatcher_plugin; |
835 | |
836 | void |
837 | -ConnectionState::ProcessMysqlPkts(const u_char *pkts, |
838 | +ConnectionState::ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr, |
839 | + const u_char *pkts, |
840 | u_int pkts_len, |
841 | const timeval &ts, |
842 | const AddrPort &addr_port, |
843 | @@ -76,13 +78,13 @@ |
844 | { |
845 | case PKT_QUERY: |
846 | if (!query.empty()) |
847 | - DispatchQuery(ts, query, addr_port); |
848 | + DispatchQuery(cs_ptr, ts, query, addr_port); |
849 | ++stats.nr_of_parsed_queries; |
850 | ++stats.nr_of_parsed_packets; |
851 | break; |
852 | |
853 | case PKT_RESULT: |
854 | - DispatchResult(ts, addr_port); |
855 | + DispatchResult(cs_ptr, ts, addr_port); |
856 | ++stats.nr_of_parsed_packets; |
857 | break; |
858 | |
859 | @@ -107,12 +109,6 @@ |
860 | |
861 | } |
862 | |
863 | -void |
864 | -ConnectionState::ProcessFinishConnection() |
865 | -{ |
866 | - db_thread->queries.push(QueryEntryPtr(new FinishEntry())); |
867 | -} |
868 | - |
869 | PktResult |
870 | ConnectionState::ParseMysqlPkt(IN UCharBuffer &buff, |
871 | OUT std::string &query) |
872 | @@ -303,23 +299,32 @@ |
873 | } |
874 | |
875 | void |
876 | -ConnectionState::DispatchQuery(const timeval &ts, |
877 | +ConnectionState::DispatchQuery(/* Having shared pointer to "this" as an argument |
878 | + is just temporary step. We need a shared pointer |
879 | + to connection state inside of query entry. The |
880 | + better way to do this is to move code that creates |
881 | + query entries to the upper layer. |
882 | + */ |
883 | + boost::shared_ptr<ConnectionState> cs_ptr, |
884 | + const timeval &ts, |
885 | const std::string &query, |
886 | const AddrPort &addr_port) |
887 | { |
888 | boost::shared_ptr<TcpdumpQueryEntry> |
889 | - query_entry(new TcpdumpQueryEntry(ts, |
890 | + query_entry(new TcpdumpQueryEntry(cs_ptr, |
891 | + ts, |
892 | query, |
893 | addr_port)); |
894 | - db_thread->queries.push(query_entry); |
895 | + g_dispatcher_plugin->dispatch(query_entry); |
896 | } |
897 | |
898 | void |
899 | -ConnectionState::DispatchResult(const timeval &ts, |
900 | +ConnectionState::DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr, |
901 | + const timeval &ts, |
902 | const AddrPort &addr_port) |
903 | { |
904 | boost::shared_ptr<TcpdumpResultEntry> |
905 | - result_entry(new TcpdumpResultEntry(addr_port, ts, last_query_result)); |
906 | - db_thread->queries.push(result_entry); |
907 | + result_entry(new TcpdumpResultEntry(cs_ptr, addr_port, ts, last_query_result)); |
908 | + g_dispatcher_plugin->dispatch(result_entry); |
909 | } |
910 | |
911 | |
912 | === modified file 'percona_playback/tcpdump/connection_state.h' |
913 | --- percona_playback/tcpdump/connection_state.h 2012-07-03 07:41:49 +0000 |
914 | +++ percona_playback/tcpdump/connection_state.h 2012-11-23 20:15:24 +0000 |
915 | @@ -94,8 +94,7 @@ |
916 | }; |
917 | |
918 | |
919 | -class ConnectionState : public DBThreadState |
920 | - |
921 | +class ConnectionState |
922 | { |
923 | |
924 | public: |
925 | @@ -107,7 +106,7 @@ |
926 | SERVER |
927 | }; |
928 | |
929 | - ConnectionState() : |
930 | + ConnectionState(uint64_t _thread_id) : |
931 | current_origin(UNDEF), |
932 | fragmented(false), |
933 | handshake_from_client(false), |
934 | @@ -126,7 +125,7 @@ |
935 | (drizzle_con_options_t) |
936 | DRIZZLE_CON_MYSQL), |
937 | drizzle_con_free), |
938 | - db_thread(NULL) |
939 | + thread_id(_thread_id) |
940 | { |
941 | drizzle_con->result= NULL; |
942 | } |
943 | @@ -136,18 +135,15 @@ |
944 | drizzle_result_free_all(drizzle_con.get()); |
945 | } |
946 | |
947 | - void ProcessMysqlPkts(const u_char *pkts, |
948 | + void ProcessMysqlPkts(boost::shared_ptr<ConnectionState> cs_ptr, |
949 | + const u_char *pkts, |
950 | u_int total_len, |
951 | const timeval &ts, |
952 | const AddrPort &addr_port, |
953 | OUT TcpdumpMysqlParserStats &stats); |
954 | |
955 | - void ProcessFinishConnection(); |
956 | - |
957 | void SetCurrentOrigin(Origin o) { current_origin = o; } |
958 | |
959 | - void SetDBThread(DBThread *t) { db_thread= t; } |
960 | - |
961 | LastExecutedQueryInfo last_executed_query_info; |
962 | |
963 | private: |
964 | @@ -159,11 +155,13 @@ |
965 | PktResult ServerPacket(IN UCharBuffer &buff); |
966 | PktResult ClientPacket(IN UCharBuffer &buff, OUT std::string &query); |
967 | |
968 | - void DispatchQuery(const timeval &ts, |
969 | + void DispatchQuery(boost::shared_ptr<ConnectionState> cs_ptr, |
970 | + const timeval &ts, |
971 | const std::string &query, |
972 | const AddrPort &addr_port); |
973 | |
974 | - void DispatchResult(const timeval &ts, |
975 | + void DispatchResult(boost::shared_ptr<ConnectionState> cs_ptr, |
976 | + const timeval &ts, |
977 | const AddrPort &addr_port); |
978 | |
979 | Origin current_origin; |
980 | @@ -179,7 +177,7 @@ |
981 | |
982 | QueryResult last_query_result; |
983 | |
984 | - DBThread *db_thread; |
985 | + uint64_t thread_id; |
986 | |
987 | }; |
988 | |
989 | |
990 | === modified file 'percona_playback/tcpdump/pcap_packets_parser.cc' |
991 | --- percona_playback/tcpdump/pcap_packets_parser.cc 2012-07-03 07:41:49 +0000 |
992 | +++ percona_playback/tcpdump/pcap_packets_parser.cc 2012-11-23 20:15:24 +0000 |
993 | @@ -18,7 +18,6 @@ |
994 | |
995 | #include "percona_playback/db_thread.h" |
996 | #include "percona_playback/plugin.h" |
997 | -#include "percona_playback/dispatcher.h" |
998 | |
999 | #include <netinet/in.h> |
1000 | #include <arpa/inet.h> |
1001 | @@ -29,6 +28,7 @@ |
1002 | #define SNAP_LEN 16500 // pcap's max capture size |
1003 | |
1004 | extern percona_playback::DBClientPlugin *g_dbclient_plugin; |
1005 | +extern percona_playback::DispatcherPlugin *g_dispatcher_plugin; |
1006 | |
1007 | void |
1008 | PcapPacketsParser::ParsePkt(const struct pcap_pkthdr *header, |
1009 | @@ -81,7 +81,8 @@ |
1010 | if (size_mysql == 0 && |
1011 | ((tcp->th_flags & TH_FIN) || (tcp->th_flags & TH_RST))) |
1012 | { |
1013 | - g_dispatcher.finish_and_wait(addr_port.ThreadId()); |
1014 | + g_dispatcher_plugin->finish_and_wait(addr_port.ThreadId()); |
1015 | + RemoveConnectionState(addr_port.ThreadId()); |
1016 | return; |
1017 | } |
1018 | |
1019 | @@ -97,37 +98,50 @@ |
1020 | } |
1021 | |
1022 | /* If there is no DBThread with such id create it */ |
1023 | - boost::shared_ptr<DBThreadState> |
1024 | - state= g_dispatcher.get_thread_state(addr_port.ThreadId(), |
1025 | - boost::bind(&PcapPacketsParser::CreateConnectionState, |
1026 | - this, |
1027 | - _1)); |
1028 | + boost::shared_ptr<ConnectionState> |
1029 | + state= GetConnectionState(addr_port.ThreadId()); |
1030 | |
1031 | assert(state.get()); |
1032 | |
1033 | - ((ConnectionState *)state.get())->SetCurrentOrigin(origin); |
1034 | - ((ConnectionState *)state.get())->ProcessMysqlPkts(mysql, |
1035 | - size_mysql, |
1036 | - header->ts, |
1037 | - addr_port, |
1038 | - stats); |
1039 | + state->SetCurrentOrigin(origin); |
1040 | + state->ProcessMysqlPkts(state, |
1041 | + mysql, |
1042 | + size_mysql, |
1043 | + header->ts, |
1044 | + addr_port, |
1045 | + stats); |
1046 | } |
1047 | |
1048 | void |
1049 | PcapPacketsParser::WaitForUnfinishedTasks() |
1050 | { |
1051 | - g_dispatcher.finish_all_and_wait(); |
1052 | -} |
1053 | - |
1054 | -void |
1055 | -PcapPacketsParser::CreateConnectionState(DBThread *db_thread) |
1056 | -{ |
1057 | - assert(db_thread); |
1058 | - boost::shared_ptr<ConnectionState> state(new ConnectionState()); |
1059 | + g_dispatcher_plugin->finish_all_and_wait(); |
1060 | +} |
1061 | + |
1062 | +boost::shared_ptr<ConnectionState> |
1063 | +PcapPacketsParser::GetConnectionState(uint64_t thread_id) |
1064 | +{ |
1065 | + Connections::iterator it = connections.find(thread_id); |
1066 | + if (it == connections.end()) |
1067 | + return CreateConnectionState(thread_id); |
1068 | + return it->second; |
1069 | +} |
1070 | + |
1071 | +boost::shared_ptr<ConnectionState> |
1072 | +PcapPacketsParser::CreateConnectionState(uint64_t thread_id) |
1073 | +{ |
1074 | + boost::shared_ptr<ConnectionState> state(new ConnectionState(thread_id)); |
1075 | state->last_executed_query_info.end_pcap_timestamp= |
1076 | first_packet_pcap_timestamp; |
1077 | state->last_executed_query_info.end_timestamp= |
1078 | first_packet_timestamp; |
1079 | - db_thread->set_state(state); |
1080 | - state->SetDBThread(db_thread); |
1081 | -} |
1082 | + connections[thread_id] = state; |
1083 | + return state; |
1084 | +} |
1085 | + |
1086 | +void |
1087 | +PcapPacketsParser::RemoveConnectionState(uint64_t thread_id) |
1088 | +{ |
1089 | + connections.erase(thread_id); |
1090 | +} |
1091 | + |
1092 | |
1093 | === modified file 'percona_playback/tcpdump/pcap_packets_parser.h' |
1094 | --- percona_playback/tcpdump/pcap_packets_parser.h 2012-06-19 15:26:53 +0000 |
1095 | +++ percona_playback/tcpdump/pcap_packets_parser.h 2012-11-23 20:15:24 +0000 |
1096 | @@ -23,6 +23,8 @@ |
1097 | #include <tbb/concurrent_queue.h> |
1098 | #include <boost/thread.hpp> |
1099 | #include <boost/bind.hpp> |
1100 | +#include <boost/unordered_map.hpp> |
1101 | +#include <boost/shared_ptr.hpp> |
1102 | |
1103 | class DBThread; |
1104 | |
1105 | @@ -32,10 +34,14 @@ |
1106 | timeval first_packet_timestamp; |
1107 | timeval first_packet_pcap_timestamp; |
1108 | bool was_first_packet; |
1109 | + typedef boost::unordered_map<uint64_t, |
1110 | + boost::shared_ptr<ConnectionState> > |
1111 | + Connections; |
1112 | + Connections connections; |
1113 | |
1114 | public: |
1115 | |
1116 | - PcapPacketsParser() : was_first_packet(false) |
1117 | + PcapPacketsParser() : was_first_packet(false), connections(10000) |
1118 | { |
1119 | first_packet_timestamp.tv_sec= first_packet_timestamp.tv_usec= 0; |
1120 | first_packet_pcap_timestamp.tv_sec= first_packet_pcap_timestamp.tv_usec= 0; |
1121 | @@ -52,9 +58,11 @@ |
1122 | |
1123 | void WaitForUnfinishedTasks(); |
1124 | |
1125 | - void CreateConnectionState(DBThread *db_thread); |
1126 | |
1127 | private: |
1128 | + boost::shared_ptr<ConnectionState> CreateConnectionState(uint64_t thread_id); |
1129 | + boost::shared_ptr<ConnectionState> GetConnectionState(uint64_t thread_id); |
1130 | + void RemoveConnectionState(uint64_t thread_id); |
1131 | |
1132 | void ParsePkt(const struct pcap_pkthdr *header, |
1133 | const u_char *packet); |
1134 | |
1135 | === modified file 'percona_playback/tcpdump/tcpdump_query_entries.cc' |
1136 | --- percona_playback/tcpdump/tcpdump_query_entries.cc 2012-07-09 11:54:52 +0000 |
1137 | +++ percona_playback/tcpdump/tcpdump_query_entries.cc 2012-11-23 20:15:24 +0000 |
1138 | @@ -30,15 +30,10 @@ |
1139 | QueryResult expected_result; |
1140 | timeval start_time; |
1141 | timeval end_time; |
1142 | - boost::shared_ptr<DBThreadState> thr_state; |
1143 | |
1144 | assert(t); |
1145 | |
1146 | - thr_state= t->get_state(); |
1147 | - |
1148 | - assert(thr_state.get()); |
1149 | - |
1150 | - ConnectionState &state= (ConnectionState &)*thr_state; |
1151 | + ConnectionState &state= *connection_state; |
1152 | |
1153 | LastExecutedQueryInfo &last_query_info= state.last_executed_query_info; |
1154 | |
1155 | @@ -96,12 +91,7 @@ |
1156 | |
1157 | assert(t); |
1158 | |
1159 | - thr_state= t->get_state(); |
1160 | - |
1161 | - assert(thr_state.get()); |
1162 | - |
1163 | - |
1164 | - ConnectionState &state= (ConnectionState &)*thr_state; |
1165 | + ConnectionState &state= *connection_state; |
1166 | LastExecutedQueryInfo &last_query_info= state.last_executed_query_info; |
1167 | |
1168 | timersub(&pcap_timestamp, |
1169 | |
1170 | === modified file 'percona_playback/tcpdump/tcpdump_query_entries.h' |
1171 | --- percona_playback/tcpdump/tcpdump_query_entries.h 2012-07-03 07:41:49 +0000 |
1172 | +++ percona_playback/tcpdump/tcpdump_query_entries.h 2012-11-23 20:15:24 +0000 |
1173 | @@ -21,9 +21,12 @@ |
1174 | #include "connection_state.h" |
1175 | |
1176 | #include <sys/time.h> |
1177 | +#include <boost/shared_ptr.hpp> |
1178 | |
1179 | class TcpdumpQueryEntry : public QueryEntry |
1180 | { |
1181 | + |
1182 | + boost::shared_ptr<ConnectionState> connection_state; |
1183 | timeval pcap_timestamp; |
1184 | std::string query; |
1185 | |
1186 | @@ -34,10 +37,12 @@ |
1187 | pcap_timestamp.tv_sec= pcap_timestamp.tv_usec= 0; |
1188 | } |
1189 | |
1190 | - TcpdumpQueryEntry(const timeval &_pcap_timestamp, |
1191 | + TcpdumpQueryEntry(boost::shared_ptr<ConnectionState> _connection_state, |
1192 | + const timeval &_pcap_timestamp, |
1193 | const std::string &_query, |
1194 | const AddrPort &addr_port) : |
1195 | QueryEntry(addr_port.ThreadId(), false), |
1196 | + connection_state(_connection_state), |
1197 | pcap_timestamp(_pcap_timestamp), |
1198 | query(_query) |
1199 | {} |
1200 | @@ -50,15 +55,18 @@ |
1201 | class TcpdumpResultEntry : public QueryEntry |
1202 | { |
1203 | |
1204 | + boost::shared_ptr<ConnectionState> connection_state; |
1205 | timeval pcap_timestamp; |
1206 | QueryResult expected_result; |
1207 | |
1208 | public: |
1209 | TcpdumpResultEntry() {} |
1210 | - TcpdumpResultEntry(const AddrPort &addr_port, |
1211 | + TcpdumpResultEntry(boost::shared_ptr<ConnectionState> _connection_state, |
1212 | + const AddrPort &addr_port, |
1213 | const timeval &_pcap_timestamp, |
1214 | const QueryResult &_expected_result) : |
1215 | QueryEntry(addr_port.ThreadId(), false), |
1216 | + connection_state(_connection_state), |
1217 | pcap_timestamp(_pcap_timestamp), |
1218 | expected_result(_expected_result) |
1219 | {} |
1220 | |
1221 | === added file 'percona_playback/test/thread-pool-sysbench-slow.cc' |
1222 | --- percona_playback/test/thread-pool-sysbench-slow.cc 1970-01-01 00:00:00 +0000 |
1223 | +++ percona_playback/test/thread-pool-sysbench-slow.cc 2012-11-23 20:15:24 +0000 |
1224 | @@ -0,0 +1,60 @@ |
1225 | +/* BEGIN LICENSE |
1226 | + * Copyright (C) 2011 Stewart Smith <stewart@flamingspork.com> |
1227 | + * This program is free software: you can redistribute it and/or modify it |
1228 | + * under the terms of the GNU General Public License version 3, as published |
1229 | + * by the Free Software Foundation. |
1230 | + * |
1231 | + * This program is distributed in the hope that it will be useful, but |
1232 | + * WITHOUT ANY WARRANTY; without even the implied warranties of |
1233 | + * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1234 | + * PURPOSE. See the GNU General Public License for more details. |
1235 | + * |
1236 | + * You should have received a copy of the GNU General Public License along |
1237 | + * with this program. If not, see <http://www.gnu.org/licenses/>. |
1238 | + * END LICENSE */ |
1239 | + |
1240 | +#include "config.h" |
1241 | + |
1242 | +#include <iostream> |
1243 | +#include <cstdio> |
1244 | +#include <string> |
1245 | +#include <cstring> |
1246 | +#include <assert.h> |
1247 | +#include <unistd.h> |
1248 | + |
1249 | +#include <percona_playback/percona_playback.h> |
1250 | + |
1251 | +/** |
1252 | + * @TODO Actually write a real test suite here |
1253 | + */ |
1254 | +int main(int argc, char **argv) |
1255 | +{ |
1256 | + (void)argc; (void)argv; |
1257 | + |
1258 | + fprintf(stderr, "Working dir: %s\n\n", get_current_dir_name()); |
1259 | + |
1260 | + percona_playback_st *the_percona_playback= percona_playback_create("test_Percona Playback"); |
1261 | + assert(the_percona_playback); |
1262 | + |
1263 | + char dbplugin[]="--db-plugin=null"; |
1264 | + char querylog[]="--query-log-file="SRCDIR"/percona_playback/test/sysbench-slow.log"; |
1265 | + char threadpool[]="--dispatcher-plugin=thread-pool" |
1266 | + |
1267 | + char **dbplugin_argv= new char*[4]; |
1268 | + dbplugin_argv[0]= argv[0]; |
1269 | + dbplugin_argv[1]= dbplugin; |
1270 | + dbplugin_argv[2]= querylog; |
1271 | + dbplugin_argv[3]= threadpool; |
1272 | + |
1273 | + |
1274 | + assert(0 == percona_playback_argv(the_percona_playback, 3, dbplugin_argv)); |
1275 | + |
1276 | + struct percona_playback_run_result *r= percona_playback_run(the_percona_playback); |
1277 | + |
1278 | + assert(r->err == 0); |
1279 | + assert(r->n_queries == 10150); |
1280 | + assert(r->n_log_entries = 10149); |
1281 | + |
1282 | + percona_playback_destroy(&the_percona_playback); |
1283 | + return r->err; |
1284 | +} |
1285 | |
1286 | === added directory 'percona_playback/thread_per_connection' |
1287 | === added file 'percona_playback/thread_per_connection/plugin.ini' |
1288 | --- percona_playback/thread_per_connection/plugin.ini 1970-01-01 00:00:00 +0000 |
1289 | +++ percona_playback/thread_per_connection/plugin.ini 2012-11-23 20:15:24 +0000 |
1290 | @@ -0,0 +1,6 @@ |
1291 | +[plugin] |
1292 | +title=Thread-per-connection dispatcher plugin |
1293 | +description=Creates thread per each DB connection |
1294 | +version=0.1 |
1295 | +static=yes |
1296 | +load_by_default=yes |
1297 | |
1298 | === added file 'percona_playback/thread_per_connection/thread_per_connection.cc' |
1299 | --- percona_playback/thread_per_connection/thread_per_connection.cc 1970-01-01 00:00:00 +0000 |
1300 | +++ percona_playback/thread_per_connection/thread_per_connection.cc 2012-11-23 20:15:24 +0000 |
1301 | @@ -0,0 +1,109 @@ |
1302 | +/* BEGIN LICENSE |
1303 | + * Copyright (C) 2011-2012 Percona Inc. |
1304 | + * This program is free software: you can redistribute it and/or modify it |
1305 | + * under the terms of the GNU General Public License version 2, as published |
1306 | + * by the Free Software Foundation. |
1307 | + * |
1308 | + * This program is distributed in the hope that it will be useful, but |
1309 | + * WITHOUT ANY WARRANTY; without even the implied warranties of |
1310 | + * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1311 | + * PURPOSE. See the GNU General Public License for more details. |
1312 | + * |
1313 | + * You should have received a copy of the GNU General Public License along |
1314 | + * with this program. If not, see <http://www.gnu.org/licenses/>. |
1315 | + * END LICENSE */ |
1316 | + |
1317 | +#include "percona_playback/plugin.h" |
1318 | +#include "percona_playback/db_thread.h" |
1319 | + |
1320 | +#include <tbb/concurrent_hash_map.h> |
1321 | + |
1322 | +class ThreadPerConnectionDispatcher : |
1323 | + public percona_playback::DispatcherPlugin |
1324 | +{ |
1325 | + typedef tbb::concurrent_hash_map<uint64_t, DBThread*> DBExecutorsTable; |
1326 | + DBExecutorsTable executors; |
1327 | + void db_thread_func(DBThread *thread); |
1328 | + void start_thread(DBThread *thread); |
1329 | + |
1330 | +public: |
1331 | + ThreadPerConnectionDispatcher(std::string _name) : |
1332 | + DispatcherPlugin(_name) {} |
1333 | + |
1334 | + void dispatch(QueryEntryPtr query_entry); |
1335 | + bool finish_and_wait(uint64_t thread_id); |
1336 | + void finish_all_and_wait(); |
1337 | +}; |
1338 | + |
1339 | +extern percona_playback::DBClientPlugin *g_dbclient_plugin; |
1340 | + |
1341 | +void |
1342 | +ThreadPerConnectionDispatcher::dispatch(QueryEntryPtr query_entry) |
1343 | +{ |
1344 | + uint64_t thread_id= query_entry->getThreadId(); |
1345 | + { |
1346 | + DBExecutorsTable::accessor a; |
1347 | + if (executors.insert(a, thread_id)) |
1348 | + { |
1349 | + DBThread *db_thread= g_dbclient_plugin->create(thread_id); |
1350 | + a->second= db_thread; |
1351 | + db_thread->start_thread(); |
1352 | + } |
1353 | + a->second->queries->push(query_entry); |
1354 | + } |
1355 | +} |
1356 | + |
1357 | +bool |
1358 | +ThreadPerConnectionDispatcher::finish_and_wait(uint64_t thread_id) |
1359 | +{ |
1360 | + DBThread *db_thread= NULL; |
1361 | + { |
1362 | + DBExecutorsTable::accessor a; |
1363 | + if (executors.find(a, thread_id)) |
1364 | + { |
1365 | + db_thread= a->second; |
1366 | + executors.erase(a); |
1367 | + } |
1368 | + } |
1369 | + |
1370 | + if (!db_thread) |
1371 | + return false; |
1372 | + |
1373 | + db_thread->queries->push(QueryEntryPtr(new FinishEntry(thread_id))); |
1374 | + db_thread->join(); |
1375 | + |
1376 | + delete db_thread; |
1377 | + |
1378 | + return true; |
1379 | +} |
1380 | + |
1381 | +void |
1382 | +ThreadPerConnectionDispatcher::finish_all_and_wait() |
1383 | +{ |
1384 | + QueryEntryPtr shutdown_command(new FinishEntry(0)); |
1385 | + |
1386 | + while(executors.size()) |
1387 | + { |
1388 | + uint64_t thread_id; |
1389 | + DBThread *t; |
1390 | + { |
1391 | + DBExecutorsTable::const_iterator iter= executors.begin(); |
1392 | + thread_id= (*iter).first; |
1393 | + t= (*iter).second; |
1394 | + } |
1395 | + executors.erase(thread_id); |
1396 | + |
1397 | + t->queries->push(shutdown_command); |
1398 | + t->join(); |
1399 | + |
1400 | + delete t; |
1401 | + } |
1402 | +} |
1403 | + |
1404 | +static void init_plugin(percona_playback::PluginRegistry &r) |
1405 | +{ |
1406 | + r.add("thread-per-connection", |
1407 | + new ThreadPerConnectionDispatcher("thread-per-connection")); |
1408 | +} |
1409 | + |
1410 | +PERCONA_PLAYBACK_PLUGIN(init_plugin); |
1411 | |
1412 | === added directory 'percona_playback/thread_pool' |
1413 | === added file 'percona_playback/thread_pool/plugin.ini' |
1414 | --- percona_playback/thread_pool/plugin.ini 1970-01-01 00:00:00 +0000 |
1415 | +++ percona_playback/thread_pool/plugin.ini 2012-11-23 20:15:24 +0000 |
1416 | @@ -0,0 +1,6 @@ |
1417 | +[plugin] |
1418 | +title=Thread-pool dispatcher plugin |
1419 | +description=Creates threads pool to process connections to server |
1420 | +version=0.1 |
1421 | +static=yes |
1422 | +load_by_default=yes |
1423 | |
1424 | === added file 'percona_playback/thread_pool/thread_pool.cc' |
1425 | --- percona_playback/thread_pool/thread_pool.cc 1970-01-01 00:00:00 +0000 |
1426 | +++ percona_playback/thread_pool/thread_pool.cc 2012-11-23 20:15:24 +0000 |
1427 | @@ -0,0 +1,130 @@ |
1428 | +/* BEGIN LICENSE |
1429 | + * Copyright (C) 2011-2012 Percona Inc. |
1430 | + * This program is free software: you can redistribute it and/or modify it |
1431 | + * under the terms of the GNU General Public License version 2, as published |
1432 | + * by the Free Software Foundation. |
1433 | + * |
1434 | + * This program is distributed in the hope that it will be useful, but |
1435 | + * WITHOUT ANY WARRANTY; without even the implied warranties of |
1436 | + * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
1437 | + * PURPOSE. See the GNU General Public License for more details. |
1438 | + * |
1439 | + * You should have received a copy of the GNU General Public License along |
1440 | + * with this program. If not, see <http://www.gnu.org/licenses/>. |
1441 | + * END LICENSE */ |
1442 | + |
1443 | +#include "percona_playback/plugin.h" |
1444 | +#include "percona_playback/db_thread.h" |
1445 | +#include <percona_playback/gettext.h> |
1446 | +#include <boost/program_options.hpp> |
1447 | +#include <boost/shared_ptr.hpp> |
1448 | +#include <boost/crc.hpp> |
1449 | +#include <vector> |
1450 | + |
1451 | +#include <tbb/concurrent_hash_map.h> |
1452 | +#include <stdio.h> |
1453 | + |
1454 | +extern percona_playback::DBClientPlugin *g_dbclient_plugin; |
1455 | + |
1456 | +class ThreadPoolDispatcher : |
1457 | + public percona_playback::DispatcherPlugin |
1458 | +{ |
1459 | + typedef std::vector<boost::shared_ptr<DBThread> > Workers; |
1460 | + |
1461 | + unsigned threads_count; |
1462 | + boost:: |
1463 | + program_options:: |
1464 | + options_description options; |
1465 | + Workers workers; |
1466 | + |
1467 | +public: |
1468 | + ThreadPoolDispatcher(std::string _name) : |
1469 | + DispatcherPlugin(_name), |
1470 | + threads_count(0), |
1471 | + options("Threads-pool Options") {} |
1472 | + |
1473 | + virtual void dispatch(QueryEntryPtr query_entry); |
1474 | + virtual bool finish_and_wait(uint64_t) { return true; } |
1475 | + virtual void finish_all_and_wait(); |
1476 | + virtual void run(); |
1477 | + |
1478 | + boost::program_options::options_description* getProgramOptions(); |
1479 | + int processOptions(boost::program_options::variables_map &vm); |
1480 | + |
1481 | +}; |
1482 | + |
1483 | +void ThreadPoolDispatcher::run() |
1484 | +{ |
1485 | + for (unsigned i = 0; i < threads_count; ++i) |
1486 | + { |
1487 | + boost::shared_ptr<DBThread> db_thread(g_dbclient_plugin->create(i)); |
1488 | + workers.push_back(db_thread); |
1489 | + db_thread->start_thread(); |
1490 | + } |
1491 | +} |
1492 | + |
1493 | +void ThreadPoolDispatcher::dispatch(QueryEntryPtr query_entry) |
1494 | +{ |
1495 | + /* |
1496 | + Each worker has its own queue. For some types of input plugins |
1497 | + it is important to execute query entries with the same thread id |
1498 | + by the same worker. That is why we choose worker by simple hash from |
1499 | + thread id. |
1500 | + */ |
1501 | + uint64_t thread_id= query_entry->getThreadId(); |
1502 | + boost::crc_32_type crc; |
1503 | + crc.process_bytes(&thread_id, sizeof(thread_id)); |
1504 | + uint32_t worker_index = crc.checksum() % workers.size(); |
1505 | + workers[worker_index]->queries->push(query_entry); |
1506 | +} |
1507 | + |
1508 | +void ThreadPoolDispatcher::finish_all_and_wait() |
1509 | +{ |
1510 | + QueryEntryPtr shutdown_command(new FinishEntry(0)); |
1511 | + for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i) |
1512 | + (*i)->queries->push(shutdown_command); |
1513 | + for (Workers::iterator i = workers.begin(), end = workers.end(); i != end; ++i) |
1514 | + (*i)->join(); |
1515 | + workers.clear(); |
1516 | +} |
1517 | + |
1518 | + |
1519 | +boost::program_options::options_description* |
1520 | +ThreadPoolDispatcher::getProgramOptions() |
1521 | +{ |
1522 | + unsigned default_threads_count = |
1523 | + boost::thread::hardware_concurrency(); |
1524 | + options.add_options() |
1525 | + ("thread-pool-threads-count", |
1526 | + boost::program_options::value<unsigned>(&threads_count) |
1527 | + ->default_value(default_threads_count), |
1528 | + _("The number of threads in thread pool. If this options is omitted " |
1529 | + "the number of threads equals to hardware concurency.")) |
1530 | + ; |
1531 | + |
1532 | + return &options; |
1533 | + |
1534 | +} |
1535 | + |
1536 | +int |
1537 | +ThreadPoolDispatcher::processOptions(boost::program_options::variables_map &vm) |
1538 | +{ |
1539 | + if (!active && |
1540 | + !vm["thread-pool-threads-count"].defaulted()) |
1541 | + { |
1542 | + fprintf(stderr, _("thread-pool plugin is not selected, " |
1543 | + "you shouldn't use this plugin-related " |
1544 | + "command line options\n")); |
1545 | + return -1; |
1546 | + } |
1547 | + |
1548 | + return 0; |
1549 | +} |
1550 | + |
1551 | +static void init_plugin(percona_playback::PluginRegistry &r) |
1552 | +{ |
1553 | + r.add("thread-pool", |
1554 | + new ThreadPoolDispatcher("thread-pool")); |
1555 | +} |
1556 | + |
1557 | +PERCONA_PLAYBACK_PLUGIN(init_plugin); |