Merge lp:~tgates/gearmand/nuodb-queue-adapter into lp:gearmand

Proposed by Tom Gates
Status: Needs review
Proposed branch: lp:~tgates/gearmand/nuodb-queue-adapter
Merge into: lp:gearmand
Diff against target: 833 lines (+768/-0)
8 files modified
configure.ac (+6/-0)
libgearman-server/plugins.cc (+7/-0)
libgearman-server/plugins/queue.h (+2/-0)
libgearman-server/plugins/queue/include.am (+1/-0)
libgearman-server/plugins/queue/nuodb/include.am (+23/-0)
libgearman-server/plugins/queue/nuodb/queue.cc (+550/-0)
libgearman-server/plugins/queue/nuodb/queue.h (+49/-0)
m4/ax_lib_nuodb.m4 (+130/-0)
To merge this branch: bzr merge lp:~tgates/gearmand/nuodb-queue-adapter
Reviewer Review Type Date Requested Status
Tangent Trunk Pending
Review via email: mp+219047@code.launchpad.net

Description of the change

Add support for using NuoDB as a persistent queue for Gearman.

To post a comment you must log in.

Unmerged revisions

895. By Tom Gates

Add support for NuoDB persistent queues

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'configure.ac'
--- configure.ac 2014-02-16 02:15:49 +0000
+++ configure.ac 2014-05-09 18:35:29 +0000
@@ -104,6 +104,11 @@
104 [AC_DEFINE([HAVE_LIBSQLITE3],[0],[Have the SQLITE3 library])])104 [AC_DEFINE([HAVE_LIBSQLITE3],[0],[Have the SQLITE3 library])])
105AM_CONDITIONAL([HAVE_LIBSQLITE3],[test "x${WANT_SQLITE3}" = "xyes"])105AM_CONDITIONAL([HAVE_LIBSQLITE3],[test "x${WANT_SQLITE3}" = "xyes"])
106106
107AX_LIB_NUODB
108AS_IF([test "x${WANT_NUODB}" = "xyes"],
109 [AC_DEFINE([HAVE_NUODB],[1],[Have the NuoDB library])],
110 [AC_DEFINE([HAVE_NUODB],[0],[Have the NuoDB library])])
111AM_CONDITIONAL([HAVE_NUODB],[test "x${WANT_NUODB}" = "xyes"])
107112
108PANDORA_ENABLE_DTRACE113PANDORA_ENABLE_DTRACE
109AX_HAVE_LIBPQ114AX_HAVE_LIBPQ
@@ -345,6 +350,7 @@
345echo " * Debug enabled: $ax_enable_debug"350echo " * Debug enabled: $ax_enable_debug"
346echo " * Warnings as failure: $ac_cv_warnings_as_errors"351echo " * Warnings as failure: $ac_cv_warnings_as_errors"
347echo " * Building with libsqlite3 $WANT_SQLITE3"352echo " * Building with libsqlite3 $WANT_SQLITE3"
353echo " * Building with nuodb $WANT_NUODB"
348echo " * Building with libdrizzle $ac_enable_libdrizzle"354echo " * Building with libdrizzle $ac_enable_libdrizzle"
349echo " * Building with libmemcached $ax_enable_libmemcached"355echo " * Building with libmemcached $ax_enable_libmemcached"
350echo " * Building with libpq $ac_cv_libpq"356echo " * Building with libpq $ac_cv_libpq"
351357
=== modified file 'libgearman-server/plugins.cc'
--- libgearman-server/plugins.cc 2013-10-20 11:00:25 +0000
+++ libgearman-server/plugins.cc 2014-05-09 18:35:29 +0000
@@ -71,6 +71,13 @@
71 }71 }
72#endif72#endif
7373
74#if defined(HAVE_NUODB) && HAVE_NUODB
75 if (HAVE_NUODB)
76 {
77 queue::initialize_nuodb();
78 }
79#endif
80
74#if defined(HAVE_LIBPQ) && HAVE_LIBPQ81#if defined(HAVE_LIBPQ) && HAVE_LIBPQ
75 if (HAVE_LIBPQ)82 if (HAVE_LIBPQ)
76 {83 {
7784
=== modified file 'libgearman-server/plugins/queue.h'
--- libgearman-server/plugins/queue.h 2012-11-23 09:17:06 +0000
+++ libgearman-server/plugins/queue.h 2014-05-09 18:35:29 +0000
@@ -52,3 +52,5 @@
52#include <libgearman-server/plugins/queue/redis/queue.h>52#include <libgearman-server/plugins/queue/redis/queue.h>
5353
54#include <libgearman-server/plugins/queue/mysql/queue.h>54#include <libgearman-server/plugins/queue/mysql/queue.h>
55
56#include <libgearman-server/plugins/queue/nuodb/queue.h>
5557
=== modified file 'libgearman-server/plugins/queue/include.am'
--- libgearman-server/plugins/queue/include.am 2012-11-23 09:17:06 +0000
+++ libgearman-server/plugins/queue/include.am 2014-05-09 18:35:29 +0000
@@ -21,3 +21,4 @@
21include libgearman-server/plugins/queue/sqlite/include.am21include libgearman-server/plugins/queue/sqlite/include.am
22include libgearman-server/plugins/queue/tokyocabinet/include.am22include libgearman-server/plugins/queue/tokyocabinet/include.am
23include libgearman-server/plugins/queue/mysql/include.am23include libgearman-server/plugins/queue/mysql/include.am
24include libgearman-server/plugins/queue/nuodb/include.am
2425
=== added directory 'libgearman-server/plugins/queue/nuodb'
=== added file 'libgearman-server/plugins/queue/nuodb/include.am'
--- libgearman-server/plugins/queue/nuodb/include.am 1970-01-01 00:00:00 +0000
+++ libgearman-server/plugins/queue/nuodb/include.am 2014-05-09 18:35:29 +0000
@@ -0,0 +1,23 @@
1# vim:ft=automake
2# Gearman
3# Copyright (C) 2011 Data Differential, http://datadifferential.com/
4# All rights reserved.
5#
6# Use and distribution licensed under the BSD license. See
7# the COPYING file in the parent directory for full text.
8#
9# All paths should be given relative to the root
10#
11
12noinst_HEADERS+= libgearman-server/plugins/queue/nuodb/queue.h
13
14if HAVE_NUODB
15
16libgearman_server_libgearman_server_la_SOURCES+= libgearman-server/plugins/queue/nuodb/queue.cc
17libgearman_server_libgearman_server_la_LIBADD+= $(NUODB_LDFLAGS)
18libgearman_server_libgearman_server_la_CXXFLAGS+= $(NUODB_CFLAGS)
19
20gearmand_gearmand_LDADD+= $(NUODB_LDFLAGS)
21
22endif
23
024
=== added file 'libgearman-server/plugins/queue/nuodb/queue.cc'
--- libgearman-server/plugins/queue/nuodb/queue.cc 1970-01-01 00:00:00 +0000
+++ libgearman-server/plugins/queue/nuodb/queue.cc 2014-05-09 18:35:29 +0000
@@ -0,0 +1,550 @@
1/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39/**
40 * @file
41 * @brief NuoDB Queue Storage Definitions
42 */
43
44#include <gear_config.h>
45#include <libgearman-server/common.h>
46#include <libgearman-server/byte.h>
47#include <libgearman-server/plugins/queue/nuodb/queue.h>
48#include <libgearman-server/plugins/queue/base.h>
49
50#pragma GCC diagnostic push
51#pragma GCC diagnostic ignored "-Wundef"
52#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
53#pragma GCC diagnostic ignored "-Woverloaded-virtual"
54#pragma GCC diagnostic ignored "-Wreorder"
55#pragma GCC diagnostic ignored "-Wunused-parameter"
56#pragma GCC diagnostic ignored "-Wold-style-cast"
57
58#include <NuoDB.h>
59#include <cerrno>
60
61/**
62 * @addtogroup plugins::queue::NuoDB Static NuoDB Queue Storage Definitions
63 * @ingroup gearman_queue_nuodb
64 * @{
65 */
66
67/**
68 * Default values.
69 */
70#define GEARMAND_QUEUE_NUODB_DEFAULT_SCHEMA "gearman"
71#define GEARMAND_QUEUE_NUODB_DEFAULT_TABLE "queue"
72
73namespace gearmand { namespace plugins { namespace queue { class Nuodb; }}}
74
75static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Nuodb *queue);
76
77namespace gearmand {
78namespace plugins {
79namespace queue {
80
81class Nuodb : public gearmand::plugins::Queue {
82public:
83 Nuodb();
84 ~Nuodb();
85
86 gearmand_error_t initialize();
87
88 const std::string &insert() {
89 return _insert_query;
90 }
91
92 const std::string &select() {
93 return _select_query;
94 }
95
96 const std::string &create() {
97 return _create_query;
98 }
99
100 NuoDB::Connection *con;
101 std::string nuodb_connect_string;
102 std::string nuodb_user;
103 std::string nuodb_pw;
104 std::string schema;
105 std::string table;
106 NuoDB::PreparedStatement *_insert_stmt;
107 NuoDB::PreparedStatement *_delete_stmt;
108 NuoDB::PreparedStatement *_select_stmt;
109
110public:
111 std::string _insert_query;
112 std::string _select_query;
113 std::string _create_query;
114
115 NuoDB::PreparedStatement *insert_stmt() {
116 return _insert_stmt;
117 }
118
119 NuoDB::PreparedStatement *delete_stmt() {
120 return _delete_stmt;
121 }
122
123 NuoDB::PreparedStatement *select_stmt() {
124 return _select_stmt;
125 }
126
127};
128
129Nuodb::Nuodb() :
130 Queue("NuoDB"),
131 con(NULL),
132 nuodb_connect_string(""),
133 nuodb_user(""),
134 nuodb_pw(""),
135 table(""),
136 schema("")
137{
138 command_line_options().add_options()
139 ("nuodb-conninfo", boost::program_options::value(&nuodb_connect_string)->default_value(""), "NuoDB connection information string.")
140 ("nuodb-user", boost::program_options::value(&nuodb_user)->default_value(""), "NuoDB connection username.")
141 ("nuodb-password", boost::program_options::value(&nuodb_pw)->default_value(""), "NuoDB connection password.")
142 ("nuodb-table", boost::program_options::value(&table)->default_value(GEARMAND_QUEUE_NUODB_DEFAULT_TABLE), "Table to use.")
143 ("nuodb-schema", boost::program_options::value(&schema)->default_value(GEARMAND_QUEUE_NUODB_DEFAULT_SCHEMA), "Schema to use.");
144}
145
146Nuodb::~Nuodb ()
147{
148 if (con) {
149 con->close();
150 }
151 con = NULL;
152}
153
154gearmand_error_t Nuodb::initialize()
155{
156 _create_query+= "CREATE TABLE " +table +" (unique_key VARCHAR" +"(" + TOSTRING(GEARMAN_UNIQUE_SIZE) +"), ";
157 _create_query+= "function_name VARCHAR(255), priority INTEGER, data BLOB, when_to_run INTEGER, UNIQUE (unique_key, function_name))";
158 _insert_query+= "INSERT INTO " +table +" (priority, unique_key, function_name, data, when_to_run) VALUES(?,?,?,?,?)";
159 _select_query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM " +table;
160 gearmand_error_t ret= _initialize(Gearmand()->server, this);
161 return ret;
162}
163
164void initialize_nuodb()
165{
166 static Nuodb local_instance;
167}
168
169} // namespace queue
170} // namespace plugins
171} // namespace gearmand
172
173
174/* Queue callback functions. */
175static gearmand_error_t _nuodb_add(gearman_server_st *server, void *context,
176 const char *unique, size_t unique_size,
177 const char *function_name,
178 size_t function_name_size,
179 const void *data, size_t data_size,
180 gearman_job_priority_t priority,
181 int64_t when);
182
183static gearmand_error_t _nuodb_flush(gearman_server_st *server, void *context);
184
185static gearmand_error_t _nuodb_done(gearman_server_st *server, void *context,
186 const char *unique,
187 size_t unique_size,
188 const char *function_name,
189 size_t function_name_size);
190
191static gearmand_error_t _nuodb_replay(gearman_server_st *server, void *context,
192 gearman_queue_add_fn *add_fn,
193 void *add_context);
194
195/** @} */
196
197/*
198 * Public definitions
199 */
200
201gearmand_error_t _initialize(gearman_server_st& server,
202 gearmand::plugins::queue::Nuodb *queue)
203{
204 // TODO: use nuodb_connnect_string
205 const char *database = queue->nuodb_connect_string.c_str();
206 const char *user = queue->nuodb_user.c_str();
207 const char *pw = queue->nuodb_pw.c_str();
208 const char *schema = queue->schema.c_str();
209
210 gearmand_info("Initializing nuodb module");
211
212 gearman_server_set_queue(server, queue, _nuodb_add, _nuodb_flush, _nuodb_done, _nuodb_replay);
213
214 try {
215 queue->con = NuoDB::Connection::createUtf8();
216 NuoDB::Properties* properties = queue->con->allocProperties();
217 properties->putValue("user", user);
218 properties->putValue("password", pw);
219 properties->putValue("schema", schema);
220 queue->con->openDatabase(database, properties);
221 } catch (NuoDB::SQLException &e) {
222 gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
223 return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "NuoDB Connection: [%s] %s", e.getSqlcode(), e.getText());
224 } catch (...) {
225 // Unknown error
226 queue->con = NULL;
227 }
228 if (queue->con == NULL)
229 {
230 gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
231 return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "NuoDB Connection: %s", "Unknown error");
232 }
233
234
235 std::string query("SELECT TABLENAME FROM SYSTEM.TABLES WHERE TYPE = 'TABLE' AND SCHEMA = '" +queue->schema +"' AND TABLENAME = '" +queue->table + "'");
236
237 NuoDB::PreparedStatement *stmt = NULL;
238 NuoDB::ResultSet *rs = NULL;
239
240 try {
241 stmt = queue->con->prepareStatement(query.c_str());
242 stmt->execute();
243 rs = stmt->getResultSet();
244 } catch (NuoDB::SQLException &e) {
245 std::string error_string= "NuoDB:";
246 error_string += "Queue error: [";
247 error_string += e.getSqlcode();
248 error_string += "] ";
249 error_string += e.getText();
250 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
251 return GEARMAND_QUEUE_ERROR;
252 } catch (...) {
253 // Unknown error
254 rs = NULL;
255 }
256
257 if (rs == NULL) {
258 std::string error_string= "NuoDB:";
259 error_string+= "No results for query: ";
260 error_string+= query.c_str();
261 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
262 return GEARMAND_QUEUE_ERROR;
263 }
264
265 if (!rs->next())
266 {
267 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "nuodb module creating table '%s'", queue->table.c_str());
268
269 bool result = 1;
270 const char *sql = queue->create().c_str();
271 try {
272 stmt = queue->con->prepareStatement(sql);
273 result = stmt->execute();
274 } catch (NuoDB::SQLException &e) {
275 std::string error_string= "NuoDB:";
276 error_string += "Table create error: [";
277 error_string += e.getSqlcode();
278 error_string += "] ";
279 error_string += e.getText();
280 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
281 return GEARMAND_QUEUE_ERROR;
282 } catch (...) {
283 // Unknown error
284 std::string error_string= "NuoDB:";
285 error_string+= "Table create error for statement: ";
286 error_string+= query.c_str();
287 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
288 return GEARMAND_QUEUE_ERROR;
289 }
290
291 if (result == 1)
292 {
293 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
294 "NuoDB:Problem creating table: %s", sql);
295 gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
296 return GEARMAND_QUEUE_ERROR;
297 }
298 }
299
300 const char *insert_sql = queue->insert().c_str();
301 try {
302 queue->_insert_stmt = queue->con->prepareStatement(insert_sql);
303 } catch (NuoDB::SQLException &e) {
304 std::string error_string= "NuoDB:";
305 error_string += "Error preparing insert statement: [";
306 error_string += e.getSqlcode();
307 error_string += "] ";
308 error_string += e.getText();
309 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
310 return GEARMAND_QUEUE_ERROR;
311 } catch (...) {
312 // Unknown error
313 std::string error_string= "NuoDB:";
314 error_string+= "Error preparing insert statement: ";
315 error_string+= insert_sql;
316 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
317 return GEARMAND_QUEUE_ERROR;
318 }
319
320 std::string delete_query;
321 delete_query+= "DELETE FROM ";
322 delete_query+= queue->table;
323 delete_query+= " WHERE unique_key = ?";
324 delete_query+= " AND function_name = ?";
325 const char *delete_sql = delete_query.c_str();
326 try {
327 queue->_delete_stmt = queue->con->prepareStatement(delete_sql);
328 } catch (NuoDB::SQLException &e) {
329 std::string error_string= "NuoDB:";
330 error_string += "Error preparing delete statement: [";
331 error_string += e.getSqlcode();
332 error_string += "] ";
333 error_string += e.getText();
334 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
335 return GEARMAND_QUEUE_ERROR;
336 } catch (...) {
337 // Unknown error
338 std::string error_string= "NuoDB:";
339 error_string+= "Error preparing delete statement: ";
340 error_string+= insert_sql;
341 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
342 return GEARMAND_QUEUE_ERROR;
343 }
344
345 std::string select_query("SELECT unique_key,function_name,priority,data,when_to_run FROM " + queue->table);
346 const char *select_sql = select_query.c_str();
347 try {
348 queue->_select_stmt = queue->con->prepareStatement(select_sql);
349 } catch (NuoDB::SQLException &e) {
350 std::string error_string= "NuoDB:";
351 error_string += "Error preparing select statement: [";
352 error_string += e.getSqlcode();
353 error_string += "] ";
354 error_string += e.getText();
355 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
356 return GEARMAND_QUEUE_ERROR;
357 } catch (...) {
358 // Unknown error
359 std::string error_string= "NuoDB:";
360 error_string+= "Error preparing select statement: ";
361 error_string+= insert_sql;
362 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
363 return GEARMAND_QUEUE_ERROR;
364 }
365
366 rs->close();
367 rs = NULL;
368 stmt->close();
369 stmt = NULL;
370 return GEARMAND_SUCCESS;
371}
372
373/*
374 * Static definitions
375 */
376
377static gearmand_error_t _nuodb_add(gearman_server_st*, void *context,
378 const char *unique, size_t unique_size,
379 const char *function_name,
380 size_t function_name_size,
381 const void *data, size_t data_size,
382 gearman_job_priority_t priority,
383 int64_t when)
384{
385 gearmand::plugins::queue::Nuodb *queue= (gearmand::plugins::queue::Nuodb *)context;
386
387 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "nuodb add: %.*s", (uint32_t)unique_size, (char *)unique);
388
389
390 NuoDB::PreparedStatement *stmt = queue->insert_stmt();
391
392 try {
393 stmt->setInt(1, static_cast<uint32_t>(priority));
394 stmt->setString(2, unique);
395 stmt->setString(3, function_name);
396 stmt->setBytes(4, data_size, (const void *) data);
397 stmt->setLong(5, when);
398 stmt->execute();
399 } catch (NuoDB::SQLException &e) {
400 std::string error_string= "NuoDB:";
401 error_string += "Add queue error: [";
402 error_string += e.getSqlcode() + "] ";
403 error_string += e.getText();
404 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
405 return GEARMAND_QUEUE_ERROR;
406 } catch (...) {
407 // Unknown error
408 std::string error_string= "NuoDB:";
409 error_string+= "Unable to add to queue ";
410 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
411 return GEARMAND_QUEUE_ERROR;
412 }
413
414 return GEARMAND_SUCCESS;
415}
416
417static gearmand_error_t _nuodb_flush(gearman_server_st *, void *)
418{
419 gearmand_debug("nuodb flush");
420
421 return GEARMAND_SUCCESS;
422}
423
424static gearmand_error_t _nuodb_done(gearman_server_st*, void *context,
425 const char *unique,
426 size_t unique_size,
427 const char *function_name,
428 size_t function_name_size)
429{
430 (void)function_name_size;
431 gearmand::plugins::queue::Nuodb *queue= (gearmand::plugins::queue::Nuodb *)context;
432 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "nuodb done: %.*s", (uint32_t)unique_size, (char *)unique);
433
434 NuoDB::PreparedStatement *stmt = queue->delete_stmt();
435
436 try {
437 stmt->setString(1, unique);
438 stmt->setString(2, function_name);
439 stmt->execute();
440 } catch (NuoDB::SQLException &e) {
441 std::string error_string= "NuoDB:";
442 error_string += "Delete queue error: [";
443 error_string += e.getSqlcode();
444 error_string += "] ";
445 error_string += e.getText();
446 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
447 return GEARMAND_QUEUE_ERROR;
448 } catch (...) {
449 // Unknown error
450 std::string error_string= "NuoDB:";
451 error_string+= "Unable to delete from queue ";
452 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
453 return GEARMAND_QUEUE_ERROR;
454 }
455
456 return GEARMAND_SUCCESS;
457}
458
459static gearmand_error_t _nuodb_replay(gearman_server_st *server, void *context,
460 gearman_queue_add_fn *add_fn,
461 void *add_context)
462{
463 gearmand::plugins::queue::Nuodb *queue= (gearmand::plugins::queue::Nuodb *)context;
464
465 gearmand_info("nuodb replay start");
466
467
468 NuoDB::PreparedStatement *stmt = queue->select_stmt();
469 NuoDB::ResultSet *rs = NULL;
470
471
472 try {
473 stmt->execute();
474 rs = stmt->getResultSet();
475 } catch (NuoDB::SQLException &e) {
476 std::string error_string= "NuoDB:";
477 error_string += "Select error: [";
478 error_string += e.getSqlcode();
479 error_string += "] ";
480 error_string += e.getText();
481 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
482 return GEARMAND_QUEUE_ERROR;
483 } catch (...) {
484 // Unknown error
485 rs = NULL;
486 }
487
488 if (rs == NULL)
489 {
490 std::string error_string= "NuoDB:";
491 error_string+= "No results for query ";
492 gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
493 return GEARMAND_QUEUE_ERROR;
494 }
495
496
497 while(rs->next())
498 {
499 const char *unique_key;
500 const char *function_name;
501 int priority;
502 size_t data_length;
503 char *data;
504 int when_to_run;
505
506 unique_key = rs->getString(1);
507 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
508 "nuodb replay: %s",
509 unique_key);
510
511 NuoDB::Blob *blob = rs->getBlob(4);
512 if (blob == NULL)
513 {
514 data= NULL;
515 data_length= 0;
516 }
517 else
518 {
519 data_length = size_t(blob->length());
520 data= (char *)malloc(data_length);
521 if (data == NULL)
522 {
523 return gearmand_perror(errno, "malloc");
524 }
525 blob->getBytes(0, data_length, (unsigned char *)data);
526 }
527
528 gearmand_error_t ret;
529 function_name = rs->getString(2);
530 priority = rs->getInt(3);
531 when_to_run = rs->getInt(5);
532 ret= (*add_fn)(server, add_context, unique_key,
533 (size_t) strlen(unique_key),
534 function_name,
535 (size_t) strlen(function_name),
536 data, data_length,
537 (gearman_job_priority_t) priority,
538 when_to_run);
539 if (gearmand_failed(ret))
540 {
541 rs->close();
542 return ret;
543 }
544 }
545
546 rs->close();
547 return GEARMAND_SUCCESS;
548}
549
550#pragma GCC diagnostic pop
0551
=== added file 'libgearman-server/plugins/queue/nuodb/queue.h'
--- libgearman-server/plugins/queue/nuodb/queue.h 1970-01-01 00:00:00 +0000
+++ libgearman-server/plugins/queue/nuodb/queue.h 2014-05-09 18:35:29 +0000
@@ -0,0 +1,49 @@
1/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38#pragma once
39
40
41namespace gearmand {
42namespace plugins {
43namespace queue {
44
45void initialize_nuodb();
46
47} // namespace queue
48} // namespace plugin
49} // namespace gearmand
050
=== added file 'm4/ax_lib_nuodb.m4'
--- m4/ax_lib_nuodb.m4 1970-01-01 00:00:00 +0000
+++ m4/ax_lib_nuodb.m4 2014-05-09 18:35:29 +0000
@@ -0,0 +1,130 @@
1#
2# SYNOPSIS
3#
4# AX_LIB_NUODB([MINIMUM-VERSION])
5#
6# DESCRIPTION
7#
8# This macro provides tests of availability of NuoDB 'libRemote' library
9# of particular version or newer.
10#
11# AX_LIB_NUODB macro takes only one argument which is optional. If
12# there is no required version passed, then macro does not run version
13# test.
14#
15# The --with-nuodb option takes one of three possible values:
16#
17# no - do not check for NuoDB client library
18#
19# yes - do check for NuoDB client library in standard locations
20#
21# path - complete path to NuoDB home directory
22#
23# This macro calls:
24#
25# AC_SUBST(NUODB_CFLAGS)
26# AC_SUBST(NUODB_LDFLAGS)
27# AC_SUBST(NUODB_VERSION)
28#
29# And sets:
30#
31# HAVE_NUODB
32#
33
34AC_DEFUN([AX_LIB_NUODB],
35[
36 AC_ARG_WITH([nuodb],
37 AS_HELP_STRING([--with-nuodb=@<:@ARG@:>@],
38 [use NuoDB libNuoRemote library @<:@default=yes@:>@, optionally specify path to NuoDB Home Directory]
39 ),
40 [
41 if test "$withval" = "no"; then
42 WANT_NUODB="no"
43 elif test "$withval" = "yes"; then
44 WANT_NUODB="yes"
45 NUOHOME="/opt/nuodb"
46 else
47 WANT_NUODB="yes"
48 NUOHOME="$withval"
49 fi
50 ],
51 [WANT_NUODB="yes"]
52 )
53
54 NUODB_CFLAGS=""
55 NUODB_LDFLAGS=""
56 NUODB_VERSION=""
57
58 dnl
59 dnl Check NuoDB libraries (libNuoRemote)
60 dnl
61
62 if test "$WANT_NUODB" = "yes"; then
63
64 if test -z "$NUOHOME" -o test; then
65
66 AC_MSG_CHECKING([for NuoDB libraries])
67
68 NUODB_CFLAGS="-I$NUOHOME/include"
69 NUODB_LDFLAGS="-L$NUOHOME/lib64 -lNuoRemote"
70 NUODB_VERSION=`$NUOHOME/bin/nuochk --version | sed -e 's#NuoDB Server build ##'`
71 AC_DEFINE([HAVE_NUODB], [1],
72 [Define to 1 if NuoDB libraries are available])
73
74 found_nuodb="yes"
75 AC_MSG_RESULT([yes])
76 else
77 found_nuodb="no"
78 AC_MSG_RESULT([no])
79 fi
80 fi
81
82 dnl
83 dnl Check if required version of NuoDB is available
84 dnl
85
86
87 nuodb_version_req=ifelse([$1], [], [], [$1])
88
89 if test "$found_nuodb" = "yes" -a -n "$nuodb_version_req"; then
90
91 AC_MSG_CHECKING([if NuoDB version is >= $nuodb_version_req])
92
93 dnl Decompose required version string of NuoDB
94 dnl and calculate its number representation
95 nuodb_version_req_major=`expr $nuodb_version_req : '\([[0-9]]*\)'`
96 nuodb_version_req_minor=`expr $nuodb_version_req : '[[0-9]]*\.\([[0-9]]*\)'`
97 nuodb_version_req_micro=`expr $nuodb_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'`
98 if test "x$nuodb_version_req_micro" = "x"; then
99 nuodb_version_req_micro="0"
100 fi
101
102 nuodb_version_req_number=`expr $nuodb_version_req_major \* 1000000 \
103 \+ $nuodb_version_req_minor \* 1000 \
104 \+ $nuodb_version_req_micro`
105
106 dnl Decompose version string of installed NuoDB
107 dnl and calculate its number representation
108 nuodb_version_major=`expr $NUODB_VERSION : '\([[0-9]]*\)'`
109 nuodb_version_minor=`expr $NUODB_VERSION : '[[0-9]]*\.\([[0-9]]*\)'`
110 nuodb_version_micro=`expr $NUODB_VERSION : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'`
111 if test "x$nuodb_version_micro" = "x"; then
112 nuodb_version_micro="0"
113 fi
114
115 nuodb_version_number=`expr $nuodb_version_major \* 1000000 \
116 \+ $nuodb_version_minor \* 1000 \
117 \+ $nuodb_version_micro`
118
119 nuodb_version_check=`expr $nuodb_version_number \>\= $nuodb_version_req_number`
120 if test "$nuodb_version_check" = "1"; then
121 AC_MSG_RESULT([yes])
122 else
123 AC_MSG_RESULT([no])
124 fi
125 fi
126
127 AC_SUBST([NUODB_VERSION])
128 AC_SUBST([NUODB_CFLAGS])
129 AC_SUBST([NUODB_LDFLAGS])
130])

Subscribers

People subscribed via source and target branches

to all changes: