Merge lp:~tgates/gearmand/nuodb-queue-adapter into lp:gearmand
- nuodb-queue-adapter
- Merge into 1.2
Proposed by
Tom Gates
Status: | Needs review |
---|---|
Proposed branch: | lp:~tgates/gearmand/nuodb-queue-adapter |
Merge into: | lp:gearmand |
Diff against target: |
833 lines (+768/-0) 8 files modified
configure.ac (+6/-0) libgearman-server/plugins.cc (+7/-0) libgearman-server/plugins/queue.h (+2/-0) libgearman-server/plugins/queue/include.am (+1/-0) libgearman-server/plugins/queue/nuodb/include.am (+23/-0) libgearman-server/plugins/queue/nuodb/queue.cc (+550/-0) libgearman-server/plugins/queue/nuodb/queue.h (+49/-0) m4/ax_lib_nuodb.m4 (+130/-0) |
To merge this branch: | bzr merge lp:~tgates/gearmand/nuodb-queue-adapter |
Related bugs: | |
Related blueprints: |
Add support for NuoDB persistent queues.
(Undefined)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Tangent Trunk | Pending | ||
Review via email: mp+219047@code.launchpad.net |
Commit message
Description of the change
Add support for using NuoDB as a persistent queue for Gearman.
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'configure.ac' |
2 | --- configure.ac 2014-02-16 02:15:49 +0000 |
3 | +++ configure.ac 2014-05-09 18:35:29 +0000 |
4 | @@ -104,6 +104,11 @@ |
5 | [AC_DEFINE([HAVE_LIBSQLITE3],[0],[Have the SQLITE3 library])]) |
6 | AM_CONDITIONAL([HAVE_LIBSQLITE3],[test "x${WANT_SQLITE3}" = "xyes"]) |
7 | |
8 | +AX_LIB_NUODB |
9 | +AS_IF([test "x${WANT_NUODB}" = "xyes"], |
10 | + [AC_DEFINE([HAVE_NUODB],[1],[Have the NuoDB library])], |
11 | + [AC_DEFINE([HAVE_NUODB],[0],[Have the NuoDB library])]) |
12 | +AM_CONDITIONAL([HAVE_NUODB],[test "x${WANT_NUODB}" = "xyes"]) |
13 | |
14 | PANDORA_ENABLE_DTRACE |
15 | AX_HAVE_LIBPQ |
16 | @@ -345,6 +350,7 @@ |
17 | echo " * Debug enabled: $ax_enable_debug" |
18 | echo " * Warnings as failure: $ac_cv_warnings_as_errors" |
19 | echo " * Building with libsqlite3 $WANT_SQLITE3" |
20 | +echo " * Building with nuodb $WANT_NUODB" |
21 | echo " * Building with libdrizzle $ac_enable_libdrizzle" |
22 | echo " * Building with libmemcached $ax_enable_libmemcached" |
23 | echo " * Building with libpq $ac_cv_libpq" |
24 | |
25 | === modified file 'libgearman-server/plugins.cc' |
26 | --- libgearman-server/plugins.cc 2013-10-20 11:00:25 +0000 |
27 | +++ libgearman-server/plugins.cc 2014-05-09 18:35:29 +0000 |
28 | @@ -71,6 +71,13 @@ |
29 | } |
30 | #endif |
31 | |
32 | +#if defined(HAVE_NUODB) && HAVE_NUODB |
33 | + if (HAVE_NUODB) |
34 | + { |
35 | + queue::initialize_nuodb(); |
36 | + } |
37 | +#endif |
38 | + |
39 | #if defined(HAVE_LIBPQ) && HAVE_LIBPQ |
40 | if (HAVE_LIBPQ) |
41 | { |
42 | |
43 | === modified file 'libgearman-server/plugins/queue.h' |
44 | --- libgearman-server/plugins/queue.h 2012-11-23 09:17:06 +0000 |
45 | +++ libgearman-server/plugins/queue.h 2014-05-09 18:35:29 +0000 |
46 | @@ -52,3 +52,5 @@ |
47 | #include <libgearman-server/plugins/queue/redis/queue.h> |
48 | |
49 | #include <libgearman-server/plugins/queue/mysql/queue.h> |
50 | + |
51 | +#include <libgearman-server/plugins/queue/nuodb/queue.h> |
52 | |
53 | === modified file 'libgearman-server/plugins/queue/include.am' |
54 | --- libgearman-server/plugins/queue/include.am 2012-11-23 09:17:06 +0000 |
55 | +++ libgearman-server/plugins/queue/include.am 2014-05-09 18:35:29 +0000 |
56 | @@ -21,3 +21,4 @@ |
57 | include libgearman-server/plugins/queue/sqlite/include.am |
58 | include libgearman-server/plugins/queue/tokyocabinet/include.am |
59 | include libgearman-server/plugins/queue/mysql/include.am |
60 | +include libgearman-server/plugins/queue/nuodb/include.am |
61 | |
62 | === added directory 'libgearman-server/plugins/queue/nuodb' |
63 | === added file 'libgearman-server/plugins/queue/nuodb/include.am' |
64 | --- libgearman-server/plugins/queue/nuodb/include.am 1970-01-01 00:00:00 +0000 |
65 | +++ libgearman-server/plugins/queue/nuodb/include.am 2014-05-09 18:35:29 +0000 |
66 | @@ -0,0 +1,23 @@ |
67 | +# vim:ft=automake |
68 | +# Gearman |
69 | +# Copyright (C) 2011 Data Differential, http://datadifferential.com/ |
70 | +# All rights reserved. |
71 | +# |
72 | +# Use and distribution licensed under the BSD license. See |
73 | +# the COPYING file in the parent directory for full text. |
74 | +# |
75 | +# All paths should be given relative to the root |
76 | +# |
77 | + |
78 | +noinst_HEADERS+= libgearman-server/plugins/queue/nuodb/queue.h |
79 | + |
80 | +if HAVE_NUODB |
81 | + |
82 | +libgearman_server_libgearman_server_la_SOURCES+= libgearman-server/plugins/queue/nuodb/queue.cc |
83 | +libgearman_server_libgearman_server_la_LIBADD+= $(NUODB_LDFLAGS) |
84 | +libgearman_server_libgearman_server_la_CXXFLAGS+= $(NUODB_CFLAGS) |
85 | + |
86 | +gearmand_gearmand_LDADD+= $(NUODB_LDFLAGS) |
87 | + |
88 | +endif |
89 | + |
90 | |
91 | === added file 'libgearman-server/plugins/queue/nuodb/queue.cc' |
92 | --- libgearman-server/plugins/queue/nuodb/queue.cc 1970-01-01 00:00:00 +0000 |
93 | +++ libgearman-server/plugins/queue/nuodb/queue.cc 2014-05-09 18:35:29 +0000 |
94 | @@ -0,0 +1,550 @@ |
95 | +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: |
96 | + * |
97 | + * Gearmand client and server library. |
98 | + * |
99 | + * Copyright (C) 2011 Data Differential, http://datadifferential.com/ |
100 | + * Copyright (C) 2008 Brian Aker, Eric Day |
101 | + * All rights reserved. |
102 | + * |
103 | + * Redistribution and use in source and binary forms, with or without |
104 | + * modification, are permitted provided that the following conditions are |
105 | + * met: |
106 | + * |
107 | + * * Redistributions of source code must retain the above copyright |
108 | + * notice, this list of conditions and the following disclaimer. |
109 | + * |
110 | + * * Redistributions in binary form must reproduce the above |
111 | + * copyright notice, this list of conditions and the following disclaimer |
112 | + * in the documentation and/or other materials provided with the |
113 | + * distribution. |
114 | + * |
115 | + * * The names of its contributors may not be used to endorse or |
116 | + * promote products derived from this software without specific prior |
117 | + * written permission. |
118 | + * |
119 | + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
120 | + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
121 | + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
122 | + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
123 | + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
124 | + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
125 | + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
126 | + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
127 | + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
128 | + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
129 | + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
130 | + * |
131 | + */ |
132 | + |
133 | +/** |
134 | + * @file |
135 | + * @brief NuoDB Queue Storage Definitions |
136 | + */ |
137 | + |
138 | +#include <gear_config.h> |
139 | +#include <libgearman-server/common.h> |
140 | +#include <libgearman-server/byte.h> |
141 | +#include <libgearman-server/plugins/queue/nuodb/queue.h> |
142 | +#include <libgearman-server/plugins/queue/base.h> |
143 | + |
144 | +#pragma GCC diagnostic push |
145 | +#pragma GCC diagnostic ignored "-Wundef" |
146 | +#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" |
147 | +#pragma GCC diagnostic ignored "-Woverloaded-virtual" |
148 | +#pragma GCC diagnostic ignored "-Wreorder" |
149 | +#pragma GCC diagnostic ignored "-Wunused-parameter" |
150 | +#pragma GCC diagnostic ignored "-Wold-style-cast" |
151 | + |
152 | +#include <NuoDB.h> |
153 | +#include <cerrno> |
154 | + |
155 | +/** |
156 | + * @addtogroup plugins::queue::NuoDB Static NuoDB Queue Storage Definitions |
157 | + * @ingroup gearman_queue_nuodb |
158 | + * @{ |
159 | + */ |
160 | + |
161 | +/** |
162 | + * Default values. |
163 | + */ |
164 | +#define GEARMAND_QUEUE_NUODB_DEFAULT_SCHEMA "gearman" |
165 | +#define GEARMAND_QUEUE_NUODB_DEFAULT_TABLE "queue" |
166 | + |
167 | +namespace gearmand { namespace plugins { namespace queue { class Nuodb; }}} |
168 | + |
169 | +static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Nuodb *queue); |
170 | + |
171 | +namespace gearmand { |
172 | +namespace plugins { |
173 | +namespace queue { |
174 | + |
175 | +class Nuodb : public gearmand::plugins::Queue { |
176 | +public: |
177 | + Nuodb(); |
178 | + ~Nuodb(); |
179 | + |
180 | + gearmand_error_t initialize(); |
181 | + |
182 | + const std::string &insert() { |
183 | + return _insert_query; |
184 | + } |
185 | + |
186 | + const std::string &select() { |
187 | + return _select_query; |
188 | + } |
189 | + |
190 | + const std::string &create() { |
191 | + return _create_query; |
192 | + } |
193 | + |
194 | + NuoDB::Connection *con; |
195 | + std::string nuodb_connect_string; |
196 | + std::string nuodb_user; |
197 | + std::string nuodb_pw; |
198 | + std::string schema; |
199 | + std::string table; |
200 | + NuoDB::PreparedStatement *_insert_stmt; |
201 | + NuoDB::PreparedStatement *_delete_stmt; |
202 | + NuoDB::PreparedStatement *_select_stmt; |
203 | + |
204 | +public: |
205 | + std::string _insert_query; |
206 | + std::string _select_query; |
207 | + std::string _create_query; |
208 | + |
209 | + NuoDB::PreparedStatement *insert_stmt() { |
210 | + return _insert_stmt; |
211 | + } |
212 | + |
213 | + NuoDB::PreparedStatement *delete_stmt() { |
214 | + return _delete_stmt; |
215 | + } |
216 | + |
217 | + NuoDB::PreparedStatement *select_stmt() { |
218 | + return _select_stmt; |
219 | + } |
220 | + |
221 | +}; |
222 | + |
223 | +Nuodb::Nuodb() : |
224 | + Queue("NuoDB"), |
225 | + con(NULL), |
226 | + nuodb_connect_string(""), |
227 | + nuodb_user(""), |
228 | + nuodb_pw(""), |
229 | + table(""), |
230 | + schema("") |
231 | +{ |
232 | + command_line_options().add_options() |
233 | + ("nuodb-conninfo", boost::program_options::value(&nuodb_connect_string)->default_value(""), "NuoDB connection information string.") |
234 | + ("nuodb-user", boost::program_options::value(&nuodb_user)->default_value(""), "NuoDB connection username.") |
235 | + ("nuodb-password", boost::program_options::value(&nuodb_pw)->default_value(""), "NuoDB connection password.") |
236 | + ("nuodb-table", boost::program_options::value(&table)->default_value(GEARMAND_QUEUE_NUODB_DEFAULT_TABLE), "Table to use.") |
237 | + ("nuodb-schema", boost::program_options::value(&schema)->default_value(GEARMAND_QUEUE_NUODB_DEFAULT_SCHEMA), "Schema to use."); |
238 | +} |
239 | + |
240 | +Nuodb::~Nuodb () |
241 | +{ |
242 | + if (con) { |
243 | + con->close(); |
244 | + } |
245 | + con = NULL; |
246 | +} |
247 | + |
248 | +gearmand_error_t Nuodb::initialize() |
249 | +{ |
250 | + _create_query+= "CREATE TABLE " +table +" (unique_key VARCHAR" +"(" + TOSTRING(GEARMAN_UNIQUE_SIZE) +"), "; |
251 | + _create_query+= "function_name VARCHAR(255), priority INTEGER, data BLOB, when_to_run INTEGER, UNIQUE (unique_key, function_name))"; |
252 | + _insert_query+= "INSERT INTO " +table +" (priority, unique_key, function_name, data, when_to_run) VALUES(?,?,?,?,?)"; |
253 | + _select_query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM " +table; |
254 | + gearmand_error_t ret= _initialize(Gearmand()->server, this); |
255 | + return ret; |
256 | +} |
257 | + |
258 | +void initialize_nuodb() |
259 | +{ |
260 | + static Nuodb local_instance; |
261 | +} |
262 | + |
263 | +} // namespace queue |
264 | +} // namespace plugins |
265 | +} // namespace gearmand |
266 | + |
267 | + |
268 | +/* Queue callback functions. */ |
269 | +static gearmand_error_t _nuodb_add(gearman_server_st *server, void *context, |
270 | + const char *unique, size_t unique_size, |
271 | + const char *function_name, |
272 | + size_t function_name_size, |
273 | + const void *data, size_t data_size, |
274 | + gearman_job_priority_t priority, |
275 | + int64_t when); |
276 | + |
277 | +static gearmand_error_t _nuodb_flush(gearman_server_st *server, void *context); |
278 | + |
279 | +static gearmand_error_t _nuodb_done(gearman_server_st *server, void *context, |
280 | + const char *unique, |
281 | + size_t unique_size, |
282 | + const char *function_name, |
283 | + size_t function_name_size); |
284 | + |
285 | +static gearmand_error_t _nuodb_replay(gearman_server_st *server, void *context, |
286 | + gearman_queue_add_fn *add_fn, |
287 | + void *add_context); |
288 | + |
289 | +/** @} */ |
290 | + |
291 | +/* |
292 | + * Public definitions |
293 | + */ |
294 | + |
295 | +gearmand_error_t _initialize(gearman_server_st& server, |
296 | + gearmand::plugins::queue::Nuodb *queue) |
297 | +{ |
298 | + // TODO: use nuodb_connnect_string |
299 | + const char *database = queue->nuodb_connect_string.c_str(); |
300 | + const char *user = queue->nuodb_user.c_str(); |
301 | + const char *pw = queue->nuodb_pw.c_str(); |
302 | + const char *schema = queue->schema.c_str(); |
303 | + |
304 | + gearmand_info("Initializing nuodb module"); |
305 | + |
306 | + gearman_server_set_queue(server, queue, _nuodb_add, _nuodb_flush, _nuodb_done, _nuodb_replay); |
307 | + |
308 | + try { |
309 | + queue->con = NuoDB::Connection::createUtf8(); |
310 | + NuoDB::Properties* properties = queue->con->allocProperties(); |
311 | + properties->putValue("user", user); |
312 | + properties->putValue("password", pw); |
313 | + properties->putValue("schema", schema); |
314 | + queue->con->openDatabase(database, properties); |
315 | + } catch (NuoDB::SQLException &e) { |
316 | + gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL); |
317 | + return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "NuoDB Connection: [%s] %s", e.getSqlcode(), e.getText()); |
318 | + } catch (...) { |
319 | + // Unknown error |
320 | + queue->con = NULL; |
321 | + } |
322 | + if (queue->con == NULL) |
323 | + { |
324 | + gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL); |
325 | + return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "NuoDB Connection: %s", "Unknown error"); |
326 | + } |
327 | + |
328 | + |
329 | + std::string query("SELECT TABLENAME FROM SYSTEM.TABLES WHERE TYPE = 'TABLE' AND SCHEMA = '" +queue->schema +"' AND TABLENAME = '" +queue->table + "'"); |
330 | + |
331 | + NuoDB::PreparedStatement *stmt = NULL; |
332 | + NuoDB::ResultSet *rs = NULL; |
333 | + |
334 | + try { |
335 | + stmt = queue->con->prepareStatement(query.c_str()); |
336 | + stmt->execute(); |
337 | + rs = stmt->getResultSet(); |
338 | + } catch (NuoDB::SQLException &e) { |
339 | + std::string error_string= "NuoDB:"; |
340 | + error_string += "Queue error: ["; |
341 | + error_string += e.getSqlcode(); |
342 | + error_string += "] "; |
343 | + error_string += e.getText(); |
344 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
345 | + return GEARMAND_QUEUE_ERROR; |
346 | + } catch (...) { |
347 | + // Unknown error |
348 | + rs = NULL; |
349 | + } |
350 | + |
351 | + if (rs == NULL) { |
352 | + std::string error_string= "NuoDB:"; |
353 | + error_string+= "No results for query: "; |
354 | + error_string+= query.c_str(); |
355 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
356 | + return GEARMAND_QUEUE_ERROR; |
357 | + } |
358 | + |
359 | + if (!rs->next()) |
360 | + { |
361 | + gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "nuodb module creating table '%s'", queue->table.c_str()); |
362 | + |
363 | + bool result = 1; |
364 | + const char *sql = queue->create().c_str(); |
365 | + try { |
366 | + stmt = queue->con->prepareStatement(sql); |
367 | + result = stmt->execute(); |
368 | + } catch (NuoDB::SQLException &e) { |
369 | + std::string error_string= "NuoDB:"; |
370 | + error_string += "Table create error: ["; |
371 | + error_string += e.getSqlcode(); |
372 | + error_string += "] "; |
373 | + error_string += e.getText(); |
374 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
375 | + return GEARMAND_QUEUE_ERROR; |
376 | + } catch (...) { |
377 | + // Unknown error |
378 | + std::string error_string= "NuoDB:"; |
379 | + error_string+= "Table create error for statement: "; |
380 | + error_string+= query.c_str(); |
381 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
382 | + return GEARMAND_QUEUE_ERROR; |
383 | + } |
384 | + |
385 | + if (result == 1) |
386 | + { |
387 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, |
388 | + "NuoDB:Problem creating table: %s", sql); |
389 | + gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL); |
390 | + return GEARMAND_QUEUE_ERROR; |
391 | + } |
392 | + } |
393 | + |
394 | + const char *insert_sql = queue->insert().c_str(); |
395 | + try { |
396 | + queue->_insert_stmt = queue->con->prepareStatement(insert_sql); |
397 | + } catch (NuoDB::SQLException &e) { |
398 | + std::string error_string= "NuoDB:"; |
399 | + error_string += "Error preparing insert statement: ["; |
400 | + error_string += e.getSqlcode(); |
401 | + error_string += "] "; |
402 | + error_string += e.getText(); |
403 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
404 | + return GEARMAND_QUEUE_ERROR; |
405 | + } catch (...) { |
406 | + // Unknown error |
407 | + std::string error_string= "NuoDB:"; |
408 | + error_string+= "Error preparing insert statement: "; |
409 | + error_string+= insert_sql; |
410 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
411 | + return GEARMAND_QUEUE_ERROR; |
412 | + } |
413 | + |
414 | + std::string delete_query; |
415 | + delete_query+= "DELETE FROM "; |
416 | + delete_query+= queue->table; |
417 | + delete_query+= " WHERE unique_key = ?"; |
418 | + delete_query+= " AND function_name = ?"; |
419 | + const char *delete_sql = delete_query.c_str(); |
420 | + try { |
421 | + queue->_delete_stmt = queue->con->prepareStatement(delete_sql); |
422 | + } catch (NuoDB::SQLException &e) { |
423 | + std::string error_string= "NuoDB:"; |
424 | + error_string += "Error preparing delete statement: ["; |
425 | + error_string += e.getSqlcode(); |
426 | + error_string += "] "; |
427 | + error_string += e.getText(); |
428 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
429 | + return GEARMAND_QUEUE_ERROR; |
430 | + } catch (...) { |
431 | + // Unknown error |
432 | + std::string error_string= "NuoDB:"; |
433 | + error_string+= "Error preparing delete statement: "; |
434 | + error_string+= insert_sql; |
435 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
436 | + return GEARMAND_QUEUE_ERROR; |
437 | + } |
438 | + |
439 | + std::string select_query("SELECT unique_key,function_name,priority,data,when_to_run FROM " + queue->table); |
440 | + const char *select_sql = select_query.c_str(); |
441 | + try { |
442 | + queue->_select_stmt = queue->con->prepareStatement(select_sql); |
443 | + } catch (NuoDB::SQLException &e) { |
444 | + std::string error_string= "NuoDB:"; |
445 | + error_string += "Error preparing select statement: ["; |
446 | + error_string += e.getSqlcode(); |
447 | + error_string += "] "; |
448 | + error_string += e.getText(); |
449 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
450 | + return GEARMAND_QUEUE_ERROR; |
451 | + } catch (...) { |
452 | + // Unknown error |
453 | + std::string error_string= "NuoDB:"; |
454 | + error_string+= "Error preparing select statement: "; |
455 | + error_string+= insert_sql; |
456 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
457 | + return GEARMAND_QUEUE_ERROR; |
458 | + } |
459 | + |
460 | + rs->close(); |
461 | + rs = NULL; |
462 | + stmt->close(); |
463 | + stmt = NULL; |
464 | + return GEARMAND_SUCCESS; |
465 | +} |
466 | + |
467 | +/* |
468 | + * Static definitions |
469 | + */ |
470 | + |
471 | +static gearmand_error_t _nuodb_add(gearman_server_st*, void *context, |
472 | + const char *unique, size_t unique_size, |
473 | + const char *function_name, |
474 | + size_t function_name_size, |
475 | + const void *data, size_t data_size, |
476 | + gearman_job_priority_t priority, |
477 | + int64_t when) |
478 | +{ |
479 | + gearmand::plugins::queue::Nuodb *queue= (gearmand::plugins::queue::Nuodb *)context; |
480 | + |
481 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "nuodb add: %.*s", (uint32_t)unique_size, (char *)unique); |
482 | + |
483 | + |
484 | + NuoDB::PreparedStatement *stmt = queue->insert_stmt(); |
485 | + |
486 | + try { |
487 | + stmt->setInt(1, static_cast<uint32_t>(priority)); |
488 | + stmt->setString(2, unique); |
489 | + stmt->setString(3, function_name); |
490 | + stmt->setBytes(4, data_size, (const void *) data); |
491 | + stmt->setLong(5, when); |
492 | + stmt->execute(); |
493 | + } catch (NuoDB::SQLException &e) { |
494 | + std::string error_string= "NuoDB:"; |
495 | + error_string += "Add queue error: ["; |
496 | + error_string += e.getSqlcode() + "] "; |
497 | + error_string += e.getText(); |
498 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
499 | + return GEARMAND_QUEUE_ERROR; |
500 | + } catch (...) { |
501 | + // Unknown error |
502 | + std::string error_string= "NuoDB:"; |
503 | + error_string+= "Unable to add to queue "; |
504 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
505 | + return GEARMAND_QUEUE_ERROR; |
506 | + } |
507 | + |
508 | + return GEARMAND_SUCCESS; |
509 | +} |
510 | + |
511 | +static gearmand_error_t _nuodb_flush(gearman_server_st *, void *) |
512 | +{ |
513 | + gearmand_debug("nuodb flush"); |
514 | + |
515 | + return GEARMAND_SUCCESS; |
516 | +} |
517 | + |
518 | +static gearmand_error_t _nuodb_done(gearman_server_st*, void *context, |
519 | + const char *unique, |
520 | + size_t unique_size, |
521 | + const char *function_name, |
522 | + size_t function_name_size) |
523 | +{ |
524 | + (void)function_name_size; |
525 | + gearmand::plugins::queue::Nuodb *queue= (gearmand::plugins::queue::Nuodb *)context; |
526 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "nuodb done: %.*s", (uint32_t)unique_size, (char *)unique); |
527 | + |
528 | + NuoDB::PreparedStatement *stmt = queue->delete_stmt(); |
529 | + |
530 | + try { |
531 | + stmt->setString(1, unique); |
532 | + stmt->setString(2, function_name); |
533 | + stmt->execute(); |
534 | + } catch (NuoDB::SQLException &e) { |
535 | + std::string error_string= "NuoDB:"; |
536 | + error_string += "Delete queue error: ["; |
537 | + error_string += e.getSqlcode(); |
538 | + error_string += "] "; |
539 | + error_string += e.getText(); |
540 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
541 | + return GEARMAND_QUEUE_ERROR; |
542 | + } catch (...) { |
543 | + // Unknown error |
544 | + std::string error_string= "NuoDB:"; |
545 | + error_string+= "Unable to delete from queue "; |
546 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
547 | + return GEARMAND_QUEUE_ERROR; |
548 | + } |
549 | + |
550 | + return GEARMAND_SUCCESS; |
551 | +} |
552 | + |
553 | +static gearmand_error_t _nuodb_replay(gearman_server_st *server, void *context, |
554 | + gearman_queue_add_fn *add_fn, |
555 | + void *add_context) |
556 | +{ |
557 | + gearmand::plugins::queue::Nuodb *queue= (gearmand::plugins::queue::Nuodb *)context; |
558 | + |
559 | + gearmand_info("nuodb replay start"); |
560 | + |
561 | + |
562 | + NuoDB::PreparedStatement *stmt = queue->select_stmt(); |
563 | + NuoDB::ResultSet *rs = NULL; |
564 | + |
565 | + |
566 | + try { |
567 | + stmt->execute(); |
568 | + rs = stmt->getResultSet(); |
569 | + } catch (NuoDB::SQLException &e) { |
570 | + std::string error_string= "NuoDB:"; |
571 | + error_string += "Select error: ["; |
572 | + error_string += e.getSqlcode(); |
573 | + error_string += "] "; |
574 | + error_string += e.getText(); |
575 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
576 | + return GEARMAND_QUEUE_ERROR; |
577 | + } catch (...) { |
578 | + // Unknown error |
579 | + rs = NULL; |
580 | + } |
581 | + |
582 | + if (rs == NULL) |
583 | + { |
584 | + std::string error_string= "NuoDB:"; |
585 | + error_string+= "No results for query "; |
586 | + gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR); |
587 | + return GEARMAND_QUEUE_ERROR; |
588 | + } |
589 | + |
590 | + |
591 | + while(rs->next()) |
592 | + { |
593 | + const char *unique_key; |
594 | + const char *function_name; |
595 | + int priority; |
596 | + size_t data_length; |
597 | + char *data; |
598 | + int when_to_run; |
599 | + |
600 | + unique_key = rs->getString(1); |
601 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, |
602 | + "nuodb replay: %s", |
603 | + unique_key); |
604 | + |
605 | + NuoDB::Blob *blob = rs->getBlob(4); |
606 | + if (blob == NULL) |
607 | + { |
608 | + data= NULL; |
609 | + data_length= 0; |
610 | + } |
611 | + else |
612 | + { |
613 | + data_length = size_t(blob->length()); |
614 | + data= (char *)malloc(data_length); |
615 | + if (data == NULL) |
616 | + { |
617 | + return gearmand_perror(errno, "malloc"); |
618 | + } |
619 | + blob->getBytes(0, data_length, (unsigned char *)data); |
620 | + } |
621 | + |
622 | + gearmand_error_t ret; |
623 | + function_name = rs->getString(2); |
624 | + priority = rs->getInt(3); |
625 | + when_to_run = rs->getInt(5); |
626 | + ret= (*add_fn)(server, add_context, unique_key, |
627 | + (size_t) strlen(unique_key), |
628 | + function_name, |
629 | + (size_t) strlen(function_name), |
630 | + data, data_length, |
631 | + (gearman_job_priority_t) priority, |
632 | + when_to_run); |
633 | + if (gearmand_failed(ret)) |
634 | + { |
635 | + rs->close(); |
636 | + return ret; |
637 | + } |
638 | + } |
639 | + |
640 | + rs->close(); |
641 | + return GEARMAND_SUCCESS; |
642 | +} |
643 | + |
644 | +#pragma GCC diagnostic pop |
645 | |
646 | === added file 'libgearman-server/plugins/queue/nuodb/queue.h' |
647 | --- libgearman-server/plugins/queue/nuodb/queue.h 1970-01-01 00:00:00 +0000 |
648 | +++ libgearman-server/plugins/queue/nuodb/queue.h 2014-05-09 18:35:29 +0000 |
649 | @@ -0,0 +1,49 @@ |
650 | +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: |
651 | + * |
652 | + * Gearmand client and server library. |
653 | + * |
654 | + * Copyright (C) 2011 Data Differential, http://datadifferential.com/ |
655 | + * All rights reserved. |
656 | + * |
657 | + * Redistribution and use in source and binary forms, with or without |
658 | + * modification, are permitted provided that the following conditions are |
659 | + * met: |
660 | + * |
661 | + * * Redistributions of source code must retain the above copyright |
662 | + * notice, this list of conditions and the following disclaimer. |
663 | + * |
664 | + * * Redistributions in binary form must reproduce the above |
665 | + * copyright notice, this list of conditions and the following disclaimer |
666 | + * in the documentation and/or other materials provided with the |
667 | + * distribution. |
668 | + * |
669 | + * * The names of its contributors may not be used to endorse or |
670 | + * promote products derived from this software without specific prior |
671 | + * written permission. |
672 | + * |
673 | + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
674 | + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
675 | + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
676 | + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
677 | + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
678 | + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
679 | + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
680 | + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
681 | + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
682 | + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
683 | + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
684 | + * |
685 | + */ |
686 | + |
687 | +#pragma once |
688 | + |
689 | + |
690 | +namespace gearmand { |
691 | +namespace plugins { |
692 | +namespace queue { |
693 | + |
694 | +void initialize_nuodb(); |
695 | + |
696 | +} // namespace queue |
697 | +} // namespace plugin |
698 | +} // namespace gearmand |
699 | |
700 | === added file 'm4/ax_lib_nuodb.m4' |
701 | --- m4/ax_lib_nuodb.m4 1970-01-01 00:00:00 +0000 |
702 | +++ m4/ax_lib_nuodb.m4 2014-05-09 18:35:29 +0000 |
703 | @@ -0,0 +1,130 @@ |
704 | +# |
705 | +# SYNOPSIS |
706 | +# |
707 | +# AX_LIB_NUODB([MINIMUM-VERSION]) |
708 | +# |
709 | +# DESCRIPTION |
710 | +# |
711 | +# This macro provides tests of availability of NuoDB 'libRemote' library |
712 | +# of particular version or newer. |
713 | +# |
714 | +# AX_LIB_NUODB macro takes only one argument which is optional. If |
715 | +# there is no required version passed, then macro does not run version |
716 | +# test. |
717 | +# |
718 | +# The --with-nuodb option takes one of three possible values: |
719 | +# |
720 | +# no - do not check for NuoDB client library |
721 | +# |
722 | +# yes - do check for NuoDB client library in standard locations |
723 | +# |
724 | +# path - complete path to NuoDB home directory |
725 | +# |
726 | +# This macro calls: |
727 | +# |
728 | +# AC_SUBST(NUODB_CFLAGS) |
729 | +# AC_SUBST(NUODB_LDFLAGS) |
730 | +# AC_SUBST(NUODB_VERSION) |
731 | +# |
732 | +# And sets: |
733 | +# |
734 | +# HAVE_NUODB |
735 | +# |
736 | + |
737 | +AC_DEFUN([AX_LIB_NUODB], |
738 | +[ |
739 | + AC_ARG_WITH([nuodb], |
740 | + AS_HELP_STRING([--with-nuodb=@<:@ARG@:>@], |
741 | + [use NuoDB libNuoRemote library @<:@default=yes@:>@, optionally specify path to NuoDB Home Directory] |
742 | + ), |
743 | + [ |
744 | + if test "$withval" = "no"; then |
745 | + WANT_NUODB="no" |
746 | + elif test "$withval" = "yes"; then |
747 | + WANT_NUODB="yes" |
748 | + NUOHOME="/opt/nuodb" |
749 | + else |
750 | + WANT_NUODB="yes" |
751 | + NUOHOME="$withval" |
752 | + fi |
753 | + ], |
754 | + [WANT_NUODB="yes"] |
755 | + ) |
756 | + |
757 | + NUODB_CFLAGS="" |
758 | + NUODB_LDFLAGS="" |
759 | + NUODB_VERSION="" |
760 | + |
761 | + dnl |
762 | + dnl Check NuoDB libraries (libNuoRemote) |
763 | + dnl |
764 | + |
765 | + if test "$WANT_NUODB" = "yes"; then |
766 | + |
767 | + if test -z "$NUOHOME" -o test; then |
768 | + |
769 | + AC_MSG_CHECKING([for NuoDB libraries]) |
770 | + |
771 | + NUODB_CFLAGS="-I$NUOHOME/include" |
772 | + NUODB_LDFLAGS="-L$NUOHOME/lib64 -lNuoRemote" |
773 | + NUODB_VERSION=`$NUOHOME/bin/nuochk --version | sed -e 's#NuoDB Server build ##'` |
774 | + AC_DEFINE([HAVE_NUODB], [1], |
775 | + [Define to 1 if NuoDB libraries are available]) |
776 | + |
777 | + found_nuodb="yes" |
778 | + AC_MSG_RESULT([yes]) |
779 | + else |
780 | + found_nuodb="no" |
781 | + AC_MSG_RESULT([no]) |
782 | + fi |
783 | + fi |
784 | + |
785 | + dnl |
786 | + dnl Check if required version of NuoDB is available |
787 | + dnl |
788 | + |
789 | + |
790 | + nuodb_version_req=ifelse([$1], [], [], [$1]) |
791 | + |
792 | + if test "$found_nuodb" = "yes" -a -n "$nuodb_version_req"; then |
793 | + |
794 | + AC_MSG_CHECKING([if NuoDB version is >= $nuodb_version_req]) |
795 | + |
796 | + dnl Decompose required version string of NuoDB |
797 | + dnl and calculate its number representation |
798 | + nuodb_version_req_major=`expr $nuodb_version_req : '\([[0-9]]*\)'` |
799 | + nuodb_version_req_minor=`expr $nuodb_version_req : '[[0-9]]*\.\([[0-9]]*\)'` |
800 | + nuodb_version_req_micro=`expr $nuodb_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'` |
801 | + if test "x$nuodb_version_req_micro" = "x"; then |
802 | + nuodb_version_req_micro="0" |
803 | + fi |
804 | + |
805 | + nuodb_version_req_number=`expr $nuodb_version_req_major \* 1000000 \ |
806 | + \+ $nuodb_version_req_minor \* 1000 \ |
807 | + \+ $nuodb_version_req_micro` |
808 | + |
809 | + dnl Decompose version string of installed NuoDB |
810 | + dnl and calculate its number representation |
811 | + nuodb_version_major=`expr $NUODB_VERSION : '\([[0-9]]*\)'` |
812 | + nuodb_version_minor=`expr $NUODB_VERSION : '[[0-9]]*\.\([[0-9]]*\)'` |
813 | + nuodb_version_micro=`expr $NUODB_VERSION : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'` |
814 | + if test "x$nuodb_version_micro" = "x"; then |
815 | + nuodb_version_micro="0" |
816 | + fi |
817 | + |
818 | + nuodb_version_number=`expr $nuodb_version_major \* 1000000 \ |
819 | + \+ $nuodb_version_minor \* 1000 \ |
820 | + \+ $nuodb_version_micro` |
821 | + |
822 | + nuodb_version_check=`expr $nuodb_version_number \>\= $nuodb_version_req_number` |
823 | + if test "$nuodb_version_check" = "1"; then |
824 | + AC_MSG_RESULT([yes]) |
825 | + else |
826 | + AC_MSG_RESULT([no]) |
827 | + fi |
828 | + fi |
829 | + |
830 | + AC_SUBST([NUODB_VERSION]) |
831 | + AC_SUBST([NUODB_CFLAGS]) |
832 | + AC_SUBST([NUODB_LDFLAGS]) |
833 | +]) |