Merge lp:~oleksiyk/gearmand/mysql into lp:gearmand/1.0
- mysql
- Merge into 1.0
Proposed by
Oleksiy Krivoshey
Status: | Merged |
---|---|
Merged at revision: | 500 |
Proposed branch: | lp:~oleksiyk/gearmand/mysql |
Merge into: | lp:gearmand/1.0 |
Diff against target: |
725 lines (+661/-0) 8 files modified
configure.ac (+4/-0) libgearman-server/plugins.cc (+5/-0) libgearman-server/plugins/queue.h (+2/-0) libgearman-server/plugins/queue/include.am (+1/-0) libgearman-server/plugins/queue/mysql/include.am (+19/-0) libgearman-server/plugins/queue/mysql/queue.cc (+435/-0) libgearman-server/plugins/queue/mysql/queue.h (+48/-0) m4/ax_lib_mysql.m4 (+147/-0) |
To merge this branch: | bzr merge lp:~oleksiyk/gearmand/mysql |
Related bugs: | |
Related blueprints: |
Persistent queue module using libmysql
(Undefined)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Brian Aker | Pending | ||
Review via email: mp+92946@code.launchpad.net |
Commit message
Description of the change
Support for MySQL based persistent queue using libmysqlclient. Uses prepared statements.
Configuration:
$ gearmand -q MySQL --mysql-host <HOSTNAME> --mysql-db <DATABASE> --mysql-user <USER> --mysql-password <PASSWORD> --mysql-table <TABLE>
$ gearmand --help should show defaults for options above
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'configure.ac' |
2 | --- configure.ac 2012-01-27 06:44:03 +0000 |
3 | +++ configure.ac 2012-02-14 10:32:50 +0000 |
4 | @@ -110,6 +110,9 @@ |
5 | SOCKET_SEND_FLAGS |
6 | AX_HAVE_LIBHIREDIS |
7 | |
8 | +AX_LIB_MYSQL([5.0]) |
9 | +AM_CONDITIONAL(HAVE_MYSQL, test "x${HAVE_MYSQL}" = "x1") |
10 | + |
11 | # Checks for programs. |
12 | AC_PROG_AWK |
13 | AC_PROG_INSTALL |
14 | @@ -244,5 +247,6 @@ |
15 | echo " * Building with libmemcached $ac_enable_libmemcached" |
16 | echo " * Building with libpq $ac_cv_libpq" |
17 | echo " * Building with tokyocabinet $ac_enable_libtokyocabinet" |
18 | +echo " * Building with libmysql $found_mysql" |
19 | echo "" |
20 | echo "---" |
21 | |
22 | === modified file 'libgearman-server/plugins.cc' |
23 | --- libgearman-server/plugins.cc 2012-02-10 02:49:36 +0000 |
24 | +++ libgearman-server/plugins.cc 2012-02-14 10:32:50 +0000 |
25 | @@ -78,6 +78,11 @@ |
26 | queue::initialize_tokyocabinet(); |
27 | } |
28 | |
29 | + if (HAVE_MYSQL) |
30 | + { |
31 | + queue::initialize_mysql(); |
32 | + } |
33 | + |
34 | gearmand::queue::load_options(all); |
35 | } |
36 | |
37 | |
38 | === modified file 'libgearman-server/plugins/queue.h' |
39 | --- libgearman-server/plugins/queue.h 2011-11-29 05:21:54 +0000 |
40 | +++ libgearman-server/plugins/queue.h 2012-02-14 10:32:50 +0000 |
41 | @@ -48,3 +48,5 @@ |
42 | #include <libgearman-server/plugins/queue/tokyocabinet/queue.h> |
43 | |
44 | #include <libgearman-server/plugins/queue/redis/queue.h> |
45 | + |
46 | +#include <libgearman-server/plugins/queue/mysql/queue.h> |
47 | |
48 | === modified file 'libgearman-server/plugins/queue/include.am' |
49 | --- libgearman-server/plugins/queue/include.am 2011-11-29 05:21:54 +0000 |
50 | +++ libgearman-server/plugins/queue/include.am 2012-02-14 10:32:50 +0000 |
51 | @@ -18,3 +18,4 @@ |
52 | include libgearman-server/plugins/queue/redis/include.am |
53 | include libgearman-server/plugins/queue/sqlite/include.am |
54 | include libgearman-server/plugins/queue/tokyocabinet/include.am |
55 | +include libgearman-server/plugins/queue/mysql/include.am |
56 | |
57 | === added directory 'libgearman-server/plugins/queue/mysql' |
58 | === added file 'libgearman-server/plugins/queue/mysql/include.am' |
59 | --- libgearman-server/plugins/queue/mysql/include.am 1970-01-01 00:00:00 +0000 |
60 | +++ libgearman-server/plugins/queue/mysql/include.am 2012-02-14 10:32:50 +0000 |
61 | @@ -0,0 +1,19 @@ |
62 | +# Gearman |
63 | +# Copyright (C) 2011 Oleksiy Krivoshey |
64 | +# All rights reserved. |
65 | +# |
66 | +# Use and distribution licensed under the BSD license. See |
67 | +# the COPYING file in the parent directory for full text. |
68 | +# |
69 | +# All paths should be given relative to the root |
70 | +# |
71 | + |
72 | +noinst_HEADERS+= \ |
73 | + libgearman-server/plugins/queue/mysql/queue.h |
74 | + |
75 | +libgearman_server_libgearman_server_la_SOURCES+= \ |
76 | + libgearman-server/plugins/queue/mysql/queue.cc |
77 | + |
78 | +libgearman_server_libgearman_server_la_LIBADD+= $(MYSQL_LDFLAGS) |
79 | + |
80 | + |
81 | |
82 | === added file 'libgearman-server/plugins/queue/mysql/queue.cc' |
83 | --- libgearman-server/plugins/queue/mysql/queue.cc 1970-01-01 00:00:00 +0000 |
84 | +++ libgearman-server/plugins/queue/mysql/queue.cc 2012-02-14 10:32:50 +0000 |
85 | @@ -0,0 +1,435 @@ |
86 | +/* Gearman server and library |
87 | + * Copyright (C) 2011 Oleksiy Krivoshey |
88 | + * All rights reserved. |
89 | + * |
90 | + * Use and distribution licensed under the BSD license. See |
91 | + * the COPYING file in the parent directory for full text. |
92 | + */ |
93 | + |
94 | +#include <libgearman-server/common.h> |
95 | +#include <libgearman-server/byte.h> |
96 | + |
97 | +#include <libgearman-server/plugins/queue/mysql/queue.h> |
98 | +#include <libgearman-server/plugins/queue/base.h> |
99 | + |
100 | +#include <mysql/mysql.h> |
101 | +#include <mysql/errmsg.h> |
102 | + |
103 | +/** |
104 | + * Default values. |
105 | + */ |
106 | +#define GEARMAN_QUEUE_MYSQL_DEFAULT_TABLE "gearman_queue" |
107 | + |
108 | +namespace gearmand { |
109 | + namespace plugins { |
110 | + namespace queue { |
111 | + class MySQL; |
112 | + } |
113 | + } |
114 | +} |
115 | + |
116 | +static gearmand_error_t _initialize(gearman_server_st *server, gearmand::plugins::queue::MySQL *queue); |
117 | + |
118 | +namespace gearmand { |
119 | + namespace plugins { |
120 | + namespace queue { |
121 | + |
122 | + class MySQL : public gearmand::plugins::Queue { |
123 | + public: |
124 | + MySQL(); |
125 | + ~MySQL(); |
126 | + |
127 | + gearmand_error_t initialize(); |
128 | + gearmand_error_t prepareAddStatement(); |
129 | + gearmand_error_t prepareDoneStatement(); |
130 | + |
131 | + MYSQL *con; |
132 | + MYSQL_STMT *add_stmt; |
133 | + MYSQL_STMT *done_stmt; |
134 | + std::string mysql_host; |
135 | + std::string mysql_user; |
136 | + std::string mysql_password; |
137 | + std::string mysql_db; |
138 | + std::string mysql_table; |
139 | + private: |
140 | + }; |
141 | + |
142 | + MySQL::MySQL() : |
143 | + Queue("MySQL"), |
144 | + con(NULL), |
145 | + add_stmt(NULL) { |
146 | + command_line_options().add_options() |
147 | + ("mysql-host", boost::program_options::value(&mysql_host)->default_value("localhost"), "MySQL host.") |
148 | + ("mysql-user", boost::program_options::value(&mysql_user)->default_value(""), "MySQL user.") |
149 | + ("mysql-password", boost::program_options::value(&mysql_password)->default_value(""), "MySQL user password.") |
150 | + ("mysql-db", boost::program_options::value(&mysql_db)->default_value(""), "MySQL database.") |
151 | + ("mysql-table", boost::program_options::value(&mysql_table)->default_value(GEARMAN_QUEUE_MYSQL_DEFAULT_TABLE), "MySQL table name."); |
152 | + } |
153 | + |
154 | + MySQL::~MySQL() { |
155 | + if (add_stmt){ |
156 | + mysql_stmt_close(add_stmt); |
157 | + } |
158 | + if (con){ |
159 | + mysql_close(con); |
160 | + } |
161 | + } |
162 | + |
163 | + gearmand_error_t MySQL::initialize() { |
164 | + return _initialize(&Gearmand()->server, this); |
165 | + } |
166 | + |
167 | + gearmand_error_t MySQL::prepareAddStatement() { |
168 | + char query_buffer[1024]; |
169 | + |
170 | + if((this->add_stmt = mysql_stmt_init(this->con)) == NULL){ |
171 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con)); |
172 | + return GEARMAN_QUEUE_ERROR; |
173 | + } |
174 | + |
175 | + snprintf(query_buffer, sizeof(query_buffer), |
176 | + "INSERT INTO %s " |
177 | + "(unique_key, function_name, priority, data, when_to_run) " |
178 | + "VALUES(?, ?, ?, ?, ?)", this->mysql_table.c_str()); |
179 | + |
180 | + if (mysql_stmt_prepare(this->add_stmt, query_buffer, strlen(query_buffer))){ |
181 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con)); |
182 | + return GEARMAN_QUEUE_ERROR; |
183 | + } |
184 | + |
185 | + return GEARMAN_SUCCESS; |
186 | + } |
187 | + |
188 | + gearmand_error_t MySQL::prepareDoneStatement() { |
189 | + char query_buffer[1024]; |
190 | + |
191 | + if((this->done_stmt = mysql_stmt_init(this->con)) == NULL){ |
192 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con)); |
193 | + return GEARMAN_QUEUE_ERROR; |
194 | + } |
195 | + |
196 | + snprintf(query_buffer, sizeof(query_buffer), |
197 | + "DELETE FROM %s " |
198 | + "WHERE unique_key=? " |
199 | + "AND function_name=?", this->mysql_table.c_str()); |
200 | + |
201 | + if (mysql_stmt_prepare(this->done_stmt, query_buffer, strlen(query_buffer))){ |
202 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con)); |
203 | + return GEARMAN_QUEUE_ERROR; |
204 | + } |
205 | + |
206 | + return GEARMAN_SUCCESS; |
207 | + } |
208 | + |
209 | + void initialize_mysql() { |
210 | + static MySQL local_instance; |
211 | + } |
212 | + |
213 | + } // namespace queue |
214 | + } // namespace plugin |
215 | +} // namespace gearmand |
216 | + |
217 | +/* Queue callback functions. */ |
218 | +static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context, |
219 | + const char *unique, size_t unique_size, |
220 | + const char *function_name, |
221 | + size_t function_name_size, |
222 | + const void *data, size_t data_size, |
223 | + gearmand_job_priority_t priority, |
224 | + int64_t when); |
225 | + |
226 | +static gearmand_error_t _mysql_queue_flush(gearman_server_st *server, void *context); |
227 | + |
228 | +static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context, |
229 | + const char *unique, |
230 | + size_t unique_size, |
231 | + const char *function_name, |
232 | + size_t function_name_size); |
233 | + |
234 | +static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context, |
235 | + gearman_queue_add_fn *add_fn, |
236 | + void *add_context); |
237 | + |
238 | + |
239 | +gearmand_error_t _initialize(gearman_server_st *server, |
240 | + gearmand::plugins::queue::MySQL *queue) { |
241 | + |
242 | + MYSQL_RES * result; |
243 | + char query_buffer[1024]; |
244 | + my_bool my_true = true; |
245 | + |
246 | + gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"Initializing MySQL module"); |
247 | + |
248 | + gearman_server_set_queue(server, queue, _mysql_queue_add, _mysql_queue_flush, _mysql_queue_done, _mysql_queue_replay); |
249 | + |
250 | + queue->con = mysql_init(queue->con); |
251 | + |
252 | + mysql_options(queue->con, MYSQL_READ_DEFAULT_GROUP, "gearmand"); |
253 | + |
254 | + if (!mysql_real_connect(queue->con, |
255 | + queue->mysql_host.c_str(), |
256 | + queue->mysql_user.c_str(), |
257 | + queue->mysql_password.c_str(), |
258 | + queue->mysql_db.c_str(), 0, NULL, 0)) { |
259 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Failed to connect to database: %s", mysql_error(queue->con)); |
260 | + |
261 | + return GEARMAN_QUEUE_ERROR; |
262 | + } |
263 | + |
264 | + mysql_options(queue->con, MYSQL_OPT_RECONNECT, &my_true); |
265 | + |
266 | + if (!(result = mysql_list_tables(queue->con, queue->mysql_table.c_str()))){ |
267 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_list_tables failed: %s", mysql_error(queue->con)); |
268 | + return GEARMAN_QUEUE_ERROR; |
269 | + } |
270 | + |
271 | + if (mysql_num_rows(result) == 0){ |
272 | + snprintf(query_buffer, sizeof(query_buffer), |
273 | + "CREATE TABLE %s" |
274 | + "(" |
275 | + "unique_key VARCHAR(%d)," |
276 | + "function_name VARCHAR(255)," |
277 | + "priority INT," |
278 | + "data LONGBLOB," |
279 | + "when_to_run INT," |
280 | + "unique key (unique_key, function_name)" |
281 | + ")", |
282 | + queue->mysql_table.c_str(), GEARMAN_UNIQUE_SIZE); |
283 | + |
284 | + gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL module: creating table %s", queue->mysql_table.c_str()); |
285 | + |
286 | + if(mysql_real_query(queue->con, query_buffer, strlen(query_buffer))){ |
287 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL module: create table failed: %s", mysql_error(queue->con)); |
288 | + return GEARMAN_QUEUE_ERROR; |
289 | + } |
290 | + } |
291 | + |
292 | + mysql_free_result(result); |
293 | + |
294 | + if(queue->prepareAddStatement() == GEARMAN_QUEUE_ERROR) |
295 | + return GEARMAN_QUEUE_ERROR; |
296 | + |
297 | + if(queue->prepareDoneStatement() == GEARMAN_QUEUE_ERROR) |
298 | + return GEARMAN_QUEUE_ERROR; |
299 | + |
300 | + return GEARMAN_SUCCESS; |
301 | +} |
302 | + |
303 | +/* |
304 | + * Static definitions |
305 | + */ |
306 | + |
307 | + |
308 | +static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context, |
309 | + const char *unique, size_t unique_size, |
310 | + const char *function_name, |
311 | + size_t function_name_size, |
312 | + const void *data, size_t data_size, |
313 | + gearmand_job_priority_t priority, |
314 | + int64_t when) { |
315 | + |
316 | + MYSQL_BIND bind[5]; |
317 | + |
318 | + (void) server; |
319 | + |
320 | + gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context; |
321 | + |
322 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue add: %.*s %.*s", (uint32_t) unique_size, (char *) unique, |
323 | + (uint32_t) function_name_size, (char *) function_name); |
324 | + |
325 | + bind[0].buffer_type= MYSQL_TYPE_STRING; |
326 | + bind[0].buffer= (char *)unique; |
327 | + bind[0].buffer_length= unique_size; |
328 | + bind[0].is_null= 0; |
329 | + bind[0].length= (unsigned long*)&unique_size; |
330 | + |
331 | + bind[1].buffer_type= MYSQL_TYPE_STRING; |
332 | + bind[1].buffer= (char *)function_name; |
333 | + bind[1].buffer_length= function_name_size; |
334 | + bind[1].is_null= 0; |
335 | + bind[1].length= (unsigned long*)&function_name_size; |
336 | + |
337 | + bind[2].buffer_type= MYSQL_TYPE_LONG; |
338 | + bind[2].buffer= (char *)&priority; |
339 | + bind[2].is_null= 0; |
340 | + bind[2].length= 0; |
341 | + |
342 | + bind[3].buffer_type= MYSQL_TYPE_LONG_BLOB; |
343 | + bind[3].buffer= (char *)data; |
344 | + bind[3].buffer_length= data_size; |
345 | + bind[3].is_null= 0; |
346 | + bind[3].length= (unsigned long*)&data_size; |
347 | + |
348 | + bind[4].buffer_type= MYSQL_TYPE_LONG; |
349 | + bind[4].buffer= (char *)&when; |
350 | + bind[4].is_null= 0; |
351 | + bind[4].length= 0; |
352 | + |
353 | + while(1){ |
354 | + if (mysql_stmt_bind_param(queue->add_stmt, bind)){ |
355 | + if ( mysql_stmt_errno(queue->add_stmt) == CR_NO_PREPARE_STMT ){ |
356 | + if(queue->prepareAddStatement() == GEARMAN_QUEUE_ERROR){ |
357 | + return GEARMAN_QUEUE_ERROR; |
358 | + } |
359 | + continue; |
360 | + } else { |
361 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con)); |
362 | + return GEARMAN_QUEUE_ERROR; |
363 | + } |
364 | + } |
365 | + |
366 | + if (mysql_stmt_execute(queue->add_stmt)){ |
367 | + if ( mysql_stmt_errno(queue->add_stmt) == CR_SERVER_LOST ){ |
368 | + mysql_stmt_close(queue->add_stmt); |
369 | + if(queue->prepareAddStatement() != GEARMAN_QUEUE_ERROR){ |
370 | + continue; |
371 | + } |
372 | + } |
373 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con)); |
374 | + return GEARMAN_QUEUE_ERROR; |
375 | + } |
376 | + |
377 | + break; |
378 | + } |
379 | + |
380 | + return GEARMAN_SUCCESS; |
381 | +} |
382 | + |
383 | +static gearmand_error_t _mysql_queue_flush(gearman_server_st *server, |
384 | + void *context __attribute__((unused))) { |
385 | + |
386 | + (void) server; |
387 | + (void) context; |
388 | + |
389 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue flush"); |
390 | + |
391 | + return GEARMAN_SUCCESS; |
392 | +} |
393 | + |
394 | +static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context, |
395 | + const char *unique, |
396 | + size_t unique_size, |
397 | + const char *function_name, |
398 | + size_t function_name_size) { |
399 | + |
400 | + MYSQL_BIND bind[2]; |
401 | + |
402 | + (void) server; |
403 | + |
404 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue done: %.*s %.*s", (uint32_t) unique_size, (char *) unique, |
405 | + (uint32_t) function_name_size, (char *) function_name); |
406 | + |
407 | + gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context; |
408 | + |
409 | + bind[0].buffer_type= MYSQL_TYPE_STRING; |
410 | + bind[0].buffer= (char *)unique; |
411 | + bind[0].buffer_length= unique_size; |
412 | + bind[0].is_null= 0; |
413 | + bind[0].length= (unsigned long*)&unique_size; |
414 | + |
415 | + bind[1].buffer_type= MYSQL_TYPE_STRING; |
416 | + bind[1].buffer= (char *)function_name; |
417 | + bind[1].buffer_length= function_name_size; |
418 | + bind[1].is_null= 0; |
419 | + bind[1].length= (unsigned long*)&function_name_size; |
420 | + |
421 | + while(1){ |
422 | + if (mysql_stmt_bind_param(queue->done_stmt, bind)){ |
423 | + if ( mysql_stmt_errno(queue->done_stmt) == CR_NO_PREPARE_STMT ){ |
424 | + if(queue->prepareDoneStatement() == GEARMAN_QUEUE_ERROR){ |
425 | + return GEARMAN_QUEUE_ERROR; |
426 | + } |
427 | + continue; |
428 | + } else { |
429 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con)); |
430 | + return GEARMAN_QUEUE_ERROR; |
431 | + } |
432 | + } |
433 | + |
434 | + if (mysql_stmt_execute(queue->done_stmt)){ |
435 | + if ( mysql_stmt_errno(queue->done_stmt) == CR_SERVER_LOST ){ |
436 | + mysql_stmt_close(queue->done_stmt); |
437 | + if(queue->prepareDoneStatement() != GEARMAN_QUEUE_ERROR){ |
438 | + continue; |
439 | + } |
440 | + } |
441 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con)); |
442 | + return GEARMAN_QUEUE_ERROR; |
443 | + } |
444 | + |
445 | + break; |
446 | + } |
447 | + |
448 | + return GEARMAN_SUCCESS; |
449 | +} |
450 | + |
451 | +static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context, |
452 | + gearman_queue_add_fn *add_fn, |
453 | + void *add_context) { |
454 | + |
455 | + MYSQL_RES * result; |
456 | + MYSQL_ROW row; |
457 | + char query_buffer[1024]; |
458 | + |
459 | + (void) server; |
460 | + |
461 | + gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue replay"); |
462 | + |
463 | + gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context; |
464 | + |
465 | + snprintf(query_buffer, sizeof(query_buffer), |
466 | + "SELECT unique_key, function_name, data, priority, when_to_run FROM %s", |
467 | + queue->mysql_table.c_str()); |
468 | + |
469 | + if(mysql_real_query(queue->con, query_buffer, strlen(query_buffer))){ |
470 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_real_query failed: %s", mysql_error(queue->con)); |
471 | + return GEARMAN_QUEUE_ERROR; |
472 | + } |
473 | + |
474 | + if(!(result = mysql_store_result(queue->con))){ |
475 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_store_result failed: %s", mysql_error(queue->con)); |
476 | + return GEARMAN_QUEUE_ERROR; |
477 | + } |
478 | + |
479 | + if(mysql_num_fields(result) < 5){ |
480 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL queue: insufficient row fields in queue table"); |
481 | + return GEARMAN_QUEUE_ERROR; |
482 | + } |
483 | + |
484 | + gearmand_error_t ret = GEARMAN_SUCCESS; |
485 | + |
486 | + while ((row = mysql_fetch_row(result))) { |
487 | + unsigned long *lengths; |
488 | + gearmand_job_priority_t priority = (gearmand_job_priority_t)0; |
489 | + int when = 0; |
490 | + |
491 | + lengths = mysql_fetch_lengths(result); |
492 | + |
493 | + /* need to make a copy here ... gearman_server_job_free will free it later */ |
494 | + size_t data_size= lengths[2]; |
495 | + char * data= (char *)malloc(data_size); |
496 | + if (data == NULL){ |
497 | + gearmand_perror("malloc failed"); |
498 | + return GEARMAN_MEMORY_ALLOCATION_FAILURE; |
499 | + } |
500 | + memcpy(data, row[2], data_size); |
501 | + |
502 | + if(lengths[3]) priority = (gearmand_job_priority_t) atoi(row[3]); |
503 | + if(lengths[4]) when = atoi(row[4]); |
504 | + |
505 | + ret = (*add_fn)(server, add_context, |
506 | + row[0], (size_t) lengths[0], |
507 | + row[1], (size_t) lengths[1], |
508 | + data, data_size, |
509 | + priority, |
510 | + when); |
511 | + |
512 | + if (ret != GEARMAN_SUCCESS) { |
513 | + break; |
514 | + } |
515 | + } |
516 | + |
517 | + mysql_free_result(result); |
518 | + |
519 | + return ret; |
520 | +} |
521 | |
522 | === added file 'libgearman-server/plugins/queue/mysql/queue.h' |
523 | --- libgearman-server/plugins/queue/mysql/queue.h 1970-01-01 00:00:00 +0000 |
524 | +++ libgearman-server/plugins/queue/mysql/queue.h 2012-02-14 10:32:50 +0000 |
525 | @@ -0,0 +1,48 @@ |
526 | +/* |
527 | + * Gearmand client and server library. |
528 | + * |
529 | + * Copyright (C) 2011 Data Differential, http://datadifferential.com/ |
530 | + * All rights reserved. |
531 | + * |
532 | + * Redistribution and use in source and binary forms, with or without |
533 | + * modification, are permitted provided that the following conditions are |
534 | + * met: |
535 | + * |
536 | + * * Redistributions of source code must retain the above copyright |
537 | + * notice, this list of conditions and the following disclaimer. |
538 | + * |
539 | + * * Redistributions in binary form must reproduce the above |
540 | + * copyright notice, this list of conditions and the following disclaimer |
541 | + * in the documentation and/or other materials provided with the |
542 | + * distribution. |
543 | + * |
544 | + * * The names of its contributors may not be used to endorse or |
545 | + * promote products derived from this software without specific prior |
546 | + * written permission. |
547 | + * |
548 | + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
549 | + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
550 | + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
551 | + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
552 | + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
553 | + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
554 | + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
555 | + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
556 | + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
557 | + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
558 | + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
559 | + * |
560 | + */ |
561 | + |
562 | +#pragma once |
563 | + |
564 | + |
565 | +namespace gearmand { |
566 | +namespace plugins { |
567 | +namespace queue { |
568 | + |
569 | +void initialize_mysql(); |
570 | + |
571 | +} // namespace queue |
572 | +} // namespace plugin |
573 | +} // namespace gearmand |
574 | |
575 | === added file 'm4/ax_lib_mysql.m4' |
576 | --- m4/ax_lib_mysql.m4 1970-01-01 00:00:00 +0000 |
577 | +++ m4/ax_lib_mysql.m4 2012-02-14 10:32:50 +0000 |
578 | @@ -0,0 +1,147 @@ |
579 | +# =========================================================================== |
580 | +# http://www.gnu.org/software/autoconf-archive/ax_lib_mysql.html |
581 | +# =========================================================================== |
582 | +# |
583 | +# SYNOPSIS |
584 | +# |
585 | +# AX_LIB_MYSQL([MINIMUM-VERSION]) |
586 | +# |
587 | +# DESCRIPTION |
588 | +# |
589 | +# This macro provides tests of availability of MySQL client library of |
590 | +# particular version or newer. |
591 | +# |
592 | +# AX_LIB_MYSQL macro takes only one argument which is optional. If there |
593 | +# is no required version passed, then macro does not run version test. |
594 | +# |
595 | +# The --with-mysql option takes one of three possible values: |
596 | +# |
597 | +# no - do not check for MySQL client library |
598 | +# |
599 | +# yes - do check for MySQL library in standard locations (mysql_config |
600 | +# should be in the PATH) |
601 | +# |
602 | +# path - complete path to mysql_config utility, use this option if |
603 | +# mysql_config can't be found in the PATH |
604 | +# |
605 | +# This macro calls: |
606 | +# |
607 | +# AC_SUBST(MYSQL_CFLAGS) |
608 | +# AC_SUBST(MYSQL_LDFLAGS) |
609 | +# AC_SUBST(MYSQL_VERSION) |
610 | +# |
611 | +# And sets: |
612 | +# |
613 | +# HAVE_MYSQL |
614 | +# |
615 | +# LICENSE |
616 | +# |
617 | +# Copyright (c) 2008 Mateusz Loskot <mateusz@loskot.net> |
618 | +# |
619 | +# Copying and distribution of this file, with or without modification, are |
620 | +# permitted in any medium without royalty provided the copyright notice |
621 | +# and this notice are preserved. This file is offered as-is, without any |
622 | +# warranty. |
623 | + |
624 | +#serial 12 |
625 | + |
626 | +AC_DEFUN([AX_LIB_MYSQL], |
627 | +[ |
628 | + AC_ARG_WITH([mysql], |
629 | + AS_HELP_STRING([--with-mysql=@<:@ARG@:>@], |
630 | + [use MySQL client library @<:@default=yes@:>@, optionally specify path to mysql_config] |
631 | + ), |
632 | + [ |
633 | + if test "$withval" = "no"; then |
634 | + want_mysql="no" |
635 | + elif test "$withval" = "yes"; then |
636 | + want_mysql="yes" |
637 | + else |
638 | + want_mysql="yes" |
639 | + MYSQL_CONFIG="$withval" |
640 | + fi |
641 | + ], |
642 | + [want_mysql="yes"] |
643 | + ) |
644 | + AC_ARG_VAR([MYSQL_CONFIG], [Full path to mysql_config program]) |
645 | + |
646 | + MYSQL_CFLAGS="" |
647 | + MYSQL_LDFLAGS="" |
648 | + MYSQL_VERSION="" |
649 | + |
650 | + dnl |
651 | + dnl Check MySQL libraries |
652 | + dnl |
653 | + |
654 | + if test "$want_mysql" = "yes"; then |
655 | + |
656 | + if test -z "$MYSQL_CONFIG" ; then |
657 | + AC_PATH_PROGS([MYSQL_CONFIG], [mysql_config mysql_config5], [no]) |
658 | + fi |
659 | + |
660 | + if test "$MYSQL_CONFIG" != "no"; then |
661 | + MYSQL_CFLAGS="`$MYSQL_CONFIG --cflags`" |
662 | + MYSQL_LDFLAGS="`$MYSQL_CONFIG --libs`" |
663 | + |
664 | + MYSQL_VERSION=`$MYSQL_CONFIG --version` |
665 | + |
666 | + found_mysql="yes" |
667 | + else |
668 | + found_mysql="no" |
669 | + fi |
670 | + fi |
671 | + |
672 | + dnl |
673 | + dnl Check if required version of MySQL is available |
674 | + dnl |
675 | + |
676 | + |
677 | + mysql_version_req=ifelse([$1], [], [], [$1]) |
678 | + |
679 | + if test "$found_mysql" = "yes" -a -n "$mysql_version_req"; then |
680 | + |
681 | + AC_MSG_CHECKING([if MySQL version is >= $mysql_version_req]) |
682 | + |
683 | + dnl Decompose required version string of MySQL |
684 | + dnl and calculate its number representation |
685 | + mysql_version_req_major=`expr $mysql_version_req : '\([[0-9]]*\)'` |
686 | + mysql_version_req_minor=`expr $mysql_version_req : '[[0-9]]*\.\([[0-9]]*\)'` |
687 | + mysql_version_req_micro=`expr $mysql_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'` |
688 | + if test "x$mysql_version_req_micro" = "x"; then |
689 | + mysql_version_req_micro="0" |
690 | + fi |
691 | + |
692 | + mysql_version_req_number=`expr $mysql_version_req_major \* 1000000 \ |
693 | + \+ $mysql_version_req_minor \* 1000 \ |
694 | + \+ $mysql_version_req_micro` |
695 | + |
696 | + dnl Decompose version string of installed MySQL |
697 | + dnl and calculate its number representation |
698 | + mysql_version_major=`expr $MYSQL_VERSION : '\([[0-9]]*\)'` |
699 | + mysql_version_minor=`expr $MYSQL_VERSION : '[[0-9]]*\.\([[0-9]]*\)'` |
700 | + mysql_version_micro=`expr $MYSQL_VERSION : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'` |
701 | + if test "x$mysql_version_micro" = "x"; then |
702 | + mysql_version_micro="0" |
703 | + fi |
704 | + |
705 | + mysql_version_number=`expr $mysql_version_major \* 1000000 \ |
706 | + \+ $mysql_version_minor \* 1000 \ |
707 | + \+ $mysql_version_micro` |
708 | + |
709 | + mysql_version_check=`expr $mysql_version_number \>\= $mysql_version_req_number` |
710 | + if test "$mysql_version_check" = "1"; then |
711 | + AC_MSG_RESULT([yes]) |
712 | + else |
713 | + AC_MSG_RESULT([no]) |
714 | + fi |
715 | + fi |
716 | + |
717 | + if test "$found_mysql" = "yes" ; then |
718 | + AC_DEFINE([HAVE_MYSQL], [1], |
719 | + [Define to 1 if MySQL libraries are available]) |
720 | + fi |
721 | + |
722 | + AC_SUBST([MYSQL_VERSION]) |
723 | + AC_SUBST([MYSQL_CFLAGS]) |
724 | + AC_SUBST([MYSQL_LDFLAGS]) |
725 | +]) |