Merge lp:~oleksiyk/gearmand/mysql into lp:gearmand/1.0
- mysql
- Merge into 1.0
Proposed by
Oleksiy Krivoshey
Status: | Merged |
---|---|
Merged at revision: | 500 |
Proposed branch: | lp:~oleksiyk/gearmand/mysql |
Merge into: | lp:gearmand/1.0 |
Diff against target: |
725 lines (+661/-0) 8 files modified
configure.ac (+4/-0) libgearman-server/plugins.cc (+5/-0) libgearman-server/plugins/queue.h (+2/-0) libgearman-server/plugins/queue/include.am (+1/-0) libgearman-server/plugins/queue/mysql/include.am (+19/-0) libgearman-server/plugins/queue/mysql/queue.cc (+435/-0) libgearman-server/plugins/queue/mysql/queue.h (+48/-0) m4/ax_lib_mysql.m4 (+147/-0) |
To merge this branch: | bzr merge lp:~oleksiyk/gearmand/mysql |
Related bugs: | |
Related blueprints: |
Persistent queue module using libmysql
(Undefined)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Brian Aker | Pending | ||
Review via email: mp+92946@code.launchpad.net |
Commit message
Description of the change
Support for MySQL based persistent queue using libmysqlclient. Uses prepared statements.
Configuration:
$ gearmand -q MySQL --mysql-host <HOSTNAME> --mysql-db <DATABASE> --mysql-user <USER> --mysql-password <PASSWORD> --mysql-table <TABLE>
$ gearmand --help should show defaults for options above
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'configure.ac' | |||
2 | --- configure.ac 2012-01-27 06:44:03 +0000 | |||
3 | +++ configure.ac 2012-02-14 10:32:50 +0000 | |||
4 | @@ -110,6 +110,9 @@ | |||
5 | 110 | SOCKET_SEND_FLAGS | 110 | SOCKET_SEND_FLAGS |
6 | 111 | AX_HAVE_LIBHIREDIS | 111 | AX_HAVE_LIBHIREDIS |
7 | 112 | 112 | ||
8 | 113 | AX_LIB_MYSQL([5.0]) | ||
9 | 114 | AM_CONDITIONAL(HAVE_MYSQL, test "x${HAVE_MYSQL}" = "x1") | ||
10 | 115 | |||
11 | 113 | # Checks for programs. | 116 | # Checks for programs. |
12 | 114 | AC_PROG_AWK | 117 | AC_PROG_AWK |
13 | 115 | AC_PROG_INSTALL | 118 | AC_PROG_INSTALL |
14 | @@ -244,5 +247,6 @@ | |||
15 | 244 | echo " * Building with libmemcached $ac_enable_libmemcached" | 247 | echo " * Building with libmemcached $ac_enable_libmemcached" |
16 | 245 | echo " * Building with libpq $ac_cv_libpq" | 248 | echo " * Building with libpq $ac_cv_libpq" |
17 | 246 | echo " * Building with tokyocabinet $ac_enable_libtokyocabinet" | 249 | echo " * Building with tokyocabinet $ac_enable_libtokyocabinet" |
18 | 250 | echo " * Building with libmysql $found_mysql" | ||
19 | 247 | echo "" | 251 | echo "" |
20 | 248 | echo "---" | 252 | echo "---" |
21 | 249 | 253 | ||
22 | === modified file 'libgearman-server/plugins.cc' | |||
23 | --- libgearman-server/plugins.cc 2012-02-10 02:49:36 +0000 | |||
24 | +++ libgearman-server/plugins.cc 2012-02-14 10:32:50 +0000 | |||
25 | @@ -78,6 +78,11 @@ | |||
26 | 78 | queue::initialize_tokyocabinet(); | 78 | queue::initialize_tokyocabinet(); |
27 | 79 | } | 79 | } |
28 | 80 | 80 | ||
29 | 81 | if (HAVE_MYSQL) | ||
30 | 82 | { | ||
31 | 83 | queue::initialize_mysql(); | ||
32 | 84 | } | ||
33 | 85 | |||
34 | 81 | gearmand::queue::load_options(all); | 86 | gearmand::queue::load_options(all); |
35 | 82 | } | 87 | } |
36 | 83 | 88 | ||
37 | 84 | 89 | ||
38 | === modified file 'libgearman-server/plugins/queue.h' | |||
39 | --- libgearman-server/plugins/queue.h 2011-11-29 05:21:54 +0000 | |||
40 | +++ libgearman-server/plugins/queue.h 2012-02-14 10:32:50 +0000 | |||
41 | @@ -48,3 +48,5 @@ | |||
42 | 48 | #include <libgearman-server/plugins/queue/tokyocabinet/queue.h> | 48 | #include <libgearman-server/plugins/queue/tokyocabinet/queue.h> |
43 | 49 | 49 | ||
44 | 50 | #include <libgearman-server/plugins/queue/redis/queue.h> | 50 | #include <libgearman-server/plugins/queue/redis/queue.h> |
45 | 51 | |||
46 | 52 | #include <libgearman-server/plugins/queue/mysql/queue.h> | ||
47 | 51 | 53 | ||
48 | === modified file 'libgearman-server/plugins/queue/include.am' | |||
49 | --- libgearman-server/plugins/queue/include.am 2011-11-29 05:21:54 +0000 | |||
50 | +++ libgearman-server/plugins/queue/include.am 2012-02-14 10:32:50 +0000 | |||
51 | @@ -18,3 +18,4 @@ | |||
52 | 18 | include libgearman-server/plugins/queue/redis/include.am | 18 | include libgearman-server/plugins/queue/redis/include.am |
53 | 19 | include libgearman-server/plugins/queue/sqlite/include.am | 19 | include libgearman-server/plugins/queue/sqlite/include.am |
54 | 20 | include libgearman-server/plugins/queue/tokyocabinet/include.am | 20 | include libgearman-server/plugins/queue/tokyocabinet/include.am |
55 | 21 | include libgearman-server/plugins/queue/mysql/include.am | ||
56 | 21 | 22 | ||
57 | === added directory 'libgearman-server/plugins/queue/mysql' | |||
58 | === added file 'libgearman-server/plugins/queue/mysql/include.am' | |||
59 | --- libgearman-server/plugins/queue/mysql/include.am 1970-01-01 00:00:00 +0000 | |||
60 | +++ libgearman-server/plugins/queue/mysql/include.am 2012-02-14 10:32:50 +0000 | |||
61 | @@ -0,0 +1,19 @@ | |||
62 | 1 | # Gearman | ||
63 | 2 | # Copyright (C) 2011 Oleksiy Krivoshey | ||
64 | 3 | # All rights reserved. | ||
65 | 4 | # | ||
66 | 5 | # Use and distribution licensed under the BSD license. See | ||
67 | 6 | # the COPYING file in the parent directory for full text. | ||
68 | 7 | # | ||
69 | 8 | # All paths should be given relative to the root | ||
70 | 9 | # | ||
71 | 10 | |||
72 | 11 | noinst_HEADERS+= \ | ||
73 | 12 | libgearman-server/plugins/queue/mysql/queue.h | ||
74 | 13 | |||
75 | 14 | libgearman_server_libgearman_server_la_SOURCES+= \ | ||
76 | 15 | libgearman-server/plugins/queue/mysql/queue.cc | ||
77 | 16 | |||
78 | 17 | libgearman_server_libgearman_server_la_LIBADD+= $(MYSQL_LDFLAGS) | ||
79 | 18 | |||
80 | 19 | |||
81 | 0 | 20 | ||
82 | === added file 'libgearman-server/plugins/queue/mysql/queue.cc' | |||
83 | --- libgearman-server/plugins/queue/mysql/queue.cc 1970-01-01 00:00:00 +0000 | |||
84 | +++ libgearman-server/plugins/queue/mysql/queue.cc 2012-02-14 10:32:50 +0000 | |||
85 | @@ -0,0 +1,435 @@ | |||
86 | 1 | /* Gearman server and library | ||
87 | 2 | * Copyright (C) 2011 Oleksiy Krivoshey | ||
88 | 3 | * All rights reserved. | ||
89 | 4 | * | ||
90 | 5 | * Use and distribution licensed under the BSD license. See | ||
91 | 6 | * the COPYING file in the parent directory for full text. | ||
92 | 7 | */ | ||
93 | 8 | |||
94 | 9 | #include <libgearman-server/common.h> | ||
95 | 10 | #include <libgearman-server/byte.h> | ||
96 | 11 | |||
97 | 12 | #include <libgearman-server/plugins/queue/mysql/queue.h> | ||
98 | 13 | #include <libgearman-server/plugins/queue/base.h> | ||
99 | 14 | |||
100 | 15 | #include <mysql/mysql.h> | ||
101 | 16 | #include <mysql/errmsg.h> | ||
102 | 17 | |||
103 | 18 | /** | ||
104 | 19 | * Default values. | ||
105 | 20 | */ | ||
106 | 21 | #define GEARMAN_QUEUE_MYSQL_DEFAULT_TABLE "gearman_queue" | ||
107 | 22 | |||
108 | 23 | namespace gearmand { | ||
109 | 24 | namespace plugins { | ||
110 | 25 | namespace queue { | ||
111 | 26 | class MySQL; | ||
112 | 27 | } | ||
113 | 28 | } | ||
114 | 29 | } | ||
115 | 30 | |||
116 | 31 | static gearmand_error_t _initialize(gearman_server_st *server, gearmand::plugins::queue::MySQL *queue); | ||
117 | 32 | |||
118 | 33 | namespace gearmand { | ||
119 | 34 | namespace plugins { | ||
120 | 35 | namespace queue { | ||
121 | 36 | |||
122 | 37 | class MySQL : public gearmand::plugins::Queue { | ||
123 | 38 | public: | ||
124 | 39 | MySQL(); | ||
125 | 40 | ~MySQL(); | ||
126 | 41 | |||
127 | 42 | gearmand_error_t initialize(); | ||
128 | 43 | gearmand_error_t prepareAddStatement(); | ||
129 | 44 | gearmand_error_t prepareDoneStatement(); | ||
130 | 45 | |||
131 | 46 | MYSQL *con; | ||
132 | 47 | MYSQL_STMT *add_stmt; | ||
133 | 48 | MYSQL_STMT *done_stmt; | ||
134 | 49 | std::string mysql_host; | ||
135 | 50 | std::string mysql_user; | ||
136 | 51 | std::string mysql_password; | ||
137 | 52 | std::string mysql_db; | ||
138 | 53 | std::string mysql_table; | ||
139 | 54 | private: | ||
140 | 55 | }; | ||
141 | 56 | |||
142 | 57 | MySQL::MySQL() : | ||
143 | 58 | Queue("MySQL"), | ||
144 | 59 | con(NULL), | ||
145 | 60 | add_stmt(NULL) { | ||
146 | 61 | command_line_options().add_options() | ||
147 | 62 | ("mysql-host", boost::program_options::value(&mysql_host)->default_value("localhost"), "MySQL host.") | ||
148 | 63 | ("mysql-user", boost::program_options::value(&mysql_user)->default_value(""), "MySQL user.") | ||
149 | 64 | ("mysql-password", boost::program_options::value(&mysql_password)->default_value(""), "MySQL user password.") | ||
150 | 65 | ("mysql-db", boost::program_options::value(&mysql_db)->default_value(""), "MySQL database.") | ||
151 | 66 | ("mysql-table", boost::program_options::value(&mysql_table)->default_value(GEARMAN_QUEUE_MYSQL_DEFAULT_TABLE), "MySQL table name."); | ||
152 | 67 | } | ||
153 | 68 | |||
154 | 69 | MySQL::~MySQL() { | ||
155 | 70 | if (add_stmt){ | ||
156 | 71 | mysql_stmt_close(add_stmt); | ||
157 | 72 | } | ||
158 | 73 | if (con){ | ||
159 | 74 | mysql_close(con); | ||
160 | 75 | } | ||
161 | 76 | } | ||
162 | 77 | |||
163 | 78 | gearmand_error_t MySQL::initialize() { | ||
164 | 79 | return _initialize(&Gearmand()->server, this); | ||
165 | 80 | } | ||
166 | 81 | |||
167 | 82 | gearmand_error_t MySQL::prepareAddStatement() { | ||
168 | 83 | char query_buffer[1024]; | ||
169 | 84 | |||
170 | 85 | if((this->add_stmt = mysql_stmt_init(this->con)) == NULL){ | ||
171 | 86 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con)); | ||
172 | 87 | return GEARMAN_QUEUE_ERROR; | ||
173 | 88 | } | ||
174 | 89 | |||
175 | 90 | snprintf(query_buffer, sizeof(query_buffer), | ||
176 | 91 | "INSERT INTO %s " | ||
177 | 92 | "(unique_key, function_name, priority, data, when_to_run) " | ||
178 | 93 | "VALUES(?, ?, ?, ?, ?)", this->mysql_table.c_str()); | ||
179 | 94 | |||
180 | 95 | if (mysql_stmt_prepare(this->add_stmt, query_buffer, strlen(query_buffer))){ | ||
181 | 96 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con)); | ||
182 | 97 | return GEARMAN_QUEUE_ERROR; | ||
183 | 98 | } | ||
184 | 99 | |||
185 | 100 | return GEARMAN_SUCCESS; | ||
186 | 101 | } | ||
187 | 102 | |||
188 | 103 | gearmand_error_t MySQL::prepareDoneStatement() { | ||
189 | 104 | char query_buffer[1024]; | ||
190 | 105 | |||
191 | 106 | if((this->done_stmt = mysql_stmt_init(this->con)) == NULL){ | ||
192 | 107 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con)); | ||
193 | 108 | return GEARMAN_QUEUE_ERROR; | ||
194 | 109 | } | ||
195 | 110 | |||
196 | 111 | snprintf(query_buffer, sizeof(query_buffer), | ||
197 | 112 | "DELETE FROM %s " | ||
198 | 113 | "WHERE unique_key=? " | ||
199 | 114 | "AND function_name=?", this->mysql_table.c_str()); | ||
200 | 115 | |||
201 | 116 | if (mysql_stmt_prepare(this->done_stmt, query_buffer, strlen(query_buffer))){ | ||
202 | 117 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con)); | ||
203 | 118 | return GEARMAN_QUEUE_ERROR; | ||
204 | 119 | } | ||
205 | 120 | |||
206 | 121 | return GEARMAN_SUCCESS; | ||
207 | 122 | } | ||
208 | 123 | |||
209 | 124 | void initialize_mysql() { | ||
210 | 125 | static MySQL local_instance; | ||
211 | 126 | } | ||
212 | 127 | |||
213 | 128 | } // namespace queue | ||
214 | 129 | } // namespace plugin | ||
215 | 130 | } // namespace gearmand | ||
216 | 131 | |||
217 | 132 | /* Queue callback functions. */ | ||
218 | 133 | static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context, | ||
219 | 134 | const char *unique, size_t unique_size, | ||
220 | 135 | const char *function_name, | ||
221 | 136 | size_t function_name_size, | ||
222 | 137 | const void *data, size_t data_size, | ||
223 | 138 | gearmand_job_priority_t priority, | ||
224 | 139 | int64_t when); | ||
225 | 140 | |||
226 | 141 | static gearmand_error_t _mysql_queue_flush(gearman_server_st *server, void *context); | ||
227 | 142 | |||
228 | 143 | static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context, | ||
229 | 144 | const char *unique, | ||
230 | 145 | size_t unique_size, | ||
231 | 146 | const char *function_name, | ||
232 | 147 | size_t function_name_size); | ||
233 | 148 | |||
234 | 149 | static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context, | ||
235 | 150 | gearman_queue_add_fn *add_fn, | ||
236 | 151 | void *add_context); | ||
237 | 152 | |||
238 | 153 | |||
239 | 154 | gearmand_error_t _initialize(gearman_server_st *server, | ||
240 | 155 | gearmand::plugins::queue::MySQL *queue) { | ||
241 | 156 | |||
242 | 157 | MYSQL_RES * result; | ||
243 | 158 | char query_buffer[1024]; | ||
244 | 159 | my_bool my_true = true; | ||
245 | 160 | |||
246 | 161 | gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"Initializing MySQL module"); | ||
247 | 162 | |||
248 | 163 | gearman_server_set_queue(server, queue, _mysql_queue_add, _mysql_queue_flush, _mysql_queue_done, _mysql_queue_replay); | ||
249 | 164 | |||
250 | 165 | queue->con = mysql_init(queue->con); | ||
251 | 166 | |||
252 | 167 | mysql_options(queue->con, MYSQL_READ_DEFAULT_GROUP, "gearmand"); | ||
253 | 168 | |||
254 | 169 | if (!mysql_real_connect(queue->con, | ||
255 | 170 | queue->mysql_host.c_str(), | ||
256 | 171 | queue->mysql_user.c_str(), | ||
257 | 172 | queue->mysql_password.c_str(), | ||
258 | 173 | queue->mysql_db.c_str(), 0, NULL, 0)) { | ||
259 | 174 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Failed to connect to database: %s", mysql_error(queue->con)); | ||
260 | 175 | |||
261 | 176 | return GEARMAN_QUEUE_ERROR; | ||
262 | 177 | } | ||
263 | 178 | |||
264 | 179 | mysql_options(queue->con, MYSQL_OPT_RECONNECT, &my_true); | ||
265 | 180 | |||
266 | 181 | if (!(result = mysql_list_tables(queue->con, queue->mysql_table.c_str()))){ | ||
267 | 182 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_list_tables failed: %s", mysql_error(queue->con)); | ||
268 | 183 | return GEARMAN_QUEUE_ERROR; | ||
269 | 184 | } | ||
270 | 185 | |||
271 | 186 | if (mysql_num_rows(result) == 0){ | ||
272 | 187 | snprintf(query_buffer, sizeof(query_buffer), | ||
273 | 188 | "CREATE TABLE %s" | ||
274 | 189 | "(" | ||
275 | 190 | "unique_key VARCHAR(%d)," | ||
276 | 191 | "function_name VARCHAR(255)," | ||
277 | 192 | "priority INT," | ||
278 | 193 | "data LONGBLOB," | ||
279 | 194 | "when_to_run INT," | ||
280 | 195 | "unique key (unique_key, function_name)" | ||
281 | 196 | ")", | ||
282 | 197 | queue->mysql_table.c_str(), GEARMAN_UNIQUE_SIZE); | ||
283 | 198 | |||
284 | 199 | gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL module: creating table %s", queue->mysql_table.c_str()); | ||
285 | 200 | |||
286 | 201 | if(mysql_real_query(queue->con, query_buffer, strlen(query_buffer))){ | ||
287 | 202 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL module: create table failed: %s", mysql_error(queue->con)); | ||
288 | 203 | return GEARMAN_QUEUE_ERROR; | ||
289 | 204 | } | ||
290 | 205 | } | ||
291 | 206 | |||
292 | 207 | mysql_free_result(result); | ||
293 | 208 | |||
294 | 209 | if(queue->prepareAddStatement() == GEARMAN_QUEUE_ERROR) | ||
295 | 210 | return GEARMAN_QUEUE_ERROR; | ||
296 | 211 | |||
297 | 212 | if(queue->prepareDoneStatement() == GEARMAN_QUEUE_ERROR) | ||
298 | 213 | return GEARMAN_QUEUE_ERROR; | ||
299 | 214 | |||
300 | 215 | return GEARMAN_SUCCESS; | ||
301 | 216 | } | ||
302 | 217 | |||
303 | 218 | /* | ||
304 | 219 | * Static definitions | ||
305 | 220 | */ | ||
306 | 221 | |||
307 | 222 | |||
308 | 223 | static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context, | ||
309 | 224 | const char *unique, size_t unique_size, | ||
310 | 225 | const char *function_name, | ||
311 | 226 | size_t function_name_size, | ||
312 | 227 | const void *data, size_t data_size, | ||
313 | 228 | gearmand_job_priority_t priority, | ||
314 | 229 | int64_t when) { | ||
315 | 230 | |||
316 | 231 | MYSQL_BIND bind[5]; | ||
317 | 232 | |||
318 | 233 | (void) server; | ||
319 | 234 | |||
320 | 235 | gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context; | ||
321 | 236 | |||
322 | 237 | gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue add: %.*s %.*s", (uint32_t) unique_size, (char *) unique, | ||
323 | 238 | (uint32_t) function_name_size, (char *) function_name); | ||
324 | 239 | |||
325 | 240 | bind[0].buffer_type= MYSQL_TYPE_STRING; | ||
326 | 241 | bind[0].buffer= (char *)unique; | ||
327 | 242 | bind[0].buffer_length= unique_size; | ||
328 | 243 | bind[0].is_null= 0; | ||
329 | 244 | bind[0].length= (unsigned long*)&unique_size; | ||
330 | 245 | |||
331 | 246 | bind[1].buffer_type= MYSQL_TYPE_STRING; | ||
332 | 247 | bind[1].buffer= (char *)function_name; | ||
333 | 248 | bind[1].buffer_length= function_name_size; | ||
334 | 249 | bind[1].is_null= 0; | ||
335 | 250 | bind[1].length= (unsigned long*)&function_name_size; | ||
336 | 251 | |||
337 | 252 | bind[2].buffer_type= MYSQL_TYPE_LONG; | ||
338 | 253 | bind[2].buffer= (char *)&priority; | ||
339 | 254 | bind[2].is_null= 0; | ||
340 | 255 | bind[2].length= 0; | ||
341 | 256 | |||
342 | 257 | bind[3].buffer_type= MYSQL_TYPE_LONG_BLOB; | ||
343 | 258 | bind[3].buffer= (char *)data; | ||
344 | 259 | bind[3].buffer_length= data_size; | ||
345 | 260 | bind[3].is_null= 0; | ||
346 | 261 | bind[3].length= (unsigned long*)&data_size; | ||
347 | 262 | |||
348 | 263 | bind[4].buffer_type= MYSQL_TYPE_LONG; | ||
349 | 264 | bind[4].buffer= (char *)&when; | ||
350 | 265 | bind[4].is_null= 0; | ||
351 | 266 | bind[4].length= 0; | ||
352 | 267 | |||
353 | 268 | while(1){ | ||
354 | 269 | if (mysql_stmt_bind_param(queue->add_stmt, bind)){ | ||
355 | 270 | if ( mysql_stmt_errno(queue->add_stmt) == CR_NO_PREPARE_STMT ){ | ||
356 | 271 | if(queue->prepareAddStatement() == GEARMAN_QUEUE_ERROR){ | ||
357 | 272 | return GEARMAN_QUEUE_ERROR; | ||
358 | 273 | } | ||
359 | 274 | continue; | ||
360 | 275 | } else { | ||
361 | 276 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con)); | ||
362 | 277 | return GEARMAN_QUEUE_ERROR; | ||
363 | 278 | } | ||
364 | 279 | } | ||
365 | 280 | |||
366 | 281 | if (mysql_stmt_execute(queue->add_stmt)){ | ||
367 | 282 | if ( mysql_stmt_errno(queue->add_stmt) == CR_SERVER_LOST ){ | ||
368 | 283 | mysql_stmt_close(queue->add_stmt); | ||
369 | 284 | if(queue->prepareAddStatement() != GEARMAN_QUEUE_ERROR){ | ||
370 | 285 | continue; | ||
371 | 286 | } | ||
372 | 287 | } | ||
373 | 288 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con)); | ||
374 | 289 | return GEARMAN_QUEUE_ERROR; | ||
375 | 290 | } | ||
376 | 291 | |||
377 | 292 | break; | ||
378 | 293 | } | ||
379 | 294 | |||
380 | 295 | return GEARMAN_SUCCESS; | ||
381 | 296 | } | ||
382 | 297 | |||
383 | 298 | static gearmand_error_t _mysql_queue_flush(gearman_server_st *server, | ||
384 | 299 | void *context __attribute__((unused))) { | ||
385 | 300 | |||
386 | 301 | (void) server; | ||
387 | 302 | (void) context; | ||
388 | 303 | |||
389 | 304 | gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue flush"); | ||
390 | 305 | |||
391 | 306 | return GEARMAN_SUCCESS; | ||
392 | 307 | } | ||
393 | 308 | |||
394 | 309 | static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context, | ||
395 | 310 | const char *unique, | ||
396 | 311 | size_t unique_size, | ||
397 | 312 | const char *function_name, | ||
398 | 313 | size_t function_name_size) { | ||
399 | 314 | |||
400 | 315 | MYSQL_BIND bind[2]; | ||
401 | 316 | |||
402 | 317 | (void) server; | ||
403 | 318 | |||
404 | 319 | gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue done: %.*s %.*s", (uint32_t) unique_size, (char *) unique, | ||
405 | 320 | (uint32_t) function_name_size, (char *) function_name); | ||
406 | 321 | |||
407 | 322 | gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context; | ||
408 | 323 | |||
409 | 324 | bind[0].buffer_type= MYSQL_TYPE_STRING; | ||
410 | 325 | bind[0].buffer= (char *)unique; | ||
411 | 326 | bind[0].buffer_length= unique_size; | ||
412 | 327 | bind[0].is_null= 0; | ||
413 | 328 | bind[0].length= (unsigned long*)&unique_size; | ||
414 | 329 | |||
415 | 330 | bind[1].buffer_type= MYSQL_TYPE_STRING; | ||
416 | 331 | bind[1].buffer= (char *)function_name; | ||
417 | 332 | bind[1].buffer_length= function_name_size; | ||
418 | 333 | bind[1].is_null= 0; | ||
419 | 334 | bind[1].length= (unsigned long*)&function_name_size; | ||
420 | 335 | |||
421 | 336 | while(1){ | ||
422 | 337 | if (mysql_stmt_bind_param(queue->done_stmt, bind)){ | ||
423 | 338 | if ( mysql_stmt_errno(queue->done_stmt) == CR_NO_PREPARE_STMT ){ | ||
424 | 339 | if(queue->prepareDoneStatement() == GEARMAN_QUEUE_ERROR){ | ||
425 | 340 | return GEARMAN_QUEUE_ERROR; | ||
426 | 341 | } | ||
427 | 342 | continue; | ||
428 | 343 | } else { | ||
429 | 344 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con)); | ||
430 | 345 | return GEARMAN_QUEUE_ERROR; | ||
431 | 346 | } | ||
432 | 347 | } | ||
433 | 348 | |||
434 | 349 | if (mysql_stmt_execute(queue->done_stmt)){ | ||
435 | 350 | if ( mysql_stmt_errno(queue->done_stmt) == CR_SERVER_LOST ){ | ||
436 | 351 | mysql_stmt_close(queue->done_stmt); | ||
437 | 352 | if(queue->prepareDoneStatement() != GEARMAN_QUEUE_ERROR){ | ||
438 | 353 | continue; | ||
439 | 354 | } | ||
440 | 355 | } | ||
441 | 356 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con)); | ||
442 | 357 | return GEARMAN_QUEUE_ERROR; | ||
443 | 358 | } | ||
444 | 359 | |||
445 | 360 | break; | ||
446 | 361 | } | ||
447 | 362 | |||
448 | 363 | return GEARMAN_SUCCESS; | ||
449 | 364 | } | ||
450 | 365 | |||
451 | 366 | static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context, | ||
452 | 367 | gearman_queue_add_fn *add_fn, | ||
453 | 368 | void *add_context) { | ||
454 | 369 | |||
455 | 370 | MYSQL_RES * result; | ||
456 | 371 | MYSQL_ROW row; | ||
457 | 372 | char query_buffer[1024]; | ||
458 | 373 | |||
459 | 374 | (void) server; | ||
460 | 375 | |||
461 | 376 | gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue replay"); | ||
462 | 377 | |||
463 | 378 | gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context; | ||
464 | 379 | |||
465 | 380 | snprintf(query_buffer, sizeof(query_buffer), | ||
466 | 381 | "SELECT unique_key, function_name, data, priority, when_to_run FROM %s", | ||
467 | 382 | queue->mysql_table.c_str()); | ||
468 | 383 | |||
469 | 384 | if(mysql_real_query(queue->con, query_buffer, strlen(query_buffer))){ | ||
470 | 385 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_real_query failed: %s", mysql_error(queue->con)); | ||
471 | 386 | return GEARMAN_QUEUE_ERROR; | ||
472 | 387 | } | ||
473 | 388 | |||
474 | 389 | if(!(result = mysql_store_result(queue->con))){ | ||
475 | 390 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_store_result failed: %s", mysql_error(queue->con)); | ||
476 | 391 | return GEARMAN_QUEUE_ERROR; | ||
477 | 392 | } | ||
478 | 393 | |||
479 | 394 | if(mysql_num_fields(result) < 5){ | ||
480 | 395 | gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL queue: insufficient row fields in queue table"); | ||
481 | 396 | return GEARMAN_QUEUE_ERROR; | ||
482 | 397 | } | ||
483 | 398 | |||
484 | 399 | gearmand_error_t ret = GEARMAN_SUCCESS; | ||
485 | 400 | |||
486 | 401 | while ((row = mysql_fetch_row(result))) { | ||
487 | 402 | unsigned long *lengths; | ||
488 | 403 | gearmand_job_priority_t priority = (gearmand_job_priority_t)0; | ||
489 | 404 | int when = 0; | ||
490 | 405 | |||
491 | 406 | lengths = mysql_fetch_lengths(result); | ||
492 | 407 | |||
493 | 408 | /* need to make a copy here ... gearman_server_job_free will free it later */ | ||
494 | 409 | size_t data_size= lengths[2]; | ||
495 | 410 | char * data= (char *)malloc(data_size); | ||
496 | 411 | if (data == NULL){ | ||
497 | 412 | gearmand_perror("malloc failed"); | ||
498 | 413 | return GEARMAN_MEMORY_ALLOCATION_FAILURE; | ||
499 | 414 | } | ||
500 | 415 | memcpy(data, row[2], data_size); | ||
501 | 416 | |||
502 | 417 | if(lengths[3]) priority = (gearmand_job_priority_t) atoi(row[3]); | ||
503 | 418 | if(lengths[4]) when = atoi(row[4]); | ||
504 | 419 | |||
505 | 420 | ret = (*add_fn)(server, add_context, | ||
506 | 421 | row[0], (size_t) lengths[0], | ||
507 | 422 | row[1], (size_t) lengths[1], | ||
508 | 423 | data, data_size, | ||
509 | 424 | priority, | ||
510 | 425 | when); | ||
511 | 426 | |||
512 | 427 | if (ret != GEARMAN_SUCCESS) { | ||
513 | 428 | break; | ||
514 | 429 | } | ||
515 | 430 | } | ||
516 | 431 | |||
517 | 432 | mysql_free_result(result); | ||
518 | 433 | |||
519 | 434 | return ret; | ||
520 | 435 | } | ||
521 | 0 | 436 | ||
522 | === added file 'libgearman-server/plugins/queue/mysql/queue.h' | |||
523 | --- libgearman-server/plugins/queue/mysql/queue.h 1970-01-01 00:00:00 +0000 | |||
524 | +++ libgearman-server/plugins/queue/mysql/queue.h 2012-02-14 10:32:50 +0000 | |||
525 | @@ -0,0 +1,48 @@ | |||
526 | 1 | /* | ||
527 | 2 | * Gearmand client and server library. | ||
528 | 3 | * | ||
529 | 4 | * Copyright (C) 2011 Data Differential, http://datadifferential.com/ | ||
530 | 5 | * All rights reserved. | ||
531 | 6 | * | ||
532 | 7 | * Redistribution and use in source and binary forms, with or without | ||
533 | 8 | * modification, are permitted provided that the following conditions are | ||
534 | 9 | * met: | ||
535 | 10 | * | ||
536 | 11 | * * Redistributions of source code must retain the above copyright | ||
537 | 12 | * notice, this list of conditions and the following disclaimer. | ||
538 | 13 | * | ||
539 | 14 | * * Redistributions in binary form must reproduce the above | ||
540 | 15 | * copyright notice, this list of conditions and the following disclaimer | ||
541 | 16 | * in the documentation and/or other materials provided with the | ||
542 | 17 | * distribution. | ||
543 | 18 | * | ||
544 | 19 | * * The names of its contributors may not be used to endorse or | ||
545 | 20 | * promote products derived from this software without specific prior | ||
546 | 21 | * written permission. | ||
547 | 22 | * | ||
548 | 23 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
549 | 24 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
550 | 25 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
551 | 26 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
552 | 27 | * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
553 | 28 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
554 | 29 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
555 | 30 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
556 | 31 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
557 | 32 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
558 | 33 | * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
559 | 34 | * | ||
560 | 35 | */ | ||
561 | 36 | |||
562 | 37 | #pragma once | ||
563 | 38 | |||
564 | 39 | |||
565 | 40 | namespace gearmand { | ||
566 | 41 | namespace plugins { | ||
567 | 42 | namespace queue { | ||
568 | 43 | |||
569 | 44 | void initialize_mysql(); | ||
570 | 45 | |||
571 | 46 | } // namespace queue | ||
572 | 47 | } // namespace plugin | ||
573 | 48 | } // namespace gearmand | ||
574 | 0 | 49 | ||
575 | === added file 'm4/ax_lib_mysql.m4' | |||
576 | --- m4/ax_lib_mysql.m4 1970-01-01 00:00:00 +0000 | |||
577 | +++ m4/ax_lib_mysql.m4 2012-02-14 10:32:50 +0000 | |||
578 | @@ -0,0 +1,147 @@ | |||
579 | 1 | # =========================================================================== | ||
580 | 2 | # http://www.gnu.org/software/autoconf-archive/ax_lib_mysql.html | ||
581 | 3 | # =========================================================================== | ||
582 | 4 | # | ||
583 | 5 | # SYNOPSIS | ||
584 | 6 | # | ||
585 | 7 | # AX_LIB_MYSQL([MINIMUM-VERSION]) | ||
586 | 8 | # | ||
587 | 9 | # DESCRIPTION | ||
588 | 10 | # | ||
589 | 11 | # This macro provides tests of availability of MySQL client library of | ||
590 | 12 | # particular version or newer. | ||
591 | 13 | # | ||
592 | 14 | # AX_LIB_MYSQL macro takes only one argument which is optional. If there | ||
593 | 15 | # is no required version passed, then macro does not run version test. | ||
594 | 16 | # | ||
595 | 17 | # The --with-mysql option takes one of three possible values: | ||
596 | 18 | # | ||
597 | 19 | # no - do not check for MySQL client library | ||
598 | 20 | # | ||
599 | 21 | # yes - do check for MySQL library in standard locations (mysql_config | ||
600 | 22 | # should be in the PATH) | ||
601 | 23 | # | ||
602 | 24 | # path - complete path to mysql_config utility, use this option if | ||
603 | 25 | # mysql_config can't be found in the PATH | ||
604 | 26 | # | ||
605 | 27 | # This macro calls: | ||
606 | 28 | # | ||
607 | 29 | # AC_SUBST(MYSQL_CFLAGS) | ||
608 | 30 | # AC_SUBST(MYSQL_LDFLAGS) | ||
609 | 31 | # AC_SUBST(MYSQL_VERSION) | ||
610 | 32 | # | ||
611 | 33 | # And sets: | ||
612 | 34 | # | ||
613 | 35 | # HAVE_MYSQL | ||
614 | 36 | # | ||
615 | 37 | # LICENSE | ||
616 | 38 | # | ||
617 | 39 | # Copyright (c) 2008 Mateusz Loskot <mateusz@loskot.net> | ||
618 | 40 | # | ||
619 | 41 | # Copying and distribution of this file, with or without modification, are | ||
620 | 42 | # permitted in any medium without royalty provided the copyright notice | ||
621 | 43 | # and this notice are preserved. This file is offered as-is, without any | ||
622 | 44 | # warranty. | ||
623 | 45 | |||
624 | 46 | #serial 12 | ||
625 | 47 | |||
626 | 48 | AC_DEFUN([AX_LIB_MYSQL], | ||
627 | 49 | [ | ||
628 | 50 | AC_ARG_WITH([mysql], | ||
629 | 51 | AS_HELP_STRING([--with-mysql=@<:@ARG@:>@], | ||
630 | 52 | [use MySQL client library @<:@default=yes@:>@, optionally specify path to mysql_config] | ||
631 | 53 | ), | ||
632 | 54 | [ | ||
633 | 55 | if test "$withval" = "no"; then | ||
634 | 56 | want_mysql="no" | ||
635 | 57 | elif test "$withval" = "yes"; then | ||
636 | 58 | want_mysql="yes" | ||
637 | 59 | else | ||
638 | 60 | want_mysql="yes" | ||
639 | 61 | MYSQL_CONFIG="$withval" | ||
640 | 62 | fi | ||
641 | 63 | ], | ||
642 | 64 | [want_mysql="yes"] | ||
643 | 65 | ) | ||
644 | 66 | AC_ARG_VAR([MYSQL_CONFIG], [Full path to mysql_config program]) | ||
645 | 67 | |||
646 | 68 | MYSQL_CFLAGS="" | ||
647 | 69 | MYSQL_LDFLAGS="" | ||
648 | 70 | MYSQL_VERSION="" | ||
649 | 71 | |||
650 | 72 | dnl | ||
651 | 73 | dnl Check MySQL libraries | ||
652 | 74 | dnl | ||
653 | 75 | |||
654 | 76 | if test "$want_mysql" = "yes"; then | ||
655 | 77 | |||
656 | 78 | if test -z "$MYSQL_CONFIG" ; then | ||
657 | 79 | AC_PATH_PROGS([MYSQL_CONFIG], [mysql_config mysql_config5], [no]) | ||
658 | 80 | fi | ||
659 | 81 | |||
660 | 82 | if test "$MYSQL_CONFIG" != "no"; then | ||
661 | 83 | MYSQL_CFLAGS="`$MYSQL_CONFIG --cflags`" | ||
662 | 84 | MYSQL_LDFLAGS="`$MYSQL_CONFIG --libs`" | ||
663 | 85 | |||
664 | 86 | MYSQL_VERSION=`$MYSQL_CONFIG --version` | ||
665 | 87 | |||
666 | 88 | found_mysql="yes" | ||
667 | 89 | else | ||
668 | 90 | found_mysql="no" | ||
669 | 91 | fi | ||
670 | 92 | fi | ||
671 | 93 | |||
672 | 94 | dnl | ||
673 | 95 | dnl Check if required version of MySQL is available | ||
674 | 96 | dnl | ||
675 | 97 | |||
676 | 98 | |||
677 | 99 | mysql_version_req=ifelse([$1], [], [], [$1]) | ||
678 | 100 | |||
679 | 101 | if test "$found_mysql" = "yes" -a -n "$mysql_version_req"; then | ||
680 | 102 | |||
681 | 103 | AC_MSG_CHECKING([if MySQL version is >= $mysql_version_req]) | ||
682 | 104 | |||
683 | 105 | dnl Decompose required version string of MySQL | ||
684 | 106 | dnl and calculate its number representation | ||
685 | 107 | mysql_version_req_major=`expr $mysql_version_req : '\([[0-9]]*\)'` | ||
686 | 108 | mysql_version_req_minor=`expr $mysql_version_req : '[[0-9]]*\.\([[0-9]]*\)'` | ||
687 | 109 | mysql_version_req_micro=`expr $mysql_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'` | ||
688 | 110 | if test "x$mysql_version_req_micro" = "x"; then | ||
689 | 111 | mysql_version_req_micro="0" | ||
690 | 112 | fi | ||
691 | 113 | |||
692 | 114 | mysql_version_req_number=`expr $mysql_version_req_major \* 1000000 \ | ||
693 | 115 | \+ $mysql_version_req_minor \* 1000 \ | ||
694 | 116 | \+ $mysql_version_req_micro` | ||
695 | 117 | |||
696 | 118 | dnl Decompose version string of installed MySQL | ||
697 | 119 | dnl and calculate its number representation | ||
698 | 120 | mysql_version_major=`expr $MYSQL_VERSION : '\([[0-9]]*\)'` | ||
699 | 121 | mysql_version_minor=`expr $MYSQL_VERSION : '[[0-9]]*\.\([[0-9]]*\)'` | ||
700 | 122 | mysql_version_micro=`expr $MYSQL_VERSION : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'` | ||
701 | 123 | if test "x$mysql_version_micro" = "x"; then | ||
702 | 124 | mysql_version_micro="0" | ||
703 | 125 | fi | ||
704 | 126 | |||
705 | 127 | mysql_version_number=`expr $mysql_version_major \* 1000000 \ | ||
706 | 128 | \+ $mysql_version_minor \* 1000 \ | ||
707 | 129 | \+ $mysql_version_micro` | ||
708 | 130 | |||
709 | 131 | mysql_version_check=`expr $mysql_version_number \>\= $mysql_version_req_number` | ||
710 | 132 | if test "$mysql_version_check" = "1"; then | ||
711 | 133 | AC_MSG_RESULT([yes]) | ||
712 | 134 | else | ||
713 | 135 | AC_MSG_RESULT([no]) | ||
714 | 136 | fi | ||
715 | 137 | fi | ||
716 | 138 | |||
717 | 139 | if test "$found_mysql" = "yes" ; then | ||
718 | 140 | AC_DEFINE([HAVE_MYSQL], [1], | ||
719 | 141 | [Define to 1 if MySQL libraries are available]) | ||
720 | 142 | fi | ||
721 | 143 | |||
722 | 144 | AC_SUBST([MYSQL_VERSION]) | ||
723 | 145 | AC_SUBST([MYSQL_CFLAGS]) | ||
724 | 146 | AC_SUBST([MYSQL_LDFLAGS]) | ||
725 | 147 | ]) |